From 50351d026ca17b47ecd91378465a16be9371dcfd Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 4 May 2026 15:28:16 +0200 Subject: [PATCH 1/6] feat: scan interlock actor --- bec_lib/bec_lib/bl_state_manager.py | 12 +++- bec_lib/bec_lib/builtin_actor_hli.py | 35 ++++++++++ bec_lib/bec_lib/client.py | 2 + bec_lib/bec_lib/endpoints.py | 9 +++ bec_lib/bec_lib/messages.py | 10 ++- bec_server/bec_server/actors/actor.py | 57 ++++++++++++++- .../actors/builtin_actor_manager.py | 70 +++++++++++++++++++ .../bec_server/actors/scan_interlock.py | 38 ++++++++++ .../bec_server/scan_server/scan_server.py | 7 ++ 9 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 bec_lib/bec_lib/builtin_actor_hli.py create mode 100644 bec_server/bec_server/actors/builtin_actor_manager.py create mode 100644 bec_server/bec_server/actors/scan_interlock.py diff --git a/bec_lib/bec_lib/bl_state_manager.py b/bec_lib/bec_lib/bl_state_manager.py index 8204dcb0d..ae2a5d364 100644 --- a/bec_lib/bec_lib/bl_state_manager.py +++ b/bec_lib/bec_lib/bl_state_manager.py @@ -48,7 +48,7 @@ class BeamlineStateGet(TypedDict): TypedDict for the return value of the get method of a beamline state client. """ - status: str + status: messages.BlStateStatus label: str @@ -200,6 +200,16 @@ def delete(self, state_name: str) -> None: self._delete_state(state_name) self._publish_states() + def get_status_by_name(self, name: str) -> messages.BlStateStatus | None: + """ + Get current value of a given state, or None if it does not exist. + Args: + state_name (str): The name of the state for which to get the value. + """ + if not isinstance(state := getattr(self, name, None), BeamlineStateClientBase): + return + return state.get()["status"] + def show_all(self): """ Pretty print all beamline states using rich. diff --git a/bec_lib/bec_lib/builtin_actor_hli.py b/bec_lib/bec_lib/builtin_actor_hli.py new file mode 100644 index 000000000..e85bd384d --- /dev/null +++ b/bec_lib/bec_lib/builtin_actor_hli.py @@ -0,0 +1,35 @@ +from typing import TYPE_CHECKING + +from bec_lib.endpoints import MessageEndpoints +from bec_lib.messages import BuiltinActorStateChangeMessage + +if TYPE_CHECKING: + from bec_lib.client import BECClient + +VAR_PREFIX = "_BuiltinActors" + + +def builtin_actor_enabled_var(actor_name: str): + return f"{VAR_PREFIX}/enabled/{actor_name}" + + +class BuiltinActorHli: + def __init__(self, client: "BECClient") -> None: + self._client = client + + def _notify(self, actor_name): + self._client.connector.send( + MessageEndpoints.builtin_actor_notification(), + BuiltinActorStateChangeMessage(actor_name=actor_name), + ) + + def check_enabled(self, actor_name: str): + return bool(self._client.get_global_var(builtin_actor_enabled_var(actor_name))) + + def set_enabled(self, actor_name: str): + self._client.set_global_var(builtin_actor_enabled_var(actor_name), True) + self._notify(actor_name) + + def set_disabled(self, actor_name: str): + self._client.set_global_var(builtin_actor_enabled_var(actor_name), False) + self._notify(actor_name) diff --git a/bec_lib/bec_lib/client.py b/bec_lib/bec_lib/client.py index 689f41f1b..d09b6d7f0 100644 --- a/bec_lib/bec_lib/client.py +++ b/bec_lib/bec_lib/client.py @@ -21,6 +21,7 @@ from bec_lib.alarm_handler import AlarmHandler, Alarms from bec_lib.bec_service import BECService from bec_lib.bl_state_manager import BeamlineStateManager +from bec_lib.builtin_actor_hli import BuiltinActorHli from bec_lib.callback_handler import CallbackHandler, EventType from bec_lib.config_helper import ConfigHelperUser from bec_lib.dap_plugins import DAPPlugins @@ -163,6 +164,7 @@ def __init__( self._system_user = "" self.beamline_states = None self.messaging: MessagingContainer = None # type: ignore + self.builtin_actors = BuiltinActorHli(self) def __new__(cls, *args, forced=False, **kwargs): if forced or BECClient._client is None: diff --git a/bec_lib/bec_lib/endpoints.py b/bec_lib/bec_lib/endpoints.py index 930e3bbb0..332b3650f 100644 --- a/bec_lib/bec_lib/endpoints.py +++ b/bec_lib/bec_lib/endpoints.py @@ -1679,6 +1679,15 @@ def actor_request_response() -> EndpointInfo: message_op=MessageOp.SEND, ) + @staticmethod + def builtin_actor_notification() -> EndpointInfo: + endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/state_change" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.BuiltinActorStateChangeMessage, + message_op=MessageOp.SEND, + ) + @staticmethod def gui_registry_state(gui_id: str): """ diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index 76d351568..a900d1c3f 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -1980,6 +1980,9 @@ class GameLeaderboardMessage(BECMessage): leaderboard: list[GameScoreMessage] +BlStateStatus = Literal["valid", "invalid", "warning", "unknown"] + + class BeamlineStateMessage(BECMessage): """ Message for beamline state updates @@ -1992,7 +1995,7 @@ class BeamlineStateMessage(BECMessage): msg_type: ClassVar[str] = "beamline_state_message" name: str - status: Literal["valid", "invalid", "warning", "unknown"] + status: BlStateStatus label: str timestamp: float = Field(default_factory=time.time) @@ -2026,6 +2029,11 @@ class AvailableBeamlineStatesMessage(BECMessage): states: list[BeamlineStateConfig] +class BuiltinActorStateChangeMessage(BECMessage): + msg_type: ClassVar[str] = "actor_start_request" + actor_name: str + + class ActorStartRequestMessage(BECMessage): """Specify an actor class by module and name, to be instantiated and started by the actor manager.""" diff --git a/bec_server/bec_server/actors/actor.py b/bec_server/bec_server/actors/actor.py index e59bca1c3..70cf577d8 100644 --- a/bec_server/bec_server/actors/actor.py +++ b/bec_server/bec_server/actors/actor.py @@ -3,12 +3,13 @@ import time from abc import ABC, abstractmethod from threading import Event +from typing import Callable from bec_lib.actors import ActorActionTable from bec_lib.client import BECClient from bec_lib.endpoints import EndpointInfo, MessageEndpoints from bec_lib.logger import bec_logger -from bec_lib.messages import ProcedureWorkerStatus +from bec_lib.messages import BeamlineStateMessage, BlStateStatus, ProcedureWorkerStatus from bec_server.procedures.oop_worker_base import push_status logger = bec_logger.logger @@ -70,11 +71,15 @@ def __init__(self, client: BECClient, name: str, exec_id: str): logger.info(f"Setting up {self.__class__.__name__}: {self._endpoints}.") for endpoint in self._endpoints: logger.info(f"Connecting {self.__class__.__name__} to '{endpoint.endpoint}'") - client.connector.register(endpoint, cb=self.evaluate) + for cb in self.default_monitor_callbacks(): + client.connector.register(endpoint, cb=cb) def default_monitor_endpoints(self) -> set[EndpointInfo]: return set() + def default_monitor_callbacks(self) -> list[Callable]: + return [self.evaluate] + def evaluate(self, *_, **__): if (now := time.monotonic()) < self.last_evaluated + self.min_delay_s: return @@ -90,6 +95,54 @@ def run(self): self.push_status(ProcedureWorkerStatus.IDLE) +class BlStateActor(SubscriptionActor): + """ + Base for actors which respond to changes in beamline states. + + If all current values of states in state_table match the value in the table, + self.all_match_action() is called. If not, self.some_mismatch_action() is called. + """ + + state_table: dict[str, BlStateStatus] + + def __init__(self, client: BECClient, name: str, exec_id: str): + self.action_table = { + self.all_states_match: self.all_match_action, + self.not_all_states_match: self.some_mismatch_action, + } + super().__init__(client, name, exec_id) + self.state_cache: dict[str, BlStateStatus] = {} + for state in self.state_table: + status = self.client.beamline_states.get_status_by_name(state) + if status is None: + logger.warning(f"Beamline state actor could not get the status of {state}") + continue + self.state_cache[state] = status + + def all_states_match(self, client: BECClient): + for state, status in self.state_table.items(): + if self.state_cache.get(state) != status: + return False + return True + + def not_all_states_match(self, client: BECClient): + return not self.all_states_match(client) + + def all_match_action(self, client: BECClient): + pass + + def some_mismatch_action(self, client: BECClient): + pass + + def default_monitor_endpoints(self) -> set[EndpointInfo]: + return {MessageEndpoints.beamline_state(state) for state in self.state_table} + + def evaluate(self, msg_dict: dict): + msg: BeamlineStateMessage = msg_dict["data"] + self.state_cache[msg.name] = msg.status + return super().evaluate() + + class PollingActor(ActorBase): """An actor which evaluates its conditions after a certain time interval.""" diff --git a/bec_server/bec_server/actors/builtin_actor_manager.py b/bec_server/bec_server/actors/builtin_actor_manager.py new file mode 100644 index 000000000..a0010aee2 --- /dev/null +++ b/bec_server/bec_server/actors/builtin_actor_manager.py @@ -0,0 +1,70 @@ +from threading import Event, Thread + +from bec_lib.client import BECClient, ServiceConfig +from bec_lib.connector import MessageObject +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.messages import BuiltinActorStateChangeMessage +from bec_server.actors.actor import ActorBase +from bec_server.actors.scan_interlock import ScanInterlockActor + +logger = bec_logger.logger + + +class BuiltinActorManager: + """A simple manager for builtin actors which are always available - only handles enabling and + disabling""" + + def __init__(self, bootstrap_server: str) -> None: + host, port = bootstrap_server.split(":") + self._client = BECClient( + config=ServiceConfig(config={"redis": {"host": host, "port": port}}) + ) + self._client.start() + self._actors_threads_and_stops: dict[str, tuple[ActorBase, Thread, Event]] = {} + self._builtin_actors = {cls.__name__: cls for cls in (ScanInterlockActor,)} + self._start_all() + self._client.connector.register( + MessageEndpoints.builtin_actor_notification(), cb=self._on_state_changed + ) + + def _on_state_changed(self, msg_obj: MessageObject): + msg: BuiltinActorStateChangeMessage = msg_obj.value # type:ignore + logger.info(f"Received state change notification {msg.actor_name}") + if msg.actor_name not in self._builtin_actors: + logger.error(f"Actor {msg.actor_name} does not exist!") + return + if self._client.builtin_actors.check_enabled(msg.actor_name): + self._start_actor(self._builtin_actors[msg.actor_name]) + else: + self._stop_actor(msg.actor_name) + + def _start_all(self): + for actor_class in self._builtin_actors.values(): + if self._client.builtin_actors.check_enabled(actor_class.__name__): + self._start_actor(actor_class) + + def _start_actor(self, actor_class: type[ActorBase]): + name = actor_class.__name__ + logger.info(f"Starting {name}") + if name in self._actors_threads_and_stops: + logger.warning(f"Actor {name} is already active!") + return + actor = actor_class(self._client, name=name, exec_id=name) + t = Thread(target=actor.run) + self._actors_threads_and_stops[name] = (actor, t, actor.stop_event) + t.start() + + def _stop_actor(self, actor_name: str): + logger.info(f"Stopping {actor_name}") + if (entry := self._actors_threads_and_stops.get(actor_name)) is None: + logger.warning(f"Actor {actor_name} is not active!") + return + _, t, event = entry + event.set() + t.join() + + def shutdown(self): + for actor in self._actors_threads_and_stops: + self._stop_actor(actor) + self._client.shutdown() diff --git a/bec_server/bec_server/actors/scan_interlock.py b/bec_server/bec_server/actors/scan_interlock.py new file mode 100644 index 000000000..24831bfc1 --- /dev/null +++ b/bec_server/bec_server/actors/scan_interlock.py @@ -0,0 +1,38 @@ +from uuid import uuid4 + +from bec_lib.client import BECClient +from bec_server.actors.actor import BlStateActor + + +class ScanInterlockActor(BlStateActor): + """Sets a scan lock on the primary queue if any of the the states in the state_table don't match + the required value. Removes the lock if all of them match.""" + + state_table = {"samx_within_limits": "valid"} + + def __init__(self, client: BECClient, name: str, exec_id: str): + super().__init__(client, name, exec_id) + self.lock_id: str | None = None + + def some_mismatch_action(self, client: BECClient): + if self.client.queue is None or self.lock_id is not None: + return + self.lock_id = str(uuid4()) + self.client.queue.add_queue_lock( + queue="primary", reason="ScanInterlockActor", lock_id=self.lock_id + ) + + def all_match_action(self, client: BECClient): + self._unlock() + + def _unlock(self): + if self.client.queue is None or self.lock_id is None: + return + try: + self.client.queue.remove_queue_lock(queue="primary", lock_id=self.lock_id) + finally: + self.lock_id = None + + def run(self): + super().run() + self._unlock() diff --git a/bec_server/bec_server/scan_server/scan_server.py b/bec_server/bec_server/scan_server/scan_server.py index a205637e4..24901355d 100644 --- a/bec_server/bec_server/scan_server/scan_server.py +++ b/bec_server/bec_server/scan_server/scan_server.py @@ -10,6 +10,8 @@ from bec_lib.logger import bec_logger from bec_lib.scan_number_container import ScanNumberContainer from bec_lib.service_config import ServiceConfig +from bec_server.actors.builtin_actor_manager import BuiltinActorManager +from bec_server.actors.manager import ActorManager from bec_server.procedures.container_utils import podman_available from bec_server.procedures.container_worker import ContainerProcedureWorker from bec_server.procedures.manager import ProcedureManager @@ -40,6 +42,7 @@ def __init__(self, config: ServiceConfig, connector_cls: type[RedisConnector]): self._start_procedure_manager( use_subprocess_proc_worker=config.model.procedures.use_subprocess_worker ) + self._start_actor_managers() self.status = messages.BECStatus.RUNNING self.beamline_states = None self._start_beamline_state_manager() @@ -83,6 +86,10 @@ def _start_procedure_manager(self, use_subprocess_proc_worker: bool = False): ) self.proc_manager = ProcedureManager(self.bootstrap_server, procedure_worker) + def _start_actor_managers(self): + self.actor_manager = ActorManager(self.bootstrap_server) + self.builtin_actor_manager = BuiltinActorManager(self.bootstrap_server) + def _alarm_callback(self, msg): msg = msg.value queue = msg.metadata.get("queue", "primary") From cba2e90c9f32d42517e4049be8d7c96b1edfe8dc Mon Sep 17 00:00:00 2001 From: perl_d Date: Fri, 8 May 2026 17:14:22 +0200 Subject: [PATCH 2/6] feat: add and remove states from interlock --- .../tests/end-2-end/test_actors_e2e.py | 100 ++++++++++++ bec_lib/bec_lib/builtin_actor_hli.py | 69 ++++++++- bec_lib/bec_lib/endpoints.py | 18 +++ bec_lib/bec_lib/messages.py | 22 +++ bec_server/bec_server/actors/actor.py | 27 ++-- .../actors/builtin_actor_manager.py | 5 +- .../bec_server/actors/scan_interlock.py | 45 +++++- .../bec_server/scan_server/scan_server.py | 2 + .../bec_server/scan_server/tests/utils.py | 4 + .../test_builtin_actor_manager.py | 145 ++++++++++++++++++ .../tests_scan_server/test_scan_interlock.py | 140 +++++++++++++++++ pytest_bec_e2e/pytest_bec_e2e/plugin.py | 1 + 12 files changed, 563 insertions(+), 15 deletions(-) create mode 100644 bec_ipython_client/tests/end-2-end/test_actors_e2e.py create mode 100644 bec_server/tests/tests_scan_server/test_builtin_actor_manager.py create mode 100644 bec_server/tests/tests_scan_server/test_scan_interlock.py diff --git a/bec_ipython_client/tests/end-2-end/test_actors_e2e.py b/bec_ipython_client/tests/end-2-end/test_actors_e2e.py new file mode 100644 index 000000000..c28c0a63c --- /dev/null +++ b/bec_ipython_client/tests/end-2-end/test_actors_e2e.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import pytest + +from bec_lib.bl_states import DeviceWithinLimitsStateConfig +from bec_lib.builtin_actor_hli import BuiltinActorHli +from bec_lib.logger import bec_logger +from bec_lib.messages import ScanQueueStatus, ScanQueueStatusMessage + +logger = bec_logger.logger + +if TYPE_CHECKING: # pragma: no cover + from ophyd_devices.sim.sim_test_devices import SimDeviceWithSignalDelay + + from bec_ipython_client.main import BECIPythonClient + + +@pytest.fixture +def bec_with_delay_device(bec_ipython_client_fixture): + bec = bec_ipython_client_fixture + bec.builtin_actors.scan_interlock.enabled = True + dev = bec.device_manager.devices + config = { + "ramp_up": { + "deviceClass": "ophyd_devices.sim.sim_test_devices.SimDeviceWithSignalDelay", + "deviceConfig": {}, + "readoutPriority": "baseline", + "deviceTags": {"signal delay", "ramp up"}, + "enabled": True, + "readOnly": False, + } + } + bec.device_manager.config_helper.send_config_request(action="add", config=config) + dev.ramp_up.min_val.put(0) + dev.ramp_up.max_val.put(400) + dev.ramp_up.value.put(400) + dev.ramp_up.delay.put(1) + dev.ramp_up.rampup_time.put(3) + yield bec, dev.ramp_up + dev.ramp_up.stop() + + +@pytest.fixture +def ramp_up_bl_state(bec_with_delay_device): + bec, _ = bec_with_delay_device + ramp_up_state_config = DeviceWithinLimitsStateConfig( + name="beam_intensity_sufficient", device="ramp_up", low_limit=200 + ) + bec.beamline_states.add(ramp_up_state_config) + yield bec_with_delay_device + bec.beamline_states.delete("beam_intensity_sufficient") + bec.builtin_actors.scan_interlock.clear_all() + + +def _wait_for(pred, timeout=10, retries=100): + for i in range(retries): + if pred(): + return True + time.sleep(timeout / retries) + return pred() + + +# pylint: disable=protected-accesstest +@pytest.mark.timeout(100) +def test_scan_interlock( + ramp_up_bl_state: tuple[BECIPythonClient, SimDeviceWithSignalDelay], bec_with_delay_device +): + bec, ramp_up = ramp_up_bl_state + actors: BuiltinActorHli = bec.builtin_actors + assert bec.beamline_states.beam_intensity_sufficient.get()["status"] == "valid" + assert actors.scan_interlock.enabled + current_q_status_msg: ScanQueueStatus = bec.queue.queue_storage.current_scan_queue["primary"] + assert current_q_status_msg.status == "RUNNING" + actors.scan_interlock.add_state_to_interlock("beam_intensity_sufficient", "valid") + + assert _wait_for(lambda: "beam_intensity_sufficient" in actors.scan_interlock.states_watched) + + ramp_up.start.set(1) + + def _beam_down(): + return (ramp_up.value.get() < 200) and bec.beamline_states.beam_intensity_sufficient.get()[ + "status" + ] == "invalid" + + def _beam_up(): + return (ramp_up.value.get() > 200) and bec.beamline_states.beam_intensity_sufficient.get()[ + "status" + ] == "valid" + + assert _wait_for(_beam_down) + assert _wait_for( + lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "RUNNING" + ) + assert _wait_for(_beam_up) + assert _wait_for( + lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "RUNNING" + ) diff --git a/bec_lib/bec_lib/builtin_actor_hli.py b/bec_lib/bec_lib/builtin_actor_hli.py index e85bd384d..46ddb2e4b 100644 --- a/bec_lib/bec_lib/builtin_actor_hli.py +++ b/bec_lib/bec_lib/builtin_actor_hli.py @@ -1,7 +1,11 @@ from typing import TYPE_CHECKING from bec_lib.endpoints import MessageEndpoints -from bec_lib.messages import BuiltinActorStateChangeMessage +from bec_lib.messages import ( + BlStateStatus, + BuiltinActorStateChangeMessage, + ScanInterlockModifyStateTableMessage, +) if TYPE_CHECKING: from bec_lib.client import BECClient @@ -13,9 +17,72 @@ def builtin_actor_enabled_var(actor_name: str): return f"{VAR_PREFIX}/enabled/{actor_name}" +class ScanInterlockHli: + def __init__(self, client: "BECClient", parent: "BuiltinActorHli") -> None: + self._client = client + self._parent = parent + self._actor_name = "ScanInterlockActor" + + @property + def enabled(self): + return self._parent.check_enabled(self._actor_name) + + @enabled.setter + def enabled(self, enabled: bool): + if enabled: + self._parent.set_enabled(self._actor_name) + else: + self._parent.set_disabled(self._actor_name) + + @property + def states_watched(self): + """Return the table of beamline states currently watched by the scan interlock actor""" + return self._client.connector.get(MessageEndpoints.scan_interlock_states()).states_watched + + def add_state_to_interlock(self, state_name: str, required_value: BlStateStatus): + """ + Add a beamline state and its status to watch to the ScanInterlockActor. If the state no + longer has this status, an interlock will be placed on the primary scan queue. + Args: + state_name (str): the state to watch + status (Literal["valid","invalid","warning","unknown"]): the status to watch for. + """ + self._client.connector.xadd( + MessageEndpoints.modify_interlock_table(), + { + "data": ScanInterlockModifyStateTableMessage( + action="add", state_name=state_name, status=required_value + ) + }, + ) + + def remove_state_from_interlock(self, state_name: str): + """ + No longer watch the given state for the scan interlock. + Args: + state_name (str): the state to watch + """ + self._client.connector.xadd( + MessageEndpoints.modify_interlock_table(), + {"data": ScanInterlockModifyStateTableMessage(action="remove", state_name=state_name)}, + ) + + def clear_all(self): + """ + Remove all beamline states from the interlock watch table + Args: + state_name (str): the state to watch + """ + self._client.connector.xadd( + MessageEndpoints.modify_interlock_table(), + {"data": ScanInterlockModifyStateTableMessage(action="remove_all")}, + ) + + class BuiltinActorHli: def __init__(self, client: "BECClient") -> None: self._client = client + self.scan_interlock = ScanInterlockHli(self._client, self) def _notify(self, actor_name): self._client.connector.send( diff --git a/bec_lib/bec_lib/endpoints.py b/bec_lib/bec_lib/endpoints.py index 332b3650f..079f759cb 100644 --- a/bec_lib/bec_lib/endpoints.py +++ b/bec_lib/bec_lib/endpoints.py @@ -1688,6 +1688,24 @@ def builtin_actor_notification() -> EndpointInfo: message_op=MessageOp.SEND, ) + @staticmethod + def modify_interlock_table() -> EndpointInfo: + endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/scan_interlock/table_mod" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanInterlockModifyStateTableMessage, + message_op=MessageOp.STREAM, + ) + + @staticmethod + def scan_interlock_states() -> EndpointInfo: + endpoint = f"{EndpointType.INFO.value}/actor/builtin/scan_interlock/current_states_watched" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanInterlockStateTableContent, + message_op=MessageOp.KEY_VALUE, + ) + @staticmethod def gui_registry_state(gui_id: str): """ diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index a900d1c3f..aa4eb9d4e 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -2034,6 +2034,28 @@ class BuiltinActorStateChangeMessage(BECMessage): actor_name: str +class ScanInterlockModifyStateTableMessage(BECMessage): + msg_type: ClassVar[str] = "scan_interlock_modify_state_table" + action: Literal["add", "remove", "remove_all"] + state_name: str | None = None + status: BlStateStatus | None = None + + @model_validator(mode="after") + def _validate(self): + if self.action == "add" and (self.status is None or self.state_name is None): + raise ValueError("Must specify a name and status when adding a state") + if self.action in ["remove", "remove_all"] and self.status is not None: + raise ValueError("May not specify a status when removing a state") + if self.action == "remove_all" and self.state_name is not None: + raise ValueError("May not specify a state when removing all states") + return self + + +class ScanInterlockStateTableContent(BECMessage): + msg_type: ClassVar[str] = "scan_interlock_state_table_content" + states_watched: dict[str, BlStateStatus] + + class ActorStartRequestMessage(BECMessage): """Specify an actor class by module and name, to be instantiated and started by the actor manager.""" diff --git a/bec_server/bec_server/actors/actor.py b/bec_server/bec_server/actors/actor.py index 70cf577d8..4e336b7f4 100644 --- a/bec_server/bec_server/actors/actor.py +++ b/bec_server/bec_server/actors/actor.py @@ -2,7 +2,7 @@ import time from abc import ABC, abstractmethod -from threading import Event +from threading import Event, RLock from typing import Callable from bec_lib.actors import ActorActionTable @@ -106,23 +106,30 @@ class BlStateActor(SubscriptionActor): state_table: dict[str, BlStateStatus] def __init__(self, client: BECClient, name: str, exec_id: str): + self.state_table_lock = RLock() self.action_table = { self.all_states_match: self.all_match_action, self.not_all_states_match: self.some_mismatch_action, } super().__init__(client, name, exec_id) self.state_cache: dict[str, BlStateStatus] = {} - for state in self.state_table: - status = self.client.beamline_states.get_status_by_name(state) - if status is None: - logger.warning(f"Beamline state actor could not get the status of {state}") - continue - self.state_cache[state] = status + self._update_cache() + + def _update_cache(self): + with self.state_table_lock: + for state in self.state_table: + status = self.client.beamline_states.get_status_by_name(state) + if status is None: + logger.warning(f"Beamline state actor could not get the status of {state}") + continue + self.state_cache[state] = status def all_states_match(self, client: BECClient): - for state, status in self.state_table.items(): - if self.state_cache.get(state) != status: - return False + with self.state_table_lock: + for state, status in self.state_table.items(): + if self.state_cache.get(state) != status: + logger.info(f"Beamline state {state} out of bounds: expected={status}") + return False return True def not_all_states_match(self, client: BECClient): diff --git a/bec_server/bec_server/actors/builtin_actor_manager.py b/bec_server/bec_server/actors/builtin_actor_manager.py index a0010aee2..954d238e8 100644 --- a/bec_server/bec_server/actors/builtin_actor_manager.py +++ b/bec_server/bec_server/actors/builtin_actor_manager.py @@ -18,7 +18,8 @@ class BuiltinActorManager: def __init__(self, bootstrap_server: str) -> None: host, port = bootstrap_server.split(":") self._client = BECClient( - config=ServiceConfig(config={"redis": {"host": host, "port": port}}) + config=ServiceConfig(config={"redis": {"host": host, "port": port}}), + name="BuiltinActors", ) self._client.start() self._actors_threads_and_stops: dict[str, tuple[ActorBase, Thread, Event]] = {} @@ -29,7 +30,7 @@ def __init__(self, bootstrap_server: str) -> None: ) def _on_state_changed(self, msg_obj: MessageObject): - msg: BuiltinActorStateChangeMessage = msg_obj.value # type:ignore + msg: BuiltinActorStateChangeMessage = msg_obj.value # type: ignore logger.info(f"Received state change notification {msg.actor_name}") if msg.actor_name not in self._builtin_actors: logger.error(f"Actor {msg.actor_name} does not exist!") diff --git a/bec_server/bec_server/actors/scan_interlock.py b/bec_server/bec_server/actors/scan_interlock.py index 24831bfc1..2f0af404b 100644 --- a/bec_server/bec_server/actors/scan_interlock.py +++ b/bec_server/bec_server/actors/scan_interlock.py @@ -1,18 +1,59 @@ from uuid import uuid4 from bec_lib.client import BECClient +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.messages import ScanInterlockModifyStateTableMessage, ScanInterlockStateTableContent from bec_server.actors.actor import BlStateActor +logger = bec_logger.logger + class ScanInterlockActor(BlStateActor): """Sets a scan lock on the primary queue if any of the the states in the state_table don't match the required value. Removes the lock if all of them match.""" - state_table = {"samx_within_limits": "valid"} - def __init__(self, client: BECClient, name: str, exec_id: str): + self.state_table = {} super().__init__(client, name, exec_id) self.lock_id: str | None = None + self._update_watched_states_in_redis() + self.client.connector.register( + MessageEndpoints.modify_interlock_table(), cb=self._on_state_modification + ) + + def _update_watched_states_in_redis(self): + self.client.connector.set( + MessageEndpoints.scan_interlock_states(), + ScanInterlockStateTableContent(states_watched=self.state_table), + ) + + def _on_state_modification(self, msg_dict: dict): + msg: ScanInterlockModifyStateTableMessage = msg_dict["data"] + with self.state_table_lock: + if msg.action == "add": + logger.info(f"Adding {msg.state_name} to the scan interlock actor") + if msg.state_name not in self.state_table: + self.client.connector.register( + MessageEndpoints.beamline_state(msg.state_name), cb=self.evaluate + ) + self.state_table[msg.state_name] = msg.status # type: ignore # msg is validated + elif msg.action == "remove_all": + for state in self.state_table: + self.client.connector.unregister(MessageEndpoints.beamline_state(state)) + self.state_table = {} + self.state_cache = {} + else: + logger.info(f"Removing {msg.state_name} from the scan interlock actor") + if msg.state_name in self.state_table: + self.client.connector.unregister( + MessageEndpoints.beamline_state(msg.state_name) + ) + del self.state_table[msg.state_name] + del self.state_cache[msg.state_name] + self._update_cache() + self._update_watched_states_in_redis() + super(BlStateActor, self).evaluate() def some_mismatch_action(self, client: BECClient): if self.client.queue is None or self.lock_id is not None: diff --git a/bec_server/bec_server/scan_server/scan_server.py b/bec_server/bec_server/scan_server/scan_server.py index 24901355d..52b85815f 100644 --- a/bec_server/bec_server/scan_server/scan_server.py +++ b/bec_server/bec_server/scan_server/scan_server.py @@ -122,6 +122,8 @@ def dataset_number(self, val: int): def shutdown(self, per_thread_timeout_s: float | None = None) -> None: """shutdown the scan server""" + self.builtin_actor_manager.shutdown() + self.actor_manager.shutdown() self.proc_manager.shutdown() self.device_manager.shutdown() self.queue_manager.shutdown() diff --git a/bec_server/bec_server/scan_server/tests/utils.py b/bec_server/bec_server/scan_server/tests/utils.py index 866eb4e3e..2e41da9ed 100644 --- a/bec_server/bec_server/scan_server/tests/utils.py +++ b/bec_server/bec_server/scan_server/tests/utils.py @@ -34,6 +34,10 @@ def __init__(self, device_manager: DMMock) -> None: self.scan_worker = WorkerMock() self.proc_manager = ProcManagerMock() + def _start_actor_managers(self): + self.actor_manager = ProcManagerMock() + self.builtin_actor_manager = ProcManagerMock() + def _start_metrics_emitter(self): pass diff --git a/bec_server/tests/tests_scan_server/test_builtin_actor_manager.py b/bec_server/tests/tests_scan_server/test_builtin_actor_manager.py new file mode 100644 index 000000000..9b821269f --- /dev/null +++ b/bec_server/tests/tests_scan_server/test_builtin_actor_manager.py @@ -0,0 +1,145 @@ +from threading import Event +from unittest.mock import MagicMock, patch + +import pytest + +from bec_server.actors.builtin_actor_manager import BuiltinActorManager + + +class DummyActor: + def __init__(self, client, name, exec_id): + self.client = client + self.name = name + self.exec_id = exec_id + self.stop_event = Event() + + def run(self): + pass + + +@pytest.fixture +def mocked_manager(): + with ( + patch("bec_server.actors.builtin_actor_manager.BECClient") as mock_client_cls, + patch.object(BuiltinActorManager, "_start_all"), + ): + mock_client = MagicMock() + mock_client.connector = MagicMock() + mock_client.builtin_actors = MagicMock() + mock_client_cls.return_value = mock_client + + manager = BuiltinActorManager("localhost:6379") + yield manager, mock_client + + +def test_init_registers_callback(mocked_manager): + manager, mock_client = mocked_manager + + mock_client.start.assert_called_once() + + mock_client.connector.register.assert_called_once() + args, kwargs = mock_client.connector.register.call_args + + assert "cb" in kwargs + assert kwargs["cb"] == manager._on_state_changed + + +def test_start_actor_starts_thread(mocked_manager): + manager, _ = mocked_manager + + with patch("bec_server.actors.builtin_actor_manager.Thread") as mock_thread_cls: + mock_thread = MagicMock() + mock_thread_cls.return_value = mock_thread + + manager._start_actor(DummyActor) + + assert "DummyActor" in manager._actors_threads_and_stops + + actor, thread, stop_event = manager._actors_threads_and_stops["DummyActor"] + + assert isinstance(actor, DummyActor) + assert thread == mock_thread + assert stop_event == actor.stop_event + + mock_thread.start.assert_called_once() + + +def test_start_actor_does_not_duplicate(mocked_manager): + manager, _ = mocked_manager + + with patch("bec_server.actors.builtin_actor_manager.Thread"): + manager._start_actor(DummyActor) + manager._start_actor(DummyActor) + + assert len(manager._actors_threads_and_stops) == 1 + + +def test_stop_actor_sets_event_and_joins(mocked_manager): + manager, _ = mocked_manager + + actor = DummyActor(None, "DummyActor", "DummyActor") + mock_thread = MagicMock() + + manager._actors_threads_and_stops["DummyActor"] = (actor, mock_thread, actor.stop_event) + + manager._stop_actor("DummyActor") + + assert actor.stop_event.is_set() + mock_thread.join.assert_called_once() + + +def test_stop_actor_missing_is_noop(mocked_manager): + manager, _ = mocked_manager + + # Should not raise + manager._stop_actor("MissingActor") + + +def test_on_state_changed_starts_enabled_actor(mocked_manager): + manager, mock_client = mocked_manager + + manager._builtin_actors = {"DummyActor": DummyActor} + + msg = MagicMock() + msg.value.actor_name = "DummyActor" + + mock_client.builtin_actors.check_enabled.return_value = True + + with patch.object(manager, "_start_actor") as mock_start: + manager._on_state_changed(msg) + + mock_start.assert_called_once_with(DummyActor) + + +def test_on_state_changed_unknown_actor(mocked_manager): + manager, _ = mocked_manager + + msg = MagicMock() + msg.value.actor_name = "UnknownActor" + + with ( + patch.object(manager, "_start_actor") as mock_start, + patch.object(manager, "_stop_actor") as mock_stop, + ): + manager._on_state_changed(msg) + + mock_start.assert_not_called() + mock_stop.assert_not_called() + + +def test_shutdown_stops_all_and_shuts_down_client(mocked_manager): + manager, mock_client = mocked_manager + + manager._actors_threads_and_stops = { + "Actor1": (MagicMock(), MagicMock(), Event()), + "Actor2": (MagicMock(), MagicMock(), Event()), + } + + with patch.object(manager, "_stop_actor") as mock_stop: + manager.shutdown() + + mock_stop.assert_any_call("Actor1") + mock_stop.assert_any_call("Actor2") + assert mock_stop.call_count == 2 + + mock_client.shutdown.assert_called_once() diff --git a/bec_server/tests/tests_scan_server/test_scan_interlock.py b/bec_server/tests/tests_scan_server/test_scan_interlock.py new file mode 100644 index 000000000..8d8778ddc --- /dev/null +++ b/bec_server/tests/tests_scan_server/test_scan_interlock.py @@ -0,0 +1,140 @@ +import threading +from unittest.mock import MagicMock, patch + +import pytest + +from bec_server.actors.scan_interlock import ScanInterlockActor + + +@pytest.fixture +def mock_client(): + client = MagicMock() + client.connector = MagicMock() + client.queue = MagicMock() + return client + + +@pytest.fixture +def actor(mock_client): + + actor = ScanInterlockActor( + client=mock_client, name="ScanInterlockActor", exec_id="ScanInterlockActor" + ) + + actor.client = mock_client + actor.state_table = {} + actor.state_cache = {} + actor.state_table_lock = threading.RLock() + + return actor + + +class TestScanInterlockActor: + def test_update_watched_states_in_redis(self, actor, mock_client): + mock_client.connector.set.reset_mock() + with patch( + "bec_server.actors.scan_interlock.ScanInterlockStateTableContent" + ) as mock_content: + actor._update_watched_states_in_redis() + + mock_client.connector.set.assert_called_once() + mock_content.assert_called_once_with(states_watched=actor.state_table) + + def test_on_state_modification_remove_all(self, actor, mock_client): + actor.state_table = {"beam_ok": "valid", "vacuum_ok": "valid"} + actor.state_cache = {"beam_ok": "valid", "vacuum_ok": "valid"} + actor._update_cache = MagicMock() + + msg = MagicMock() + msg.action = "remove_all" + msg.state_name = None + msg.status = None + + with patch("bec_server.actors.actor.BlStateActor.evaluate"): + actor._on_state_modification({"data": msg}) + + assert actor.state_table == {} + assert actor.state_cache == {} + + assert mock_client.connector.unregister.call_count == 2 + + def test_on_state_modification_add(self, actor, mock_client): + actor._update_cache = MagicMock() + mock_client.connector.register.reset_mock() + msg = MagicMock(action="add", state_name="beam_ok", status="valid") + actor._on_state_modification({"data": msg}) + assert actor.state_table["beam_ok"] == "valid" + mock_client.connector.register.assert_called_once() + actor._update_cache.assert_called_once() + + def test_on_state_modification_remove(self, actor, mock_client): + actor.state_table["beam_ok"] = "valid" + actor.state_cache["beam_ok"] = "valid" + actor._update_cache = MagicMock() + + msg = MagicMock(action="remove", state_name="beam_ok", status=None) + + with patch("bec_server.actors.actor.BlStateActor.evaluate"): + actor._on_state_modification({"data": msg}) + + assert "beam_ok" not in actor.state_table + assert "beam_ok" not in actor.state_cache + + mock_client.connector.unregister.assert_called_once() + + def test_on_state_modification_remove_missing(self, actor, mock_client): + actor._update_cache = MagicMock() + msg = MagicMock(action="remove", state_name="missing", status=None) + + with patch("bec_server.actors.actor.BlStateActor.evaluate"): + actor._on_state_modification({"data": msg}) + + mock_client.connector.unregister.assert_not_called() + + def test_some_mismatch_action_adds_lock(self, actor, mock_client): + actor.lock_id = None + + with patch("bec_server.actors.scan_interlock.uuid4", return_value="uuid-1234"): + actor.some_mismatch_action(mock_client) + assert actor.lock_id == "uuid-1234" + mock_client.queue.add_queue_lock.assert_called_once_with( + queue="primary", reason="ScanInterlockActor", lock_id="uuid-1234" + ) + + def test_some_mismatch_action_skips_if_lock_exists(self, actor, mock_client): + actor.lock_id = "existing-lock" + actor.some_mismatch_action(mock_client) + mock_client.queue.add_queue_lock.assert_not_called() + + def test_some_mismatch_action_skips_if_no_queue(self, actor, mock_client): + actor.client.queue = None + actor.some_mismatch_action(mock_client) + assert actor.lock_id is None + + def test_all_match_action_unlocks(self, actor): + with patch.object(actor, "_unlock") as mock_unlock: + actor.all_match_action(actor.client) + + mock_unlock.assert_called_once() + + def test_unlock_removes_lock(self, actor, mock_client): + actor.lock_id = "lock-123" + actor._unlock() + mock_client.queue.remove_queue_lock.assert_called_once_with( + queue="primary", lock_id="lock-123" + ) + assert actor.lock_id is None + + def test_unlock_skips_without_lock(self, actor, mock_client): + actor.lock_id = None + actor._unlock() + mock_client.queue.remove_queue_lock.assert_not_called() + + def test_unlock_keeps_lock_if_remove_fails(self, actor, mock_client): + actor.lock_id = "lock-123" + mock_client.queue.remove_queue_lock.side_effect = RuntimeError("boom") + + with pytest.raises(RuntimeError): + actor._unlock() + + assert actor.lock_id is None diff --git a/pytest_bec_e2e/pytest_bec_e2e/plugin.py b/pytest_bec_e2e/pytest_bec_e2e/plugin.py index d1bab2190..61f3f492d 100644 --- a/pytest_bec_e2e/pytest_bec_e2e/plugin.py +++ b/pytest_bec_e2e/pytest_bec_e2e/plugin.py @@ -243,6 +243,7 @@ def bec_ipython_client_fixture(bec_ipython_client_with_demo_config): bec = bec_ipython_client_with_demo_config bec.queue.request_queue_reset() bec.queue.request_scan_continuation() + bec.builtin_actors.scan_interlock.enabled = False wait_for_empty_queue(bec) yield bec From e1eae9c838df2e20d17c710fb40b37b9e114db69 Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 19 May 2026 10:21:41 +0200 Subject: [PATCH 3/6] feat: interlock restarts scans --- bec_lib/bec_lib/configs/demo_config.yaml | 8 ++++ .../bec_server/actors/scan_interlock.py | 38 +++++++++++++------ .../tests_scan_server/test_scan_interlock.py | 1 + 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/bec_lib/bec_lib/configs/demo_config.yaml b/bec_lib/bec_lib/configs/demo_config.yaml index c1eaba96b..0580ddb9b 100644 --- a/bec_lib/bec_lib/configs/demo_config.yaml +++ b/bec_lib/bec_lib/configs/demo_config.yaml @@ -2160,3 +2160,11 @@ waveform: enabled: true readOnly: false softwareTrigger: true +ramp_up: + deviceClass: ophyd_devices.sim.sim_test_devices.SimDeviceWithSignalDelay + readoutPriority: "baseline" + deviceTags: + - signal delay + - ramp up + enabled: true + readOnly: false diff --git a/bec_server/bec_server/actors/scan_interlock.py b/bec_server/bec_server/actors/scan_interlock.py index 2f0af404b..77632b727 100644 --- a/bec_server/bec_server/actors/scan_interlock.py +++ b/bec_server/bec_server/actors/scan_interlock.py @@ -14,10 +14,16 @@ class ScanInterlockActor(BlStateActor): the required value. Removes the lock if all of them match.""" def __init__(self, client: BECClient, name: str, exec_id: str): - self.state_table = {} + self._LOCK_ID = "ScanInterlockActor" + states_msg: ScanInterlockStateTableContent | None = client.connector.get( + MessageEndpoints.scan_interlock_states() + ) + if states_msg is not None: + self.state_table = states_msg.states_watched + else: + self.state_table = {} + super().__init__(client, name, exec_id) - self.lock_id: str | None = None - self._update_watched_states_in_redis() self.client.connector.register( MessageEndpoints.modify_interlock_table(), cb=self._on_state_modification ) @@ -55,24 +61,34 @@ def _on_state_modification(self, msg_dict: dict): self._update_watched_states_in_redis() super(BlStateActor, self).evaluate() + @property + def mismatched_states(self): + """A list of all the states which are out of spec""" + with self.state_table_lock: + return [ + state_name + for state_name, expected_state in self.state_table.items() + if (current_state := self.state_cache.get(state_name)) is not None + and current_state != expected_state + ] + def some_mismatch_action(self, client: BECClient): - if self.client.queue is None or self.lock_id is not None: + if self.client.queue is None: return - self.lock_id = str(uuid4()) self.client.queue.add_queue_lock( - queue="primary", reason="ScanInterlockActor", lock_id=self.lock_id + queue="primary", + reason=f"Interlock for beamline states: {self.mismatched_states}", + lock_id=self._LOCK_ID, ) + self.client.queue.request_scan_restart() def all_match_action(self, client: BECClient): self._unlock() def _unlock(self): - if self.client.queue is None or self.lock_id is None: + if self.client.queue is None: return - try: - self.client.queue.remove_queue_lock(queue="primary", lock_id=self.lock_id) - finally: - self.lock_id = None + self.client.queue.remove_queue_lock(queue="primary", lock_id=self._LOCK_ID) def run(self): super().run() diff --git a/bec_server/tests/tests_scan_server/test_scan_interlock.py b/bec_server/tests/tests_scan_server/test_scan_interlock.py index 8d8778ddc..eb407ed7a 100644 --- a/bec_server/tests/tests_scan_server/test_scan_interlock.py +++ b/bec_server/tests/tests_scan_server/test_scan_interlock.py @@ -10,6 +10,7 @@ def mock_client(): client = MagicMock() client.connector = MagicMock() + client.connector.get.return_value = None client.queue = MagicMock() return client From 82c678ec10f5740f3b856482249d697c1e1be255 Mon Sep 17 00:00:00 2001 From: perl_d Date: Wed, 20 May 2026 14:21:28 +0200 Subject: [PATCH 4/6] test: test that scan is restarted --- .../tests/end-2-end/test_actors_e2e.py | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/bec_ipython_client/tests/end-2-end/test_actors_e2e.py b/bec_ipython_client/tests/end-2-end/test_actors_e2e.py index c28c0a63c..e9c6be442 100644 --- a/bec_ipython_client/tests/end-2-end/test_actors_e2e.py +++ b/bec_ipython_client/tests/end-2-end/test_actors_e2e.py @@ -8,7 +8,7 @@ from bec_lib.bl_states import DeviceWithinLimitsStateConfig from bec_lib.builtin_actor_hli import BuiltinActorHli from bec_lib.logger import bec_logger -from bec_lib.messages import ScanQueueStatus, ScanQueueStatusMessage +from bec_lib.messages import ScanQueueStatus logger = bec_logger.logger @@ -23,22 +23,11 @@ def bec_with_delay_device(bec_ipython_client_fixture): bec = bec_ipython_client_fixture bec.builtin_actors.scan_interlock.enabled = True dev = bec.device_manager.devices - config = { - "ramp_up": { - "deviceClass": "ophyd_devices.sim.sim_test_devices.SimDeviceWithSignalDelay", - "deviceConfig": {}, - "readoutPriority": "baseline", - "deviceTags": {"signal delay", "ramp up"}, - "enabled": True, - "readOnly": False, - } - } - bec.device_manager.config_helper.send_config_request(action="add", config=config) dev.ramp_up.min_val.put(0) dev.ramp_up.max_val.put(400) dev.ramp_up.value.put(400) - dev.ramp_up.delay.put(1) - dev.ramp_up.rampup_time.put(3) + dev.ramp_up.delay.put(2) + dev.ramp_up.rampup_time.put(2) yield bec, dev.ramp_up dev.ramp_up.stop() @@ -78,8 +67,6 @@ def test_scan_interlock( assert _wait_for(lambda: "beam_intensity_sufficient" in actors.scan_interlock.states_watched) - ramp_up.start.set(1) - def _beam_down(): return (ramp_up.value.get() < 200) and bec.beamline_states.beam_intensity_sufficient.get()[ "status" @@ -90,11 +77,25 @@ def _beam_up(): "status" ] == "valid" + ramp_up.start.set(1) + scan = bec.scans.line_scan( + "samx", -5, 5, steps=10, exp_time=0.5, relative=False, hide_report=True + ) + assert _wait_for(_beam_down) assert _wait_for( - lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "RUNNING" + lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "LOCKED" ) + assert scan.status == "STOPPED" assert _wait_for(_beam_up) assert _wait_for( lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "RUNNING" ) + + def second_scan_has_run(): + return ( + bec.history[-2].metadata["bec"]["status"] == "aborted" + and bec.history[-1].metadata["bec"]["status"] == "closed" + ) + + assert _wait_for(second_scan_has_run) From bd8a204d0723f589e762e4bdee335136f494ab67 Mon Sep 17 00:00:00 2001 From: perl_d Date: Wed, 20 May 2026 14:43:22 +0200 Subject: [PATCH 5/6] test: simplify scan interlock unit tests --- .../tests_scan_server/test_scan_interlock.py | 36 ++++--------------- 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/bec_server/tests/tests_scan_server/test_scan_interlock.py b/bec_server/tests/tests_scan_server/test_scan_interlock.py index eb407ed7a..9d1bdd13d 100644 --- a/bec_server/tests/tests_scan_server/test_scan_interlock.py +++ b/bec_server/tests/tests_scan_server/test_scan_interlock.py @@ -93,24 +93,18 @@ def test_on_state_modification_remove_missing(self, actor, mock_client): mock_client.connector.unregister.assert_not_called() def test_some_mismatch_action_adds_lock(self, actor, mock_client): - actor.lock_id = None - - with patch("bec_server.actors.scan_interlock.uuid4", return_value="uuid-1234"): - actor.some_mismatch_action(mock_client) - assert actor.lock_id == "uuid-1234" + actor.some_mismatch_action(mock_client) mock_client.queue.add_queue_lock.assert_called_once_with( - queue="primary", reason="ScanInterlockActor", lock_id="uuid-1234" + queue="primary", + reason="Interlock for beamline states: []", + lock_id="ScanInterlockActor", ) - def test_some_mismatch_action_skips_if_lock_exists(self, actor, mock_client): - actor.lock_id = "existing-lock" - actor.some_mismatch_action(mock_client) - mock_client.queue.add_queue_lock.assert_not_called() - def test_some_mismatch_action_skips_if_no_queue(self, actor, mock_client): + add_queue_lock = mock_client.queue.add_queue_lock actor.client.queue = None actor.some_mismatch_action(mock_client) - assert actor.lock_id is None + add_queue_lock.assert_not_called() def test_all_match_action_unlocks(self, actor): with patch.object(actor, "_unlock") as mock_unlock: @@ -119,23 +113,7 @@ def test_all_match_action_unlocks(self, actor): mock_unlock.assert_called_once() def test_unlock_removes_lock(self, actor, mock_client): - actor.lock_id = "lock-123" actor._unlock() mock_client.queue.remove_queue_lock.assert_called_once_with( - queue="primary", lock_id="lock-123" + queue="primary", lock_id="ScanInterlockActor" ) - assert actor.lock_id is None - - def test_unlock_skips_without_lock(self, actor, mock_client): - actor.lock_id = None - actor._unlock() - mock_client.queue.remove_queue_lock.assert_not_called() - - def test_unlock_keeps_lock_if_remove_fails(self, actor, mock_client): - actor.lock_id = "lock-123" - mock_client.queue.remove_queue_lock.side_effect = RuntimeError("boom") - - with pytest.raises(RuntimeError): - actor._unlock() - - assert actor.lock_id is None From f2bb265c1c8617db8813fc35592dc364dd54671a Mon Sep 17 00:00:00 2001 From: perl_d Date: Wed, 20 May 2026 15:04:39 +0200 Subject: [PATCH 6/6] fix: handle no interlock data in redis --- bec_lib/bec_lib/builtin_actor_hli.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bec_lib/bec_lib/builtin_actor_hli.py b/bec_lib/bec_lib/builtin_actor_hli.py index 46ddb2e4b..9f843b01a 100644 --- a/bec_lib/bec_lib/builtin_actor_hli.py +++ b/bec_lib/bec_lib/builtin_actor_hli.py @@ -35,9 +35,11 @@ def enabled(self, enabled: bool): self._parent.set_disabled(self._actor_name) @property - def states_watched(self): + def states_watched(self) -> dict[str, BlStateStatus]: """Return the table of beamline states currently watched by the scan interlock actor""" - return self._client.connector.get(MessageEndpoints.scan_interlock_states()).states_watched + if msg := self._client.connector.get(MessageEndpoints.scan_interlock_states()): + return msg.states_watched + return {} def add_state_to_interlock(self, state_name: str, required_value: BlStateStatus): """