diff --git a/pylabrobot/agrowpumps/__init__.py b/pylabrobot/agrowpumps/__init__.py new file mode 100644 index 00000000000..2cd7f15f42e --- /dev/null +++ b/pylabrobot/agrowpumps/__init__.py @@ -0,0 +1 @@ +from .agrowdosepump_backend import AgrowChannelBackend, AgrowDriver, AgrowDosePumpArray diff --git a/pylabrobot/agrowpumps/agrowdosepump_backend.py b/pylabrobot/agrowpumps/agrowdosepump_backend.py new file mode 100644 index 00000000000..bcc3e9774ba --- /dev/null +++ b/pylabrobot/agrowpumps/agrowdosepump_backend.py @@ -0,0 +1,220 @@ +import asyncio +import logging +import threading +import time +from typing import Dict, List, Optional, Union + +try: + from pymodbus.client import AsyncModbusSerialClient # type: ignore + + _MODBUS_IMPORT_ERROR = None +except ImportError as e: + AsyncModbusSerialClient = None # type: ignore + _MODBUS_IMPORT_ERROR = e + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability +from pylabrobot.device import Device + +logger = logging.getLogger("pylabrobot") + + +class AgrowDriver: + """Shared Modbus connection for an Agrow pump array. + + Not a DeviceBackend — just the hardware communication layer. + """ + + def __init__(self, port: str, address: Union[int, str]): + if _MODBUS_IMPORT_ERROR is not None: + raise RuntimeError( + "pymodbus is not installed. Install with: pip install pylabrobot[modbus]. " + f"Import error: {_MODBUS_IMPORT_ERROR}" + ) + if not isinstance(port, str): + raise ValueError("Port must be a string") + self.port = port + if address not in range(0, 256): + raise ValueError("Pump address out of range") + self.address = int(address) + self._keep_alive_thread: Optional[threading.Thread] = None + self._pump_index_to_address: Optional[Dict[int, int]] = None + self._modbus: Optional["AsyncModbusSerialClient"] = None + self._num_channels: Optional[int] = None + self._keep_alive_thread_active = False + + @property + def modbus(self) -> "AsyncModbusSerialClient": + if self._modbus is None: + raise RuntimeError("Modbus connection not established") + return self._modbus + + @property + def pump_index_to_address(self) -> Dict[int, int]: + if self._pump_index_to_address is None: + raise RuntimeError("Pump mappings not established") + return self._pump_index_to_address + + @property + def num_channels(self) -> int: + if self._num_channels is None: + raise RuntimeError("Number of channels not established") + return self._num_channels + + def _start_keep_alive_thread(self): + async def keep_alive(): + i = 0 + while self._keep_alive_thread_active: + time.sleep(0.1) + i += 1 + if i == 250: + await self.modbus.read_holding_registers(0, 1, unit=self.address) + i = 0 + + def manage_async_keep_alive(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(keep_alive()) + loop.close() + except Exception as e: + logger.error("Error in keep alive thread: %s", e) + + self._keep_alive_thread_active = True + self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True) + self._keep_alive_thread.start() + + async def setup(self): + await self._setup_modbus() + register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address) + self._num_channels = int( + "".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2] + ) + self._start_keep_alive_thread() + self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)} + + async def _setup_modbus(self): + if AsyncModbusSerialClient is None: + raise RuntimeError( + "pymodbus is not installed. Install with: pip install pylabrobot[modbus]." + f" Import error: {_MODBUS_IMPORT_ERROR}" + ) + self._modbus = AsyncModbusSerialClient( + port=self.port, + baudrate=115200, + timeout=1, + stopbits=1, + bytesize=8, + parity="E", + retry_on_empty=True, + ) + await self.modbus.connect() + if not self.modbus.connected: + raise ConnectionError("Modbus connection failed during pump setup") + + async def stop(self): + # halt all channels before closing + for pump in self.pump_index_to_address: + address = self.pump_index_to_address[pump] + await self.modbus.write_register(address, 0, unit=self.address) + if self._keep_alive_thread is not None: + self._keep_alive_thread_active = False + self._keep_alive_thread.join() + self.modbus.close() + assert not self.modbus.connected, "Modbus failing to disconnect" + + async def write_speed(self, channel: int, speed: int): + if speed not in range(101): + raise ValueError("Pump speed out of range. Value should be between 0 and 100.") + await self.modbus.write_register( + self.pump_index_to_address[channel], + speed, + unit=self.address, + ) + + +class AgrowChannelBackend(PumpBackend): + """Per-channel PumpBackend adapter that delegates to a shared AgrowDriver.""" + + def __init__(self, connection: AgrowDriver, channel: int): + self._driver = connection + self._channel = channel + + async def setup(self): + pass # lifecycle managed by the device via AgrowDriver + + async def stop(self): + pass # lifecycle managed by the device via AgrowDriver + + async def run_revolutions(self, num_revolutions: float): + raise NotImplementedError( + "Revolution based pumping commands are not available for Agrow pumps." + ) + + async def run_continuously(self, speed: float): + await self._driver.write_speed(self._channel, int(speed)) + + async def halt(self): + await self._driver.write_speed(self._channel, 0) + + def serialize(self): + return { + **super().serialize(), + "port": self._driver.port, + "address": self._driver.address, + "channel": self._channel, + } + + +class AgrowDosePumpArray(Device): + """Agrow dose pump array device. + + Exposes each channel as an individual PumpingCapability via `self.pumps`. + """ + + def __init__( + self, + port: str, + address: Union[int, str], + calibrations: Optional[List[Optional[PumpCalibration]]] = None, + ): + self._driver = AgrowDriver(port=port, address=address) + # We need a DeviceBackend for Device.__init__; use channel 0's adapter as the "primary". + # The real lifecycle is managed via _connection. + self._channel_backends: List[AgrowChannelBackend] = [] + self.pumps: List[PumpingCapability] = [] + self._calibrations = calibrations + # Defer full init until setup() when we know num_channels. + # Pass a placeholder backend — we'll override setup/stop. + super().__init__(backend=AgrowChannelBackend(self._driver, 0)) + + async def setup(self): + await self._driver.setup() + num_channels = self._driver.num_channels + + self._channel_backends = [AgrowChannelBackend(self._driver, ch) for ch in range(num_channels)] + self.pumps = [] + for i, backend in enumerate(self._channel_backends): + cal = None + if self._calibrations is not None and i < len(self._calibrations): + cal = self._calibrations[i] + cap = PumpingCapability(backend=backend, calibration=cal) + self.pumps.append(cap) + + self._capabilities = list(self.pumps) + for cap in self._capabilities: + await cap._on_setup() + self._setup_finished = True + + async def stop(self): + for cap in reversed(self._capabilities): + await cap._on_stop() + await self._driver.stop() + self._setup_finished = False + + def serialize(self): + return { + "port": self._driver.port, + "address": self._driver.address, + } diff --git a/pylabrobot/agrowpumps/agrowdosepump_tests.py b/pylabrobot/agrowpumps/agrowdosepump_tests.py new file mode 100644 index 00000000000..4f353a4a0bf --- /dev/null +++ b/pylabrobot/agrowpumps/agrowdosepump_tests.py @@ -0,0 +1,77 @@ +import unittest +from unittest.mock import AsyncMock + +import pytest + +pytest.importorskip("pymodbus") + +from pymodbus.client import AsyncModbusSerialClient # type: ignore + +from pylabrobot.agrowpumps import AgrowDosePumpArray + + +class SimulatedModbusClient(AsyncModbusSerialClient): + def __init__(self, connected: bool = False): + self._connected = connected + + async def connect(self): + self._connected = True + + @property + def connected(self): + return self._connected + + async def read_holding_registers(self, address: int, count: int, **kwargs): + if "unit" not in kwargs: + raise ValueError("unit must be specified") + if address == 19: + return_register = AsyncMock() + return_register.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] + return return_register + + write_register = AsyncMock() + + def close(self, reconnect=False): + assert not self.connected, "Modbus connection not established" + self._connected = False + + +class TestAgrowPumps(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.device = AgrowDosePumpArray(port="simulated", address=1) + + async def _mock_setup_modbus(): + self.device._driver._modbus = SimulatedModbusClient() + + self.device._driver._setup_modbus = _mock_setup_modbus + await self.device.setup() + + async def asyncTearDown(self): + await self.device.stop() + + async def test_setup(self): + self.assertEqual(self.device._driver.port, "simulated") + self.assertEqual(self.device._driver.address, 1) + self.assertEqual(len(self.device.pumps), 6) + self.assertEqual( + self.device._driver._pump_index_to_address, + {pump: pump + 100 for pump in range(0, 6)}, + ) + + async def test_run_continuously(self): + self.device._driver.modbus.write_register.reset_mock() + await self.device.pumps[0].run_continuously(speed=1) + self.device._driver.modbus.write_register.assert_called_once_with(100, 1, unit=1) + + # invalid speed: cannot be bigger than 100 + with self.assertRaises(ValueError): + await self.device.pumps[0].run_continuously(speed=101) + + async def test_run_revolutions(self): + with self.assertRaises(NotImplementedError): + await self.device.pumps[0].run_revolutions(num_revolutions=1.0) + + async def test_halt_single_channel(self): + self.device._driver.modbus.write_register.reset_mock() + await self.device.pumps[2].halt() + self.device._driver.modbus.write_register.assert_called_once_with(102, 0, unit=1) diff --git a/pylabrobot/capabilities/pumping/__init__.py b/pylabrobot/capabilities/pumping/__init__.py new file mode 100644 index 00000000000..4e3c3a32ec0 --- /dev/null +++ b/pylabrobot/capabilities/pumping/__init__.py @@ -0,0 +1,5 @@ +from .backend import PumpBackend +from .calibration import PumpCalibration +from .chatterbox import PumpChatterboxBackend +from .errors import NotCalibratedError +from .pumping import PumpingCapability diff --git a/pylabrobot/capabilities/pumping/backend.py b/pylabrobot/capabilities/pumping/backend.py new file mode 100644 index 00000000000..448c68adf69 --- /dev/null +++ b/pylabrobot/capabilities/pumping/backend.py @@ -0,0 +1,19 @@ +from abc import ABCMeta, abstractmethod + +from pylabrobot.device import DeviceBackend + + +class PumpBackend(DeviceBackend, metaclass=ABCMeta): + """Abstract backend for a single pump.""" + + @abstractmethod + async def run_revolutions(self, num_revolutions: float): + """Run for a given number of revolutions.""" + + @abstractmethod + async def run_continuously(self, speed: float): + """Run continuously at a given speed. If speed is 0, halt.""" + + @abstractmethod + async def halt(self): + """Halt the pump.""" diff --git a/pylabrobot/capabilities/pumping/calibration.py b/pylabrobot/capabilities/pumping/calibration.py new file mode 100644 index 00000000000..5ae1ba0e9b5 --- /dev/null +++ b/pylabrobot/capabilities/pumping/calibration.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import csv +import json +from typing import Dict, List, Literal, Optional, Union + +from pylabrobot.serializer import SerializableMixin + + +class PumpCalibration(SerializableMixin): + """Calibration for a single pump or pump array + + Attributes: + calibration: The calibration of the pump or pump array. + """ + + def __init__( + self, + calibration: List[Union[float, int]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ): + """Initialize a PumpCalibration object. + + Args: + calibration: calibration of the pump in pump-specific volume per time/revolution units. + calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for + volume per revolution. Defaults to "duration". + + Raises: + ValueError: if a value in the calibration is outside expected parameters. + """ + + if any(value <= 0 for value in calibration): + raise ValueError("A value in the calibration is is outside expected parameters.") + if calibration_mode not in ["duration", "revolutions"]: + raise ValueError("calibration_mode must be 'duration' or 'revolutions'") + self.calibration = calibration + self.calibration_mode = calibration_mode + + def __getitem__(self, item: int) -> Union[float, int]: + return self.calibration[item] # type: ignore + + def __len__(self) -> int: + """Return the length of the calibration.""" + return len(self.calibration) + + @classmethod + def load_calibration( + cls, + calibration: Optional[Union[dict, list, float, int, str]] = None, + num_items: Optional[int] = None, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a file, dictionary, list, or value. + + Args: + calibration: pump calibration file, dictionary, list, or value. + calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for + volume per revolution. Defaults to "duration". + num_items: number of items in the calibration. Required if calibration is a value. + + Raises: + NotImplementedError: if the calibration filetype or format is not supported. + ValueError: if num_items is not specified when calibration is a value. + """ + + if isinstance(calibration, dict): + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, list): + return PumpCalibration.load_from_list( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, (float, int)): + if num_items is None: + raise ValueError("num_items must be specified if calibration is a value.") + return PumpCalibration.load_from_value( + value=calibration, + num_items=num_items, + calibration_mode=calibration_mode, + ) + if isinstance(calibration, str): + if calibration.endswith(".json"): + return PumpCalibration.load_from_json( + file_path=calibration, calibration_mode=calibration_mode + ) + if calibration.endswith(".csv"): + return PumpCalibration.load_from_csv( + file_path=calibration, calibration_mode=calibration_mode + ) + raise NotImplementedError("Calibration filetype not supported.") + raise NotImplementedError("Calibration format not supported.") + + def serialize(self) -> dict: + return { + "calibration": self.calibration, + "calibration_mode": self.calibration_mode, + } + + @classmethod + def deserialize(cls, data: dict) -> PumpCalibration: + return cls( + calibration=data["calibration"], + calibration_mode=data["calibration_mode"], + ) + + @classmethod + def load_from_json( + cls, + file_path: str, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a json file.""" + + with open(file_path, "rb") as f: + calibration = json.load(f) + if isinstance(calibration, dict): + calibration = {int(key): float(value) for key, value in calibration.items()} + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, list): + return PumpCalibration(calibration=calibration, calibration_mode=calibration_mode) + raise TypeError(f"Calibration pulled from {file_path} is not a dictionary or list.") + + @classmethod + def load_from_csv( + cls, + file_path: str, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a csv file.""" + + with open(file_path, encoding="utf-8", newline="") as f: + csv_file = list(csv.reader(f)) + num_columns = len(csv_file[0]) + if num_columns != 2: + raise ValueError("CSV file must have two columns.") + calibration = {int(row[0]): float(row[1]) for row in csv_file} + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + + @classmethod + def load_from_dict( + cls, + calibration: Dict[int, Union[int, float]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a dictionary (0-indexed).""" + + if sorted(calibration.keys()) != list(range(len(calibration))): + raise ValueError("Keys must be a contiguous range of integers starting at 0.") + calibration_list = [calibration[key] for key in sorted(calibration.keys())] + return cls(calibration=calibration_list, calibration_mode=calibration_mode) + + @classmethod + def load_from_list( + cls, + calibration: List[Union[int, float]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a list.""" + return cls(calibration=calibration, calibration_mode=calibration_mode) + + @classmethod + def load_from_value( + cls, + value: Union[float, int], + num_items: int, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a single value applied to all channels.""" + calibration = [value] * num_items + return cls(calibration, calibration_mode) diff --git a/pylabrobot/capabilities/pumping/chatterbox.py b/pylabrobot/capabilities/pumping/chatterbox.py new file mode 100644 index 00000000000..70a25ebd57e --- /dev/null +++ b/pylabrobot/capabilities/pumping/chatterbox.py @@ -0,0 +1,20 @@ +from .backend import PumpBackend + + +class PumpChatterboxBackend(PumpBackend): + """Chatterbox backend for device-free testing.""" + + async def setup(self): + print("Setting up the pump.") + + async def stop(self): + print("Stopping the pump.") + + async def run_revolutions(self, num_revolutions: float): + print(f"Running {num_revolutions} revolutions.") + + async def run_continuously(self, speed: float): + print(f"Running continuously at speed {speed}.") + + async def halt(self): + print("Halting the pump.") diff --git a/pylabrobot/capabilities/pumping/errors.py b/pylabrobot/capabilities/pumping/errors.py new file mode 100644 index 00000000000..64f19783fca --- /dev/null +++ b/pylabrobot/capabilities/pumping/errors.py @@ -0,0 +1,2 @@ +class NotCalibratedError(Exception): + """Error raised when calling a method that requires the pump to be calibrated.""" diff --git a/pylabrobot/capabilities/pumping/pumping.py b/pylabrobot/capabilities/pumping/pumping.py new file mode 100644 index 00000000000..c956d1b4a9a --- /dev/null +++ b/pylabrobot/capabilities/pumping/pumping.py @@ -0,0 +1,85 @@ +import asyncio +from typing import Optional, Union + +from pylabrobot.capabilities.capability import Capability, need_capability_ready + +from .backend import PumpBackend +from .calibration import PumpCalibration + + +class PumpingCapability(Capability): + """Single-pump capability.""" + + def __init__( + self, + backend: PumpBackend, + calibration: Optional[PumpCalibration] = None, + ): + super().__init__(backend=backend) + self.backend: PumpBackend = backend + if calibration is not None and len(calibration) != 1: + raise ValueError("Calibration may only have a single item for this pump") + self.calibration = calibration + + @need_capability_ready + async def run_revolutions(self, num_revolutions: float): + """Run for a given number of revolutions. + + Args: + num_revolutions: number of revolutions to run. + """ + await self.backend.run_revolutions(num_revolutions=num_revolutions) + + @need_capability_ready + async def run_continuously(self, speed: float): + """Run continuously at a given speed. If speed is 0, the pump will be halted. + + Args: + speed: speed in rpm/pump-specific units. + """ + await self.backend.run_continuously(speed=speed) + + @need_capability_ready + async def run_for_duration(self, speed: Union[float, int], duration: Union[float, int]): + """Run the pump at specified speed for the specified duration. + + Args: + speed: speed in rpm/pump-specific units. + duration: duration in seconds. + """ + if duration < 0: + raise ValueError("Duration must be positive.") + await self.run_continuously(speed=speed) + await asyncio.sleep(duration) + await self.run_continuously(speed=0) + + @need_capability_ready + async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]): + """Run the pump at specified speed for the specified volume. Requires calibration. + + Args: + speed: speed in rpm/pump-specific units. + volume: volume to pump. + """ + if self.calibration is None: + raise TypeError( + "Pump is not calibrated. Volume based pumping and related functions unavailable." + ) + if self.calibration.calibration_mode == "duration": + duration = volume / self.calibration[0] + await self.run_for_duration(speed=speed, duration=duration) + elif self.calibration.calibration_mode == "revolutions": + num_revolutions = volume / self.calibration[0] + await self.run_revolutions(num_revolutions=num_revolutions) + else: + raise ValueError("Calibration mode not recognized.") + + @need_capability_ready + async def halt(self): + """Halt the pump.""" + await self.backend.halt() + + async def _on_stop(self): + if self._setup_finished: + await self.backend.halt() + await super()._on_stop() diff --git a/pylabrobot/capabilities/pumping/pumping_tests.py b/pylabrobot/capabilities/pumping/pumping_tests.py new file mode 100644 index 00000000000..969c58c85f1 --- /dev/null +++ b/pylabrobot/capabilities/pumping/pumping_tests.py @@ -0,0 +1,78 @@ +import unittest +from unittest.mock import AsyncMock, Mock + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability + + +class TestPumpingCapability(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.mock_backend = Mock(spec=PumpBackend) + self.mock_backend.run_revolutions = AsyncMock() + self.mock_backend.run_continuously = AsyncMock() + self.mock_backend.halt = AsyncMock() + self.test_calibration = PumpCalibration.load_calibration(1, num_items=1) + + async def _make_cap(self, calibration=None): + cap = PumpingCapability(backend=self.mock_backend, calibration=calibration) + await cap._on_setup() + return cap + + async def test_setup(self): + cap = await self._make_cap() + self.assertIsNone(cap.calibration) + self.assertTrue(cap.setup_finished) + + async def test_run_revolutions(self): + cap = await self._make_cap() + await cap.run_revolutions(num_revolutions=1) + self.mock_backend.run_revolutions.assert_called_once_with(num_revolutions=1) + + async def test_run_continuously(self): + cap = await self._make_cap() + await cap.run_continuously(speed=100) + self.mock_backend.run_continuously.assert_called_once_with(speed=100) + + async def test_halt(self): + cap = await self._make_cap() + await cap.halt() + self.mock_backend.halt.assert_called_once() + + async def test_run_for_duration(self): + cap = await self._make_cap() + await cap.run_for_duration(speed=1, duration=0) + self.mock_backend.run_continuously.assert_called_with(speed=0) + + async def test_run_invalid_duration(self): + cap = await self._make_cap() + with self.assertRaises(ValueError): + await cap.run_for_duration(speed=1, duration=-1) + + async def test_pump_volume_duration_mode(self): + cap = await self._make_cap(calibration=self.test_calibration) + cap.calibration.calibration_mode = "duration" + cap.run_for_duration = AsyncMock() + await cap.pump_volume(speed=1, volume=1) + cap.run_for_duration.assert_called_once_with(speed=1, duration=1.0) + + async def test_pump_volume_revolutions_mode(self): + cap = await self._make_cap(calibration=self.test_calibration) + cap.calibration.calibration_mode = "revolutions" + cap.run_revolutions = AsyncMock() + await cap.pump_volume(speed=1, volume=1) + cap.run_revolutions.assert_called_once_with(num_revolutions=1.0) + + async def test_pump_volume_no_calibration(self): + cap = await self._make_cap() + with self.assertRaises(TypeError): + await cap.pump_volume(speed=1, volume=1) + + async def test_not_setup_raises(self): + cap = PumpingCapability(backend=self.mock_backend) + with self.assertRaises(RuntimeError): + await cap.run_continuously(speed=1) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/cole_parmer/__init__.py b/pylabrobot/cole_parmer/__init__.py new file mode 100644 index 00000000000..76b87527436 --- /dev/null +++ b/pylabrobot/cole_parmer/__init__.py @@ -0,0 +1 @@ +from .masterflex_backend import MasterflexBackend, MasterflexPump diff --git a/pylabrobot/cole_parmer/masterflex_backend.py b/pylabrobot/cole_parmer/masterflex_backend.py new file mode 100644 index 00000000000..988538ed968 --- /dev/null +++ b/pylabrobot/cole_parmer/masterflex_backend.py @@ -0,0 +1,101 @@ +try: + import serial # type: ignore + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + +from typing import Optional + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability +from pylabrobot.device import Device +from pylabrobot.io.serial import Serial + + +class MasterflexBackend(PumpBackend): + """Backend for the Cole Parmer Masterflex L/S pump + + tested on: + 07551-20 + + should be same as: + 07522-20 + 07522-30 + 07551-30 + 07575-30 + 07575-40 + + Documentation available at: + - https://pim-resources.coleparmer.com/instruction-manual/a-1299-1127b-en.pdf + - https://web.archive.org/web/20210924061132/https://pim-resources.coleparmer.com/ + instruction-manual/a-1299-1127b-en.pdf + """ + + def __init__(self, com_port: str): + if not HAS_SERIAL: + raise RuntimeError( + "pyserial is not installed. Install with: pip install pylabrobot[serial]. " + f"Import error: {_SERIAL_IMPORT_ERROR}" + ) + self.com_port = com_port + self.io = Serial( + port=self.com_port, + baudrate=4800, + timeout=1, + parity=serial.PARITY_ODD, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.SEVENBITS, + human_readable_device_name="Masterflex Pump", + ) + + async def setup(self): + await self.io.setup() + await self.io.write(b"\x05") # Enquiry; ready to send. + await self.io.write(b"\x05P02\r") + + def serialize(self): + return {**super().serialize(), "com_port": self.com_port} + + async def stop(self): + await self.io.stop() + + async def send_command(self, command: str): + command = "\x02P02" + command + "\x0d" + await self.io.write(command.encode()) + return self.io.read() + + async def run_revolutions(self, num_revolutions: float): + num_revolutions = round(num_revolutions, 2) + cmd = f"V{num_revolutions}G" + await self.send_command(cmd) + + async def run_continuously(self, speed: float): + if speed == 0: + await self.halt() + return + + direction = "+" if speed > 0 else "-" + speed_int = int(abs(speed)) + cmd = f"S{direction}{speed_int}G0" + await self.send_command(cmd) + + async def halt(self): + await self.send_command("H") + + +class MasterflexPump(Device): + """Cole Parmer Masterflex L/S pump.""" + + def __init__( + self, + com_port: str, + calibration: Optional[PumpCalibration] = None, + ): + backend = MasterflexBackend(com_port=com_port) + super().__init__(backend=backend) + self._backend: MasterflexBackend = backend + self.pumping = PumpingCapability(backend=backend, calibration=calibration) + self._capabilities = [self.pumping]