diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 00000000..d7fe2f8d --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,2 @@ +ignore: + - "src/tickit_devices/eiger/stream/stream2.py" diff --git a/pyproject.toml b/pyproject.toml index 9add3847..8c5ea5ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,8 @@ dependencies = [ "typing_extensions", "softioc", "pydantic>1", - "apischema" + "apischema", + "cbor2", ] dynamic = ["version"] license.file = "LICENSE" @@ -68,6 +69,7 @@ version_file = "src/tickit_devices/_version.py" [tool.mypy] ignore_missing_imports = true # Ignore missing stubs in imported modules +exclude = ["src/tickit_devices/eiger/stream/stream2.py"] [tool.pytest.ini_options] # Run pytest with all our checkers, and don't spam us with massive tracebacks on error @@ -84,6 +86,7 @@ testpaths = "docs src tests" [tool.coverage.run] data_file = "/tmp/tickit_devices.coverage" +omit = ["src/tickit_devices/eiger/stream/stream2.py"] [tool.coverage.paths] # Tests are run from installed location, map back to the src directory diff --git a/src/tickit_devices/eiger/__init__.py b/src/tickit_devices/eiger/__init__.py index cbb21ba1..44f5f3ee 100644 --- a/src/tickit_devices/eiger/__init__.py +++ b/src/tickit_devices/eiger/__init__.py @@ -8,6 +8,7 @@ from tickit_devices.eiger.eiger import EigerDevice from tickit_devices.eiger.eiger_adapters import EigerRESTAdapter, EigerZMQAdapter +from tickit_devices.eiger.stream.stream_config import CBOR_STREAM, LEGACY_STREAM @pydantic.v1.dataclasses.dataclass @@ -16,8 +17,9 @@ class Eiger(ComponentConfig): host: str = "0.0.0.0" port: int = 8081 - zmq_host: str = "127.0.0.1" - zmq_port: int = 9999 + stream_host: str = "127.0.0.1" + stream_legacy_port: int = 9999 + stream_cbor_port: int = 31001 def __call__(self) -> Component: # noqa: D102 logging.getLogger("aiohttp.access").setLevel(logging.WARNING) @@ -31,10 +33,17 @@ def __call__(self) -> Component: # noqa: D102 ), ), AdapterContainer( - EigerZMQAdapter(device), + EigerZMQAdapter(device.streams[LEGACY_STREAM]), ZeroMqPushIo( - self.zmq_host, - self.zmq_port, + self.stream_host, + self.stream_legacy_port, + ), + ), + AdapterContainer( + EigerZMQAdapter(device.streams[CBOR_STREAM]), + ZeroMqPushIo( + self.stream_host, + self.stream_cbor_port, ), ), ] diff --git a/src/tickit_devices/eiger/data/stream2/end.cbor b/src/tickit_devices/eiger/data/stream2/end.cbor new file mode 100644 index 00000000..0530124a --- /dev/null +++ b/src/tickit_devices/eiger/data/stream2/end.cbor @@ -0,0 +1 @@ +ÙÙ÷£dtypecendiseries_id<þpseries_unique_idx01HBV3JPF9T4ZDPADX6EMK6XMZ \ No newline at end of file diff --git a/src/tickit_devices/eiger/data/stream2/image.cbor b/src/tickit_devices/eiger/data/stream2/image.cbor new file mode 100644 index 00000000..a5d8cb46 Binary files /dev/null and b/src/tickit_devices/eiger/data/stream2/image.cbor differ diff --git a/src/tickit_devices/eiger/data/stream2/start.cbor b/src/tickit_devices/eiger/data/stream2/start.cbor new file mode 100644 index 00000000..dada14ee Binary files /dev/null and b/src/tickit_devices/eiger/data/stream2/start.cbor differ diff --git a/src/tickit_devices/eiger/eiger.py b/src/tickit_devices/eiger/eiger.py index fa02b826..22b78f9e 100644 --- a/src/tickit_devices/eiger/eiger.py +++ b/src/tickit_devices/eiger/eiger.py @@ -1,5 +1,6 @@ import asyncio import logging +from collections.abc import Mapping from queue import Queue from tickit.core.device import Device, DeviceUpdate @@ -13,6 +14,13 @@ from tickit_devices.eiger.monitor.monitor_config import MonitorConfig from tickit_devices.eiger.monitor.monitor_status import MonitorStatus from tickit_devices.eiger.stream.eiger_stream import EigerStream +from tickit_devices.eiger.stream.eiger_stream_2 import EigerStream2 +from tickit_devices.eiger.stream.stream_config import ( + CBOR_STREAM, + LEGACY_STREAM, + StreamConfig, +) +from tickit_devices.eiger.stream.stream_status import StreamStatus from .eiger_status import EigerStatus, State @@ -35,7 +43,8 @@ class EigerDevice(Device): settings: EigerSettings status: EigerStatus - stream: EigerStream + stream: EigerStream | EigerStream2 + streams: Mapping[str, EigerStream | EigerStream2] _num_frames_left: int _data_queue: Queue @@ -49,7 +58,7 @@ def __init__( self, settings: EigerSettings | None = None, status: EigerStatus | None = None, - stream: EigerStream | None = None, + stream: EigerStream | EigerStream2 | None = None, ) -> None: """Construct a new eiger. @@ -61,7 +70,13 @@ def __init__( self.settings = settings or EigerSettings() self.status = status or EigerStatus() - self.stream = stream or EigerStream(callback_period=SimTime(int(1e9))) + self.stream_status: StreamStatus = StreamStatus() + self.stream_config: StreamConfig = StreamConfig() + self.streams = { + LEGACY_STREAM: stream or EigerStream(callback_period=SimTime(int(1e9))), + CBOR_STREAM: stream or EigerStream2(callback_period=SimTime(int(1e9))), + } + self.stream = self.streams[CBOR_STREAM] self.filewriter_status: FileWriterStatus = FileWriterStatus() self.filewriter_config: FileWriterConfig = FileWriterConfig() @@ -102,8 +117,11 @@ async def arm(self) -> None: Required for triggering. """ + self.stream = self.streams[self.stream_config.format] self._series_id += 1 - self.stream.begin_series(self.settings, self._series_id) + self.stream.begin_series( + self.settings, self._series_id, self.stream_config.header_detail + ) self._num_frames_left = self.settings.nimages self._num_triggers_left = self.settings.ntrigger self._set_state(State.READY) @@ -224,3 +242,73 @@ def _set_state(self, state: State) -> None: def _is_in_state(self, state: State) -> bool: return self.get_state() is state + + +def get_changed_parameters(key: str) -> list[str]: + """Get the list of parameters that may have changed as a result of putting + to the parameter provided. + + Args: + key: string key of the changed parameter within the detector subsystem + + Returns: + list[str]: a list of keys which may have been changed after a PUT request + """ + match key: + case "auto_summation": + return ["auto_summation", "frame_count_time"] + case "count_time" | "frame_time": + return [ + "bit_depth_image", + "bit_depth_readout", + "count_time", + "countrate_correction_count_cutoff", + "frame_count_time", + "frame_time", + ] + case "flatfield": + return ["flatfield", "threshold/1/flatfield"] + case "incident_energy" | "photon_energy": + return [ + "element", + "flatfield", + "incident_energy", + "photon_energy", + "threshold/1/energy", + "threshold/1/flatfield", + "threshold/2/energy", + "threshold/2/flatfield", + "threshold_energy", + "wavelength", + ] + case "pixel_mask": + return ["pixel_mask", "threshold/1/pixel_mask"] + case "threshold/1/flatfield": + return ["flatfield", "threshold/1/flatfield"] + case "roi_mode": + return ["count_time", "frame_time", "roi_mode"] + case "threshold_energy" | "threshold/1/energy": + return [ + "flatfield", + "threshold/1/energy", + "threshold/1/flatfield", + "threshold/2/flatfield", + "threshold_energy", + ] + case "threshold/2/energy": + return [ + "flatfield", + "threshold/1/flatfield", + "threshold/2/energy", + "threshold/2/flatfield", + ] + case "threshold/1/mode": + return ["threshold/1/mode", "threshold/difference/mode"] + case "threshold/2/mode": + return ["threshold/2/mode", "threshold/difference/mode"] + case "threshold/1/pixel_mask": + return ["pixel_mask", "threshold/1/pixel_mask"] + case "threshold/difference/mode": + return ["difference_mode"] # replicating API inconsistency + case _: + return [key] diff --git a/src/tickit_devices/eiger/eiger_adapters.py b/src/tickit_devices/eiger/eiger_adapters.py index bac24b5e..f1e927ee 100644 --- a/src/tickit_devices/eiger/eiger_adapters.py +++ b/src/tickit_devices/eiger/eiger_adapters.py @@ -6,8 +6,10 @@ from tickit.adapters.specifications import HttpEndpoint from tickit.adapters.zmq import ZeroMqPushAdapter -from tickit_devices.eiger.eiger import EigerDevice +from tickit_devices.eiger.eiger import EigerDevice, get_changed_parameters from tickit_devices.eiger.eiger_schema import SequenceComplete, construct_value +from tickit_devices.eiger.stream.eiger_stream import EigerStream +from tickit_devices.eiger.stream.eiger_stream_2 import EigerStream2 API_VERSION = "1.8.0" DETECTOR_API = f"detector/api/{API_VERSION}" @@ -73,7 +75,10 @@ async def put_config(self, request: web.Request) -> web.Response: self.device.settings[param] = attr LOGGER.debug("Set " + str(param) + " to " + str(attr)) - return web.json_response(serialize([param])) + + changed_parameters = get_changed_parameters(param) + + return web.json_response(serialize(changed_parameters)) else: LOGGER.debug("Eiger has no config variable: " + str(param)) return web.json_response(status=404) @@ -121,15 +126,14 @@ async def put_threshold_config(self, request: web.Request) -> web.Response: config = self.device.settings.threshold_config if threshold in config and hasattr(config[threshold], param): attr = response["value"] - - LOGGER.debug( - f"Changing to {str(attr)} for threshold/{threshold}{str(param)}" - ) - config[threshold][param] = attr LOGGER.debug(f"Set threshold/{threshold}{str(param)} to {str(attr)}") - return web.json_response(serialize([param])) + + full_param = f"threshold/{threshold}/{param}" + changed_parameters = get_changed_parameters(full_param) + + return web.json_response(serialize(changed_parameters)) else: LOGGER.debug("Eiger has no config variable: " + str(param)) return web.json_response(status=404) @@ -319,8 +323,8 @@ async def get_stream_status(self, request: web.Request) -> web.Response: """ param = request.match_info["param"] - if hasattr(self.device.stream.status, param): - return web.json_response(construct_value(self.device.stream.status, param)) + if hasattr(self.device.stream_status, param): + return web.json_response(construct_value(self.device.stream_status, param)) else: return web.json_response(status=404) @@ -337,8 +341,8 @@ async def get_stream_config(self, request: web.Request) -> web.Response: """ param = request.match_info["param"] - if hasattr(self.device.stream.config, param): - return web.json_response(construct_value(self.device.stream.config, param)) + if hasattr(self.device.stream_config, param): + return web.json_response(construct_value(self.device.stream_config, param)) else: return web.json_response(status=404) @@ -358,15 +362,15 @@ async def put_stream_config(self, request: web.Request) -> web.Response: response = await request.json() - if hasattr(self.device.stream.config, param): + if hasattr(self.device.stream_config, param): attr = response["value"] LOGGER.debug(f"Changing to {attr} for {param}") - self.device.stream.config[param] = attr + self.device.stream_config[param] = attr LOGGER.debug("Set " + str(param) + " to " + str(attr)) - return web.json_response(serialize([param])) + return web.json_response([]) else: LOGGER.debug("Eiger has no config variable: " + str(param)) return web.json_response(status=404) @@ -413,7 +417,7 @@ async def put_monitor_config(self, request: web.Request) -> web.Response: self.device.monitor_config[param] = attr LOGGER.debug("Set " + str(param) + " to " + str(attr)) - return web.json_response(serialize([param])) + return web.json_response([]) else: LOGGER.debug("Eiger has no config variable: " + str(param)) return web.json_response(status=404) @@ -480,7 +484,7 @@ async def put_filewriter_config(self, request: web.Request) -> web.Response: self.device.filewriter_config[param] = attr LOGGER.debug("Set " + str(param) + " to " + str(attr)) - return web.json_response(serialize([param])) + return web.json_response([]) else: LOGGER.debug("Eiger has no config variable: " + str(param)) return web.json_response(status=404) @@ -511,11 +515,11 @@ class EigerZMQAdapter(ZeroMqPushAdapter): device: EigerDevice - def __init__(self, device: EigerDevice) -> None: + def __init__(self, stream: EigerStream | EigerStream2) -> None: super().__init__() - self.device = device + self.stream = stream def after_update(self) -> None: """Updates IOC values immediately following a device update.""" - if buffered_data := list(self.device.stream.consume_data()): + if buffered_data := list(self.stream.consume_data()): self.add_message_to_stream(buffered_data) diff --git a/src/tickit_devices/eiger/eiger_schema.py b/src/tickit_devices/eiger/eiger_schema.py index 8a1cb50e..2216e3c3 100644 --- a/src/tickit_devices/eiger/eiger_schema.py +++ b/src/tickit_devices/eiger/eiger_schema.py @@ -33,26 +33,26 @@ def field_config(**kwargs) -> Mapping[str, Any]: class AccessMode(Enum): """Possible access modes for field metadata.""" - READ_ONLY: str = "r" - WRITE_ONLY: str = "w" - READ_WRITE: str = "rw" + READ_ONLY = "r" + WRITE_ONLY = "w" + READ_WRITE = "rw" class ValueType(Enum): """Possible value types for field metadata.""" - FLOAT: str = "float" - INT: str = "int" - UINT: str = "uint" - STRING: str = "string" - STR_LIST: str = "string[]" - BOOL: str = "bool" - FLOAT_GRID: str = "float[][]" - UINT_GRID: str = "uint[][]" - DATE: str = "date" - DATETIME: str = "datetime" - NONE: str = "none" - STATE: str = "State" + FLOAT = "float" + INT = "int" + UINT = "uint" + STRING = "string" + STR_LIST = "string[]" + BOOL = "bool" + FLOAT_GRID = "float[][]" + UINT_GRID = "uint[][]" + DATE = "date" + DATETIME = "datetime" + NONE = "none" + STATE = "State" # diff --git a/src/tickit_devices/eiger/eiger_settings.py b/src/tickit_devices/eiger/eiger_settings.py index c47bca41..12d5ae35 100644 --- a/src/tickit_devices/eiger/eiger_settings.py +++ b/src/tickit_devices/eiger/eiger_settings.py @@ -84,6 +84,7 @@ def config_keys() -> list[str]: "threshold_energy", "total_flux", "trigger_mode", + "trigger_start_delay", "two_theta_increment", "two_theta_start", "virtual_pixel_correction_applied", @@ -98,41 +99,41 @@ def config_keys() -> list[str]: class KA_Energy(Enum): """Possible element K-alpha energies for samples.""" - Li: float = 54.3 - Be: float = 108.5 - B: float = 183.3 - C: float = 277.0 - N: float = 392.4 - O: float = 524.9 # noqa: E741 - F: float = 676.8 - Ne: float = 848.6 - Na: float = 1040.98 - Mg: float = 1253.6 - Al: float = 1486.7 - Si: float = 1739.98 - P: float = 2013.7 - S: float = 2307.84 - Cl: float = 2622.39 - Ar: float = 2957.7 - K: float = 3313.8 - Ca: float = 3691.68 - Sc: float = 4090.6 - Ti: float = 4510.84 - V: float = 4952.2 - Cr: float = 5414.72 - Mn: float = 5898.75 - Fe: float = 6403.84 - Co: float = 6930.32 - Ni: float = 7478.15 - Cu: float = 8047.78 - Zn: float = 8638.86 + Li = 54.3 + Be = 108.5 + B = 183.3 + C = 277.0 + N = 392.4 + O = 524.9 # noqa: E741 + F = 676.8 + Ne = 848.6 + Na = 1040.98 + Mg = 1253.6 + Al = 1486.7 + Si = 1739.98 + P = 2013.7 + S = 2307.84 + Cl = 2622.39 + Ar = 2957.7 + K = 3313.8 + Ca = 3691.68 + Sc = 4090.6 + Ti = 4510.84 + V = 4952.2 + Cr = 5414.72 + Mn = 5898.75 + Fe = 6403.84 + Co = 6930.32 + Ni = 7478.15 + Cu = 8047.78 + Zn = 8638.86 @dataclass class Threshold: """Data container for a single threshold configuration.""" - energy: float = field(default=6729, metadata=rw_float()) + energy: float = field(default=6729.0, metadata=rw_float()) mode: str = field( default="enabled", metadata=rw_str(allowed_values=["enabled", "disabled"]) ) @@ -220,7 +221,7 @@ class EigerSettings: frame_count_time: float = field(default=0.01, metadata=ro_float()) frame_time: float = field(default=0.12, metadata=rw_float()) frame_period: float = field(default=0.12, metadata=rw_float()) - incident_energy: float = field(default=13458, metadata=rw_float()) + incident_energy: float = field(default=13458.0, metadata=rw_float()) incident_particle_type: str = field(default="photons", metadata=ro_str()) instrument_name: str = field(default="", metadata=rw_str()) kappa_increment: float = field(default=0.0, metadata=rw_float()) @@ -256,6 +257,7 @@ class EigerSettings: allowed_values=["eies", "exte", "extg", "exts", "inte", "ints"] ), ) + trigger_start_delay: float = field(default=0.0, metadata=rw_float(min=0.0)) two_theta_increment: float = field(default=0.0, metadata=rw_float()) two_theta_start: float = field(default=0.0, metadata=rw_float()) virtual_pixel_correction_applied: bool = field(default=True, metadata=rw_bool()) @@ -270,7 +272,7 @@ class EigerSettings: def __post_init__(self): self._threshold_config = { "1": Threshold(), - "2": Threshold(energy=18841), + "2": Threshold(energy=18841.0), "difference": ThresholdDifference(), } diff --git a/src/tickit_devices/eiger/stream/eiger_stream.py b/src/tickit_devices/eiger/stream/eiger_stream.py index 6fac5782..cb3fe456 100644 --- a/src/tickit_devices/eiger/stream/eiger_stream.py +++ b/src/tickit_devices/eiger/stream/eiger_stream.py @@ -17,8 +17,6 @@ ImageHeader, ) from tickit_devices.eiger.eiger_settings import EigerSettings -from tickit_devices.eiger.stream.stream_config import StreamConfig -from tickit_devices.eiger.stream.stream_status import StreamStatus LOGGER = logging.getLogger(__name__) @@ -29,8 +27,6 @@ class EigerStream: """Simulation of an Eiger stream.""" - status: StreamStatus - config: StreamConfig callback_period: SimTime _message_buffer: Queue[_Message] @@ -41,21 +37,21 @@ class Outputs(TypedDict): ... def __init__(self, callback_period: int = int(1e9)) -> None: """An Eiger Stream constructor.""" - self.status = StreamStatus() - self.config = StreamConfig() self.callback_period = SimTime(callback_period) self._message_buffer = Queue() - def begin_series(self, settings: EigerSettings, series_id: int) -> None: + def begin_series( + self, settings: EigerSettings, series_id: int, header_detail: str + ) -> None: """Send the headers marking the beginning of the acquisition series. Args: settings: Current detector configuration, a snapshot may be sent with the headers. series_id: ID for the acquisition series. + header_detail: Header detail for start message - "none", "basic" or "all" """ - header_detail = self.config.header_detail header = AcquisitionSeriesHeader( header_detail=header_detail, series=series_id, @@ -64,7 +60,7 @@ def begin_series(self, settings: EigerSettings, series_id: int) -> None: if header_detail != "none": config_header = settings.filtered( - ["flatfield", "pixelmask" "countrate_correction_table"] + ["flatfield", "pixelmask", "countrate_correction_table"] ) self._buffer(config_header) diff --git a/src/tickit_devices/eiger/stream/eiger_stream_2.py b/src/tickit_devices/eiger/stream/eiger_stream_2.py new file mode 100644 index 00000000..5064cd91 --- /dev/null +++ b/src/tickit_devices/eiger/stream/eiger_stream_2.py @@ -0,0 +1,175 @@ +import base64 +import logging +from collections.abc import Iterable +from pathlib import Path +from queue import Queue +from typing import Any, TypedDict + +import cbor2 +import numpy as np +from tickit.core.typedefs import SimTime + +from tickit_devices.eiger.data.dummy_image import Image +from tickit_devices.eiger.eiger_settings import EigerSettings +from tickit_devices.eiger.stream.stream2 import stream2_tag_decoder + +LOGGER = logging.getLogger(__name__) +DATA_PATH = Path(__file__).parent.parent / "data" / "stream2" +STREAM_SETTINGS_MAP = { + # Direct Mappings + # TODO: These are ints, but they should be floats + # https://github.com/DiamondLightSource/tickit-devices/issues/120 + "beam_center_x": "beam_center_x", + "beam_center_y": "beam_center_y", + "count_time": "count_time", + "frame_time": "frame_time", + "sensor_material": "sensor_material", + "sensor_thickness": "sensor_thickness", + # TODO: This is broken because it is now a map of thresholds + # https://github.com/DiamondLightSource/tickit-devices/issues/121 + # threshold_energy="threshold_energy", + # Indirect Mappings + "countrate_correction_enabled": "countrate_correction_applied", + "detector_description": "description", + "detector_serial_number": "detector_number", + "flatfield_enabled": "flatfield_correction_applied", + "image_size_x": "x_pixels_in_detector", + "image_size_y": "y_pixels_in_detector", + "incident_energy": "threshold_energy", + "incident_wavelength": "wavelength", + "pixel_mask_enabled": "pixel_mask_applied", + "pixel_size_x": "x_pixel_size", + "pixel_size_y": "y_pixel_size", + "saturation_value": "countrate_correction_count_cutoff", +} +START_ALL_FIELDS = ["flatfield", "pixel_mask", "countrate_correction_lookup_table"] +GONIO_AXES = ["chi", "kappa", "omega", "phi", "two_theta"] + + +def _load_messages(): + start = image = end = None + with open(DATA_PATH / "start.cbor", "rb") as f: + start = cbor2.load(f, tag_hook=stream2_tag_decoder) + + # Populate missing large datasets + sensor_shape = (start["image_size_y"], start["image_size_x"]) + # we need a base64 encoded array of 4 bit integers, numpy can't provide uint4s + # we can construct it manually for the trivial zero case + start["countrate_correction_lookup_table"] = base64.b64encode(65536 // 2 * b"\x00") + start["flatfield"]["threshold_1"] = base64.b64encode( + np.prod(sensor_shape) // 2 * b"\x00" # 2 pixels per byte + ) + start["pixel_mask"]["threshold_1"] = start["flatfield"]["threshold_1"] # copy value + + with open(DATA_PATH / "image.cbor", "rb") as f: + image = cbor2.load(f) + with open(DATA_PATH / "end.cbor", "rb") as f: + end = cbor2.load(f) + + return start, image, end + + +class EigerStream2: + """Simulation of an Eiger stream.""" + + callback_period: SimTime + + _message_buffer: Queue[bytes] + + class Inputs(TypedDict): + """No inputs.""" + + class Outputs(TypedDict): + """No outputs.""" + + def __init__(self, callback_period: int = int(1e9)) -> None: + """Eiger Stream2 constructor.""" + self.callback_period = SimTime(callback_period) + + self._message_buffer = Queue() + + self._start, self._image, self._end = _load_messages() + + def begin_series( + self, settings: EigerSettings, series_id: int, header_detail: str + ) -> None: + """Send the start message marking the beginning of the acquisition series. + + Args: + settings: Current detector configuration, a snapshot may be sent with the + headers. + series_id: ID for the acquisition series. + header_detail: Header detail for start message - 'none', 'basic' or 'all' + """ + if header_detail == "all": + # Use loaded message in place + start = self._start + else: + # Make a copy with "all" fields removed + start = {k: v for k, v in self._start.items() if k not in START_ALL_FIELDS} + + # Update message with current state + # TODO: Check what fields should be updated from current state + # https://github.com/DiamondLightSource/tickit-devices/issues/122 + start["number_of_images"] = settings.nimages * settings.ntrigger + for stream_field, setting in STREAM_SETTINGS_MAP.items(): + if stream_field not in start: + start[stream_field] = getattr(settings, setting) + for axis in [a for a in GONIO_AXES if a not in start["goniometer"]]: + # get default values for axes not in start message + # TODO: Captured cbor start message should have all axes? + start["goniometer"][axis] = {} + start["goniometer"][axis]["start"] = float( + getattr(settings, f"{axis}_start") + ) + start["goniometer"][axis]["increment"] = float( + getattr(settings, f"{axis}_increment") + ) + + start["series_id"] = series_id + + self._buffer(cbor_dumps(start)) + + def insert_image(self, image: Image, series_id: int) -> None: + """Send headers and an data blob for a single image. + + Args: + image: The image with associated metadata + series_id: ID for the acquisition series. + """ + self._image["series_id"] = series_id + self._image["image_id"] = image.index + + self._buffer(cbor_dumps(self._image)) + + def end_series(self, series_id: int) -> None: + """Send footer marking the end of an acquisition series. + + Args: + series_id: ID of the series to end. + """ + self._end["series_id"] = series_id + self._buffer(cbor_dumps(self._end)) + + def consume_data(self) -> Iterable[bytes]: + """Consume all headers and data buffered by other methods. + + Returns: + Iterable[_Message]: Iterable of headers and data + """ + while not self._message_buffer.empty(): + message = self._message_buffer.get() + yield message + + def _buffer(self, message: bytes) -> None: + self._message_buffer.put_nowait(message) + + +def cbor_dumps(message: dict[str, Any]) -> bytes: + """Serialize dictionary to cbor, including headers. + + Args: + message: Message to be serialized + + """ + return cbor2.dumps(cbor2.CBORTag(55799, message)) diff --git a/src/tickit_devices/eiger/stream/stream2.py b/src/tickit_devices/eiger/stream/stream2.py new file mode 100644 index 00000000..94881693 --- /dev/null +++ b/src/tickit_devices/eiger/stream/stream2.py @@ -0,0 +1,60 @@ +import cbor2 +import numpy as np + + +def decode_multi_dim_array(tag, column_major): + dimensions, contents = tag.value + if isinstance(contents, list): + array = np.empty((len(contents),), dtype=object) + array[:] = contents + elif isinstance(contents, np.ndarray | np.generic): + array = contents + else: + raise cbor2.CBORDecodeValueError("expected array or typed array") + return array.reshape(dimensions, order="F" if column_major else "C") + + +def decode_typed_array(tag, dtype): + if not isinstance(tag.value, bytes): + raise cbor2.CBORDecodeValueError("expected byte string in typed array") + return np.frombuffer(tag.value, dtype=dtype) + + +def decode_dectris_compression(tag): + _algorithm, _elem_size, encoded = tag.value + return encoded + + +tag_decoders = { + 40: lambda tag: decode_multi_dim_array(tag, column_major=False), + 64: lambda tag: decode_typed_array(tag, dtype="u1"), + 65: lambda tag: decode_typed_array(tag, dtype=">u2"), + 66: lambda tag: decode_typed_array(tag, dtype=">u4"), + 67: lambda tag: decode_typed_array(tag, dtype=">u8"), + 68: lambda tag: decode_typed_array(tag, dtype="u1"), + 69: lambda tag: decode_typed_array(tag, dtype=" list[str]: return ["format", "header_appendix", "header_detail", "image_appendix", "mode"] +LEGACY_STREAM = "legacy" +CBOR_STREAM = "cbor" + + @dataclass class StreamConfig: """Eiger stream configuration taken from the API spec.""" @@ -15,6 +19,10 @@ class StreamConfig: mode: str = field( default="enabled", metadata=rw_str(allowed_values=["enabled", "disabled"]) ) + format: str = field( + default=LEGACY_STREAM, + metadata=rw_str(allowed_values=[LEGACY_STREAM, CBOR_STREAM]), + ) header_detail: str = field( default="basic", metadata=rw_str(allowed_values=["none", "basic", "all"]) ) diff --git a/src/tickit_devices/zebra/__init__.py b/src/tickit_devices/zebra/__init__.py index a8d0e1a4..80205c0d 100644 --- a/src/tickit_devices/zebra/__init__.py +++ b/src/tickit_devices/zebra/__init__.py @@ -13,7 +13,7 @@ def _default() -> dict[str, int]: - return {k: 0 for k in param_types.keys()} + return dict.fromkeys(param_types.keys(), 0) @pydantic.v1.dataclasses.dataclass diff --git a/tests/eiger/test_eiger.py b/tests/eiger/test_eiger.py index ff2acdce..9d183588 100644 --- a/tests/eiger/test_eiger.py +++ b/tests/eiger/test_eiger.py @@ -112,7 +112,7 @@ async def test_armed_eiger_starts_series(eiger: EigerDevice, mock_stream: Mock): await eiger.initialize() eiger.settings.trigger_mode = "ints" await eiger.arm() - mock_stream.begin_series.assert_called_once_with(eiger.settings, 1) + mock_stream.begin_series.assert_called_once_with(eiger.settings, 1, "basic") @pytest.mark.asyncio @@ -123,7 +123,7 @@ async def test_disarmed_eiger_starts_and_ends_series( eiger.settings.trigger_mode = "ints" await eiger.arm() await eiger.disarm() - mock_stream.begin_series.assert_called_once_with(eiger.settings, 1) + mock_stream.begin_series.assert_called_once_with(eiger.settings, 1, "basic") mock_stream.end_series.assert_called_once_with(1) @@ -135,7 +135,7 @@ async def test_cancelled_eiger_starts_and_ends_series( eiger.settings.trigger_mode = "ints" await eiger.arm() await eiger.cancel() - mock_stream.begin_series.assert_called_once_with(eiger.settings, 1) + mock_stream.begin_series.assert_called_once_with(eiger.settings, 1, "basic") mock_stream.end_series.assert_called_once_with(1) @@ -164,7 +164,7 @@ async def test_acquire_frames_in_ints_mode( update = eiger.update(SimTime(0.0), {}) assert update.call_at is None - mock_stream.begin_series.assert_called_with(eiger.settings, series) + mock_stream.begin_series.assert_called_with(eiger.settings, series, "basic") assert mock_stream.begin_series.call_count == series if num_frames > 0: mock_stream.insert_image.assert_called_with(ANY, series) @@ -203,7 +203,7 @@ async def test_acquire_frames_in_exts_mode( update = eiger.update(SimTime(0.0), {}) assert update.call_at is None - mock_stream.begin_series.assert_called_with(eiger.settings, series) + mock_stream.begin_series.assert_called_with(eiger.settings, series, "basic") assert mock_stream.begin_series.call_count == series if num_frames > 0: mock_stream.insert_image.assert_called_with(ANY, series) diff --git a/tests/eiger/test_eiger_adapters.py b/tests/eiger/test_eiger_adapters.py index 205c26f7..779fd2ca 100644 --- a/tests/eiger/test_eiger_adapters.py +++ b/tests/eiger/test_eiger_adapters.py @@ -1,7 +1,7 @@ import pytest from pytest_mock import MockerFixture -from tickit_devices.eiger.eiger import EigerDevice +from tickit_devices.eiger.eiger import EigerDevice, get_changed_parameters from tickit_devices.eiger.eiger_adapters import EigerRESTAdapter, EigerZMQAdapter @@ -12,7 +12,7 @@ def test_after_update(mocker: MockerFixture) -> None: device_mock = mocker.MagicMock() device_mock.stream.consume_data.side_effect = [test_data, []] - zmq_adapter = EigerZMQAdapter(device_mock) + zmq_adapter = EigerZMQAdapter(device_mock.stream) add_mock = mocker.patch.object(zmq_adapter, "add_message_to_stream") # Test after_update only calls add_message_to_stream with non-empty data @@ -72,3 +72,60 @@ async def test_rest_adapter_command_404(mocker: MockerFixture): assert (await eiger_adapter.trigger_eiger(request)).status == 404 assert (await eiger_adapter.cancel_eiger(request)).status == 404 assert (await eiger_adapter.abort_eiger(request)).status == 404 + + +@pytest.mark.asyncio +async def test_detector_put_responses(mocker: MockerFixture): + # test special keys have non-trivial response + custom_response_keys = [ + "auto_summation", + "count_time", + "frame_time", + "flatfield", + "incident_energy", + "photon_energy", + "pixel_mask", + "threshold/1/flatfield", + "roi_mode", + "threshold_energy", + "threshold/1/energy", + "threshold/2/energy", + "threshold/1/mode", + "threshold/2/mode", + "threshold/1/pixel_mask", + "threshold/difference/mode", + ] + + for key in custom_response_keys: + assert get_changed_parameters(key) != [key] + + assert get_changed_parameters("phi_start") == ["phi_start"] + assert get_changed_parameters("threshold/1/pixel_mask") == [ + "pixel_mask", + "threshold/1/pixel_mask", + ] + assert get_changed_parameters("threshold/difference/mode") == ["difference_mode"] + + eiger_adapter = EigerRESTAdapter(EigerDevice()) + + request = mocker.MagicMock() + request.json = mocker.AsyncMock() + + request.match_info = {"parameter_name": "count_time", "value": 1.0} + response = await eiger_adapter.put_config(request) + assert response.body == ( + b'["bit_depth_image", "bit_depth_readout", "count_time",' + b' "countrate_correction_count_cutoff",' + b' "frame_count_time", "frame_time"]' + ) + # trivial case just returns the single parameter + request.match_info = {"parameter_name": "phi_start", "value": 1.0} + response = await eiger_adapter.put_config(request) + assert response.body == b'["phi_start"]' + + # test threshold responses work + + request.match_info = {"parameter_name": "mode", "threshold": "1", "value": 1} + request.json = mocker.AsyncMock(return_value={"value": 1}) + response = await eiger_adapter.put_threshold_config(request) + assert response.body == b'["threshold/1/mode", "threshold/difference/mode"]' diff --git a/tests/eiger/test_eiger_stream.py b/tests/eiger/test_eiger_stream.py index ad83c9d9..d9c3e995 100644 --- a/tests/eiger/test_eiger_stream.py +++ b/tests/eiger/test_eiger_stream.py @@ -33,7 +33,7 @@ def stream() -> EigerStream: ] EIGER_SETTINGS_HEADER = EigerSettings().filtered( - ["flatfield", "pixelmask" "countrate_correction_table"] + ["flatfield", "pixelmask", "countrate_correction_table"] ) X_SIZE = EIGER_SETTINGS_HEADER["x_pixels_in_detector"] Y_SIZE = EIGER_SETTINGS_HEADER["y_pixels_in_detector"] @@ -90,8 +90,7 @@ def test_begin_series_produces_correct_headers( expected_headers: list[BaseModel | bytes | Mapping[str, Any]], ) -> None: settings = EigerSettings() - stream.config.header_detail = header_detail - stream.begin_series(settings, TEST_SERIES_ID) + stream.begin_series(settings, TEST_SERIES_ID, header_detail) blobs = list(stream.consume_data()) for a, b in zip(expected_headers, blobs, strict=True): @@ -119,8 +118,7 @@ def test_end_series_produces_correct_headers( def test_data_buffered(stream: EigerStream) -> None: settings = EigerSettings() - stream.config.header_detail = "all" - stream.begin_series(settings, TEST_SERIES_ID) + stream.begin_series(settings, TEST_SERIES_ID, "all") image = Image.create_dummy_image(0, (X_SIZE, Y_SIZE)) stream.insert_image(image, TEST_SERIES_ID) stream.end_series(TEST_SERIES_ID) diff --git a/tests/eiger/test_eiger_stream_2.py b/tests/eiger/test_eiger_stream_2.py new file mode 100644 index 00000000..aa3058e8 --- /dev/null +++ b/tests/eiger/test_eiger_stream_2.py @@ -0,0 +1,198 @@ +import datetime +from typing import Any + +import cbor2 +import pytest + +from tickit_devices.eiger.data.dummy_image import Image +from tickit_devices.eiger.eiger import EigerDevice +from tickit_devices.eiger.eiger_settings import EigerSettings +from tickit_devices.eiger.stream.eiger_stream_2 import EigerStream2 + + +@pytest.fixture +def stream() -> EigerStream2: + return EigerStream2() + + +@pytest.fixture +def eiger() -> EigerDevice: + return EigerDevice() + + +TEST_SERIES_ID = 15614 + +EIGER_SETTINGS_HEADER = EigerSettings().filtered( + ["flatfield", "pixelmask", "countrate_correction_table"] +) +X_SIZE = EIGER_SETTINGS_HEADER["x_pixels_in_detector"] +Y_SIZE = EIGER_SETTINGS_HEADER["y_pixels_in_detector"] + +ALL_FIELDS = ["flatfield", "pixel_mask", "countrate_correction_lookup_table"] + +BASIC_START_MESSAGE: dict[str, Any] = { + "type": "start", + "series_id": 15614, + "series_unique_id": "01HBV3JPF9T4ZDPADX6EMK6XMZ", + "arm_date": datetime.datetime( + 2023, + 10, + 3, + 17, + 47, + 48, + 329000, + tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)), + ), + "beam_center_x": 2049.3840906675064, + "beam_center_y": 2163.621048575148, + "channels": ["threshold_1"], + "count_time": 0.004317472232502031, + "countrate_correction_lookup_table": b"\x00\x00\x00\x00", + "countrate_correction_enabled": True, + "detector_description": "Dectris EIGER2 Si 16M", + "detector_serial_number": "E-32-0117", + "detector_translation": [ + 0.15370380680006296, + 0.1622715786431361, + -0.23715919962215962, + ], + "flatfield": {"threshold_1": b"\x00\x00\x00\x00"}, + "flatfield_enabled": True, + "frame_time": 0.004317572232502031, + "goniometer": { + "chi": {"increment": 0.0, "start": 30.0}, + "omega": {"increment": 0.1, "start": 0.0}, + "phi": {"increment": 0.0, "start": 0.0}, + }, + "image_size_x": 4148, + "image_size_y": 4362, + "incident_energy": 13500.299829398293, + "incident_wavelength": 0.918381073013, + "number_of_images": 1, + "pixel_mask": {"threshold_1": b"\x00\x00\x00\x00"}, + "pixel_mask_enabled": True, + "pixel_size_x": 7.5e-05, + "pixel_size_y": 7.5e-05, + "saturation_value": 21517, + "sensor_material": "Si", + "sensor_thickness": 0.00045, + "threshold_energy": { + "threshold_1": 6750.149914699146, + "threshold_2": 18900.41976115761, + }, + "virtual_pixel_interpolation_enabled": True, +} + + +# This does not include 'image_id' or 'data' as these are tested separately +IMAGE_MESSAGE = { + "type": "image", + "series_id": 15614, + "series_unique_id": "01HBV3JPF9T4ZDPADX6EMK6XMZ", + "real_time": [215873, 50000000], + "series_date": datetime.datetime( + 2023, + 10, + 3, + 17, + 47, + 49, + 434000, + tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)), + ), + "start_time": [0, 50000000], + "stop_time": [215873, 50000000], +} + + +END_MESSAGE = { + "type": "end", + "series_id": 15614, + "series_unique_id": "01HBV3JPF9T4ZDPADX6EMK6XMZ", +} + + +@pytest.mark.parametrize( + "header_detail", + [("none"), ("basic")], +) +def test_begin_series_message_basic(stream: EigerStream2, header_detail: str) -> None: + settings = EigerSettings() + + stream.begin_series(settings, TEST_SERIES_ID, header_detail) + data = list(stream.consume_data())[0] + assert isinstance(data, bytes) + message = cbor2.loads(data) + for axis, start_value in BASIC_START_MESSAGE["goniometer"].items(): + assert start_value == message["goniometer"][axis] + message.pop("goniometer") + reduced_start_message = BASIC_START_MESSAGE.copy() + reduced_start_message.pop("goniometer") + for f in ALL_FIELDS: + reduced_start_message.pop(f) + assert message == reduced_start_message + assert not any(f in message for f in ALL_FIELDS) + + +def test_begin_series_message_all(stream: EigerStream2) -> None: + settings = EigerSettings() + stream.begin_series(settings, TEST_SERIES_ID, "all") + + message = cbor2.loads(list(stream.consume_data())[0]) + + for axis, start_value in BASIC_START_MESSAGE["goniometer"].items(): + assert start_value == message["goniometer"][axis] + + message.pop("goniometer") + reduced_start_message = BASIC_START_MESSAGE.copy() + reduced_start_message.pop("goniometer") + # the ALL_FIELDS entries get set to default value, overwriting initial message + for f in ALL_FIELDS: + assert f in message + message.pop(f) + reduced_start_message.pop(f) + + assert message == reduced_start_message + + +def test_insert_image_produces_correct_message(stream: EigerStream2) -> None: + for i in range(2): + image = Image.create_dummy_image(i, (X_SIZE, Y_SIZE)) + + stream.insert_image(image, TEST_SERIES_ID) + message = cbor2.loads(list(stream.consume_data())[0]) + + # Image data is too big to compare - just sanity check size + assert message["data"]["threshold_1"].value[0] == [Y_SIZE, X_SIZE] + del message["data"] + + # Check image_id and remove it to compare against generic expected message + assert message["image_id"] == i + del message["image_id"] + + assert message == IMAGE_MESSAGE + + +def test_end_series_produces_correct_message(stream: EigerStream2) -> None: + stream.end_series(TEST_SERIES_ID) + + message = stream.consume_data() + message = cbor2.loads(list(stream.consume_data())[0]) + + assert message == END_MESSAGE + + +def test_data_buffered(stream: EigerStream2) -> None: + settings = EigerSettings() + image = Image.create_dummy_image(0, (X_SIZE, Y_SIZE)) + + stream.begin_series(settings, TEST_SERIES_ID, "basic") + stream.insert_image(image, TEST_SERIES_ID) + stream.insert_image(image, TEST_SERIES_ID) + stream.insert_image(image, TEST_SERIES_ID) + stream.end_series(TEST_SERIES_ID) + + messages = [cbor2.loads(b) for b in stream.consume_data()] + + assert [m["type"] for m in messages] == ["start", "image", "image", "image", "end"] diff --git a/tests/eiger/test_eiger_system.py b/tests/eiger/test_eiger_system.py index d77a13ff..7dd20b95 100644 --- a/tests/eiger/test_eiger_system.py +++ b/tests/eiger/test_eiger_system.py @@ -87,7 +87,7 @@ async def get_status(status, expected): json={"value": "enabled"}, timeout=REQUEST_TIMEOUT, ) as response: - assert ["mode"] == (await response.json()) + assert [] == (await response.json()) # Test filewriter, monitor and stream endpoints async with session.get( @@ -108,7 +108,7 @@ async def get_status(status, expected): json={"value": "enabled"}, timeout=REQUEST_TIMEOUT, ) as response: - assert ["mode"] == (await response.json()) + assert [] == (await response.json()) async with session.get( MONITOR_URL + "status/error", @@ -128,7 +128,7 @@ async def get_status(status, expected): json={"value": "enabled"}, timeout=REQUEST_TIMEOUT, ) as response: - assert ["mode"] == (await response.json()) + assert [] == (await response.json()) async with session.put( DETECTOR_URL + "config/threshold/1/energy", @@ -136,7 +136,13 @@ async def get_status(status, expected): json={"value": 6829}, timeout=REQUEST_TIMEOUT, ) as response: - assert ["energy"] == (await response.json()) + assert [ + "flatfield", + "threshold/1/energy", + "threshold/1/flatfield", + "threshold/2/flatfield", + "threshold_energy", + ] == (await response.json()) async with session.get( DETECTOR_URL + "config/threshold/1/energy", @@ -151,7 +157,7 @@ async def get_status(status, expected): json={"value": "enabled"}, timeout=REQUEST_TIMEOUT, ) as response: - assert ["mode"] == (await response.json()) + assert ["difference_mode"] == (await response.json()) async with session.get( DETECTOR_URL + "config/threshold/difference/mode",