Source code for astropix_analysis.sock
# Copyright (C) 2025 the astropix team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Network facilities for monitoring applications.
"""
import socket
import struct
import typing
from astropix_analysis import logger
from astropix_analysis.fmt import AbstractAstroPixReadout
LOCAL_HOST = '127.0.0.1'
# Note this is part of the administratively scoped block.
DEFAULT_GROUP = '239.1.1.1'
DEFAULT_PORT = 5007
[docs]
class MulticastSocketBase(socket.socket):
"""Base class for UDP multicast sockets.
Arguments
---------
group : str
The multicast group.
port : int
The multicast port.
"""
def __init__(self, group: str = LOCAL_HOST, port: int = DEFAULT_PORT) -> None:
"""Constructor.
"""
# Cache the address information
self._address = (group, port)
logger.info(f'Creating {self.__class__.__name__} with address {self._address}...')
# AF_INET specify the IPv4 address family, SOCK_DGRAM the socket type,
# and IPPROTO_UDP specifies the UDP protocol.
super().__init__(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
[docs]
def address(self) -> tuple:
"""Return the socket address.
"""
return self._address
[docs]
def set_option(self, identifier: int, value: typing.Any) -> None:
"""Set a specific option for the socket.
Arguments
---------
identifier : int
The numerical identifier for the option.
value : any
The value for the option.
"""
self.setsockopt(socket.IPPROTO_IP, identifier, value)
[docs]
class MulticastSender(MulticastSocketBase):
"""Simple socket class to multicast packets over the network.
Arguments
---------
group : str
The multicast group.
port : int
The multicast port.
ttl : int
The TTL (time-to-live) for multicast packets. TTL controls how far multicast
packets can go. (Each router decrements the TTL by 1, and when TTL hits 0,
the packet is dropped.) Common TTL values are 1 (stay on the local subnet),
2 (allow routing to directly connected subnets), or >2 (allow more hops)
"""
def __init__(self, group: str = LOCAL_HOST, port: int = DEFAULT_PORT,
ttl: int = 2) -> None:
"""Constructor.
"""
super().__init__(group, port)
# Set the TTL (time-to-live) for the multicast packets.
self.set_option(socket.IP_MULTICAST_TTL, ttl)
[docs]
def send_data(self, data: bytes) -> int:
"""Send a packet over the network.
Arguments
---------
data : bytes
The data to be sent.
"""
return self.sendto(data, self._address)
[docs]
def send_readout(self, readout: AbstractAstroPixReadout) -> int:
"""Send a readout over the network.
Arguments
---------
readout : AbstractAstroPixReadout
The readout to be sent.
"""
return self.send_data(readout.to_bytes())
[docs]
class MulticastReceiver(MulticastSocketBase):
"""Simple socket class to receive multicast packets.
Arguments
---------
readout_class : type
The type of readout objects we are expecting (e.g., `AstroPix4Readout``).
group : str
The multicast group.
port : int
The multicast port.
"""
DEFAULT_MAX_PACKET_SIZE = 65535
def __init__(self, readout_class: type, group: str = LOCAL_HOST,
port: int = DEFAULT_PORT) -> None:
"""Constructor.
"""
self._readout_class = readout_class
super().__init__(group, port)
if group == LOCAL_HOST:
self.bind(self._address)
else:
# Bind to all interfaces on port
self.bind(('', port))
# Join the appropriate multicast group.
_mreq = struct.pack('4sl', socket.inet_aton(group), socket.INADDR_ANY)
self.set_option(socket.IP_ADD_MEMBERSHIP, _mreq)
[docs]
def receive(self, max_size: int = DEFAULT_MAX_PACKET_SIZE) -> AbstractAstroPixReadout:
"""Wait for a packet to be available, read the binary data and
return the corresponding readout object.
"""
# The return value of the recvfrom() call is a pair (bytes, address) where
# bytes is a bytes object representing the data received and address is
# the address of the socket sending the data.
data, _ = self.recvfrom(max_size)
return self._readout_class.from_bytes(data)