Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions bec_ipython_client/tests/end-2-end/test_actors_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import time
from typing import TYPE_CHECKING

import pytest

from bec_lib.bl_states import DeviceWithinLimitsStateConfig
from bec_lib.builtin_actor_hli import BuiltinActorHli
from bec_lib.logger import bec_logger
from bec_lib.messages import ScanQueueStatus

logger = bec_logger.logger

if TYPE_CHECKING: # pragma: no cover
from ophyd_devices.sim.sim_test_devices import SimDeviceWithSignalDelay

from bec_ipython_client.main import BECIPythonClient


@pytest.fixture
def bec_with_delay_device(bec_ipython_client_fixture):
bec = bec_ipython_client_fixture
bec.builtin_actors.scan_interlock.enabled = True
dev = bec.device_manager.devices
dev.ramp_up.min_val.put(0)
dev.ramp_up.max_val.put(400)
dev.ramp_up.value.put(400)
dev.ramp_up.delay.put(2)
dev.ramp_up.rampup_time.put(2)
yield bec, dev.ramp_up
dev.ramp_up.stop()


@pytest.fixture
def ramp_up_bl_state(bec_with_delay_device):
bec, _ = bec_with_delay_device
ramp_up_state_config = DeviceWithinLimitsStateConfig(
name="beam_intensity_sufficient", device="ramp_up", low_limit=200
)
bec.beamline_states.add(ramp_up_state_config)
yield bec_with_delay_device
bec.beamline_states.delete("beam_intensity_sufficient")
bec.builtin_actors.scan_interlock.clear_all()


def _wait_for(pred, timeout=10, retries=100):
for i in range(retries):
if pred():
return True
time.sleep(timeout / retries)
return pred()


# pylint: disable=protected-accesstest
@pytest.mark.timeout(100)
def test_scan_interlock(
ramp_up_bl_state: tuple[BECIPythonClient, SimDeviceWithSignalDelay], bec_with_delay_device
Comment thread
d-perl marked this conversation as resolved.
):
bec, ramp_up = ramp_up_bl_state
actors: BuiltinActorHli = bec.builtin_actors
assert bec.beamline_states.beam_intensity_sufficient.get()["status"] == "valid"
assert actors.scan_interlock.enabled
current_q_status_msg: ScanQueueStatus = bec.queue.queue_storage.current_scan_queue["primary"]
assert current_q_status_msg.status == "RUNNING"
actors.scan_interlock.add_state_to_interlock("beam_intensity_sufficient", "valid")

assert _wait_for(lambda: "beam_intensity_sufficient" in actors.scan_interlock.states_watched)

def _beam_down():
return (ramp_up.value.get() < 200) and bec.beamline_states.beam_intensity_sufficient.get()[
"status"
] == "invalid"

def _beam_up():
return (ramp_up.value.get() > 200) and bec.beamline_states.beam_intensity_sufficient.get()[
"status"
] == "valid"

ramp_up.start.set(1)
scan = bec.scans.line_scan(
"samx", -5, 5, steps=10, exp_time=0.5, relative=False, hide_report=True
)

assert _wait_for(_beam_down)
assert _wait_for(
lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "LOCKED"
)
assert scan.status == "STOPPED"
assert _wait_for(_beam_up)
assert _wait_for(
lambda: bec.queue.queue_storage.current_scan_queue["primary"].status == "RUNNING"
)

def second_scan_has_run():
return (
bec.history[-2].metadata["bec"]["status"] == "aborted"
and bec.history[-1].metadata["bec"]["status"] == "closed"
)

assert _wait_for(second_scan_has_run)
12 changes: 11 additions & 1 deletion bec_lib/bec_lib/bl_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BeamlineStateGet(TypedDict):
TypedDict for the return value of the get method of a beamline state client.
"""

status: str
status: messages.BlStateStatus
label: str


Expand Down Expand Up @@ -200,6 +200,16 @@ def delete(self, state_name: str) -> None:
self._delete_state(state_name)
self._publish_states()

def get_status_by_name(self, name: str) -> messages.BlStateStatus | None:
Comment thread
d-perl marked this conversation as resolved.
"""
Get current value of a given state, or None if it does not exist.
Args:
state_name (str): The name of the state for which to get the value.
"""
if not isinstance(state := getattr(self, name, None), BeamlineStateClientBase):
return
return state.get()["status"]

def show_all(self):
"""
Pretty print all beamline states using rich.
Expand Down
104 changes: 104 additions & 0 deletions bec_lib/bec_lib/builtin_actor_hli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import TYPE_CHECKING

from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import (
BlStateStatus,
BuiltinActorStateChangeMessage,
ScanInterlockModifyStateTableMessage,
)

if TYPE_CHECKING:
from bec_lib.client import BECClient

VAR_PREFIX = "_BuiltinActors"


def builtin_actor_enabled_var(actor_name: str):
return f"{VAR_PREFIX}/enabled/{actor_name}"


class ScanInterlockHli:
def __init__(self, client: "BECClient", parent: "BuiltinActorHli") -> None:
self._client = client
self._parent = parent
self._actor_name = "ScanInterlockActor"

@property
def enabled(self):
return self._parent.check_enabled(self._actor_name)

@enabled.setter
def enabled(self, enabled: bool):
if enabled:
self._parent.set_enabled(self._actor_name)
else:
self._parent.set_disabled(self._actor_name)

@property
def states_watched(self) -> dict[str, BlStateStatus]:
"""Return the table of beamline states currently watched by the scan interlock actor"""
if msg := self._client.connector.get(MessageEndpoints.scan_interlock_states()):
return msg.states_watched
return {}

def add_state_to_interlock(self, state_name: str, required_value: BlStateStatus):
"""
Add a beamline state and its status to watch to the ScanInterlockActor. If the state no
longer has this status, an interlock will be placed on the primary scan queue.
Args:
state_name (str): the state to watch
status (Literal["valid","invalid","warning","unknown"]): the status to watch for.
"""
self._client.connector.xadd(
MessageEndpoints.modify_interlock_table(),
{
"data": ScanInterlockModifyStateTableMessage(
action="add", state_name=state_name, status=required_value
)
},
)

def remove_state_from_interlock(self, state_name: str):
"""
No longer watch the given state for the scan interlock.
Args:
state_name (str): the state to watch
"""
self._client.connector.xadd(
MessageEndpoints.modify_interlock_table(),
{"data": ScanInterlockModifyStateTableMessage(action="remove", state_name=state_name)},
)

def clear_all(self):
"""
Remove all beamline states from the interlock watch table
Args:
state_name (str): the state to watch
"""
self._client.connector.xadd(
MessageEndpoints.modify_interlock_table(),
{"data": ScanInterlockModifyStateTableMessage(action="remove_all")},
)


class BuiltinActorHli:
def __init__(self, client: "BECClient") -> None:
self._client = client
self.scan_interlock = ScanInterlockHli(self._client, self)

def _notify(self, actor_name):
self._client.connector.send(
MessageEndpoints.builtin_actor_notification(),
BuiltinActorStateChangeMessage(actor_name=actor_name),
)

def check_enabled(self, actor_name: str):
return bool(self._client.get_global_var(builtin_actor_enabled_var(actor_name)))

def set_enabled(self, actor_name: str):
self._client.set_global_var(builtin_actor_enabled_var(actor_name), True)
self._notify(actor_name)

def set_disabled(self, actor_name: str):
self._client.set_global_var(builtin_actor_enabled_var(actor_name), False)
self._notify(actor_name)
2 changes: 2 additions & 0 deletions bec_lib/bec_lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from bec_lib.alarm_handler import AlarmHandler, Alarms
from bec_lib.bec_service import BECService
from bec_lib.bl_state_manager import BeamlineStateManager
from bec_lib.builtin_actor_hli import BuiltinActorHli
from bec_lib.callback_handler import CallbackHandler, EventType
from bec_lib.config_helper import ConfigHelperUser
from bec_lib.dap_plugins import DAPPlugins
Expand Down Expand Up @@ -163,6 +164,7 @@ def __init__(
self._system_user = ""
self.beamline_states = None
self.messaging: MessagingContainer = None # type: ignore
self.builtin_actors = BuiltinActorHli(self)

def __new__(cls, *args, forced=False, **kwargs):
if forced or BECClient._client is None:
Expand Down
8 changes: 8 additions & 0 deletions bec_lib/bec_lib/configs/demo_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2160,3 +2160,11 @@ waveform:
enabled: true
readOnly: false
softwareTrigger: true
ramp_up:
deviceClass: ophyd_devices.sim.sim_test_devices.SimDeviceWithSignalDelay
readoutPriority: "baseline"
deviceTags:
- signal delay
- ramp up
enabled: true
readOnly: false
27 changes: 27 additions & 0 deletions bec_lib/bec_lib/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,33 @@ def actor_request_response() -> EndpointInfo:
message_op=MessageOp.SEND,
)

@staticmethod
def builtin_actor_notification() -> EndpointInfo:
endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/state_change"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.BuiltinActorStateChangeMessage,
message_op=MessageOp.SEND,
)

@staticmethod
def modify_interlock_table() -> EndpointInfo:
endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/scan_interlock/table_mod"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ScanInterlockModifyStateTableMessage,
message_op=MessageOp.STREAM,
)

@staticmethod
def scan_interlock_states() -> EndpointInfo:
endpoint = f"{EndpointType.INFO.value}/actor/builtin/scan_interlock/current_states_watched"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ScanInterlockStateTableContent,
message_op=MessageOp.KEY_VALUE,
)

@staticmethod
def gui_registry_state(gui_id: str):
"""
Expand Down
32 changes: 31 additions & 1 deletion bec_lib/bec_lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,9 @@ class GameLeaderboardMessage(BECMessage):
leaderboard: list[GameScoreMessage]


BlStateStatus = Literal["valid", "invalid", "warning", "unknown"]


class BeamlineStateMessage(BECMessage):
"""
Message for beamline state updates
Expand All @@ -1992,7 +1995,7 @@ class BeamlineStateMessage(BECMessage):

msg_type: ClassVar[str] = "beamline_state_message"
name: str
status: Literal["valid", "invalid", "warning", "unknown"]
status: BlStateStatus
label: str
timestamp: float = Field(default_factory=time.time)

Expand Down Expand Up @@ -2026,6 +2029,33 @@ class AvailableBeamlineStatesMessage(BECMessage):
states: list[BeamlineStateConfig]


class BuiltinActorStateChangeMessage(BECMessage):
msg_type: ClassVar[str] = "actor_start_request"
actor_name: str


class ScanInterlockModifyStateTableMessage(BECMessage):
msg_type: ClassVar[str] = "scan_interlock_modify_state_table"
action: Literal["add", "remove", "remove_all"]
state_name: str | None = None
status: BlStateStatus | None = None

@model_validator(mode="after")
def _validate(self):
if self.action == "add" and (self.status is None or self.state_name is None):
raise ValueError("Must specify a name and status when adding a state")
if self.action in ["remove", "remove_all"] and self.status is not None:
raise ValueError("May not specify a status when removing a state")
if self.action == "remove_all" and self.state_name is not None:
raise ValueError("May not specify a state when removing all states")
return self


class ScanInterlockStateTableContent(BECMessage):
msg_type: ClassVar[str] = "scan_interlock_state_table_content"
states_watched: dict[str, BlStateStatus]


class ActorStartRequestMessage(BECMessage):
"""Specify an actor class by module and name, to be instantiated and started by the actor
manager."""
Expand Down
Loading
Loading