Source code for astropix_analysis.fmt

# Copyright (C) 2025 the astropix team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""Data format description for the astropix chip.
"""

from __future__ import annotations
from abc import ABC, abstractmethod
from enum import IntEnum
import struct
import time
import typing

import astropy.table
import numpy as np

from astropix_analysis import logger

# Table to reverse the bit order within a byte---we pre-compute this once and
# forever to speedup the computation at runtime and avoid doing the same
# calculation over and over again.
_BIT_REVERSE_TABLE = bytes.maketrans(
    bytes(range(256)),
    bytes(int(f'{i:08b}'[::-1], 2) for i in range(256))
)


[docs] def reverse_bit_order(data: bytearray) -> None: """Reverses the bit order within a bytearray. """ return data.translate(_BIT_REVERSE_TABLE)
[docs] class BitPattern(str): """Small convenience class representing a bit pattern, that we can slice (interpreting the result as the binary representation of an integer) without caring about the byte boundaries. This is not very memory-efficient and probably not blazingly fast either, but it allows to reason about the incoming bits in a straightforward fashion, and I doubt we will ever need to optimize this. (If that is the case, there are probably ways, using either numpy or the bitarray third-party package.) Arguments --------- data : bytes The binary representation of the bit pattern. """ def __new__(cls, data: bytes) -> None: """Strings are immutable, so use __new__ to start. """ return super().__new__(cls, ''.join(f'{byte:08b}' for byte in data)) def __getitem__(self, index): """Slice the underlying string and convert to integer in base 2. """ return int(super().__getitem__(index), 2)
[docs] def hitclass(cls: type) -> type: """Small decorator to support automatic generation of concrete hit classes. Here we simply calculate some useful class variables that are needed to unpack the binary data and/or write the hit to different data formats. Having a decorator allows to do the operation once and for all at the time the type is created, as opposed to do it over and over again each time an instance of the class is created. More specifically: * ``ATTRIBUTE_NAMES`` is a tuple containing all the hit field names that can be used, e.g., for printing out the hit itself; * ``_ATTR_IDX_DICT`` is a dictionary mapping the name of each attribute to the corresponding slice of the input binary buffer---note it does not include the attributes that are not encoded in the input buffer, but are calculated at construction time; this facilitates unpacking the input buffer; * ``_ATTR_TYPE_DICT`` is a dictionary mapping the name of each class attribute to the corresponding data type for the purpose of writing it to a binary file (e.g., in HDF5 or FITS format). """ # pylint: disable=protected-access cls.ATTRIBUTE_NAMES = tuple(cls._LAYOUT.keys()) cls._ATTR_IDX_DICT = {name: idx for name, (idx, _) in cls._LAYOUT.items() if idx is not None} cls._ATTR_TYPE_DICT = {name: type_ for name, (_, type_) in cls._LAYOUT.items()} return cls
[docs] class AbstractAstroPixHit(ABC): """Abstract base class for a generic AstroPix hit. While the original decode routine was working in terms of the various bytes in the binary representation of the hit, since there seem to be no meaning altogether in the byte boundaries (at least for AstroPix 4), and the various fields are arbitrary subsets of a multi-byte word, it seemed more natural to describe the hit as a sequence of fields, each one with its own length in bits. Note this is an abstract class that cannot be instantiated. (Note the ``__init__()`` special method is abstract and needs to be overloaded, based on the assumption that for concrete classes we always want to calculate derived quantities based on the row ones parsed from the input binary buffer.) Concrete subclasses must by contract: * overload the ``_SIZE``; * overload the ``_LAYOUT``; * be decorated with the ``@hitclass`` decorator. Arguments --------- data : bytearray The portion of a full AstroPix readout representing a single hit. """ # These first two class variables must be overriden by concrete subclasses... _SIZE = 0 _LAYOUT = {} # ... while these get populated automatically once the subclass is decorated # with @hitclass (and still we initialize them here to None to make the linters happy.) ATTRIBUTE_NAMES = () _ATTR_IDX_DICT = {} _ATTR_TYPE_DICT = {} @abstractmethod def __init__(self, data: bytearray) -> None: """Constructor. """ # Since we don't need the underlying bit pattern to be mutable, turn the # bytearray object into a bytes object. self._data = bytes(data) # Build a bit pattern to extract the fields and loop over the hit fields # to set all the class members. bit_pattern = BitPattern(self._data) for name, idx in self._ATTR_IDX_DICT.items(): setattr(self, name, bit_pattern[idx])
[docs] @staticmethod def gray_to_decimal(gray: int) -> int: """Convert a Gray code (integer) to decimal. A Gray code (or reflected binary code) is a binary numeral system where two consecutive values differ by only one bit, which makes it useful in error correction and minimizing logic transitions in digital circuits. This function is provided as a convenience to translate counter values encoded in Gray code into actual decimal values. """ decimal = gray # First bit is the same mask = gray while mask: mask >>= 1 decimal ^= mask # XOR each shifted bit return decimal
[docs] @classmethod def empty_table(cls, attribute_names: list[str] = None) -> astropy.table.Table: """Return an astropy empty table with the proper column types for the concrete hit type. Note this is checking that all the attribute names are valid and tries and raise a useful exception if that is not the case. Arguments --------- attribute_names : str The name of the hit attributes. """ if attribute_names is None: attribute_names = cls.ATTRIBUTE_NAMES for name in attribute_names: if name not in cls.ATTRIBUTE_NAMES: raise RuntimeError(f'Invalid attribute "{name}" for {cls.__name__}---' f'valid attributes are {cls.ATTRIBUTE_NAMES}') types = [cls._ATTR_TYPE_DICT[name] for name in attribute_names] return astropy.table.Table(names=attribute_names, dtype=types)
[docs] def attribute_values(self, attribute_names: list[str] = None) -> list: """Return the value of the hit attributes for a given set of attribute names. Arguments --------- attribute_names : str The name of the hit attributes. """ if attribute_names is None: attribute_names = self.ATTRIBUTE_NAMES return [getattr(self, name) for name in attribute_names]
def __eq__(self, other: 'AbstractAstroPixHit') -> bool: """Comparison operator---this is handy in the unit tests. """ return self._data == other._data
[docs] def dict(self) -> dict: """Return the hit content as a dict---this will be essentially identical to the ``__dict__`` dunder method, except for the order of the field. .. warning:: This will be slow, so do not abuse it. It is good for printing :-) """ _dict = {key: getattr(self, key) for key in self.ATTRIBUTE_NAMES} _dict['raw_data'] = self._data return _dict
def __str__(self) -> str: """String formatting. """ return f'{self.__class__.__name__}'\ f"({', '.join(f'{key} = {value}' for key, value in self.dict().items())})"
[docs] @hitclass class AstroPix3Hit(AbstractAstroPixHit): """Class describing an AstroPix3 hit. .. warning:: This is copied from decode.py and totally untested. """ _SIZE = 5 _LAYOUT = { 'chip_id': (slice(0, 5), np.uint8), 'payload': (slice(5, 8), np.uint8), 'column': (8, np.uint8), 'location': (slice(10, 16), np.uint8), 'timestamp': (slice(16, 24), np.uint8), 'tot_msb': (slice(28, 32), np.uint8), 'tot_lsb': (slice(32, 40), np.uint8), 'tot_dec': (None, np.uint16), 'tot_us': (None, np.float32) } CLOCK_CYCLES_PER_US = 200. def __init__(self, data: bytearray) -> None: """Constructor. """ # pylint: disable=no-member super().__init__(data) # Calculate the TOT in physical units. self.tot_dec = (self.tot_msb << 8) + self.tot_lsb self.tot_us = self.tot_dec / self.CLOCK_CYCLES_PER_US
[docs] @hitclass class AstroPix4Hit(AbstractAstroPixHit): """Class describing an AstroPix4 hit. """ _SIZE = 8 _LAYOUT = { 'chip_id': (slice(0, 5), np.uint8), 'payload': (slice(5, 8), np.uint8), 'readout_id': (None, np.uint32), 'timestamp': (None, np.uint64), 'decoding_order': (None, np.uint8), 'row': (slice(8, 13), np.uint8), 'column': (slice(13, 18), np.uint8), 'ts_neg1': (18, np.uint8), 'ts_coarse1': (slice(19, 33), np.uint16), 'ts_fine1': (slice(33, 36), np.uint8), 'ts_tdc1': (slice(36, 41), np.uint8), 'ts_neg2': (41, np.uint8), 'ts_coarse2': (slice(42, 56), np.uint16), 'ts_fine2': (slice(56, 59), np.uint8), 'ts_tdc2': (slice(59, 64), np.uint8), 'ts_dec1': (None, np.uint32), 'ts_dec2': (None, np.uint32), 'tot_us': (None, np.float64) } CLOCK_CYCLES_PER_US = 20. CLOCK_ROLLOVER = 2**17 def __init__(self, data: bytearray, readout_id: int, timestamp: int, decoding_order: int) -> None: """Constructor. """ # pylint: disable=no-member super().__init__(data) self.decoding_order = decoding_order # Calculate the values of the two timestamps in clock cycles. self.ts_dec1 = self._compose_ts(self.ts_coarse1, self.ts_fine1) self.ts_dec2 = self._compose_ts(self.ts_coarse2, self.ts_fine2) # Take into account possible rollovers. if self.ts_dec2 < self.ts_dec1: self.ts_dec2 += self.CLOCK_ROLLOVER # Calculate the actual TOT in us. self.tot_us = (self.ts_dec2 - self.ts_dec1) / self.CLOCK_CYCLES_PER_US self.readout_id = readout_id self.timestamp = timestamp
[docs] @staticmethod def _compose_ts(ts_coarse: int, ts_fine: int) -> int: """Compose the actual decimal representation of the timestamp counter, putting together the coarse and fine counters (in Gray code). Arguments --------- ts_coarse : int The value of the coarse counter (MSBs) in Gray code. ts_fine : int The value of the fine counter (3 LSBs) in Gray code. Returns ------- int The actual decimal value of the timestamp counter, in clock cycles. """ return AbstractAstroPixHit.gray_to_decimal((ts_coarse << 3) + ts_fine)
[docs] class Decoding(IntEnum): """Enum class for all the possible issue that can happen during decoding. * ``ORPHAN_BYTES_MATCHED``: the readout has orphan bytes at the beginning that were succesfully matched with the extra bytes from the previous readout; * ``ORPHAN_BYTES_DROPPED``: the readout has orphan bytes at the beginning that we were not able to match with the extra bytes from the previous readout; * ``ORPHAN_BYTES_NOT_USED``: the readout has orphan bytes at the beginning that we could not use because the previous readout had no extra bytes; * ``VALID_EXTRA_BYTES``: the readout had extra bytes at the end, starting with a valid start-hit byte, that could be potentially matched with the beginning of the next buffer; * ``INVALID_EXTRA_BYTES``: the readout had extra bytes at the end, but since the first byte is not a valid start-hit byte, these cannot effectively be used; * ``INCOMPLETE_HIT_DROPPED``: the readout had incomplete hit data somewhere that we had to drop. """ ORPHAN_BYTES_MATCHED = 0 ORPHAN_BYTES_DROPPED = 1 ORPHAN_BYTES_NOT_USED = 2 VALID_EXTRA_BYTES = 3 INVALID_EXTRA_BYTES = 4 INCOMPLETE_HIT_DROPPED = 5 NOT_ALL_BYTES_VISITED = 6 UNKNOWN_FATAL_ERROR = 7
[docs] class DecodingStatus: """Small class representing the status of a readout decoding. """ def __init__(self) -> None: """Constructor. """ self._status_code = 0 def __bool__(self): """Evaluate the status as a bool. This implements the simple semantics `if(status)` to check if any of the error bytes is set. """ return self._status_code > 0
[docs] def set(self, bit: Decoding) -> None: """Set a status bit. """ self._status_code |= (1 << bit)
def __getitem__(self, bit: Decoding) -> bool: """Retrieve the value of a status bit. """ return (self._status_code >> bit) & 0x1 def __str__(self) -> str: """String formatting. """ text = f'DecodingStatus {hex(self._status_code)} ({bin(self._status_code)})' for bit in Decoding: text = f'{text}\n{bit.name.ljust(25, ".")} {self[bit]}' return text
[docs] class ByteType(IntEnum): """Enum class used to keep track of the byte types within a readout. The main purpose of this utility class is to be able to pretty-print readout raw data in order to debug the decoding process. * ``NOT_ASSIGNED``: bit not assigned (this should never happen unless the readout cannot be properly decoded); * ``PADDING``: padding byte; * ``IDLE``: idle byte; * ``HIT_START``: byte signaling the start of a hit (e.g., ``0xe0`` for chipi_id = 0); * ``HIT``: the byte is part of a valid hit (but not the start byte); * ``EXTRA``: the byte is an extra byte at the end of the readout, and possibly the initial part of a valid hit that is split across readouts; * ``ORPHAN``: the byte is an orphan byte at the beginning of the readout, and we might be able to reassemble with the extra bytes from the previous readout; * ``DROPPED``: the byte could not be assigned to a valid hit and was dropped. """ NOT_ASSIGNED = 0 PADDING = 1 IDLE = 2 HIT_START = 3 HIT = 4 EXTRA = 5 ORPHAN = 6 DROPPED = 7
[docs] @classmethod def _fmt(cls, byte: str, *escape_codes: int) -> str: """Format a single byte using standard ANSI escape sequences for colors and formatting. Note we always append a reset escape sequence at the end---this will have the terminal working a little bit harder, but we are relieved from the responsibility to put the terminal back in the original state. (And this will never be used in CPU-intensive contexts, anyway.) Common color codes: | Color | Code | | ------- | ---- | | Black | `30` | | Red | `31` | | Green | `32` | | Yellow | `33` | | Blue | `34` | | Magenta | `35` | | Cyan | `36` | | White | `37` | | Reset | `0` | Common background color codes: | Color | Code | | ------- | ---- | | Black | `40` | | Red | `41` | | Green | `42` | | Yellow | `43` | | Blue | `44` | | Magenta | `45` | | Cyan | `46` | | White | `47` | Common text modifiers: | Style | Code | | ------------- | ---- | | Reset | `0` | | Bold | `1` | | Faint | `2` | | Italic | `3` | | Underline | `4` | | Blink | `5` | | Reverse | `7` | | Conceal | `8` | | Strikethrough | `9` | """ return f'\033[{";".join([str(code) for code in escape_codes])}m{byte}\033[0m'
[docs] @classmethod def format_byte(cls, byte: str, byte_type) -> str: """Format a single byte according to its type. """ if byte_type == cls.HIT_START: return cls._fmt(byte, 31, 47) if byte_type == cls.HIT: return cls._fmt(byte, 30, 47) if byte_type == cls.IDLE: return cls._fmt(byte, 33) if byte_type == cls.DROPPED: return cls._fmt(byte, 9) if byte_type in (cls.EXTRA, cls.ORPHAN): return cls._fmt(byte, 35) return byte
[docs] def readoutclass(cls: type) -> type: """Small decorator to support automatic generation of concrete hit classes. Decorating concrete readout classes with this allows for some minimal checks on the class definition---note this is only done one, when the class is defined and not at runtime (every time a class instance is created.) """ # pylint: disable=protected-access if cls.HIT_CLASS is None: raise TypeError(f'{cls.__name__} must override HIT_CLASS') if cls.HIT_CLASS is AbstractAstroPixHit: raise TypeError(f'{cls.__name__}.HIT_CLASS is abstract') if not issubclass(cls.HIT_CLASS, AbstractAstroPixHit): raise TypeError(f'{cls.__name__}.HIT_CLASS is not a subclass of AbstractAstroPixHit') if cls._UID is None: raise TypeError(f'{cls.__name__} must override _UID') if not isinstance(cls._UID, int): raise TypeError(f'{cls.__name__} must be an integer ({cls._UID} is invalid)') return cls
[docs] class AbstractAstroPixReadout(ABC): """Abstract base class for a generic AstroPix readout. This is basically a wrapper around the bytearray object that is returned by the DAQ board, and it provides some basic functionality to write the readout to a binary file. A full readout comes in the form of a fixed-length bytearray object that is padded at the end with a padding byte (0xff). The hit data are surrounded by a (generally variable) number of idle bytes (0xbc), see the documentation of the decode() class method. Arguments --------- readout_data : bytearray The readout data from the DAQ board. readout_id : int A sequential id for the readout, assigned by the host DAQ machine. timestamp : int A timestamp for the readout, assigned by the host DAQ machine, in ns since the epoch, from time.time_ns(). """ # The class representing the hit type encoded in the readout, e.g., ``AstroPix4Hit``. HIT_CLASS = None # A unique identifier for the readout class. _UID = None # The padding byte used to pad the readout. PADDING_BYTE = bytes.fromhex('ff') # The idle byte, output by the chip while gathering data. IDLE_BYTE = bytes.fromhex('bc') # The readout header, which is prepended to the buffer read from the NEXYS board # before the thing gets written to disk. _HEADER = bytes.fromhex('fedcba') _HEADER_SIZE = len(_HEADER) # Basic bookkeeping for the additional fields assigned by the host machine. _READOUT_ID_FMT = '<L' _TIMESTAMP_FMT = '<Q' _LENGTH_FMT = '<L' def __init__(self, readout_data: bytearray, readout_id: int, timestamp: int = None) -> None: """Constructor. """ # If the timestamp is None, automatically latch the system time. # Note this is done first in order to latch the timestamp as close as # possible to the actual readout. self.timestamp = self.latch_ns() if timestamp is None else timestamp # Strip all the trailing padding bytes from the input bytearray object # and turn it into a bytes object to make it immutable. self._readout_data = bytes(readout_data.rstrip(self.PADDING_BYTE)) self.readout_id = readout_id # Initialize all the status variable for the decoding. self._decoded = False self._decoding_status = DecodingStatus() self._extra_bytes = None self._byte_mask = np.zeros(len(self._readout_data), dtype=int) self._hits = []
[docs] @abstractmethod def decode(self, extra_bytes: bytes = None) -> list[AbstractAstroPixHit]: """Placeholder for the decoding function---this needs to be reimplemented in derived classes. """
[docs] def data(self) -> bytes: """Return the underlying binary data. """ return self._readout_data
[docs] def decoded(self) -> bool: """Return True if the readout has been decoded. """ return self._decoded
[docs] def decoding_status(self) -> bool: """Return the full decoding status. """ return self._decoding_status
[docs] def extra_bytes(self) -> bytes: """Return the extra bytes, if any, at the end of the readout (None if there are no extra bytes). """ return self._extra_bytes
[docs] def all_bytes_visited(self) -> bool: """Return True if all the bytes have been visited in the decoding process. """ return np.count_nonzero(self._byte_mask == 0) == 0
[docs] def hits(self) -> list: """Return the decoded hits. """ if not self.decoded(): self.decode() return self._hits
[docs] @classmethod def uid(cls) -> int: """Return the unique identifier for the readout class. """ return cls._UID
[docs] @staticmethod def latch_ns() -> int: """Convenience function returning the time of the function call as an integer number of nanoseconds since the epoch, i.e., January 1, 1970, 00:00:00 (UTC) on all platforms. """ return time.time_ns()
[docs] @staticmethod def _unpack(fmt: str, data: bytes) -> typing.Any: """Convenience function wrapping the struct.unpack() method---this saves us the trouble of getting the first (and only) element of a 1-element list. """ return struct.unpack(fmt, data)[0]
[docs] @staticmethod def _unpack_slice(fmt: str, data: bytes, start: int) -> typing.Any: """Convenience function to unpack a specific slice of a bytes object. """ size = struct.calcsize(fmt) value = AbstractAstroPixReadout._unpack(fmt, data[start:start + size]) return value, start + size
[docs] @staticmethod def _read_and_unpack(input_file: typing.BinaryIO, fmt: str) -> typing.Any: """Convenience function to read and unpack a fixed-size field from an input file. Arguments --------- input_file : BinaryIO A file object opened in "rb" mode. fmt : str The format string for the field to be read. """ return AbstractAstroPixReadout._unpack(fmt, input_file.read(struct.calcsize(fmt)))
[docs] def to_bytes(self) -> bytes: """Convert the readout object to its binary representation. This is used, e.g., to write the readout to disk, or to send the object over to a network socket. """ parts = [self._HEADER, struct.pack(self._READOUT_ID_FMT, self.readout_id), struct.pack(self._TIMESTAMP_FMT, self.timestamp), struct.pack(self._LENGTH_FMT, len(self._readout_data)), self._readout_data ] return b''.join(parts)
[docs] @classmethod def from_bytes(cls, data: bytes) -> AbstractAstroPixReadout: """Re-assemble the readout object from its binary persistent representation. .. note:: It would be nice to be able to reuse the same code here and in the ``from_file()`` call, but the slight issue here, is the variable-size part of the event, which makes it not straighforward to read the whole thing from file in one shot. Something we might think about, though. Arguments --------- data : bytes The binary data representing the readout. """ # Make sure the readout header is correct. _header = data[:cls._HEADER_SIZE] if _header != cls._HEADER: raise RuntimeError(f'Invalid readout header ({_header}), expected {cls._HEADER}') # Read all the fields. cursor = cls._HEADER_SIZE readout_id, cursor = cls._unpack_slice(cls._READOUT_ID_FMT, data, cursor) timestamp, cursor = cls._unpack_slice(cls._TIMESTAMP_FMT, data, cursor) _length, cursor = cls._unpack_slice(cls._LENGTH_FMT, data, cursor) data = data[cursor:] # Make sure the remaining part of the binary data matches our expectations # in terms of its size. if len(data) != _length: raise RuntimeError(f'Size mismatch: {len(data)} bytes remaining, expected {_length}') return cls(data, readout_id, timestamp)
[docs] def write(self, output_file: typing.BinaryIO) -> None: """Write the complete readout to a binary file. Arguments --------- output_file : BinaryIO A file object opened in "wb" mode. """ output_file.write(self.to_bytes())
[docs] @classmethod def from_file(cls, input_file: typing.BinaryIO) -> AbstractAstroPixReadout: """Create a Readout object reading the underlying data from an input binary file. By contract this should return None when there are no more data to be read from the input file, so that downstream code can use the information to stop iterating over the file. Arguments --------- input_file : BinaryIO A file object opened in "rb" mode. """ _header = input_file.read(cls._HEADER_SIZE) # If the header is empty, this means we are at the end of the file, and we # return None to signal that there are no more readouts to be read. This # can be used downstream, e.g., to raise a StopIteration exception with # the implementation of an iterator protocol. if len(_header) == 0: return None # If the header is not empty, we check that it is what we expect, and raise # a RuntimeError if it is not. if _header != cls._HEADER: raise RuntimeError(f'Invalid readout header ({_header}), expected {cls._HEADER}') # Go ahead, read all the fields, and create the AstroPix4Readout object. readout_id = cls._read_and_unpack(input_file, cls._READOUT_ID_FMT) timestamp = cls._read_and_unpack(input_file, cls._TIMESTAMP_FMT) data = input_file.read(cls._read_and_unpack(input_file, cls._LENGTH_FMT)) return cls(data, readout_id, timestamp)
[docs] def _add_hit(self, hit_data: bytes, reverse: bool = True) -> None: """Add a hit to readout. This will be typically called during the readout decoding. """ # pylint: disable=not-callable if reverse: hit_data = reverse_bit_order(hit_data) decoding_order = len(self._hits) hit = self.HIT_CLASS(hit_data, self.readout_id, self.timestamp, decoding_order) self._hits.append(hit)
[docs] def hex(self) -> str: """Return a string with the hexadecimal representation of the underlying binary data (two hexadecimal digits per byte). """ return self._readout_data.hex()
[docs] def pretty_hex(self): """Return a pretty version of the hexadecimal representation, where the hit portion of the readout are colored. """ # pylint: disable=protected-access # This uses the underlying ``_byte_mask``, so we have to make sure the # thing has been decoded. if not self.decoded(): self.decode() hex_bytes = self.hex() text = '' for i, byte_type in enumerate(self._byte_mask): byte = hex_bytes[i * 2:i * 2 + 2] text = f'{text}{ByteType.format_byte(byte, byte_type)}' return text
[docs] def pretty_print(self, hits: bool = True) -> str: """Full, glorious pretty print of the readout object. """ text = f'{self}\nRaw: {self.data()}\nHex: {self.pretty_hex()}\n\n' if hits: for hit in self.decode(): text = f'{text}{hit}\n' text = f'{text}\n{self.decoding_status()}' return text
def __str__(self) -> str: """String formatting. """ return f'{self.__class__.__name__}({len(self._readout_data)} bytes, ' \ f'readout_id = {self.readout_id}, timestamp = {self.timestamp} ns)'
[docs] @readoutclass class AstroPix4Readout(AbstractAstroPixReadout): """Class describing an AstroPix 4 readout. """ HIT_CLASS = AstroPix4Hit _UID = 4000 DEFAULT_START_BYTE = bytes.fromhex('e0')
[docs] @staticmethod def is_valid_start_byte(byte: bytes) -> bool: """Return True if the byte is a valid start byte for Astropix4 hit. This, effectively, entiles to make sure that the byte is of the form `111xxxxx`, where the 5 least significant bits are the chip id. .. note:: This assume the byte is before the bit order is reverse, i.e., this operates in the space of the data stream from the Nexys board. The rational for this is that all the error checking happens at the readout level, before the bit order is reversed and before the hit is even created. .. warning:: We have an amusing edge case, here, in that 0xff is both the padding byte and a valid start byte for Astropix 4. We should probably put some thought into this, but we are tentatively saying that 0xff is *not* a valid start byte for a hit, in order to keep the decoding as simple as possible. """ return byte != AbstractAstroPixReadout.PADDING_BYTE and ord(byte) >> 5 == 7
[docs] @staticmethod def _invalid_start_byte_msg(start_byte: bytes, position: int) -> str: """Generic error message for an invalid start byte. """ return f'Invalid start byte {start_byte} (0b{ord(start_byte):08b}) @ position {position}'
[docs] def decode(self, extra_bytes: bytes = None) -> list[AbstractAstroPixHit]: # noqa: C901 """Astropix4 decoding function. .. note:: Note that you always need to addess single bytes in the data stream with a proper slice, as opposed to an integer, i.e., `data[i:i + 1]` instead of `data[i]`, because Python will return an integer, otherwise. Arguments --------- extra_bytes : bytes Optional extra bytes from the previous readout that might be re-assembled together with the beginning of this readout. """ # pylint: disable=not-callable, protected-access, line-too-long, too-many-branches, too-many-statements # noqa # If the event has been already decoded, return the list of hits that # has been previously calculated. if self.decoded(): return self._hits # Ready to start---the cursor indicates the position within the readout. self._decoded = True cursor = 0 # Skip the initial idle and padding bytes. # (In principle we would only expect idle bytes, here, but it is a # known fact that we occasionally get padding bytes interleaved with # them, especially when operating at high rate.) while self._readout_data[cursor:cursor + 1] in [self.IDLE_BYTE, self.PADDING_BYTE]: self._byte_mask[cursor] = ByteType.IDLE cursor += 1 # Look at the first legitimate hit byte---if it is not a valid hit start # byte, then we might need to piece the first few bytes of the readout # with the leftover of the previous readout. byte = self._readout_data[cursor:cursor + 1] if not self.is_valid_start_byte(byte): logger.warning(self._invalid_start_byte_msg(byte, cursor)) offset = 1 # Move forward until we find the next valid start byte. while not self.is_valid_start_byte(self._readout_data[cursor + offset:cursor + offset + 1]): # noqa: E501 offset += 1 # Note we have to strip all the idle bytes at the end, if any. # Also note the Jedi trick here: we first set all the bytes in the # portion to idle... self._byte_mask[cursor:cursor + offset] = ByteType.IDLE orphan_bytes = self._readout_data[cursor:cursor + offset].rstrip(self.IDLE_BYTE) # ... and then we override the bit mask in the actual orphan part. self._byte_mask[cursor:cursor + len(orphan_bytes)] = ByteType.ORPHAN logger.info(f'{len(orphan_bytes)} orphan bytes found ({orphan_bytes})...') if extra_bytes is not None: logger.info('Trying to re-assemble the hit across readouts...') data = extra_bytes + orphan_bytes if len(data) == self.HIT_CLASS._SIZE: logger.info('Total size matches---we got a hit!') self._add_hit(data) self._decoding_status.set(Decoding.ORPHAN_BYTES_MATCHED) else: self._decoding_status.set(Decoding.ORPHAN_BYTES_DROPPED) else: self._decoding_status.set(Decoding.ORPHAN_BYTES_NOT_USED) cursor += offset # And now we can proceed with business as usual. while cursor < len(self._readout_data): # Skip all the idle bytes and the padding bytes that we encounter. # (In principle we would only expect idle bytes, here, but it is a # known fact that we occasionally get padding bytes interleaved with # them, especially when operating at high rate.) while self._readout_data[cursor:cursor + 1] in [self.IDLE_BYTE, self.PADDING_BYTE]: self._byte_mask[cursor] = ByteType.IDLE cursor += 1 # Check if we are at the end of the readout. if cursor == len(self._readout_data): if not self.all_bytes_visited(): self._decoding_status.set(Decoding.NOT_ALL_BYTES_VISITED) return self._hits # Handle the case where the last hit is truncated in the original readout data. # If the start byte is valid we put the thing aside in the extra_bytes class # member so that, potentially, we have the data available to be matched # with the beginning of the next readout. if cursor + self.HIT_CLASS._SIZE >= len(self._readout_data): data = self._readout_data[cursor:] self._byte_mask[cursor:] = ByteType.EXTRA logger.warning(f'Found {len(data)} byte(s) of truncated hit data ' f'({data}) at the end of the readout.') if self.is_valid_start_byte(data[0:1]): self._byte_mask[cursor] = ByteType.HIT_START logger.info('Valid start byte, extra bytes set aside for next readout!') self._extra_bytes = data self._decoding_status.set(Decoding.VALID_EXTRA_BYTES) else: self._decoding_status.set(Decoding.INVALID_EXTRA_BYTES) break byte = self._readout_data[cursor:cursor + 1] # At this point we do expect a valid start hit for the next event... if not self.is_valid_start_byte(byte): # ... and if this is not the case, we go forward until we find the # next hit start, dropping all the bytes in between. logger.warning(self._invalid_start_byte_msg(byte, cursor)) while not self.is_valid_start_byte(self._readout_data[cursor:cursor + 1]): self._byte_mask[cursor] = ByteType.DROPPED cursor += 1 # We have a tentative 8-byte word, with the correct start byte, # representing a hit. data = self._readout_data[cursor:cursor + self.HIT_CLASS._SIZE] # Loop over bytes 1--7 (included) in the word to see whether there is # any additional valid start byte in the hit. for offset in range(1, len(data)): byte = data[offset:offset + 1] if self.is_valid_start_byte(byte): # At this point we have really two cases: # 1 - this is a legitimate hit containing a start byte by chance; # 2 - this is a truncated hit, and the start byte signals the next hit. # I don't think there is any way we can get this right 100% of the # times, but a sensible thing to try is to move forward by the hit size, # skip all the subsequent idle bytes and see if the next thing in line # is a valid start byte. In that situation we are probably # dealing with case 1. forward_cursor = cursor + self.HIT_CLASS._SIZE while self._readout_data[forward_cursor:forward_cursor + 1] == self.IDLE_BYTE: forward_cursor += 1 if forward_cursor < len(self._readout_data): byte = self._readout_data[forward_cursor:forward_cursor + 1] if not self.is_valid_start_byte(byte): # Here we are really in case 2, and there is not other thing # we can do except dropping the hit. logger.warning(f'Unexpected start byte {byte} @ position {cursor}+{offset}') # noqa: E501 logger.warning(f'Dropping incomplete hit {data[:offset]}') self._decoding_status.set(Decoding.INCOMPLETE_HIT_DROPPED) self._byte_mask[cursor:cursor + offset] = ByteType.DROPPED cursor = cursor + offset data = self._readout_data[cursor:cursor + self.HIT_CLASS._SIZE] # And this should be by far the most common case. self._add_hit(data) self._byte_mask[cursor:cursor + 1] = ByteType.HIT_START self._byte_mask[cursor + 1:cursor + self.HIT_CLASS._SIZE] = ByteType.HIT cursor += self.HIT_CLASS._SIZE while self._readout_data[cursor:cursor + 1] == self.IDLE_BYTE: self._byte_mask[cursor] = ByteType.IDLE cursor += 1 if not self.all_bytes_visited(): self._decoding_status.set(Decoding.NOT_ALL_BYTES_VISITED) return self._hits
__READOUT_CLASSES = (AstroPix4Readout, ) __READOUT_CLASS_DICT = {readout_class.uid(): readout_class for readout_class in __READOUT_CLASSES}
[docs] def uid_to_readout_class(uid: int) -> type: """Return the readout class corresponding to a given unique ID. Arguments --------- uid : int The unique ID of the readout class. """ if uid not in __READOUT_CLASS_DICT: raise RuntimeError(f'Unknown readout class with identifier {uid}') return __READOUT_CLASS_DICT[uid]