diff --git a/src/dodal/beamlines/i10_1.py b/src/dodal/beamlines/i10_1.py index 92fc9f4685e..01dffe47e19 100644 --- a/src/dodal/beamlines/i10_1.py +++ b/src/dodal/beamlines/i10_1.py @@ -5,9 +5,11 @@ from dodal.devices.beamlines.i10_1 import ( ElectromagnetMagnetField, ElectromagnetStage, + HighFieldMagnet, I10JScalerCard, ) from dodal.devices.current_amplifiers import SR570, CurrentAmpDet +from dodal.devices.motors import XYPitchStage from dodal.devices.temperture_controller.lakeshore.lakeshore import Lakeshore336 from dodal.log import set_beamline as set_log_beamline from dodal.utils import BeamlinePrefix, get_beamline_name @@ -122,3 +124,23 @@ def em_temperature_controller() -> Lakeshore336: return Lakeshore336( prefix=f"{PREFIX.beamline_prefix}-EA-TCTRL-41:", ) + + +"""I10J Hight Field Magnet Devices""" + + +@devices.factory() +def high_field_magnet_stage() -> XYPitchStage: + return XYPitchStage( + prefix=f"{PREFIX.beamline_prefix}-EA-MAG-01:", + x_infix="X", + y_infix="INSERT:Y", + pitch_infix="INSERT:ROTY", + ) + + +@devices.factory() +def high_field_magnet() -> HighFieldMagnet: + return HighFieldMagnet( + prefix=f"{PREFIX.beamline_prefix}-EA-SMC-01:", + ) diff --git a/src/dodal/devices/beamlines/i10_1/__init__.py b/src/dodal/devices/beamlines/i10_1/__init__.py index b562d0a9a59..1f6b94f0cdf 100644 --- a/src/dodal/devices/beamlines/i10_1/__init__.py +++ b/src/dodal/devices/beamlines/i10_1/__init__.py @@ -1,9 +1,11 @@ from .electromagnet.magnet import ElectromagnetMagnetField from .electromagnet.stages import ElectromagnetStage +from .high_field_magnet.high_field_magnet import HighFieldMagnet from .scaler_cards import I10JScalerCard __all__ = [ "ElectromagnetMagnetField", "I10JScalerCard", "ElectromagnetStage", + "HighFieldMagnet", ] diff --git a/src/dodal/devices/beamlines/i10_1/high_field_magnet/__init__.py b/src/dodal/devices/beamlines/i10_1/high_field_magnet/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/dodal/devices/beamlines/i10_1/high_field_magnet/high_field_magnet.py b/src/dodal/devices/beamlines/i10_1/high_field_magnet/high_field_magnet.py new file mode 100644 index 00000000000..393882132fa --- /dev/null +++ b/src/dodal/devices/beamlines/i10_1/high_field_magnet/high_field_magnet.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +import asyncio + +from bluesky.protocols import ( + Flyable, + Locatable, + Location, + Preparable, + Reading, + Stoppable, + Subscribable, +) +from ophyd_async.core import ( + DEFAULT_TIMEOUT, + AsyncStatus, + Callback, + StandardReadable, + StandardReadableFormat, + StrictEnum, + SubsetEnum, + WatchableAsyncStatus, + WatcherUpdate, + derived_signal_r, + error_if_none, + observe_value, + set_and_wait_for_other_value, + soft_signal_rw, +) +from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_w +from pydantic import BaseModel, Field + + +class HighFieldMangetSweepTypes(StrictEnum): + FAST = "Fast" + SLOW = "Slow" + + +class HighFieldMagnetStatus(SubsetEnum): + HOLD = "Hold" + TO_SETPOINT = "To Setpoint" + TO_ZERO = "To Zero" + CLAMP = "Clamp" + + +class HighFieldMagnetStatusRBV(SubsetEnum): + HOLD = "Hold" + TO_SETPOINT = "To Setpoint" + TO_ZERO = "To Zero" + CLAMPED = "Clamped" + + +class FlyMagInfo(BaseModel): + """Minimal set of information required to fly high field magnet.""" + + start_position: float = Field(frozen=True) + + end_position: float = Field(frozen=True) + + sweep_rate: float = Field(frozen=True, gt=0) + + +class HighFieldMagnet( + StandardReadable, + Locatable[float], + Stoppable, + Flyable, + Preparable, + Subscribable[float], +): + def __init__( + self, prefix: str, field_tolerance: float = 0.01, name: str = "" + ) -> None: + with self.add_children_as_readables(StandardReadableFormat.CONFIG_SIGNAL): + self.sweep_rate = epics_signal_rw( + float, + read_pv=prefix + "RBV:FIELDsweep_rate", + write_pv=prefix + "SET:FIELDsweep_rate", + ) + self.sweep_type = epics_signal_rw( + HighFieldMangetSweepTypes, + read_pv=prefix + "STS:SWEEPMODE:TYPE", + write_pv=prefix + "SET:SWEEPMODE:TYPE", + ) + self.set_move_readback = epics_signal_r( + HighFieldMagnetStatusRBV, + read_pv=prefix + "STS:ACTIVITY", + ) + self.ramp_up_time = soft_signal_rw(datatype=float, initial_value=1.0) + self.field_tolerance = soft_signal_rw(float, initial_value=field_tolerance) + with self.add_children_as_readables(StandardReadableFormat.HINTED_SIGNAL): + self.user_readback = epics_signal_r(float, prefix + "RBV:DEMANDFIELD") + + self.set_mode = epics_signal_w( + HighFieldMagnetStatus, + write_pv=prefix + "SET:ACTIVITY", + ) + self.user_setpoint = epics_signal_rw( + float, + read_pv=prefix + "RBV:SETPOINTFIELD", + write_pv=prefix + "SET:SETPOINTFIELD", + ) + + self.within_tolerance = derived_signal_r( + raw_to_derived=self._within_tolerance, + setpoint=self.user_setpoint, + readback=self.user_readback, + tolerance=self.field_tolerance, + ) + + self._set_success = True + + self._fly_info: FlyMagInfo | None = None + + self._fly_status: WatchableAsyncStatus | None = None + + super().__init__(name=name) + + def _within_tolerance( + self, setpoint: float, readback: float, tolerance: float + ) -> bool: + """Check if the readback is within the tolerance of the setpoint.""" + return abs(setpoint - readback) < abs(tolerance) + + def set_name(self, name: str, *, child_name_separator: str | None = None) -> None: + super().set_name(name, child_name_separator=child_name_separator) + self.user_readback.set_name(name) + + async def locate(self) -> Location[float]: + setpoint, readback = await asyncio.gather( + self.user_setpoint.get_value(), self.user_readback.get_value() + ) + return Location(setpoint=setpoint, readback=readback) + + async def stop(self, success=False): + self._set_success = success + await self.user_readback.get_value() + await self.user_setpoint.set(await self.user_readback.get_value()) + + def subscribe_reading(self, function: Callback[dict[str, Reading[float]]]) -> None: + self.user_readback.subscribe_reading(function) + + subscribe = subscribe_reading + + def clear_sub(self, function: Callback[dict[str, Reading[float]]]) -> None: + """Unsubscribe.""" + self.user_readback.clear_sub(function) + + @WatchableAsyncStatus.wrap + async def set( + self, + new_position: float, + ): + self._set_success = True + ( + old_position, + sweep_rate, + ramp_up_time, + ) = await asyncio.gather( + self.user_readback.get_value(), + self.sweep_rate.get_value(), + self.ramp_up_time.get_value(), + ) + + try: + timeout = ( + abs((new_position - old_position) / sweep_rate) + + 2 * ramp_up_time + + DEFAULT_TIMEOUT + ) + except ZeroDivisionError as error: + msg = "Magnet has zero sweep_rate." + raise ValueError(msg) from error + + move_status = AsyncStatus( + set_and_wait_for_other_value( + set_signal=self.user_setpoint, + set_value=new_position, + match_signal=self.within_tolerance, + match_value=True, + timeout=timeout, + ) + ) + async for current_position in observe_value( + self.user_readback, done_status=move_status + ): + yield WatcherUpdate( + current=current_position, + initial=old_position, + target=new_position, + name=self.name, + ) + if not self._set_success: + raise RuntimeError("Field change was stopped") + + @AsyncStatus.wrap + async def prepare(self, value: FlyMagInfo) -> None: + """Move to the beginning of a suitable run-up distance ready for a fly scan.""" + self._fly_info = value + + await self.set(value.start_position) + + await self.sweep_rate.set(abs(value.sweep_rate)) + + @AsyncStatus.wrap + async def kickoff(self): + fly_info = error_if_none( + self._fly_info, "Magnet must be prepared before attempting to kickoff" + ) + + self._fly_status = self.set(fly_info.end_position) + + def complete(self) -> WatchableAsyncStatus: + fly_status = error_if_none(self._fly_status, "kickoff not called") + return fly_status diff --git a/tests/devices/beamlines/i10_1/high_field_magnet/__init__.py b/tests/devices/beamlines/i10_1/high_field_magnet/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/devices/beamlines/i10_1/high_field_magnet/test_high_field_magnet.py b/tests/devices/beamlines/i10_1/high_field_magnet/test_high_field_magnet.py new file mode 100644 index 00000000000..fd5ea075fbb --- /dev/null +++ b/tests/devices/beamlines/i10_1/high_field_magnet/test_high_field_magnet.py @@ -0,0 +1,179 @@ +import pytest +from ophyd_async.core import ( + AsyncStatus, + init_devices, + set_mock_value, +) +from ophyd_async.testing import assert_reading, wait_for_pending_wakeups + +from dodal.devices.beamlines.i10_1.high_field_magnet.high_field_magnet import ( + FlyMagInfo, + HighFieldMagnet, +) + + +@pytest.fixture +async def high_field_magnet() -> HighFieldMagnet: + async with init_devices(mock=True): + magnet = HighFieldMagnet(prefix="TEST:") + return magnet + + +async def test_within_tolerance_when_in_range(high_field_magnet: HighFieldMagnet): + result = high_field_magnet._within_tolerance( + setpoint=10.0, readback=10.005, tolerance=0.01 + ) + assert result is True + + +async def test_within_tolerance_when_outside_range(high_field_magnet: HighFieldMagnet): + result = high_field_magnet._within_tolerance( + setpoint=10.0, readback=10.02, tolerance=0.01 + ) + assert result is False + + +async def test_within_tolerance_with_negative_tolerance( + high_field_magnet: HighFieldMagnet, +): + result = high_field_magnet._within_tolerance( + setpoint=10.0, readback=9.995, tolerance=-0.01 + ) + assert result is True + + +async def test_locate(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_setpoint, 5.0) + set_mock_value(high_field_magnet.user_readback, 5.0) + + location = await high_field_magnet.locate() + assert location["setpoint"] == 5.0 + assert location["readback"] == 5.0 + + +async def test_stop_success(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 7.5) + set_mock_value(high_field_magnet.user_readback, 1.5) + await high_field_magnet.stop() + assert high_field_magnet._set_success is False + assert await high_field_magnet.user_setpoint.get_value() == 1.5 + + +async def test_set_raises_runtime_error_when_stopped( + high_field_magnet: HighFieldMagnet, +): + set_mock_value(high_field_magnet.user_readback, -5.0) + set_mock_value(high_field_magnet.sweep_rate, 1.0) + set_mock_value(high_field_magnet.ramp_up_time, 1.0) + status = high_field_magnet.set(5.0) + + assert status is not None + with pytest.raises(RuntimeError, match="Field change was stopped"): + await wait_for_pending_wakeups() + await high_field_magnet.stop(success=False) + set_mock_value(high_field_magnet.user_readback, 5.0) + await status + + +async def test_subscribe_and_clear_sub(high_field_magnet: HighFieldMagnet): + """Test subscribe_reading and clear_sub for callbacks.""" + readings = [] + + def callback(reading_dict): + readings.append(reading_dict) + + high_field_magnet.subscribe_reading(callback) + + set_mock_value(high_field_magnet.user_readback, 3.0) + + high_field_magnet.clear_sub(callback) + + assert callable(callback) + + +async def test_set_raises_on_zero_sweep_rate(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 0.0) + set_mock_value(high_field_magnet.sweep_rate, 0.0) + + with pytest.raises(ValueError, match="Magnet has zero sweep_rate."): + status = high_field_magnet.set(10.0) + await status + + +async def test_set_calculates_correct_timeout(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 0.0) + set_mock_value(high_field_magnet.sweep_rate, 1.0) + set_mock_value(high_field_magnet.ramp_up_time, 1.0) + set_mock_value(high_field_magnet.user_setpoint, 0.0) + + # Start the set operation + status = high_field_magnet.set(10.0) + assert isinstance(status, AsyncStatus) or hasattr(status, "watch") + + +async def test_set_returns_watchable_async_status(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 0.0) + set_mock_value(high_field_magnet.sweep_rate, 1.0) + + status = high_field_magnet.set(5.0) + assert status is not None + + +async def test_prepare(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 0.0) + set_mock_value(high_field_magnet.sweep_rate, 1.0) + + fly_info = FlyMagInfo(start_position=1.0, end_position=10.0, sweep_rate=2.0) + await high_field_magnet.prepare(fly_info) + assert high_field_magnet._fly_info == fly_info + assert await high_field_magnet.user_setpoint.get_value() == 1.0 + assert await high_field_magnet.sweep_rate.get_value() == 2.0 + + +async def test_kickoff_without_prepare_raises(high_field_magnet: HighFieldMagnet): + with pytest.raises(RuntimeError, match="Magnet must be prepared"): + status = high_field_magnet.kickoff() + await status + + +async def test_kickoff_after_prepare(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 0.0) + set_mock_value(high_field_magnet.sweep_rate, 1.0) + + fly_info = FlyMagInfo(start_position=1.0, end_position=10.0, sweep_rate=2.0) + + prepare_status = high_field_magnet.prepare(fly_info) + await prepare_status + + kickoff_status = high_field_magnet.kickoff() + assert isinstance(kickoff_status, AsyncStatus) + + +async def test_complete_without_kickoff_raises(high_field_magnet: HighFieldMagnet): + with pytest.raises(RuntimeError, match="kickoff not called"): + high_field_magnet.complete() + + +async def test_complete_after_kickoff(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_readback, 0.0) + set_mock_value(high_field_magnet.sweep_rate, 1.0) + + fly_info = FlyMagInfo(start_position=1.0, end_position=10.0, sweep_rate=2.0) + + prepare_status = high_field_magnet.prepare(fly_info) + await prepare_status + + kickoff_status = high_field_magnet.kickoff() + await kickoff_status + + complete_status = high_field_magnet.complete() + assert complete_status is high_field_magnet._fly_status + + +async def test_read(high_field_magnet: HighFieldMagnet): + set_mock_value(high_field_magnet.user_setpoint, 5.0) + set_mock_value(high_field_magnet.user_readback, 5.0) + await assert_reading( + high_field_magnet, + expected_reading={"magnet": {"value": 5.0}}, + )