# Copyright (C) 2025 the astropix team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Online monitoring.
"""
from abc import ABC, abstractmethod
import queue
import threading
import time
import numpy as np
from astropix_analysis import __version__
from astropix_analysis.fmt import AbstractAstroPixReadout, AstroPix4Readout
from astropix_analysis.hist import Histogram1d, Matrix2d
from astropix_analysis.plt_ import plt
from astropix_analysis import sock
from astropix_analysis.sock import MulticastReceiver
[docs]
class AbstractMonitor(ABC):
"""Abstract base class for online monitoring applications.
.. warning::
Note this is very rudimentary, and the (rough) result is achieved with
no other GUI facility than the matplotlib canvas. In the future we almost
certainly will want to do this properly, but in the meantime this class
provides a sensible base building block for simple online monitoring
applications.
Arguments
---------
readout_class : type
The type of readout objects we are expecting (e.g., `AstroPix4Readout``).
group : str
The multicast group.
port : int
The multicast port.
"""
def __init__(self, readout_class: type, group: str = sock.LOCAL_HOST,
port: int = sock.DEFAULT_PORT) -> None:
"""Constructor.
"""
self._receiver = MulticastReceiver(readout_class, group, port)
self._readout_buffer = queue.Queue()
self._num_processed_readouts = 0
self._num_processed_hits = 0
self._msg_ax = None
self._axes = None
[docs]
def create_canvas(self, **kwargs):
"""Create the matplotlib canvas that will hold the monitoring plot.
Note we are adding a row at the very top of the subplots that we
are taking over to use it as a message dashboard.
Arguments
---------
kwargs : dict
Any keyword argument accepted by ``plt.subplot()``.
"""
plt.ion()
# Retrieve the number of rows and increase it by one unity.
nrows = kwargs.get('nrows', 1)
nrows += 1
kwargs['nrows'] = nrows
# Setup the height ratios.
ratios = kwargs.get('height_ratios')
height_ratios = [0.1]
if ratios is None:
height_ratios += [1.] * (nrows - 1)
else:
height_ratios += list(ratios)
kwargs['height_ratios'] = height_ratios
# Create the canvas.
kwargs.setdefault('num', f'Astropix Monitor {__version__}')
_, axes = plt.subplots(**kwargs)
# Switch the axes off for the first row of subplots.
for ax in axes[0]:
ax.axis('off')
# Get pointers to the relevant objects.
self._msg_ax = axes[0, 0]
self._axes = axes[1:]
[docs]
def display_message(self, x, y, text, **kwargs) -> None:
"""Display a message.
"""
self._msg_ax.cla()
self._msg_ax.axis('off')
self._msg_ax.text(x, y, text, **kwargs)
[docs]
def _listen(self) -> None:
"""Listening function to be started on a separate thread.
Note the leading underscore---this is generally not intended to
be called directly.
"""
while True:
self._readout_buffer.put(self._receiver.receive())
[docs]
def start_listening(self) -> None:
"""Start listening for readouts over the UDP socket on a new thread.
"""
# Note by default Python threads are non-daemon, meaning: Python will
# wait for them to finish before the program exits. With ``daemon=True``,
# the thread becomes a background worker that won't block the program
# from exiting---and since we don't care about dropping packets, this
# is what we want, here.
threading.Thread(target=self._listen, daemon=True).start()
[docs]
def start(self, refresh_interval: float = 0.5, update_pause: float = 0.005):
"""Start the monitoring.
This means that we start listening to the UDP socket on a new thread,
and we begin popping readouts from the underlying buffer and updating the
display.
Arguments
---------
refresh_interval : float
The plot refresh interval in s.
update_pause : float
The post-update pause, in s, at the end of the update.
"""
self.setup()
self.start_listening()
# You will notice there is some heuristics, here. Rather than sleep for the
# given refresh interval, empty the buffer and update the display, we
# are continuosly emptying the buffer, and this is the timeout that we
# use for the ``get()`` call. Roughly speaking, if the timeout is 5 times
# smaller than the refresh interval, this means that we are always poking
# the buffer 5 times for each cycle---and more if we are taking data at
# high rate. Note that calling ``get_nowait()`` instead would cause this
# function to use 100% of the CPU.
read_timeout = 0.2 * refresh_interval
try:
while True:
last_update = time.time()
while time.time() - last_update < refresh_interval:
try:
readout = self._readout_buffer.get(timeout=read_timeout)
self.process_readout(readout)
self._num_processed_readouts += 1
self._num_processed_hits += len(readout.hits())
except queue.Empty:
continue
self.update_display()
# And, apparently, this is matplotlib's built-in way to keep the live
# plot responsive and visible when doing real-time updates. Take this
# out and the plots will just refuse to update :-)
plt.pause(update_pause)
except KeyboardInterrupt:
print('Done, bye!')
[docs]
def setup(self) -> None:
"""Setup the monitor.
This is a general placeholder for the operations that need to be
performed each time the monitor in restarted.
"""
[docs]
@abstractmethod
def process_readout(self, readout: AbstractAstroPixReadout) -> None:
"""Process a single readout.
"""
[docs]
@abstractmethod
def update_display(self) -> None:
"""Update the matplotlib display.
"""
[docs]
class AstroPix4SimpleMonitor(AbstractMonitor):
"""Simple monitor for Astropix 4 readouts.
"""
NUM_COLS = 16
NUM_ROWS = 35
def __init__(self, group: str = sock.LOCAL_HOST, port: int = sock.DEFAULT_PORT) -> None:
"""Overloaded constructor.
"""
super().__init__(AstroPix4Readout, group, port)
self.tot_hist = Histogram1d(np.linspace(0., 500., 100), 'TOT [$\\mu$s]')
self.hit_map = Matrix2d(self.NUM_COLS, self.NUM_ROWS)
self.create_canvas(ncols=2, figsize=(12, 7), width_ratios=(1., 0.5))
self.tot_ax, self.hit_ax = self._axes[0]
[docs]
def process_readout(self, readout: AbstractAstroPixReadout):
"""Overloaded method.
"""
for hit in readout.decode():
self.tot_hist.fill(hit.tot_us)
self.hit_map.fill(hit.column, hit.row)
[docs]
def update_display(self) -> None:
"""Overloaded method.
"""
message = f'Connected to address {self._receiver.address()}\n'\
f'{self._num_processed_readouts} readouts '\
f'({self._num_processed_hits} hits) processed'
self.display_message(0., 0.9, message, ha='left', va='top')
self.tot_ax.cla()
self.tot_hist.draw(self.tot_ax)
self.hit_ax.cla()
self.hit_map.draw(self.hit_ax)