fmt — Data format#
The module contains the basic definition of the data structures that are involved in writing and reading persistent astropix data.
Warning
While this is supposed to be chip-version- and setup-agnostic, everything in the package at this time has been essentially tested on a setup with a single v4 astropix chip. Help is welcome to make sure that the stuff here makes sense in other contexts, too.
There are two main basic data structures we deal with in this module: that of a readout and that of a hit—a hit is a single hit from an astropix chip, while a readout is a full binary chunk of data we get from the readout board, and that in generally contains a variable number of hits. Accordingly, we provide two abstract base classes:
from which actual concrete classes can be derived for the different chip and DAQ versions.
If you are in a hurry and want to have a sense of how thing works, the following snippets illustrates the main facilities that the module provides. At data taking time you build programmatically concrete readout objects starting from the binary buffers that the host machine receives from the DAQ board, and you can write them straight into an output file
from astropix_analysis.fileio import FileHeader, apx_open
from astropix_analysis.fmt import AstroPix4Readout
# Initialization...
header = FileHeader(AstroPix4Readout)
with open('path/to/the/output/file', 'wb', header) as output_file:
# ...and event loop
readout_id = 0
while(True):
# Get the data from the board.
readout_data = astro.get_readout()
if readout_data:
# Note the readout_id and (internally) the timestamp, are assigned
# by the host machine.
readout = AstroPix4Readout(readout_data, readout_id)
readout.write(output_file)
readout_id += 1
You can then read the readout objects back from file and have them re-assembled into a fully fledged instance of the proper class.
See also
And in fact there’s a few subtleties, here, dealing with the file header and how readout objects are read back from file, which are fully described in the fileio — File I/O section.
By default, when you create a readout object, nothing fancy happens with the underlying
binary data, beside the fact that the trailing padding bytes b'\xff' are stripped
away in order to avoid writing unnecessary stuff on disk. That does not mean that
the readout object is not aware of its own structure, and in fact the base class
provides a decode() method
unpacking the binary data and returning a list of hits, so that you can do stuff
like
hits = readout.decode()
for hit in hits:
print(hit)
This can be used no matter if the readout is constructed programmatically from some binary data or read back from file, and you can imagine using the decode function, e.g., for
monitoring and/or doing some sanity check during the data acquisition;
read back the binary data from disk and convert them in a format that is more amenable to analysis.
Readout structures#
The vast majority of the readout machinery is coded into the abstract base class
AbstractAstroPixReadout. A glance at the concrete
class AstroPix4Readout shows in fact that the only
things you really need to do is to
define the
HIT_CLASSclass variable: this indicates which type of hit structures the readout contains. (Note thatHIT_CLASSshould be a concrete subclass ofAbstractAstroPixHit; this ensures that any class instance is able to decode itself.)define
_UIDclass variable: this gets included in the header of Astropix binary files and guarantees that we have all the information that we need to parse binary data. Note that, in order to be able to read old files, the contract here is that hit structures with a given_UIDnever change, and we create new structures with different_UIDinstead.overload the
decode()abstract method, responsible from extracting the hits form the readout. (This is typically where most of the logic, and code, is needed.)
@readoutclass
class AstroPix4Readout(AbstractAstroPixReadout):
"""Class describing an AstroPix 4 readout.
"""
HIT_CLASS = AstroPix4Hit
_UID = 4000
DEFAULT_START_BYTE = bytes.fromhex('e0')
@staticmethod
def is_valid_start_byte(byte: bytes) -> bool:
"""Return True if the byte is a valid start byte for Astropix4 hit.
This, effectively, entiles to make sure that the byte is of the form `111xxxxx`,
where the 5 least significant bits are the chip id.
.. note::
This assume the byte is before the bit order is reverse, i.e., this operates
in the space of the data stream from the Nexys board. The rational for
this is that all the error checking happens at the readout level, before
the bit order is reversed and before the hit is even created.
.. warning::
We have an amusing edge case, here, in that 0xff is both the padding byte
and a valid start byte for Astropix 4. We should probably put some thought
into this, but we are tentatively saying that 0xff is *not* a valid
start byte for a hit, in order to keep the decoding as simple as possible.
"""
return byte != AbstractAstroPixReadout.PADDING_BYTE and ord(byte) >> 5 == 7
@staticmethod
def _invalid_start_byte_msg(start_byte: bytes, position: int) -> str:
"""Generic error message for an invalid start byte.
"""
return f'Invalid start byte {start_byte} (0b{ord(start_byte):08b}) @ position {position}'
def decode(self, extra_bytes: bytes = None) -> list[AbstractAstroPixHit]: # noqa: C901
"""Astropix4 decoding function.
.. note::
Note that you always need to addess single bytes in the data stream with
a proper slice, as opposed to an integer, i.e., `data[i:i + 1]` instead
of `data[i]`, because Python will return an integer, otherwise.
Arguments
---------
extra_bytes : bytes
Optional extra bytes from the previous readout that might be re-assembled
together with the beginning of this readout.
"""
# pylint: disable=not-callable, protected-access, line-too-long, too-many-branches, too-many-statements # noqa
# If the event has been already decoded, return the list of hits that
# has been previously calculated.
if self.decoded():
return self._hits
# Ready to start---the cursor indicates the position within the readout.
self._decoded = True
cursor = 0
# Skip the initial idle and padding bytes.
# (In principle we would only expect idle bytes, here, but it is a
# known fact that we occasionally get padding bytes interleaved with
# them, especially when operating at high rate.)
while self._readout_data[cursor:cursor + 1] in [self.IDLE_BYTE, self.PADDING_BYTE]:
self._byte_mask[cursor] = ByteType.IDLE
cursor += 1
# Look at the first legitimate hit byte---if it is not a valid hit start
# byte, then we might need to piece the first few bytes of the readout
# with the leftover of the previous readout.
byte = self._readout_data[cursor:cursor + 1]
if not self.is_valid_start_byte(byte):
logger.warning(self._invalid_start_byte_msg(byte, cursor))
offset = 1
# Move forward until we find the next valid start byte.
while not self.is_valid_start_byte(self._readout_data[cursor + offset:cursor + offset + 1]): # noqa: E501
offset += 1
# Note we have to strip all the idle bytes at the end, if any.
# Also note the Jedi trick here: we first set all the bytes in the
# portion to idle...
self._byte_mask[cursor:cursor + offset] = ByteType.IDLE
orphan_bytes = self._readout_data[cursor:cursor + offset].rstrip(self.IDLE_BYTE)
# ... and then we override the bit mask in the actual orphan part.
self._byte_mask[cursor:cursor + len(orphan_bytes)] = ByteType.ORPHAN
logger.info(f'{len(orphan_bytes)} orphan bytes found ({orphan_bytes})...')
if extra_bytes is not None:
logger.info('Trying to re-assemble the hit across readouts...')
data = extra_bytes + orphan_bytes
if len(data) == self.HIT_CLASS._SIZE:
logger.info('Total size matches---we got a hit!')
self._add_hit(data)
self._decoding_status.set(Decoding.ORPHAN_BYTES_MATCHED)
else:
self._decoding_status.set(Decoding.ORPHAN_BYTES_DROPPED)
else:
self._decoding_status.set(Decoding.ORPHAN_BYTES_NOT_USED)
cursor += offset
# And now we can proceed with business as usual.
while cursor < len(self._readout_data):
# Skip all the idle bytes and the padding bytes that we encounter.
# (In principle we would only expect idle bytes, here, but it is a
# known fact that we occasionally get padding bytes interleaved with
# them, especially when operating at high rate.)
while self._readout_data[cursor:cursor + 1] in [self.IDLE_BYTE, self.PADDING_BYTE]:
self._byte_mask[cursor] = ByteType.IDLE
cursor += 1
# Check if we are at the end of the readout.
if cursor == len(self._readout_data):
if not self.all_bytes_visited():
self._decoding_status.set(Decoding.NOT_ALL_BYTES_VISITED)
return self._hits
# Handle the case where the last hit is truncated in the original readout data.
# If the start byte is valid we put the thing aside in the extra_bytes class
# member so that, potentially, we have the data available to be matched
# with the beginning of the next readout.
if cursor + self.HIT_CLASS._SIZE >= len(self._readout_data):
data = self._readout_data[cursor:]
self._byte_mask[cursor:] = ByteType.EXTRA
logger.warning(f'Found {len(data)} byte(s) of truncated hit data '
f'({data}) at the end of the readout.')
if self.is_valid_start_byte(data[0:1]):
self._byte_mask[cursor] = ByteType.HIT_START
logger.info('Valid start byte, extra bytes set aside for next readout!')
self._extra_bytes = data
self._decoding_status.set(Decoding.VALID_EXTRA_BYTES)
else:
self._decoding_status.set(Decoding.INVALID_EXTRA_BYTES)
break
byte = self._readout_data[cursor:cursor + 1]
# At this point we do expect a valid start hit for the next event...
if not self.is_valid_start_byte(byte):
# ... and if this is not the case, we go forward until we find the
# next hit start, dropping all the bytes in between.
logger.warning(self._invalid_start_byte_msg(byte, cursor))
while not self.is_valid_start_byte(self._readout_data[cursor:cursor + 1]):
self._byte_mask[cursor] = ByteType.DROPPED
cursor += 1
# We have a tentative 8-byte word, with the correct start byte,
# representing a hit.
data = self._readout_data[cursor:cursor + self.HIT_CLASS._SIZE]
# Loop over bytes 1--7 (included) in the word to see whether there is
# any additional valid start byte in the hit.
for offset in range(1, len(data)):
byte = data[offset:offset + 1]
if self.is_valid_start_byte(byte):
# At this point we have really two cases:
# 1 - this is a legitimate hit containing a start byte by chance;
# 2 - this is a truncated hit, and the start byte signals the next hit.
# I don't think there is any way we can get this right 100% of the
# times, but a sensible thing to try is to move forward by the hit size,
# skip all the subsequent idle bytes and see if the next thing in line
# is a valid start byte. In that situation we are probably
# dealing with case 1.
forward_cursor = cursor + self.HIT_CLASS._SIZE
while self._readout_data[forward_cursor:forward_cursor + 1] == self.IDLE_BYTE:
forward_cursor += 1
if forward_cursor < len(self._readout_data):
byte = self._readout_data[forward_cursor:forward_cursor + 1]
if not self.is_valid_start_byte(byte):
# Here we are really in case 2, and there is not other thing
# we can do except dropping the hit.
logger.warning(f'Unexpected start byte {byte} @ position {cursor}+{offset}') # noqa: E501
logger.warning(f'Dropping incomplete hit {data[:offset]}')
self._decoding_status.set(Decoding.INCOMPLETE_HIT_DROPPED)
self._byte_mask[cursor:cursor + offset] = ByteType.DROPPED
cursor = cursor + offset
data = self._readout_data[cursor:cursor + self.HIT_CLASS._SIZE]
# And this should be by far the most common case.
self._add_hit(data)
self._byte_mask[cursor:cursor + 1] = ByteType.HIT_START
self._byte_mask[cursor + 1:cursor + self.HIT_CLASS._SIZE] = ByteType.HIT
cursor += self.HIT_CLASS._SIZE
while self._readout_data[cursor:cursor + 1] == self.IDLE_BYTE:
self._byte_mask[cursor] = ByteType.IDLE
cursor += 1
if not self.all_bytes_visited():
self._decoding_status.set(Decoding.NOT_ALL_BYTES_VISITED)
return self._hits
The constructor of the base class accepts three arguments, namely:
the underlying binary data, coming from the DAQ board;
a readout identifier, that is generally assigned by the host machine with the data acquisition event loop;
a timestamp, also assigned by the host machine, expressed as nanoseconds since the epoch (January 1, 1970, 00:00:00 (UTC)).
def __init__(self, readout_data: bytearray, readout_id: int,
timestamp: int = None) -> None:
"""Constructor.
"""
# If the timestamp is None, automatically latch the system time.
# Note this is done first in order to latch the timestamp as close as
# possible to the actual readout.
self.timestamp = self.latch_ns() if timestamp is None else timestamp
# Strip all the trailing padding bytes from the input bytearray object
# and turn it into a bytes object to make it immutable.
self._readout_data = bytes(readout_data.rstrip(self.PADDING_BYTE))
self.readout_id = readout_id
# Initialize all the status variable for the decoding.
self._decoded = False
self._decoding_status = DecodingStatus()
self._extra_bytes = None
self._byte_mask = np.zeros(len(self._readout_data), dtype=int)
self._hits = []
When instantiating readout object programmatically (e.g., in the data acquisition
event loop), you typically can omit the timestamp argument, as the latter
gets automatically latched within the class constructor, i.e.
readout = AstroPix4Readout(readout_data, readout_id)
On the other hand, when a readout object is read back from file, the signature with all three parameters is obviously used.
Decoding#
Readout structures are equipped with all the necessary tool to keep track of the full decoding process, particularly through the following class members:
_decoded: a bool flag that is asserted once the readout has been decoded, so that additional calls to thedecode()method return the pre-compiled list of hits without running the decoding again;_decoding_status: a custom object containing all the information about the decoding process—seeDecodingandDecodingStatus;_extra_bytes: the possible extra bytes at the end of the readout, that we might be able to match with the beginning of the next readout;_byte_mask: a mask mapping each byte in the readout to its own role in the readout–seeByteType.
class Decoding(IntEnum):
"""Enum class for all the possible issue that can happen during decoding.
* ``ORPHAN_BYTES_MATCHED``: the readout has orphan bytes at the beginning that
were succesfully matched with the extra bytes from the previous readout;
* ``ORPHAN_BYTES_DROPPED``: the readout has orphan bytes at the beginning that
we were not able to match with the extra bytes from the previous readout;
* ``ORPHAN_BYTES_NOT_USED``: the readout has orphan bytes at the beginning
that we could not use because the previous readout had no extra bytes;
* ``VALID_EXTRA_BYTES``: the readout had extra bytes at the end, starting
with a valid start-hit byte, that could be potentially matched with the
beginning of the next buffer;
* ``INVALID_EXTRA_BYTES``: the readout had extra bytes at the end, but since
the first byte is not a valid start-hit byte, these cannot effectively be used;
* ``INCOMPLETE_HIT_DROPPED``: the readout had incomplete hit data somewhere
that we had to drop.
"""
ORPHAN_BYTES_MATCHED = 0
ORPHAN_BYTES_DROPPED = 1
ORPHAN_BYTES_NOT_USED = 2
VALID_EXTRA_BYTES = 3
INVALID_EXTRA_BYTES = 4
INCOMPLETE_HIT_DROPPED = 5
NOT_ALL_BYTES_VISITED = 6
UNKNOWN_FATAL_ERROR = 7
class ByteType(IntEnum):
"""Enum class used to keep track of the byte types within a readout.
The main purpose of this utility class is to be able to pretty-print readout
raw data in order to debug the decoding process.
* ``NOT_ASSIGNED``: bit not assigned (this should never happen unless the readout
cannot be properly decoded);
* ``PADDING``: padding byte;
* ``IDLE``: idle byte;
* ``HIT_START``: byte signaling the start of a hit (e.g., ``0xe0`` for chipi_id = 0);
* ``HIT``: the byte is part of a valid hit (but not the start byte);
* ``EXTRA``: the byte is an extra byte at the end of the readout, and possibly
the initial part of a valid hit that is split across readouts;
* ``ORPHAN``: the byte is an orphan byte at the beginning of the readout, and
we might be able to reassemble with the extra bytes from the previous readout;
* ``DROPPED``: the byte could not be assigned to a valid hit and was dropped.
"""
NOT_ASSIGNED = 0
PADDING = 1
IDLE = 2
HIT_START = 3
HIT = 4
EXTRA = 5
ORPHAN = 6
DROPPED = 7
@classmethod
def _fmt(cls, byte: str, *escape_codes: int) -> str:
"""Format a single byte using standard ANSI escape sequences for colors
and formatting.
Note we always append a reset escape sequence at the end---this will have
the terminal working a little bit harder, but we are relieved from the
responsibility to put the terminal back in the original state. (And this
will never be used in CPU-intensive contexts, anyway.)
Common color codes:
| Color | Code |
| ------- | ---- |
| Black | `30` |
| Red | `31` |
| Green | `32` |
| Yellow | `33` |
| Blue | `34` |
| Magenta | `35` |
| Cyan | `36` |
| White | `37` |
| Reset | `0` |
Common background color codes:
| Color | Code |
| ------- | ---- |
| Black | `40` |
| Red | `41` |
| Green | `42` |
| Yellow | `43` |
| Blue | `44` |
| Magenta | `45` |
| Cyan | `46` |
| White | `47` |
Common text modifiers:
| Style | Code |
| ------------- | ---- |
| Reset | `0` |
| Bold | `1` |
| Faint | `2` |
| Italic | `3` |
| Underline | `4` |
| Blink | `5` |
| Reverse | `7` |
| Conceal | `8` |
| Strikethrough | `9` |
"""
return f'\033[{";".join([str(code) for code in escape_codes])}m{byte}\033[0m'
@classmethod
def format_byte(cls, byte: str, byte_type) -> str:
"""Format a single byte according to its type.
"""
if byte_type == cls.HIT_START:
return cls._fmt(byte, 31, 47)
if byte_type == cls.HIT:
return cls._fmt(byte, 30, 47)
if byte_type == cls.IDLE:
return cls._fmt(byte, 33)
if byte_type == cls.DROPPED:
return cls._fmt(byte, 9)
if byte_type in (cls.EXTRA, cls.ORPHAN):
return cls._fmt(byte, 35)
return byte
Hit structures#
Likewise, all concrete hit classes derive from AbstractAstroPixHit,
although defining concrete subclasses is slightly more complex in this case, as
a the following definition shows.
@hitclass
class AstroPix4Hit(AbstractAstroPixHit):
"""Class describing an AstroPix4 hit.
"""
_SIZE = 8
_LAYOUT = {
'chip_id': (slice(0, 5), np.uint8),
'payload': (slice(5, 8), np.uint8),
'readout_id': (None, np.uint32),
'timestamp': (None, np.uint64),
'decoding_order': (None, np.uint8),
'row': (slice(8, 13), np.uint8),
'column': (slice(13, 18), np.uint8),
'ts_neg1': (18, np.uint8),
'ts_coarse1': (slice(19, 33), np.uint16),
'ts_fine1': (slice(33, 36), np.uint8),
'ts_tdc1': (slice(36, 41), np.uint8),
'ts_neg2': (41, np.uint8),
'ts_coarse2': (slice(42, 56), np.uint16),
'ts_fine2': (slice(56, 59), np.uint8),
'ts_tdc2': (slice(59, 64), np.uint8),
'ts_dec1': (None, np.uint32),
'ts_dec2': (None, np.uint32),
'tot_us': (None, np.float64)
}
CLOCK_CYCLES_PER_US = 20.
CLOCK_ROLLOVER = 2**17
def __init__(self, data: bytearray, readout_id: int, timestamp: int,
decoding_order: int) -> None:
"""Constructor.
"""
# pylint: disable=no-member
super().__init__(data)
self.decoding_order = decoding_order
# Calculate the values of the two timestamps in clock cycles.
self.ts_dec1 = self._compose_ts(self.ts_coarse1, self.ts_fine1)
self.ts_dec2 = self._compose_ts(self.ts_coarse2, self.ts_fine2)
# Take into account possible rollovers.
if self.ts_dec2 < self.ts_dec1:
self.ts_dec2 += self.CLOCK_ROLLOVER
# Calculate the actual TOT in us.
self.tot_us = (self.ts_dec2 - self.ts_dec1) / self.CLOCK_CYCLES_PER_US
self.readout_id = readout_id
self.timestamp = timestamp
@staticmethod
def _compose_ts(ts_coarse: int, ts_fine: int) -> int:
"""Compose the actual decimal representation of the timestamp counter,
putting together the coarse and fine counters (in Gray code).
Arguments
---------
ts_coarse : int
The value of the coarse counter (MSBs) in Gray code.
ts_fine : int
The value of the fine counter (3 LSBs) in Gray code.
Returns
-------
int
The actual decimal value of the timestamp counter, in clock cycles.
"""
return AbstractAstroPixHit.gray_to_decimal((ts_coarse << 3) + ts_fine)
In a nutshell, all you have to do is:
override the
_SIZEclass variable: this indicates the total size in bytes of the binary buffer, as it comes from the hardware, representing a hit;override the
_LAYOUTclass variable: this is a dictionary mapping the name of each field to the corresponding slice in the input binary buffer and the desired data type when the things is to be written to persistent memory (it goes without saying that the fields might be defined in the same order they occur in the frame); for instance, the specification'column': (slice(8, 16), np.uint8)is meant to indicate that the 8 bits8:16in the input buffer need to be mapped into a class attributehit.columnand the latter, when written to binary output, is represented as a 8-bit unsigned integer; when the slice isNone, that means that the corresponding field is not to be read from the input binary buffer, but it is calculated in the constructor based on the row quantities (and, still, the output type is obviously relevant);decorate the concrete class with the
@hitclassdecorator, which calculates at the time of the type creation (and not every time a class instance is created) some useful quantities that allows for streamlining the hit manipulation;
The layout machinery is designed to avoid addressing the underlying binary data
with hard-coded indices and make it easier to reason about the hist structure.
It leverages under the hood the small convenience class
BitPattern.
Hit objects come equipped with all the facilities to represent themselves, retrieve a subset of the attributes in a programmatic fashion, and interface to astropy table to support structured binary output:
print(hit)
AstroPix4Hit(chip_id = 0, payload = 7, readout_id = 9, timestamp = 1753255537403740300,
decoding_order = 0, row = 0, column = 9, ts_neg1 = 0, ts_coarse1 = 14381,
ts_fine1 = 3, ts_tdc1 = 0, ts_neg2 = 1, ts_coarse2 = 11055, ts_fine2 = 5,
ts_tdc2 = 0, ts_dec1 = 97869, ts_dec2 = 102825, tot_us = 247.8,
raw_data = b'\x07\x02\\\x16\xb0k/\xa0')
print(hit.attribute_values(['chip_id', row', 'column']))
[0, 0, 9]
Module documentation#
Data format description for the astropix chip.
- astropix_analysis.fmt.reverse_bit_order(data: bytearray) None[source]#
Reverses the bit order within a bytearray.
- class astropix_analysis.fmt.BitPattern(data: bytes)[source]#
Small convenience class representing a bit pattern, that we can slice (interpreting the result as the binary representation of an integer) without caring about the byte boundaries.
This is not very memory-efficient and probably not blazingly fast either, but it allows to reason about the incoming bits in a straightforward fashion, and I doubt we will ever need to optimize this. (If that is the case, there are probably ways, using either numpy or the bitarray third-party package.)
- Parameters:
data (bytes) – The binary representation of the bit pattern.
- astropix_analysis.fmt.hitclass(cls: type) type[source]#
Small decorator to support automatic generation of concrete hit classes.
Here we simply calculate some useful class variables that are needed to unpack the binary data and/or write the hit to different data formats. Having a decorator allows to do the operation once and for all at the time the type is created, as opposed to do it over and over again each time an instance of the class is created. More specifically:
ATTRIBUTE_NAMESis a tuple containing all the hit field names that can be used, e.g., for printing out the hit itself;_ATTR_IDX_DICTis a dictionary mapping the name of each attribute to the corresponding slice of the input binary buffer—note it does not include the attributes that are not encoded in the input buffer, but are calculated at construction time; this facilitates unpacking the input buffer;_ATTR_TYPE_DICTis a dictionary mapping the name of each class attribute to the corresponding data type for the purpose of writing it to a binary file (e.g., in HDF5 or FITS format).
- class astropix_analysis.fmt.AbstractAstroPixHit(data: bytearray)[source]#
Abstract base class for a generic AstroPix hit.
While the original decode routine was working in terms of the various bytes in the binary representation of the hit, since there seem to be no meaning altogether in the byte boundaries (at least for AstroPix 4), and the various fields are arbitrary subsets of a multi-byte word, it seemed more natural to describe the hit as a sequence of fields, each one with its own length in bits.
Note this is an abstract class that cannot be instantiated. (Note the
__init__()special method is abstract and needs to be overloaded, based on the assumption that for concrete classes we always want to calculate derived quantities based on the row ones parsed from the input binary buffer.) Concrete subclasses must by contract:overload the
_SIZE;overload the
_LAYOUT;be decorated with the
@hitclassdecorator.
- Parameters:
data (bytearray) – The portion of a full AstroPix readout representing a single hit.
- _SIZE = 0#
- _LAYOUT = {}#
- ATTRIBUTE_NAMES = ()#
- _ATTR_IDX_DICT = {}#
- _ATTR_TYPE_DICT = {}#
- static gray_to_decimal(gray: int) int[source]#
Convert a Gray code (integer) to decimal.
A Gray code (or reflected binary code) is a binary numeral system where two consecutive values differ by only one bit, which makes it useful in error correction and minimizing logic transitions in digital circuits. This function is provided as a convenience to translate counter values encoded in Gray code into actual decimal values.
- classmethod empty_table(attribute_names: list[str] = None) Table[source]#
Return an astropy empty table with the proper column types for the concrete hit type.
Note this is checking that all the attribute names are valid and tries and raise a useful exception if that is not the case.
- Parameters:
attribute_names (str) – The name of the hit attributes.
- attribute_values(attribute_names: list[str] = None) list[source]#
Return the value of the hit attributes for a given set of attribute names.
- Parameters:
attribute_names (str) – The name of the hit attributes.
- dict() dict[source]#
Return the hit content as a dict—this will be essentially identical to the
__dict__dunder method, except for the order of the field.Warning
This will be slow, so do not abuse it. It is good for printing :-)
- _abc_impl = <_abc._abc_data object>#
- class astropix_analysis.fmt.AstroPix3Hit(data: bytearray)[source]#
Class describing an AstroPix3 hit.
Warning
This is copied from decode.py and totally untested.
- _SIZE = 5#
- _LAYOUT = {'chip_id': (slice(0, 5, None), <class 'numpy.uint8'>), 'column': (8, <class 'numpy.uint8'>), 'location': (slice(10, 16, None), <class 'numpy.uint8'>), 'payload': (slice(5, 8, None), <class 'numpy.uint8'>), 'timestamp': (slice(16, 24, None), <class 'numpy.uint8'>), 'tot_dec': (None, <class 'numpy.uint16'>), 'tot_lsb': (slice(32, 40, None), <class 'numpy.uint8'>), 'tot_msb': (slice(28, 32, None), <class 'numpy.uint8'>), 'tot_us': (None, <class 'numpy.float32'>)}#
- CLOCK_CYCLES_PER_US = 200.0#
- ATTRIBUTE_NAMES = ('chip_id', 'payload', 'column', 'location', 'timestamp', 'tot_msb', 'tot_lsb', 'tot_dec', 'tot_us')#
- _ATTR_IDX_DICT = {'chip_id': slice(0, 5, None), 'column': 8, 'location': slice(10, 16, None), 'payload': slice(5, 8, None), 'timestamp': slice(16, 24, None), 'tot_lsb': slice(32, 40, None), 'tot_msb': slice(28, 32, None)}#
- _ATTR_TYPE_DICT = {'chip_id': <class 'numpy.uint8'>, 'column': <class 'numpy.uint8'>, 'location': <class 'numpy.uint8'>, 'payload': <class 'numpy.uint8'>, 'timestamp': <class 'numpy.uint8'>, 'tot_dec': <class 'numpy.uint16'>, 'tot_lsb': <class 'numpy.uint8'>, 'tot_msb': <class 'numpy.uint8'>, 'tot_us': <class 'numpy.float32'>}#
- _abc_impl = <_abc._abc_data object>#
- class astropix_analysis.fmt.AstroPix4Hit(data: bytearray, readout_id: int, timestamp: int, decoding_order: int)[source]#
Class describing an AstroPix4 hit.
- _SIZE = 8#
- _LAYOUT = {'chip_id': (slice(0, 5, None), <class 'numpy.uint8'>), 'column': (slice(13, 18, None), <class 'numpy.uint8'>), 'decoding_order': (None, <class 'numpy.uint8'>), 'payload': (slice(5, 8, None), <class 'numpy.uint8'>), 'readout_id': (None, <class 'numpy.uint32'>), 'row': (slice(8, 13, None), <class 'numpy.uint8'>), 'timestamp': (None, <class 'numpy.uint64'>), 'tot_us': (None, <class 'numpy.float64'>), 'ts_coarse1': (slice(19, 33, None), <class 'numpy.uint16'>), 'ts_coarse2': (slice(42, 56, None), <class 'numpy.uint16'>), 'ts_dec1': (None, <class 'numpy.uint32'>), 'ts_dec2': (None, <class 'numpy.uint32'>), 'ts_fine1': (slice(33, 36, None), <class 'numpy.uint8'>), 'ts_fine2': (slice(56, 59, None), <class 'numpy.uint8'>), 'ts_neg1': (18, <class 'numpy.uint8'>), 'ts_neg2': (41, <class 'numpy.uint8'>), 'ts_tdc1': (slice(36, 41, None), <class 'numpy.uint8'>), 'ts_tdc2': (slice(59, 64, None), <class 'numpy.uint8'>)}#
- CLOCK_CYCLES_PER_US = 20.0#
- CLOCK_ROLLOVER = 131072#
- static _compose_ts(ts_coarse: int, ts_fine: int) int[source]#
Compose the actual decimal representation of the timestamp counter, putting together the coarse and fine counters (in Gray code).
- Parameters:
ts_coarse (int) – The value of the coarse counter (MSBs) in Gray code.
ts_fine (int) – The value of the fine counter (3 LSBs) in Gray code.
- Returns:
The actual decimal value of the timestamp counter, in clock cycles.
- Return type:
int
- ATTRIBUTE_NAMES = ('chip_id', 'payload', 'readout_id', 'timestamp', 'decoding_order', 'row', 'column', 'ts_neg1', 'ts_coarse1', 'ts_fine1', 'ts_tdc1', 'ts_neg2', 'ts_coarse2', 'ts_fine2', 'ts_tdc2', 'ts_dec1', 'ts_dec2', 'tot_us')#
- _ATTR_IDX_DICT = {'chip_id': slice(0, 5, None), 'column': slice(13, 18, None), 'payload': slice(5, 8, None), 'row': slice(8, 13, None), 'ts_coarse1': slice(19, 33, None), 'ts_coarse2': slice(42, 56, None), 'ts_fine1': slice(33, 36, None), 'ts_fine2': slice(56, 59, None), 'ts_neg1': 18, 'ts_neg2': 41, 'ts_tdc1': slice(36, 41, None), 'ts_tdc2': slice(59, 64, None)}#
- _ATTR_TYPE_DICT = {'chip_id': <class 'numpy.uint8'>, 'column': <class 'numpy.uint8'>, 'decoding_order': <class 'numpy.uint8'>, 'payload': <class 'numpy.uint8'>, 'readout_id': <class 'numpy.uint32'>, 'row': <class 'numpy.uint8'>, 'timestamp': <class 'numpy.uint64'>, 'tot_us': <class 'numpy.float64'>, 'ts_coarse1': <class 'numpy.uint16'>, 'ts_coarse2': <class 'numpy.uint16'>, 'ts_dec1': <class 'numpy.uint32'>, 'ts_dec2': <class 'numpy.uint32'>, 'ts_fine1': <class 'numpy.uint8'>, 'ts_fine2': <class 'numpy.uint8'>, 'ts_neg1': <class 'numpy.uint8'>, 'ts_neg2': <class 'numpy.uint8'>, 'ts_tdc1': <class 'numpy.uint8'>, 'ts_tdc2': <class 'numpy.uint8'>}#
- _abc_impl = <_abc._abc_data object>#
- class astropix_analysis.fmt.Decoding(*values)[source]#
Enum class for all the possible issue that can happen during decoding.
ORPHAN_BYTES_MATCHED: the readout has orphan bytes at the beginning that were succesfully matched with the extra bytes from the previous readout;ORPHAN_BYTES_DROPPED: the readout has orphan bytes at the beginning that we were not able to match with the extra bytes from the previous readout;ORPHAN_BYTES_NOT_USED: the readout has orphan bytes at the beginning that we could not use because the previous readout had no extra bytes;VALID_EXTRA_BYTES: the readout had extra bytes at the end, starting with a valid start-hit byte, that could be potentially matched with the beginning of the next buffer;INVALID_EXTRA_BYTES: the readout had extra bytes at the end, but since the first byte is not a valid start-hit byte, these cannot effectively be used;INCOMPLETE_HIT_DROPPED: the readout had incomplete hit data somewhere that we had to drop.
- ORPHAN_BYTES_MATCHED = 0#
- ORPHAN_BYTES_DROPPED = 1#
- ORPHAN_BYTES_NOT_USED = 2#
- VALID_EXTRA_BYTES = 3#
- INVALID_EXTRA_BYTES = 4#
- INCOMPLETE_HIT_DROPPED = 5#
- NOT_ALL_BYTES_VISITED = 6#
- UNKNOWN_FATAL_ERROR = 7#
- class astropix_analysis.fmt.DecodingStatus[source]#
Small class representing the status of a readout decoding.
- class astropix_analysis.fmt.ByteType(*values)[source]#
Enum class used to keep track of the byte types within a readout.
The main purpose of this utility class is to be able to pretty-print readout raw data in order to debug the decoding process.
NOT_ASSIGNED: bit not assigned (this should never happen unless the readout cannot be properly decoded);PADDING: padding byte;IDLE: idle byte;HIT_START: byte signaling the start of a hit (e.g.,0xe0for chipi_id = 0);HIT: the byte is part of a valid hit (but not the start byte);EXTRA: the byte is an extra byte at the end of the readout, and possibly the initial part of a valid hit that is split across readouts;ORPHAN: the byte is an orphan byte at the beginning of the readout, and we might be able to reassemble with the extra bytes from the previous readout;DROPPED: the byte could not be assigned to a valid hit and was dropped.
- NOT_ASSIGNED = 0#
- PADDING = 1#
- IDLE = 2#
- HIT_START = 3#
- HIT = 4#
- EXTRA = 5#
- ORPHAN = 6#
- DROPPED = 7#
- classmethod _fmt(byte: str, *escape_codes: int) str[source]#
Format a single byte using standard ANSI escape sequences for colors and formatting.
Note we always append a reset escape sequence at the end—this will have the terminal working a little bit harder, but we are relieved from the responsibility to put the terminal back in the original state. (And this will never be used in CPU-intensive contexts, anyway.)
Common color codes:
Color | Code |——- | —- |Black | 30 |Red | 31 |Green | 32 |Yellow | 33 |Blue | 34 |Magenta | 35 |Cyan | 36 |White | 37 |Reset | 0 |Common background color codes:
Color | Code |——- | —- |Black | 40 |Red | 41 |Green | 42 |Yellow | 43 |Blue | 44 |Magenta | 45 |Cyan | 46 |White | 47 |Common text modifiers:
Style | Code |————- | —- |Reset | 0 |Bold | 1 |Faint | 2 |Italic | 3 |Underline | 4 |Blink | 5 |Reverse | 7 |Conceal | 8 |Strikethrough | 9 |
- astropix_analysis.fmt.readoutclass(cls: type) type[source]#
Small decorator to support automatic generation of concrete hit classes.
Decorating concrete readout classes with this allows for some minimal checks on the class definition—note this is only done one, when the class is defined and not at runtime (every time a class instance is created.)
- class astropix_analysis.fmt.AbstractAstroPixReadout(readout_data: bytearray, readout_id: int, timestamp: int = None)[source]#
Abstract base class for a generic AstroPix readout.
This is basically a wrapper around the bytearray object that is returned by the DAQ board, and it provides some basic functionality to write the readout to a binary file.
A full readout comes in the form of a fixed-length bytearray object that is padded at the end with a padding byte (0xff). The hit data are surrounded by a (generally variable) number of idle bytes (0xbc), see the documentation of the decode() class method.
- Parameters:
readout_data (bytearray) – The readout data from the DAQ board.
readout_id (int) – A sequential id for the readout, assigned by the host DAQ machine.
timestamp (int) – A timestamp for the readout, assigned by the host DAQ machine, in ns since the epoch, from time.time_ns().
- HIT_CLASS = None#
- _UID = None#
- PADDING_BYTE = b'\xff'#
- IDLE_BYTE = b'\xbc'#
- _HEADER = b'\xfe\xdc\xba'#
- _HEADER_SIZE = 3#
- _READOUT_ID_FMT = '<L'#
- _TIMESTAMP_FMT = '<Q'#
- _LENGTH_FMT = '<L'#
- abstractmethod decode(extra_bytes: bytes = None) list[AbstractAstroPixHit][source]#
Placeholder for the decoding function—this needs to be reimplemented in derived classes.
- extra_bytes() bytes[source]#
Return the extra bytes, if any, at the end of the readout (None if there are no extra bytes).
- all_bytes_visited() bool[source]#
Return True if all the bytes have been visited in the decoding process.
- static latch_ns() int[source]#
Convenience function returning the time of the function call as an integer number of nanoseconds since the epoch, i.e., January 1, 1970, 00:00:00 (UTC) on all platforms.
- static _unpack(fmt: str, data: bytes) Any[source]#
Convenience function wrapping the struct.unpack() method—this saves us the trouble of getting the first (and only) element of a 1-element list.
- static _unpack_slice(fmt: str, data: bytes, start: int) Any[source]#
Convenience function to unpack a specific slice of a bytes object.
- static _read_and_unpack(input_file: BinaryIO, fmt: str) Any[source]#
Convenience function to read and unpack a fixed-size field from an input file.
- Parameters:
input_file (BinaryIO) – A file object opened in “rb” mode.
fmt (str) – The format string for the field to be read.
- to_bytes() bytes[source]#
Convert the readout object to its binary representation.
This is used, e.g., to write the readout to disk, or to send the object over to a network socket.
- classmethod from_bytes(data: bytes) AbstractAstroPixReadout[source]#
Re-assemble the readout object from its binary persistent representation.
Note
It would be nice to be able to reuse the same code here and in the
from_file()call, but the slight issue here, is the variable-size part of the event, which makes it not straighforward to read the whole thing from file in one shot. Something we might think about, though.- Parameters:
data (bytes) – The binary data representing the readout.
- write(output_file: BinaryIO) None[source]#
Write the complete readout to a binary file.
- Parameters:
output_file (BinaryIO) – A file object opened in “wb” mode.
- classmethod from_file(input_file: BinaryIO) AbstractAstroPixReadout[source]#
Create a Readout object reading the underlying data from an input binary file.
By contract this should return None when there are no more data to be read from the input file, so that downstream code can use the information to stop iterating over the file.
- Parameters:
input_file (BinaryIO) – A file object opened in “rb” mode.
- _add_hit(hit_data: bytes, reverse: bool = True) None[source]#
Add a hit to readout.
This will be typically called during the readout decoding.
- hex() str[source]#
Return a string with the hexadecimal representation of the underlying binary data (two hexadecimal digits per byte).
- pretty_hex()[source]#
Return a pretty version of the hexadecimal representation, where the hit portion of the readout are colored.
- _abc_impl = <_abc._abc_data object>#
- class astropix_analysis.fmt.AstroPix4Readout(readout_data: bytearray, readout_id: int, timestamp: int = None)[source]#
Class describing an AstroPix 4 readout.
- HIT_CLASS#
alias of
AstroPix4Hit
- _UID = 4000#
- DEFAULT_START_BYTE = b'\xe0'#
- static is_valid_start_byte(byte: bytes) bool[source]#
Return True if the byte is a valid start byte for Astropix4 hit.
This, effectively, entiles to make sure that the byte is of the form 111xxxxx, where the 5 least significant bits are the chip id.
Note
This assume the byte is before the bit order is reverse, i.e., this operates in the space of the data stream from the Nexys board. The rational for this is that all the error checking happens at the readout level, before the bit order is reversed and before the hit is even created.
Warning
We have an amusing edge case, here, in that 0xff is both the padding byte and a valid start byte for Astropix 4. We should probably put some thought into this, but we are tentatively saying that 0xff is not a valid start byte for a hit, in order to keep the decoding as simple as possible.
- static _invalid_start_byte_msg(start_byte: bytes, position: int) str[source]#
Generic error message for an invalid start byte.
- decode(extra_bytes: bytes = None) list[AbstractAstroPixHit][source]#
Astropix4 decoding function.
Note
Note that you always need to addess single bytes in the data stream with a proper slice, as opposed to an integer, i.e., data[i:i + 1] instead of data[i], because Python will return an integer, otherwise.
- Parameters:
extra_bytes (bytes) – Optional extra bytes from the previous readout that might be re-assembled together with the beginning of this readout.
- _abc_impl = <_abc._abc_data object>#