diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 553a27608fb..d8709dbc066 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -39,12 +39,16 @@ HamiltonLiquidHandler, ) from pylabrobot.liquid_handling.backends.hamilton.common import fill_in_defaults -from pylabrobot.liquid_handling.backends.hamilton.planning import group_by_x_batch_by_xy from pylabrobot.liquid_handling.errors import ChannelizedError from pylabrobot.liquid_handling.liquid_classes.hamilton import ( HamiltonLiquidClass, get_star_liquid_class, ) +from pylabrobot.liquid_handling.pipette_batch_scheduling import ( + ChannelBatch, + plan_batches, + validate_channel_selections, +) from pylabrobot.liquid_handling.standard import ( Drop, DropTipRack, @@ -63,7 +67,6 @@ SingleChannelDispense, ) from pylabrobot.liquid_handling.utils import ( - MIN_SPACING_EDGE, get_tight_single_resource_liquid_op_offsets, get_wide_single_resource_liquid_op_offsets, ) @@ -1347,6 +1350,7 @@ def __init__( self._iswap_version: Optional[str] = None # loaded lazily self._default_1d_symbology: Barcode1DSymbology = "Code 128 (Subset B and C)" + self._x_grouping_tolerance_mm: float = 0.1 self._setup_done = False @@ -1755,21 +1759,14 @@ async def channel_request_y_minimum_spacing(self, channel_idx: int) -> float: return self.y_drive_increment_to_mm(resp["yc"][1]) async def channels_request_y_minimum_spacing(self) -> List[float]: - """Query the minimum Y spacing for all channels in parallel. - - Each channel is addressed on its own module (P1, P2, ...), so the queries - can run concurrently. + """Query all channels for their minimum Y spacing in parallel. Returns: - A list of exact (unrounded) minimum Y spacings in mm, one per channel, - indexed by channel number. + A list of minimum Y spacings in mm, one per channel. """ return list( await asyncio.gather( - *( - self.channel_request_y_minimum_spacing(channel_idx=idx) - for idx in range(self.num_channels) - ) + *(self.channel_request_y_minimum_spacing(i) for i in range(self.num_channels)) ) ) @@ -2035,229 +2032,47 @@ class PressureLLDMode(enum.Enum): LIQUID = 0 FOAM = 1 - async def _move_to_traverse_height( - self, channels: Optional[List[int]] = None, traverse_height: Optional[float] = None - ): - """Move channels to a specified traverse height, if given, otherwise move to full Z safety. - - Args: - channels: Channels to move. If None, all channels are moved. - traverse_height: Absolute Z position in mm. If None, move to full Z safety. - """ - if traverse_height is None: - await self.move_all_channels_in_z_safety() - else: - if channels is None: - channels = list(range(self.num_channels)) - await self.position_channels_in_z_direction( - {channel: traverse_height for channel in channels} - ) - - async def _probe_liquid_heights_batch( - self, - containers: List[Container], - use_channels: List[int], - lld_mode: LLDMode = LLDMode.GAMMA, - search_speed: float = 10.0, - n_replicates: int = 1, - ) -> List[float]: - """Helper for probe_liquid_heights that performs a single batch of liquid level detection using a set of channels. - - Assumes channels are moved to the appropriate traverse height before calling, and does not move channels after completion. - """ - - tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - - detect_func: Callable[..., Any] - if lld_mode == self.LLDMode.GAMMA: - detect_func = self._move_z_drive_to_liquid_surface_using_clld - else: - detect_func = self._search_for_surface_using_plld - - # Compute Z search bounds for this batch - batch_lowest_immers = [ - container.get_absolute_location("c", "c", "cavity_bottom").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - for container, tip_len in zip(containers, tip_lengths) - ] - batch_start_pos = [ - container.get_absolute_location("c", "c", "t").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - + 5 - for container, tip_len in zip(containers, tip_lengths) - ] - - absolute_heights_measurements: Dict[int, List[Optional[float]]] = { - idx: [] for idx in range(len(use_channels)) - } - - # Run n_replicates detection loop for this batch - for _ in range(n_replicates): - errors = await asyncio.gather( - *[ - detect_func( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - ) - for channel, lip, sps in zip(use_channels, batch_lowest_immers, batch_start_pos) - ], - return_exceptions=True, - ) - - # Get heights for ALL channels, handling failures for channels with no liquid - current_absolute_liquid_heights = await self.request_pip_height_last_lld() - for idx, (channel_idx, error) in enumerate(zip(use_channels, errors)): - if isinstance(error, STARFirmwareError): - error_msg = str(error).lower() - if "no liquid level found" in error_msg or "no liquid was present" in error_msg: - height = None - msg = ( - f"Operation {idx} (channel {channel_idx}): No liquid detected. Could be because there is " - f"no liquid in container {containers[idx].name} or liquid level " - f"is too low." - ) - if lld_mode == self.LLDMode.GAMMA: - msg += " Consider using pressure-based LLD if liquid is believed to exist." - logger.warning(msg) - else: - raise error - elif isinstance(error, Exception): - raise error - else: - height = current_absolute_liquid_heights[channel_idx] - absolute_heights_measurements[idx].append(height) - - # Compute liquid heights relative to well bottom - relative_to_well: List[float] = [] - inconsistent_ops: List[str] = [] - - for idx, container in enumerate(containers): - measurements = absolute_heights_measurements[idx] - valid = [m for m in measurements if m is not None] - cavity_bottom = container.get_absolute_location("c", "c", "cavity_bottom").z - - if len(valid) == 0: - relative_to_well.append(0.0) - elif len(valid) == len(measurements): - relative_to_well.append(sum(valid) / len(valid) - cavity_bottom) - else: - inconsistent_ops.append( - f"Operation {idx}: {len(valid)}/{len(measurements)} replicates detected liquid" - ) - - if inconsistent_ops: - raise RuntimeError( - "Inconsistent liquid detection across replicates. " - "This may indicate liquid levels near the detection limit:\n" + "\n".join(inconsistent_ops) - ) - - return relative_to_well - - def _get_maximum_minimum_spacing_between_channels(self, use_channels: List[int]) -> float: - """Get the maximum of the set of minimum spacing requirements between the channels being used""" - sorted_channels = sorted(use_channels) - max_channel_spacing = max( - self._min_spacing_between(hi, lo) for hi, lo in zip(sorted_channels[1:], sorted_channels[:-1]) - ) - return max_channel_spacing - - def _compute_channels_in_resource_locations( + async def execute_batched( self, - resources: Sequence[Resource], - use_channels: List[int], - offsets: Optional[List[Coordinate]], - ) -> List[Coordinate]: - """Compute absolute locations of resources with given offsets.""" - - # If no offset is provided but we can fit all channels inside a single resource, - # compute the offsets to make that happen using wide spacing. - if offsets is None: - if len(set(resources)) == 1 and len(use_channels) == len(set(use_channels)): - container_size_y = resources[0].get_absolute_size_y() - # For non-consecutive channels (e.g. [0,1,2,5,6,7]), we must account for - # phantom intermediate channels (3,4) that physically exist between them. - # Compute offsets for the full channel range (min to max), then pick only - # the offsets corresponding to the actual channels being used. - max_channel_spacing = self._get_maximum_minimum_spacing_between_channels(use_channels) - num_channels_in_span = max(use_channels) - min(use_channels) + 1 - min_required = MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * max_channel_spacing - if container_size_y >= min_required: - all_offsets = get_wide_single_resource_liquid_op_offsets( - resource=resources[0], - num_channels=num_channels_in_span, - min_spacing=max_channel_spacing, - ) - min_ch = min(use_channels) - offsets = [all_offsets[ch - min_ch] for ch in use_channels] - - if num_channels_in_span % 2 != 0: - y_offset = 5.5 - offsets = [offset + Coordinate(0, y_offset, 0) for offset in offsets] - # else: container too small to fit all channels — fall back to center offsets. - # Y sub-batching will serialize channels that can't coexist. - - offsets = offsets or [Coordinate.zero()] * len(resources) - - # Compute positions for all resources - resource_locations = [ - resource.get_location_wrt(self.deck, x="c", y="c", z="b") + offset - for resource, offset in zip(resources, offsets) - ] - - return resource_locations - - async def execute_batched( # TODO: any hamilton liquid handler - self, - func: Callable[[List[int]], Awaitable[None]], - resources: List[Container], - use_channels: Optional[List[int]] = None, - resource_offsets: Optional[List[Coordinate]] = None, + func: Callable[[ChannelBatch], Awaitable[None]], + batches: List[ChannelBatch], min_traverse_height_during_command: Optional[float] = None, - ): - if use_channels is None: - use_channels = list(range(len(resources))) - - # precompute locations and batches - locations = self._compute_channels_in_resource_locations( - resources, use_channels, resource_offsets - ) - x_batches = group_by_x_batch_by_xy( - locations=locations, - use_channels=use_channels, - min_spacing_between_channels=self._min_spacing_between, - ) + ) -> None: + """Execute a Z-axis callback across pre-planned batches with X/Y positioning. - # loop over batches. keep track of channels used in previous batch to ensure they are raised to traverse height before next batch - prev_channels: Optional[List[int]] = None + Handles inter-batch safety: raises channels between batches, moves X when the + X group changes, and positions Y before calling *func*. On error or + KeyboardInterrupt, channels are moved to Z safety before re-raising. + Args: + func: Async callback that receives a ``ChannelBatch`` and performs Z-axis work + (e.g. liquid level detection, z-touch probing). Must not move X or Y. + batches: Pre-planned batches from ``plan_batches()``. + min_traverse_height_during_command: Absolute Z height (mm) for inter-batch + channel raises. ``None`` uses full Z safety. + """ try: - for x_value, x_batch in x_batches.items(): - if prev_channels is not None: - await self._move_to_traverse_height( - channels=prev_channels, traverse_height=min_traverse_height_during_command - ) - await self.move_channel_x(0, x_value) - - for y_batch in x_batch: - if prev_channels is not None: - await self._move_to_traverse_height( - channels=prev_channels, traverse_height=min_traverse_height_during_command + prev_batch: Optional[ChannelBatch] = None + for batch in batches: + if prev_batch is not None: + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_batch.channels} ) - await self.position_channels_in_y_direction( - {use_channels[idx]: locations[idx].y for idx in y_batch}, - ) - await func(y_batch) + if prev_batch is None or batch.x_position != prev_batch.x_position: + await self.move_channel_x(0, batch.x_position) + + await self.position_channels_in_y_direction(batch.y_positions) + await func(batch) + prev_batch = batch - prev_channels = [use_channels[idx] for idx in y_batch] - except Exception: + except Exception: # firmware errors, RuntimeError, etc. await self.move_all_channels_in_z_safety() raise - except BaseException: + except BaseException: # KeyboardInterrupt, SystemExit — still must raise channels await self.move_all_channels_in_z_safety() raise @@ -2269,12 +2084,13 @@ async def probe_liquid_heights( lld_mode: LLDMode = LLDMode.GAMMA, search_speed: float = 10.0, n_replicates: int = 1, + move_to_z_safety_after: bool = True, # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) min_traverse_height_at_beginning_of_command: Optional[float] = None, min_traverse_height_during_command: Optional[float] = None, z_position_at_end_of_command: Optional[float] = None, - # Deprecated - move_to_z_safety_after: Optional[bool] = None, + # X grouping tolerance (mm) — containers within this distance share an X group + x_grouping_tolerance: Optional[float] = None, ) -> List[float]: """Probe liquid surface heights in containers using liquid level detection. @@ -2282,107 +2098,183 @@ async def probe_liquid_heights( container positions and sensing the liquid surface. Heights are measured from the bottom of each container's cavity. + Uses ``plan_batches`` for X/Y partitioning and auto-spreading, then ``execute_batched`` + to iterate batches with Z safety. + Args: containers: List of Container objects to probe, one per channel. use_channels: Channel indices to use for probing (0-indexed). - resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single - containers with odd channel counts to avoid center dividers. Defaults to container centers. + resource_offsets: Optional XYZ offsets from container centers. When not provided, + ``plan_batches`` auto-spreads channels targeting the same container. lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. Defaults to capacitive. search_speed: Z-axis search speed in mm/s. Default 10.0 mm/s. n_replicates: Number of measurements per channel. Default 1. + move_to_z_safety_after: Whether to move channels to safe Z height after probing. + Set to False when probing is immediately followed by another Z operation (e.g. + aspirate) to avoid unnecessary Z travel. Default True. min_traverse_height_at_beginning_of_command: Absolute Z height (mm) to move involved channels to before the first batch. None (default) uses full Z safety. min_traverse_height_during_command: Absolute Z height (mm) to move involved channels to - between batches (X groups and Y sub-batches). None (default) uses full Z safety. + between batches. None (default) uses full Z safety. z_position_at_end_of_command: Absolute Z height (mm) to move involved channels to after probing. None (default) uses full Z safety. + x_grouping_tolerance: Containers within this X distance (mm) are grouped and probed + together. Defaults to ``_x_grouping_tolerance_mm`` (0.1 mm). Returns: Mean of measured liquid heights for each container (mm from cavity bottom). Raises: - RuntimeError: If channels lack tips. - - Notes: - - All specified channels must have tips attached - - Containers at different X positions are probed in sequential groups (single X carriage) - - For single containers with odd channel counts, Y-offsets are applied to avoid - center dividers (Hamilton 1000 uL spacing: 9mm, offset: 5.5mm) + ValueError: If ``use_channels`` is empty, contains out-of-range indices, contains + duplicates, or if input list lengths don't match. + RuntimeError: If any specified channel lacks a tip. """ - if move_to_z_safety_after is not None: - warnings.warn( - "The 'move_to_z_safety_after' parameter is deprecated and will be removed in a future release. " - "Use 'z_position_at_end_of_command' with an appropriate Z height instead. If not set, " - "the default behavior will be to move to full Z safety after the command.", - DeprecationWarning, - ) - - # Validate parameters. - if use_channels is None: - use_channels = list(range(len(containers))) - if len(use_channels) == 0: - raise ValueError("use_channels must not be empty.") - if not all(0 <= ch < self.num_channels for ch in use_channels): - raise ValueError( - f"All use_channels must be integers in range [0, {self.num_channels - 1}], " - f"got {use_channels}." - ) + if x_grouping_tolerance is None: + x_grouping_tolerance = self._x_grouping_tolerance_mm + if n_replicates < 1: + raise ValueError(f"n_replicates must be >= 1, got {n_replicates}.") if lld_mode not in {self.LLDMode.GAMMA, self.LLDMode.PRESSURE}: raise ValueError(f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {lld_mode}") - if not len(containers) == len(use_channels): - raise ValueError( - "Length of containers and use_channels must match, " - f"got lengths {len(containers)}, {len(use_channels)}." - ) + use_channels = validate_channel_selections( + containers=containers, + use_channels=use_channels, + num_channels=self.num_channels, + ) - # Validate resource_offsets length (if provided) to avoid silent truncation in downstream zips. - if resource_offsets is not None and len(resource_offsets) != len(containers): - raise ValueError( - "Length of resource_offsets must match the length of containers and use_channels, " - f"got lengths {len(resource_offsets)} (resource_offsets) and " - f"{len(containers)} (containers/use_channels)." - ) - # Make sure we have tips on all channels and know their lengths + # Verify tips and query tip lengths tip_presence = await self.request_tip_presence() if not all(tip_presence[idx] for idx in use_channels): raise RuntimeError("All specified channels must have tips attached.") + tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - # Move channels to traverse height - await self._move_to_traverse_height( - channels=use_channels, traverse_height=min_traverse_height_at_beginning_of_command + # TODO: this raises ALL channels to max Z, then lowers involved channels back down — + # wasteful yoyo motion. Should raise uninvolved to safety and involved to + # min_traverse_height_at_beginning_of_command in one pass. Requires a channel-filtered + # version of move_all_channels_in_z_safety. + await self.move_all_channels_in_z_safety() + if min_traverse_height_at_beginning_of_command is not None: + await self.position_channels_in_z_direction( + {ch: min_traverse_height_at_beginning_of_command for ch in use_channels} + ) + + # Compute Z positions + z_cavity_bottom: List[float] = [] + z_top: List[float] = [] + for resource in containers: + z_cavity_bottom.append(resource.get_location_wrt(self.deck, "c", "c", "cavity_bottom").z) + z_top.append(resource.get_location_wrt(self.deck, "c", "c", "t").z) + + batches = plan_batches( + use_channels=use_channels, + targets=containers, + channel_spacings=self._channels_minimum_y_spacing, + x_tolerance=x_grouping_tolerance, + wrt_resource=self.deck, + resource_offsets=resource_offsets, ) - result_by_operation: Dict[int, float] = {} + # Select detection function and kwargs + detect_func: Callable[..., Any] + if lld_mode == self.LLDMode.GAMMA: + detect_func = self._move_z_drive_to_liquid_surface_using_clld + else: + detect_func = self._search_for_surface_using_plld + + # Execute batches + absolute_heights_measurements: Dict[int, List[Optional[float]]] = { + ch: [] for ch in use_channels + } - async def func(batch: List[int]): - liquid_heights = await self._probe_liquid_heights_batch( - containers=[containers[idx] for idx in batch], - use_channels=[use_channels[idx] for idx in batch], - lld_mode=lld_mode, - search_speed=search_speed, - n_replicates=n_replicates, - ) - for idx, height in zip(batch, liquid_heights): - result_by_operation[idx] = height + async def _probe_batch_heights(batch: ChannelBatch) -> None: + batch_lowest_immers = [ + z_cavity_bottom[i] + tip_lengths[i] - self.DEFAULT_TIP_FITTING_DEPTH for i in batch.indices + ] + batch_start_pos = [ + z_top[i] + tip_lengths[i] - self.DEFAULT_TIP_FITTING_DEPTH + self.SEARCH_START_CLEARANCE_MM + for i in batch.indices + ] + + for _ in range(n_replicates): + results = await asyncio.gather( + *[ + detect_func( + channel_idx=channel, + lowest_immers_pos=lip, + start_pos_search=sps, + channel_speed=search_speed, + ) + for channel, lip, sps in zip(batch.channels, batch_lowest_immers, batch_start_pos) + ], + return_exceptions=True, + ) + + current_absolute_liquid_heights = await self.request_pip_height_last_lld() + for local_idx, (ch_idx, result) in enumerate(zip(batch.channels, results)): + orig_idx = batch.indices[local_idx] + if isinstance(result, STARFirmwareError): + error_msg = str(result).lower() + if "no liquid level found" in error_msg or "no liquid was present" in error_msg: + height = None + msg = ( + f"Channel {ch_idx}: No liquid detected. Could be because there is " + f"no liquid in container {containers[orig_idx].name} or liquid level " + f"is too low." + ) + if lld_mode == self.LLDMode.GAMMA: + msg += " Consider using pressure-based LLD if liquid is believed to exist." + logger.warning(msg) + else: + raise result + elif isinstance(result, Exception): + raise result + else: + height = current_absolute_liquid_heights[ch_idx] + absolute_heights_measurements[ch_idx].append(height) await self.execute_batched( - func=func, - resources=containers, - use_channels=use_channels, - resource_offsets=resource_offsets, + func=_probe_batch_heights, + batches=batches, min_traverse_height_during_command=min_traverse_height_during_command, ) - await self._move_to_traverse_height( - channels=use_channels, - traverse_height=z_position_at_end_of_command, - ) + # Compute liquid heights relative to well bottom + relative_to_well: List[float] = [] + inconsistent_channels: List[str] = [] + + for idx, (ch, container) in enumerate(zip(use_channels, containers)): + measurements = absolute_heights_measurements[ch] + valid = [m for m in measurements if m is not None] + cavity_bottom = z_cavity_bottom[idx] - return [result_by_operation[idx] for idx in range(len(containers))] + if len(valid) == 0: + relative_to_well.append(0.0) + elif len(valid) == len(measurements): + relative_to_well.append(sum(valid) / len(valid) - cavity_bottom) + else: + inconsistent_channels.append( + f"Channel {ch}: {len(valid)}/{len(measurements)} replicates detected liquid" + ) + + if inconsistent_channels: + raise RuntimeError( + "Inconsistent liquid detection across replicates. " + "This may indicate liquid levels near the detection limit:\n" + + "\n".join(inconsistent_channels) + ) + + if move_to_z_safety_after: + if z_position_at_end_of_command is None: + await self.move_all_channels_in_z_safety() + else: + await self.position_channels_in_z_direction( + {ch: z_position_at_end_of_command for ch in use_channels} + ) + + return relative_to_well async def probe_liquid_volumes( self, @@ -10483,8 +10375,9 @@ async def clld_probe_y_position_using_channel( # Machine-compatibility check of calculated parameters assert 0 <= max_y_search_pos_increments <= 13_714, ( - "Maximum y search position must be between \n0 and" - + f"{STARBackend.y_drive_increment_to_mm(13_714) + 9} mm, is {max_y_search_pos_increments} mm" + "Maximum y search position must be between 0 and " + + f"{STARBackend.y_drive_increment_to_mm(13_714) + self._channels_minimum_y_spacing[0]:.1f} mm, " + + f"is {STARBackend.y_drive_increment_to_mm(max_y_search_pos_increments):.1f} mm" ) assert 20 <= channel_speed_increments <= 8_000, ( f"LLD search speed must be between \n{STARBackend.y_drive_increment_to_mm(20)}" @@ -11216,6 +11109,7 @@ async def request_tip_len_on_channel(self, channel_idx: int) -> float: MAXIMUM_CHANNEL_Z_POSITION = 334.7 # mm (= z-drive increment 31_200) MINIMUM_CHANNEL_Z_POSITION = 99.98 # mm (= z-drive increment 9_320) DEFAULT_TIP_FITTING_DEPTH = 8 # mm, for 10, 50, 300, 1000 ul Hamilton tips + SEARCH_START_CLEARANCE_MM = 5 # mm above container top for LLD search start position async def ztouch_probe_z_height_using_channel( self, @@ -11407,10 +11301,10 @@ async def get_channels_y_positions(self) -> Dict[int, float]: y_positions = [round(y / 10, 2) for y in resp["ry"]] # sometimes there is (likely) a floating point error and channels are reported to be - # less than their minimum spacing apart (typically 9 mm). (When you set channels using - # position_channels_in_y_direction, it will raise an error.) The minimum y is 6mm, - # so we fix that first (in case that value is misreported). Then, we traverse the - # list in reverse and enforce pairwise minimum spacing. + # closer together than the minimum required spacing. (When you set channels using + # position_channels_in_y_direction, it will raise an error.) We first ensure the last + # channel is not reported in front of the known minimum Y position, then traverse the + # list in reverse and enforce the per-channel minimum spacing. min_y = self.extended_conf.left_arm_min_y_position if y_positions[-1] < min_y - 0.2: raise RuntimeError( @@ -11423,9 +11317,9 @@ async def get_channels_y_positions(self) -> Dict[int, float]: y_positions[-1] = min_y for i in range(len(y_positions) - 2, -1, -1): - spacing = self._min_spacing_between(i, i + 1) - if y_positions[i] - y_positions[i + 1] < spacing: - y_positions[i] = y_positions[i + 1] + spacing + min_diff = self._min_spacing_between(i, i + 1) + if y_positions[i] - y_positions[i + 1] < min_diff: + y_positions[i] = y_positions[i + 1] + min_diff return {channel_idx: y for channel_idx, y in enumerate(y_positions)} @@ -11451,37 +11345,30 @@ async def position_channels_in_y_direction(self, ys: Dict[int, float], make_spac channel_locations[channel_idx] = y if make_space: + # For the channels to the back of `back_channel`, make sure the space between them + # meets the per-pair minimum. We start with the channel closest to `back_channel`, and + # make sure the channel behind it is spaced correctly, updating if needed. use_channels = list(ys.keys()) back_channel = min(use_channels) - front_channel = max(use_channels) + for channel_idx in range(back_channel, 0, -1): + pair_spacing = self._min_spacing_between(channel_idx - 1, channel_idx) + if (channel_locations[channel_idx - 1] - channel_locations[channel_idx]) < pair_spacing: + channel_locations[channel_idx - 1] = channel_locations[channel_idx] + pair_spacing - # Position channels in between used channels + # Position intermediate channels between back_channel and front_channel. + front_channel = max(use_channels) for intermediate_ch in range(back_channel + 1, front_channel): if intermediate_ch not in ys: - channel_locations[intermediate_ch] = channel_locations[ - intermediate_ch - 1 - ] - self._min_spacing_between(intermediate_ch - 1, intermediate_ch) - - # For the channels to the back of `back_channel`, make sure the space between them is - # >=9mm. We start with the channel closest to `back_channel`, and make sure the - # channel behind it is at least 9mm, updating if needed. Iterating from the front (closest - # to `back_channel`) to the back (channel 0), all channels are put at the correct location. - # This order matters because the channel in front of any channel may have been moved in the - # previous iteration. - # Note that if a channel is already spaced at >=9mm, it is not moved. - for channel_idx in range(back_channel, 0, -1): - spacing = self._min_spacing_between(channel_idx - 1, channel_idx) - if (channel_locations[channel_idx - 1] - channel_locations[channel_idx]) < spacing: - channel_locations[channel_idx - 1] = channel_locations[channel_idx] + spacing + pair_spacing = self._min_spacing_between(intermediate_ch - 1, intermediate_ch) + channel_locations[intermediate_ch] = channel_locations[intermediate_ch - 1] - pair_spacing # Similarly for the channels to the front of `front_channel`, make sure they are all - # spaced >= channel_minimum_y_spacing (usually 9mm) apart. This time, we iterate from - # back (closest to `front_channel`) to the front (lh.backend.num_channels - 1), and - # put each channel >= channel_minimum_y_spacing before the one behind it. + # spaced by the per-pair minimum. This time, we iterate from back (closest to + # `front_channel`) to the front (lh.backend.num_channels - 1). for channel_idx in range(front_channel, self.num_channels - 1): - spacing = self._min_spacing_between(channel_idx, channel_idx + 1) - if (channel_locations[channel_idx] - channel_locations[channel_idx + 1]) < spacing: - channel_locations[channel_idx + 1] = channel_locations[channel_idx] - spacing + pair_spacing = self._min_spacing_between(channel_idx, channel_idx + 1) + if (channel_locations[channel_idx] - channel_locations[channel_idx + 1]) < pair_spacing: + channel_locations[channel_idx + 1] = channel_locations[channel_idx] - pair_spacing # Quick checks before movement. if channel_locations[0] > 650: @@ -11567,7 +11454,7 @@ async def pierce_foil( offsets = get_wide_single_resource_liquid_op_offsets( resource=well, num_channels=len(piercing_channels), - min_spacing=self._get_maximum_minimum_spacing_between_channels(piercing_channels), + min_spacing=max(self._channels_minimum_y_spacing), ) else: offsets = get_tight_single_resource_liquid_op_offsets( diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index fc642de8b33..847b9bfdc32 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -12,8 +12,18 @@ MachineConfiguration, STARBackend, ) +from pylabrobot.liquid_handling.pipette_batch_scheduling import ( + plan_batches, + print_batches, + validate_channel_selections, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.well import Well +# Type alias for nested enum (for cleaner signatures) +LLDMode = STARBackend.LLDMode + _DEFAULT_MACHINE_CONFIGURATION = MachineConfiguration( pip_type_1000ul=True, kb_iswap_installed=True, @@ -213,6 +223,10 @@ async def channel_request_y_minimum_spacing(self, channel_idx: int) -> float: ) return self._channels_minimum_y_spacing[channel_idx] + async def channels_request_y_minimum_spacing(self) -> List[float]: + """Return mock per-channel minimum Y spacings for all channels.""" + return list(self._channels_minimum_y_spacing) + async def move_channel_y(self, channel: int, y: float): print(f"moving channel {channel} to y: {y}") @@ -316,3 +330,73 @@ async def position_channels_in_y_direction(self, ys, make_space=True): async def request_pip_height_last_lld(self): return list(range(12)) + + async def probe_liquid_heights( + self, + containers: List[Container], + use_channels: Optional[List[int]] = None, + resource_offsets: Optional[List[Coordinate]] = None, + lld_mode: LLDMode = LLDMode.GAMMA, + search_speed: float = 10.0, + n_replicates: int = 1, + move_to_z_safety_after: bool = True, + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, + x_grouping_tolerance: Optional[float] = None, + ) -> List[float]: + """Probe liquid heights by computing from tracked container volumes. + + Instead of simulating hardware LLD, this mock computes liquid heights directly from + each container's volume tracker using ``container.compute_height_from_volume()``. + + Args: + containers: List of Container objects to probe, one per channel. + use_channels: Channel indices to use (0-indexed). Defaults to ``[0, ..., len(containers)-1]``. + resource_offsets: Passed to ``plan_batches`` for auto-spreading. See ``plan_batches``. + All other parameters: Accepted for API compatibility but unused in mock. + + Returns: + Liquid heights in mm from cavity bottom for each container, computed from tracked volumes. + + Raises: + ValueError: If ``use_channels`` is empty, contains out-of-range indices, or if + ``containers`` and ``use_channels`` have different lengths. + NoTipError: If any specified channel lacks a tip. + """ + if x_grouping_tolerance is None: + x_grouping_tolerance = self._x_grouping_tolerance_mm + + use_channels = validate_channel_selections( + containers=containers, + use_channels=use_channels, + num_channels=self.num_channels, + ) + + # Validate tip presence using tip tracker + for ch in use_channels: + self.head[ch].get_tip() # Raises NoTipError if no tip + + batches = plan_batches( + use_channels=use_channels, + targets=containers, + channel_spacings=self._channels_minimum_y_spacing, + x_tolerance=x_grouping_tolerance, + wrt_resource=self.deck, + resource_offsets=resource_offsets, + ) + + print_batches(batches, use_channels, containers, label="probe_liquid_heights plan") + + # Compute heights from volume trackers + heights: List[float] = [] + for container in containers: + volume = container.tracker.get_used_volume() + if volume == 0: + heights.append(0.0) + else: + height = container.compute_height_from_volume(volume) + heights.append(height) + + print(f" heights: {[f'{h:.2f}' for h in heights]} mm") + return heights diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index b478ff7b637..1e81f76cb90 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1,5 +1,6 @@ # mypy: disable-error-code="attr-defined,method-assign" +import contextlib import unittest import unittest.mock from typing import Literal, cast @@ -39,7 +40,10 @@ UnknownHamiltonError, parse_star_fw_string, ) -from .STAR_chatterbox import _DEFAULT_EXTENDED_CONFIGURATION, _DEFAULT_MACHINE_CONFIGURATION +from .STAR_chatterbox import ( + _DEFAULT_EXTENDED_CONFIGURATION, + _DEFAULT_MACHINE_CONFIGURATION, +) class TestSTARResponseParsing(unittest.TestCase): @@ -1530,110 +1534,8 @@ async def test_1000uL_tips(self): tip_rack.unassign() -class TestChannelsMinimumYSpacing(unittest.IsolatedAsyncioTestCase): - """Test that different channel spacing configurations produce different behavior. - - Real firmware VY responses captured from hardware (GitHub issue #822): - - 4-channel 18mm single-rail: PVYidyc194 388 1 (yc[1]=388 → 18.0mm) - - 8-channel 9mm standard: PVYidyc000 194 0 (yc[1]=194 → 9.0mm) - """ - - # -- can_reach_position: reachability shrinks with wider spacing ---------------- - - async def test_can_reach_4ch_18mm_rejects_position_reachable_at_9mm(self): - """A position reachable by channel 0 at 9mm spacing is unreachable at 18mm spacing. - - Channel 0 (backmost) min_y = left_arm_min_y_position + sum(spacings[1..3]) - At 9mm: 6 + 9*3 = 33 → y=33 reachable - At 18mm: 6 + 18*3 = 60 → y=33 unreachable - """ - backend = STARBackend() - backend._num_channels = 4 - backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION - - backend._channels_minimum_y_spacing = [9.0] * 4 - self.assertTrue(backend.can_reach_position(0, Coordinate(100, 33, 100))) - - backend._channels_minimum_y_spacing = [18.0] * 4 - self.assertFalse(backend.can_reach_position(0, Coordinate(100, 33, 100))) - - async def test_can_reach_4ch_18mm_rejects_back_channel_too_far_back(self): - """At 18mm spacing, the backmost channel has a lower max_y than at 9mm. - - Channel 3 (frontmost) max_y = pip_maximal_y_position - sum(spacings[0..2]) - At 9mm: 606.5 - 9*3 = 579.5 → y=574 reachable - At 18mm: 606.5 - 18*3 = 552.5 → y=574 unreachable - """ - backend = STARBackend() - backend._num_channels = 4 - backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION - - backend._channels_minimum_y_spacing = [9.0] * 4 - self.assertTrue(backend.can_reach_position(3, Coordinate(100, 574, 100))) - - backend._channels_minimum_y_spacing = [18.0] * 4 - self.assertFalse(backend.can_reach_position(3, Coordinate(100, 574, 100))) - - # -- position_channels_in_y_direction: validation rejects tight positions ------- - - def _make_star_backend(self, num_channels, spacings): - """Helper: create a STARBackend with given channel count and spacings, mocking I/O.""" - backend = STARBackend() - backend._num_channels = num_channels - backend._channels_minimum_y_spacing = list(spacings) - backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION - backend.id_ = 0 - backend._write_and_read_command = unittest.mock.AsyncMock() - backend.get_channels_y_positions = unittest.mock.AsyncMock() - return backend - - async def test_position_channels_rejects_9mm_gap_when_spacing_is_18mm(self): - """With make_space=False, channels 9mm apart pass validation at 9mm but are rejected at 18mm.""" - spread_positions = {0: 100.0, 1: 91.0, 2: 82.0, 3: 73.0} - - # At 9mm: channels spaced 9mm apart → valid, JY command is sent. - backend_9 = self._make_star_backend(4, [9.0] * 4) - backend_9.get_channels_y_positions.return_value = dict(spread_positions) - await backend_9.position_channels_in_y_direction(spread_positions, make_space=False) - self.assertTrue(backend_9._write_and_read_command.called) - - # At 18mm: same positions → rejected. - backend_18 = self._make_star_backend(4, [18.0] * 4) - backend_18.get_channels_y_positions.return_value = dict(spread_positions) - with self.assertRaises(ValueError): - await backend_18.position_channels_in_y_direction(spread_positions, make_space=False) - - async def test_position_channels_make_space_spreads_wider_at_18mm(self): - """make_space=True pushes non-target channels further apart at 18mm than at 9mm. - - Move only channel 2 to y=40. make_space adjusts channels 3 (in front of channel 2) - to respect minimum spacing. At 9mm it pushes channel 3 to 31, at 18mm to 22. - """ - current = {0: 300.0, 1: 200.0, 2: 100.0, 3: 50.0} - requested = {2: 40.0} - - # At 9mm: channel 3 must be ≤ 40 - 9 = 31. - backend_9 = self._make_star_backend(4, [9.0] * 4) - backend_9.get_channels_y_positions.return_value = dict(current) - await backend_9.position_channels_in_y_direction(dict(requested), make_space=True) - cmd_9mm = backend_9._write_and_read_command.call_args.kwargs["cmd"] - # Channel 3 pushed to 31.0 → 310 increments. - self.assertIn("0310", cmd_9mm) - - # At 18mm: channel 3 must be ≤ 40 - 18 = 22. - backend_18 = self._make_star_backend(4, [18.0] * 4) - backend_18.get_channels_y_positions.return_value = dict(current) - await backend_18.position_channels_in_y_direction(dict(requested), make_space=True) - cmd_18mm = backend_18._write_and_read_command.call_args.kwargs["cmd"] - # Channel 3 pushed to 22.0 → 220 increments. - self.assertIn("0220", cmd_18mm) - - # The JY commands must differ. - self.assertNotEqual(cmd_9mm, cmd_18mm) - - -class STARTestBase(unittest.IsolatedAsyncioTestCase): - """Shared setup for probe/batch/helper tests.""" +class TestProbeLiquidHeights(unittest.IsolatedAsyncioTestCase): + """Tests for probe_liquid_heights: detection dispatch, replicates, error handling.""" async def asyncSetUp(self): self.STAR = STARBackend(read_timeout=1) @@ -1671,179 +1573,73 @@ def _put_tips_on_channels(self, channels): tip = self.tip_rack.get_tip("A1") self.lh.update_head_state({ch: tip for ch in channels}) + def _standard_mocks(self, detect_side_effect=None): + """Return a context manager stack with standard mocks for probe_liquid_heights.""" + mocks = {} + + if detect_side_effect is None: + detect_side_effect = unittest.mock.AsyncMock(return_value=None) + mocks["detect"] = unittest.mock.patch.object( + self.STAR, "_move_z_drive_to_liquid_surface_using_clld", detect_side_effect + ) + mocks["plld"] = unittest.mock.patch.object( + self.STAR, + "_search_for_surface_using_plld", + new_callable=unittest.mock.AsyncMock, + return_value=None, + ) + mocks["pip_height"] = unittest.mock.patch.object( + self.STAR, + "request_pip_height_last_lld", + new_callable=unittest.mock.AsyncMock, + return_value=list(range(12)), + ) + mocks["tip_len"] = unittest.mock.patch.object( + self.STAR, + "request_tip_len_on_channel", + new_callable=unittest.mock.AsyncMock, + return_value=59.9, + ) + mocks["tip_presence"] = unittest.mock.patch.object( + self.STAR, + "request_tip_presence", + new_callable=unittest.mock.AsyncMock, + return_value={i: True for i in range(8)}, + ) + mocks["z_safety"] = unittest.mock.patch.object( + self.STAR, + "move_all_channels_in_z_safety", + new_callable=unittest.mock.AsyncMock, + ) + mocks["move_x"] = unittest.mock.patch.object( + self.STAR, + "move_channel_x", + new_callable=unittest.mock.AsyncMock, + ) + mocks["pos_y"] = unittest.mock.patch.object( + self.STAR, + "position_channels_in_y_direction", + new_callable=unittest.mock.AsyncMock, + ) + mocks["backmost_y"] = unittest.mock.patch.object( + self.STAR.extended_conf, + "pip_maximal_y_position", + 606.5, + ) + return mocks -class TestMoveToTraverseHeight(STARTestBase): - async def test_none_calls_z_safety(self): - with unittest.mock.patch.object( - self.STAR, "move_all_channels_in_z_safety", new_callable=unittest.mock.AsyncMock - ) as mock_z_safety: - await self.STAR._move_to_traverse_height(channels=[0, 1], traverse_height=None) - mock_z_safety.assert_awaited_once() - - async def test_float_calls_position_z(self): - with unittest.mock.patch.object( - self.STAR, "position_channels_in_z_direction", new_callable=unittest.mock.AsyncMock - ) as mock_pos_z: - await self.STAR._move_to_traverse_height(channels=[0, 2], traverse_height=245.0) - mock_pos_z.assert_awaited_once_with({0: 245.0, 2: 245.0}) - - -class TestComputeChannelsInResourceLocations(STARTestBase): - async def test_explicit_offsets(self): - wells = [self.plate.get_item("A1"), self.plate.get_item("B1")] - offsets = [Coordinate(0, 0, 0), Coordinate(0, 0, 0)] - locs = self.STAR._compute_channels_in_resource_locations( - resources=wells, use_channels=[0, 1], offsets=offsets - ) - self.assertEqual(len(locs), 2) - self.assertAlmostEqual(locs[0].x, 298.3, places=1) - self.assertAlmostEqual(locs[0].y, 145.7, places=1) - self.assertAlmostEqual(locs[1].x, 298.3, places=1) - self.assertAlmostEqual(locs[1].y, 136.7, places=1) - - async def test_none_offsets_single_resource(self): - """Same resource twice: both channels get the well center (offset auto-calc falls back to zero for small wells).""" - well = self.plate.get_item("A1") - locs = self.STAR._compute_channels_in_resource_locations( - resources=[well, well], use_channels=[0, 1], offsets=None - ) - self.assertEqual(len(locs), 2) - self.assertAlmostEqual(locs[0].x, 298.3, places=1) - self.assertAlmostEqual(locs[0].y, 145.7, places=1) - self.assertAlmostEqual(locs[1].x, 298.3, places=1) - self.assertAlmostEqual(locs[1].y, 145.7, places=1) - - async def test_none_offsets_different_resources(self): - """Different resources with no offsets: each gets its own center.""" - well_a1 = self.plate.get_item("A1") - well_b1 = self.plate.get_item("B1") - locs = self.STAR._compute_channels_in_resource_locations( - resources=[well_a1, well_b1], use_channels=[0, 1], offsets=None - ) - self.assertEqual(len(locs), 2) - self.assertAlmostEqual(locs[0].x, 298.3, places=1) - self.assertAlmostEqual(locs[0].y, 145.7, places=1) - self.assertAlmostEqual(locs[1].x, 298.3, places=1) - self.assertAlmostEqual(locs[1].y, 136.7, places=1) - - -class TestExecuteBatched(STARTestBase): - async def test_single_batch(self): - well = self.plate.get_item("A1") - calls = [] - - async def func(batch): - calls.append(batch) - - self._put_tips_on_channels([0, 1]) - - with ( - unittest.mock.patch.object(self.STAR, "move_channel_x", new_callable=unittest.mock.AsyncMock), - unittest.mock.patch.object( - self.STAR, "position_channels_in_y_direction", new_callable=unittest.mock.AsyncMock - ), - unittest.mock.patch.object( - self.STAR, "move_all_channels_in_z_safety", new_callable=unittest.mock.AsyncMock - ), - ): - await self.STAR.execute_batched( - func=func, - resources=[well, well], - use_channels=[0, 1], - ) - - all_indices = [i for call in calls for i in call] - self.assertEqual(sorted(all_indices), [0, 1]) - - async def test_different_x_groups(self): - well_a1 = self.plate.get_item("A1") - well_a2 = self.plate.get_item("A2") - calls = [] - - async def func(batch): - calls.append(list(batch)) - - self._put_tips_on_channels([0, 1]) - - with ( - unittest.mock.patch.object(self.STAR, "move_channel_x", new_callable=unittest.mock.AsyncMock), - unittest.mock.patch.object( - self.STAR, "position_channels_in_y_direction", new_callable=unittest.mock.AsyncMock - ), - unittest.mock.patch.object( - self.STAR, "move_all_channels_in_z_safety", new_callable=unittest.mock.AsyncMock - ), - ): - await self.STAR.execute_batched( - func=func, - resources=[well_a1, well_a2], - use_channels=[0, 1], - resource_offsets=[Coordinate.zero(), Coordinate.zero()], - ) - - all_indices = [i for call in calls for i in call] - self.assertEqual(sorted(all_indices), [0, 1]) - - async def test_traverse_height(self): - well_a1 = self.plate.get_item("A1") - well_a2 = self.plate.get_item("A2") - - async def func(batch): - pass - - self._put_tips_on_channels([0, 1]) - - with ( - unittest.mock.patch.object(self.STAR, "move_channel_x", new_callable=unittest.mock.AsyncMock), - unittest.mock.patch.object( - self.STAR, "position_channels_in_y_direction", new_callable=unittest.mock.AsyncMock - ), - unittest.mock.patch.object( - self.STAR, "_move_to_traverse_height", new_callable=unittest.mock.AsyncMock - ) as mock_traverse, - ): - await self.STAR.execute_batched( - func=func, - resources=[well_a1, well_a2], - use_channels=[0, 1], - resource_offsets=[Coordinate.zero(), Coordinate.zero()], - min_traverse_height_during_command=200.0, - ) - - if mock_traverse.await_count > 0: - for call in mock_traverse.call_args_list: - self.assertEqual(call.kwargs.get("traverse_height"), 200.0) - - -class TestProbeLiquidHeightsBatch(STARTestBase): async def test_single_well_returns_height(self): well = self.plate.get_item("A1") self._put_tips_on_channels([0]) - with ( - unittest.mock.patch.object( - self.STAR, - "_move_z_drive_to_liquid_surface_using_clld", - new_callable=unittest.mock.AsyncMock, - return_value=None, - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - result = await self.STAR._probe_liquid_heights_batch(containers=[well], use_channels=[0]) + mocks = self._standard_mocks() + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + result = await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0]) # request_pip_height_last_lld returns list(range(12)), so channel 0 gets height 0. - # relative = 0 - cavity_bottom_z = 0 - 186.65 = -186.65 + # relative = 0 - cavity_bottom_z self.assertEqual(len(result), 1) self.assertAlmostEqual(result[0], 0 - well.get_absolute_location("c", "c", "cavity_bottom").z) @@ -1852,26 +1648,11 @@ async def test_n_replicates(self): self._put_tips_on_channels([0]) mock_detect = unittest.mock.AsyncMock(return_value=None) - with ( - unittest.mock.patch.object( - self.STAR, "_move_z_drive_to_liquid_surface_using_clld", mock_detect - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - await self.STAR._probe_liquid_heights_batch( - containers=[well], use_channels=[0], n_replicates=3 - ) + mocks = self._standard_mocks(detect_side_effect=mock_detect) + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0], n_replicates=3) self.assertEqual(mock_detect.await_count, 3) @@ -1894,26 +1675,13 @@ async def test_no_liquid_detected_returns_zero(self): async def raise_error(**kwargs): raise error - with ( - unittest.mock.patch.object( - self.STAR, - "_move_z_drive_to_liquid_surface_using_clld", - unittest.mock.AsyncMock(side_effect=raise_error), - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - result = await self.STAR._probe_liquid_heights_batch(containers=[well], use_channels=[0]) + mocks = self._standard_mocks( + detect_side_effect=unittest.mock.AsyncMock(side_effect=raise_error) + ) + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + result = await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0]) self.assertEqual(result[0], 0.0) @@ -1942,58 +1710,129 @@ async def side_effect(**kwargs): return None raise error - with ( - unittest.mock.patch.object( - self.STAR, - "_move_z_drive_to_liquid_surface_using_clld", - unittest.mock.AsyncMock(side_effect=side_effect), - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): + mocks = self._standard_mocks( + detect_side_effect=unittest.mock.AsyncMock(side_effect=side_effect) + ) + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) with self.assertRaises(RuntimeError): - await self.STAR._probe_liquid_heights_batch( - containers=[well], use_channels=[0], n_replicates=2 - ) + await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0], n_replicates=2) async def test_pressure_lld_mode(self): well = self.plate.get_item("A1") self._put_tips_on_channels([0]) - with ( - unittest.mock.patch.object( - self.STAR, - "_search_for_surface_using_plld", - new_callable=unittest.mock.AsyncMock, - return_value=None, - ) as mock_plld, - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - await self.STAR._probe_liquid_heights_batch( + mocks = self._standard_mocks() + with contextlib.ExitStack() as stack: + entered = {k: stack.enter_context(v) for k, v in mocks.items()} + await self.STAR.probe_liquid_heights( containers=[well], use_channels=[0], lld_mode=self.STAR.LLDMode.PRESSURE, ) - mock_plld.assert_awaited_once() + entered["plld"].assert_awaited_once() + + +class TestChannelsMinimumYSpacing(unittest.IsolatedAsyncioTestCase): + """Test that different channel spacing configurations produce different behavior. + + Real firmware VY responses captured from hardware (GitHub issue #822): + - 4-channel 18mm single-rail: PVYidyc194 388 1 (yc[1]=388 → 18.0mm) + - 8-channel 9mm standard: PVYidyc000 194 0 (yc[1]=194 → 9.0mm) + """ + + # -- can_reach_position: reachability shrinks with wider spacing ---------------- + + async def test_can_reach_4ch_18mm_rejects_position_reachable_at_9mm(self): + """A position reachable by channel 0 at 9mm spacing is unreachable at 18mm spacing. + + Channel 0 (backmost) max_y = 601.6 - sum(spacings[0..0]) = 601.6 - 0 = 601.6 (same) + Channel 0 (backmost) min_y = 6 + sum(spacings[1..3]) + At 9mm: 6 + 9*3 = 33 → y=33 reachable + At 18mm: 6 + 18*3 = 60 → y=33 unreachable + """ + backend = STARBackend() + backend._num_channels = 4 + backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION + + backend._channels_minimum_y_spacing = [9.0] * 4 + self.assertTrue(backend.can_reach_position(0, Coordinate(100, 33, 100))) + + backend._channels_minimum_y_spacing = [18.0] * 4 + self.assertFalse(backend.can_reach_position(0, Coordinate(100, 33, 100))) + + async def test_can_reach_4ch_18mm_rejects_back_channel_too_far_back(self): + """At 18mm spacing, the backmost channel has a lower max_y than at 9mm. + + Channel 3 (frontmost) max_y = pip_maximal_y_position - sum(spacings[0..2]) + At 9mm: 606.5 - 9*3 = 579.5 → y=574 reachable + At 18mm: 606.5 - 18*3 = 552.5 → y=574 unreachable + """ + backend = STARBackend() + backend._num_channels = 4 + backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION + + backend._channels_minimum_y_spacing = [9.0] * 4 + self.assertTrue(backend.can_reach_position(3, Coordinate(100, 574, 100))) + + backend._channels_minimum_y_spacing = [18.0] * 4 + self.assertFalse(backend.can_reach_position(3, Coordinate(100, 574, 100))) + + # -- position_channels_in_y_direction: validation rejects tight positions ------- + + def _make_star_backend(self, num_channels, spacings): + """Helper: create a STARBackend with given channel count and spacings, mocking I/O.""" + backend = STARBackend() + backend._num_channels = num_channels + backend._channels_minimum_y_spacing = list(spacings) + backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION + backend.id_ = 0 + backend._write_and_read_command = unittest.mock.AsyncMock() + backend.get_channels_y_positions = unittest.mock.AsyncMock() + return backend + + async def test_position_channels_rejects_9mm_gap_when_spacing_is_18mm(self): + """With make_space=False, channels 9mm apart pass validation at 9mm but are rejected at 18mm.""" + spread_positions = {0: 100.0, 1: 91.0, 2: 82.0, 3: 73.0} + + # At 9mm: channels spaced 9mm apart → valid, JY command is sent. + backend_9 = self._make_star_backend(4, [9.0] * 4) + backend_9.get_channels_y_positions.return_value = dict(spread_positions) + await backend_9.position_channels_in_y_direction(spread_positions, make_space=False) + self.assertTrue(backend_9._write_and_read_command.called) + + # At 18mm: same positions → rejected. + backend_18 = self._make_star_backend(4, [18.0] * 4) + backend_18.get_channels_y_positions.return_value = dict(spread_positions) + with self.assertRaises(ValueError): + await backend_18.position_channels_in_y_direction(spread_positions, make_space=False) + + async def test_position_channels_make_space_spreads_wider_at_18mm(self): + """make_space=True pushes non-target channels further apart at 18mm than at 9mm. + + Move only channel 2 to y=40. make_space adjusts channels 3 (in front of channel 2) + to respect minimum spacing. At 9mm it pushes channel 3 to 31, at 18mm to 22. + """ + current = {0: 300.0, 1: 200.0, 2: 100.0, 3: 50.0} + requested = {2: 40.0} + + # At 9mm: channel 3 must be ≤ 40 - 9 = 31. + backend_9 = self._make_star_backend(4, [9.0] * 4) + backend_9.get_channels_y_positions.return_value = dict(current) + await backend_9.position_channels_in_y_direction(dict(requested), make_space=True) + cmd_9mm = backend_9._write_and_read_command.call_args.kwargs["cmd"] + # Channel 3 pushed to 31.0 → 310 increments. + self.assertIn("0310", cmd_9mm) + + # At 18mm: channel 3 must be ≤ 40 - 18 = 22. + backend_18 = self._make_star_backend(4, [18.0] * 4) + backend_18.get_channels_y_positions.return_value = dict(current) + await backend_18.position_channels_in_y_direction(dict(requested), make_space=True) + cmd_18mm = backend_18._write_and_read_command.call_args.kwargs["cmd"] + # Channel 3 pushed to 22.0 → 220 increments. + self.assertIn("0220", cmd_18mm) + + # The JY commands must differ. + self.assertNotEqual(cmd_9mm, cmd_18mm) diff --git a/pylabrobot/liquid_handling/backends/hamilton/planning.py b/pylabrobot/liquid_handling/backends/hamilton/planning.py deleted file mode 100644 index c050114bfc6..00000000000 --- a/pylabrobot/liquid_handling/backends/hamilton/planning.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import Callable, Dict, List - -from pylabrobot.liquid_handling.utils import MIN_SPACING_BETWEEN_CHANNELS -from pylabrobot.resources import Coordinate - - -def _default_min_spacing_between(channel1: int, channel2: int) -> float: - """Default minimum spacing between two channels, based on their indices.""" - return MIN_SPACING_BETWEEN_CHANNELS * abs(channel1 - channel2) - - -def group_by_x_batch_by_xy( - locations: List[Coordinate], - use_channels: List[int], - min_spacing_between_channels: Callable[[int, int], float] = _default_min_spacing_between, -) -> Dict[float, List[List[int]]]: - if len(use_channels) == 0: - raise ValueError("use_channels must not be empty.") - if len(locations) == 0: - raise ValueError("locations must not be empty.") - if len(locations) != len(use_channels): - raise ValueError("locations and use_channels must have the same length.") - - # Move channels to traverse height - x_pos, y_pos = zip(*[(loc.x, loc.y) for loc in locations]) - - # Start with a list of indices for each operation. The order is the order of operations as given in the input parameters. - # We will then turn this list of indices into batches of indices that can be executed together, based on their X and Y positions and channel numbers. - indices = list(range(len(locations))) - - # Sort indices by x position. - indices = sorted(indices, key=lambda i: x_pos[i]) - - # Group indices by x position (rounding to 0.1mm to avoid floating point splitting of same-position containers) - # Note that since the indices were already sorted by x position, the groups will also be sorted by x position. - x_groups: Dict[float, List[int]] = {} - for i in indices: - x_rounded = round(x_pos[i], 1) - x_groups.setdefault(x_rounded, []).append(i) - - # Within each x group, sort channels from back (lowest channel index) to front (highest channel index) - for x_group_indices in x_groups.values(): - x_group_indices.sort(key=lambda i: use_channels[i]) - - # Within each x group, batch by y position while respecting minimum y spacing constraint - y_batches: dict[float, List[List[int]]] = {} # x position (group) -> list of batches of indices - for x_group, x_group_indices in x_groups.items(): - y_batches_for_this_x: List[List[int]] = [] # batches of indices for this x group - for i in x_group_indices: - y = y_pos[i] - - # find the first batch that this index can be added to without violating the minimum y spacing constraint - # if no batch is found, create a new batch with this index - for batch in y_batches_for_this_x: - index_min_y = min(batch, key=lambda i: y_pos[i]) - # check min spacing - if y_pos[index_min_y] - y < min_spacing_between_channels( - use_channels[i], use_channels[index_min_y] - ): - continue - # check if channel is already used in this batch - if use_channels[i] in [use_channels[j] for j in batch]: - continue - batch.append(i) - break - else: - y_batches_for_this_x.append([i]) - - y_batches[x_group] = y_batches_for_this_x - - return y_batches diff --git a/pylabrobot/liquid_handling/backends/hamilton/planning_tests.py b/pylabrobot/liquid_handling/backends/hamilton/planning_tests.py deleted file mode 100644 index d13304655c0..00000000000 --- a/pylabrobot/liquid_handling/backends/hamilton/planning_tests.py +++ /dev/null @@ -1,129 +0,0 @@ -import unittest - -from pylabrobot.resources import Coordinate - -from .planning import group_by_x_batch_by_xy - - -class TestGroupByXBatchByXY(unittest.TestCase): - """Tests for group_by_x_batch_by_xy.""" - - def test_single_location(self): - locations = [Coordinate(100.0, 200.0, 0)] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0]) - self.assertEqual(result, {100.0: [[0]]}) - - def test_same_x_different_y_fits_in_one_batch(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 180.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - # y diff = 20 >= 9*1, fits in one batch - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_same_x_too_close_y_splits_batches(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 195.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - # y diff = 5 < 9*1, separate batches - self.assertEqual(result, {100.0: [[0], [1]]}) - - def test_different_x_groups(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(200.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - self.assertEqual(result, {100.0: [[0]], 200.0: [[1]]}) - - def test_x_rounding(self): - locations = [ - Coordinate(100.04, 200.0, 0), - Coordinate(100.02, 180.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - # Both round to 100.0 - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_two_channels_same_x(self): - locations = [Coordinate(100.0, 200.0, 0), Coordinate(100.0, 180.0, 0)] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_empty_use_channels_raises(self): - with self.assertRaises(ValueError): - group_by_x_batch_by_xy( - locations=[Coordinate(100.0, 200.0, 0)], - use_channels=[], - ) - - def test_non_adjacent_channels(self): - locations = [ - Coordinate(100.0, 300.0, 0), - Coordinate(100.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 5]) - # y diff = 100 >= 9*(5-0) = 45, fits in one batch - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_non_adjacent_channels_too_close(self): - locations = [ - Coordinate(100.0, 240.0, 0), - Coordinate(100.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 5]) - # y diff = 40 < 9*(5-0) = 45, separate batches - self.assertEqual(result, {100.0: [[0], [1]]}) - - def test_sorted_by_x(self): - locations = [ - Coordinate(300.0, 200.0, 0), - Coordinate(100.0, 200.0, 0), - Coordinate(200.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1, 2]) - self.assertEqual(result, {100.0: [[1]], 200.0: [[2]], 300.0: [[0]]}) - - def test_multiple_batches_in_one_x_group(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 197.0, 0), - Coordinate(100.0, 194.0, 0), - Coordinate(100.0, 191.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1, 2, 3]) - # Each consecutive pair has y diff = 3 < 9, so each in its own batch - self.assertEqual(result, {100.0: [[0], [1], [2], [3]]}) - - def test_duplicate_channels_split_into_separate_batches(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 180.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 0]) - self.assertEqual(result, {100.0: [[0], [1]]}) - - def test_duplicate_channels_three_ops(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 180.0, 0), - Coordinate(100.0, 160.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 0, 0]) - self.assertEqual(result, {100.0: [[0], [1], [2]]}) - - def test_channels_sorted_by_channel_index_within_x_group(self): - locations = [ - Coordinate(100.0, 180.0, 0), # channel 2 - Coordinate(100.0, 200.0, 0), # channel 0 - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[2, 0]) - # Channel 0 (index 1) sorted before channel 2 (index 0) - self.assertEqual(result, {100.0: [[1, 0]]}) - - -if __name__ == "__main__": - unittest.main() diff --git a/pylabrobot/liquid_handling/pipette_batch_scheduling.py b/pylabrobot/liquid_handling/pipette_batch_scheduling.py new file mode 100644 index 00000000000..e637e5b2ad4 --- /dev/null +++ b/pylabrobot/liquid_handling/pipette_batch_scheduling.py @@ -0,0 +1,493 @@ +"""Pipette orchestration: partition channel–target pairs into executable batches. + +Multi-channel liquid handlers have physical constraints (single X carriage, minimum +Y spacing, descending Y order by channel index) that limit which channels can act +simultaneously. + + batches = plan_batches( + use_channels, targets=containers, + channel_spacings=[9.0]*8, x_tolerance=0.1, + wrt_resource=deck, + ) + await backend.execute_batched(func=my_z_callback, batches=batches) +""" + +import math +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple, Union, cast + +from pylabrobot.liquid_handling.utils import ( + MIN_SPACING_EDGE, + get_wide_single_resource_liquid_op_offsets, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.resource import Resource + +# --- Data types --- + + +@dataclass +class ChannelBatch: + """A group of channels that can operate simultaneously. + + ``y_positions`` contains entries for active channels and any phantom channels + between non-consecutive active members. + """ + + x_position: float + indices: List[int] + channels: List[int] + y_positions: Dict[int, float] = field(default_factory=dict) # includes phantoms + + +def print_batches( + batches: List[ChannelBatch], + use_channels: List[int], + containers: List["Container"], + label: str = "plan", +) -> None: + """Print a tree view of the batch execution plan. + + Groups batches by X position and shows Y batches nested within each X group. + Active channels are marked with ``*``, phantoms with a space. + + Args: + batches: Output from ``plan_batches()``. + use_channels: Channel indices (parallel with *containers*). + containers: Container objects (parallel with *use_channels*). + label: Header label for the tree. + """ + + ch_to_container = dict(zip(use_channels, containers)) + + x_groups: Dict[float, list] = {} + for b in batches: + x_key = round(b.x_position, 1) + x_groups.setdefault(x_key, []).append(b) + + print(f"{label}:") + xg_keys = list(x_groups.keys()) + for xg_i, x_key in enumerate(xg_keys): + xg_batches = x_groups[x_key] + is_last_xg = xg_i == len(xg_keys) - 1 + xg_branch = "\u2514" if is_last_xg else "\u251c" + xg_cont = " " if is_last_xg else "\u2502" + print(f" {xg_branch}\u2500\u2500 x-group {xg_i + 1} (x={x_key:.1f} mm)") + for yb_i, b in enumerate(xg_batches): + is_last_yb = yb_i == len(xg_batches) - 1 + yb_branch = "\u2514" if is_last_yb else "\u251c" + yb_cont = " " if is_last_yb else "\u2502" + print(f" {xg_cont} {yb_branch}\u2500\u2500 y-batch {yb_i + 1}") + for ch in sorted(b.y_positions.keys()): + is_last_ch = ch == max(b.y_positions.keys()) + ch_branch = "\u2514" if is_last_ch else "\u251c" + active = "*" if ch in b.channels else " " + container_name = f" ({ch_to_container[ch].name})" if ch in ch_to_container else "" + print( + f" {xg_cont} {yb_cont} {ch_branch}\u2500\u2500 {active}ch{ch}: y={b.y_positions[ch]:.1f} mm{container_name}" + ) + + +# --- Spacing helpers --- + + +def _effective_spacing(spacings: List[float], ch_lo: int, ch_hi: int) -> float: + """Max of per-channel spacings across ch_lo..ch_hi (inclusive). + + Used by ``compute_single_container_offsets`` to determine a single uniform spacing + for spreading channels across a wide container. + """ + return max(spacings[ch_lo : ch_hi + 1]) + + +def _span_required(spacings: List[float], ch_lo: int, ch_hi: int) -> float: + """Minimum total Y distance required between channels ch_lo and ch_hi. + + Sums the rounded pairwise spacing for each adjacent pair in the range via + ``_min_spacing_between``, matching what the firmware enforces. + """ + return sum(_min_spacing_between(spacings, ch, ch + 1) for ch in range(ch_lo, ch_hi)) + + +def _min_spacing_between(spacings: List[float], i: int, j: int) -> float: + """Minimum Y spacing between adjacent channels *i* and *j*. + + Takes the larger of the two channels' spacings, then rounds up to 0.1 mm: + ``math.ceil(max(spacings[i], spacings[j]) * 10) / 10``. + + Mirrors ``STARBackend._min_spacing_between`` (which operates on + ``self._channels_minimum_y_spacing`` instead of an explicit list). + """ + return math.ceil(max(spacings[i], spacings[j]) * 10) / 10 + + +# --- Batch partitioning --- + + +@dataclass +class _BatchAccumulator: + """Mutable working state for a batch being built up during partitioning.""" + + indices: List[int] + lo_ch: int + hi_ch: int + lo_y: float + hi_y: float + + +def _channel_fits_batch( + batch: _BatchAccumulator, channel: int, y: float, spacings: List[float] +) -> bool: + """Check whether *channel* at *y* can be added to *batch* without violating spacing. + + Two checks suffice because channels are processed in ascending order, so the candidate + is always the new high end. The (lo → candidate) check covers the full span; the + (hi → candidate) check catches the local gap. + """ + if batch.hi_y - y < _span_required(spacings, batch.hi_ch, channel) - 1e-9: + return False + if batch.lo_y - y < _span_required(spacings, batch.lo_ch, channel) - 1e-9: + return False + return True + + +def _interpolate_phantoms( + channels: List[int], y_positions: Dict[int, float], spacings: List[float] +) -> None: + """Fill in Y positions for phantom channels between non-consecutive batch members. + + Each phantom is placed at its actual pairwise spacing from the previous channel, + so non-uniform spacings are respected (e.g. a wide channel only widens its own gaps). + """ + sorted_chs = sorted(channels) + for k in range(len(sorted_chs) - 1): + ch_lo, ch_hi = sorted_chs[k], sorted_chs[k + 1] + cumulative = 0.0 + for phantom in range(ch_lo + 1, ch_hi): + cumulative += _min_spacing_between(spacings, phantom - 1, phantom) + if phantom not in y_positions: + y_positions[phantom] = y_positions[ch_lo] - cumulative + + +def _partition_into_y_batches( + indices: List[int], + use_channels: List[int], + y_pos: List[float], + spacings: List[float], + x_position: float, +) -> List[ChannelBatch]: + """Partition channels within an X group into minimum parallel-compatible batches. + + Uses greedy first-fit: processes channels in ascending order and assigns each to + the first batch where it fits, or creates a new batch. + """ + + channels_by_index = sorted(indices, key=lambda i: use_channels[i]) + batches: List[_BatchAccumulator] = [] + + for idx in channels_by_index: + channel = use_channels[idx] + y = y_pos[idx] + + assigned = False + for batch in batches: + if _channel_fits_batch(batch, channel, y, spacings): + batch.indices.append(idx) + batch.hi_ch = channel + batch.hi_y = y + assigned = True + break + + if not assigned: + batches.append(_BatchAccumulator(indices=[idx], lo_ch=channel, hi_ch=channel, lo_y=y, hi_y=y)) + + result: List[ChannelBatch] = [] + for batch in batches: + batch_channels = [use_channels[i] for i in batch.indices] + y_positions: Dict[int, float] = {use_channels[i]: y_pos[i] for i in batch.indices} + _interpolate_phantoms(batch_channels, y_positions, spacings) + result.append( + ChannelBatch( + x_position=x_position, + indices=batch.indices, + channels=batch_channels, + y_positions=y_positions, + ) + ) + + return result + + +# --- Input validation and position computation --- + + +# Shift applied to odd channel spans to avoid center divider walls in troughs. +# Will be replaced by per-container no_go_zones (see docs/proposals/container_no_go_zones.md). +_ODD_SPAN_CENTER_AVOIDANCE = 5.5 # mm + + +def _offsets_for_consecutive_group( + container: Container, + use_channels: List[int], + spacing: float, +) -> Optional[List[Coordinate]]: + """Compute spread offsets for a group of channels whose full physical span fits the container.""" + ch_lo, ch_hi = min(use_channels), max(use_channels) + num_physical = ch_hi - ch_lo + 1 + min_required = MIN_SPACING_EDGE * 2 + (num_physical - 1) * spacing + + if container.get_absolute_size_y() < min_required: + return None + + all_offsets = get_wide_single_resource_liquid_op_offsets( + resource=container, + num_channels=num_physical, + min_spacing=spacing, + ) + offsets = [all_offsets[ch - ch_lo] for ch in use_channels] + + # Shift odd channel spans to avoid center divider walls, but only if the + # outermost channel (center + half-spacing) stays within the container. + if num_physical > 1 and num_physical % 2 != 0: + max_offset_y = max(o.y for o in offsets) + container_half_y = container.get_absolute_size_y() / 2 + if max_offset_y + _ODD_SPAN_CENTER_AVOIDANCE + spacing / 2 <= container_half_y: + offsets = [o + Coordinate(0, _ODD_SPAN_CENTER_AVOIDANCE, 0) for o in offsets] + + return offsets + + +def compute_single_container_offsets( + container: Container, + use_channels: List[int], + channel_spacings: Union[float, List[float]], +) -> Optional[List[Coordinate]]: + """Compute spread Y offsets for multiple channels targeting the same container. + + Accounts for the full physical span including phantom intermediate channels. + When the full span doesn't fit, splits active channels into consecutive + sub-groups at gaps in the channel sequence and computes offsets per sub-group. + Each sub-group gets centered spread offsets, so plan_batches will naturally + batch sub-groups that can't coexist into separate Y batches. + + Returns None if even a single pair of adjacent active channels can't fit. + """ + + if len(use_channels) == 0: + return [] + if len(use_channels) == 1: + return [Coordinate.zero()] + + ch_lo, ch_hi = min(use_channels), max(use_channels) + if isinstance(channel_spacings, (int, float)): + spacing = float(channel_spacings) + else: + spacing = _effective_spacing(channel_spacings, ch_lo, ch_hi) + + # Try the full span first (all channels including phantoms fit) + full = _offsets_for_consecutive_group(container, use_channels, spacing) + if full is not None: + return full + + # Full span doesn't fit. Split at gaps in the sorted channel sequence + # into consecutive sub-groups and compute offsets for each independently. + sorted_chs = sorted(use_channels) + groups: List[List[int]] = [[sorted_chs[0]]] + for i in range(1, len(sorted_chs)): + if sorted_chs[i] == sorted_chs[i - 1] + 1: + groups[-1].append(sorted_chs[i]) + else: + groups.append([sorted_chs[i]]) + + # If there's only one consecutive group and it didn't fit above, container is too small + if len(groups) == 1: + return None + + # Compute offsets per sub-group + ch_to_offset: Dict[int, Coordinate] = {} + for group in groups: + group_offsets = _offsets_for_consecutive_group(container, group, spacing) + if group_offsets is None: + return None # even a sub-group doesn't fit + for ch, offset in zip(group, group_offsets): + ch_to_offset[ch] = offset + + # Return in the original use_channels order + return [ch_to_offset[ch] for ch in use_channels] + + +def validate_channel_selections( + containers: List[Container], + use_channels: Optional[List[int]], + num_channels: int, +) -> List[int]: + """Validate and normalize channel selection. + + If *use_channels* is ``None``, defaults to ``[0, 1, ..., len(containers)-1]``. + + Returns: + Validated list of channel indices. + + Raises: + ValueError: If channels are empty, out of range, or contain duplicates. + """ + if use_channels is None: + use_channels = list(range(len(containers))) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + if not all(0 <= ch < num_channels for ch in use_channels): + raise ValueError( + f"All use_channels must be integers in range [0, {num_channels - 1}], got {use_channels}." + ) + if len(use_channels) != len(set(use_channels)): + raise ValueError("use_channels must not contain duplicates.") + if len(containers) != len(use_channels): + raise ValueError( + f"Length of containers and use_channels must match, " + f"got {len(containers)} and {len(use_channels)}." + ) + return use_channels + + +def compute_positions( + containers: List[Container], + resource_offsets: List[Coordinate], + wrt_resource: Resource, +) -> Tuple[List[float], List[float]]: + """Convert containers and offsets into absolute X/Y positions relative to a resource. + + Each container must be a descendant of *wrt_resource* (checked by + ``Resource.get_location_wrt``, which raises ``ValueError`` if not). + + Returns: + (x_positions, y_positions) — parallel lists of coordinates in mm, + one entry per container. + """ + x_pos: List[float] = [] + y_pos: List[float] = [] + for resource, offset in zip(containers, resource_offsets): + loc = resource.get_location_wrt(wrt_resource, x="c", y="c", z="b") + x_pos.append(loc.x + offset.x) + y_pos.append(loc.y + offset.y) + return x_pos, y_pos + + +# --- Public API --- + + +def plan_batches( + use_channels: List[int], + targets: Union[List[Container], List[Coordinate]], + channel_spacings: Union[float, List[float]], + x_tolerance: float, + wrt_resource: Optional[Resource] = None, + resource_offsets: Optional[List[Coordinate]] = None, +) -> List[ChannelBatch]: + """Partition channel–target pairs into executable batches. + + Targets can be either: + + - **Containers** (with *wrt_resource*): computes X/Y positions from containers relative + to a reference resource. When multiple channels target the same container and *resource_offsets* + is not provided, automatically spreads them using ``compute_single_container_offsets``. + Channels that cannot be spread (container too narrow) stay at the container center + and are serialized into separate batches. + + - **Coordinates**: uses absolute X/Y positions directly. No auto-spreading. + + Groups by X position (within *x_tolerance*), then within each X group partitions + into Y sub-batches respecting per-channel minimum spacing. Computes phantom channel + positions for intermediate channels between non-consecutive batch members. + + Args: + use_channels: Channel indices being used (e.g. [0, 1, 2, 5, 6, 7]). + targets: Either Container objects (requires *wrt_resource*) or Coordinate objects + with absolute X/Y positions. One per entry in *use_channels*. + channel_spacings: Minimum Y spacing per channel (mm). Scalar for uniform, + or a list with one entry per channel on the instrument. + x_tolerance: Positions within this tolerance share an X group. + wrt_resource: Reference resource for computing positions. All containers must + be descendants of this resource. Required when *targets* are Containers. + resource_offsets: Optional XYZ offsets from container centers. When provided, + auto-spreading is disabled and these offsets are used directly. Only valid + when *targets* are Containers. + + Returns: + Flat list of ChannelBatch sorted by ascending X position. + """ + + if len(use_channels) != len(targets): + raise ValueError( + f"use_channels and targets must have the same length, " + f"got {len(use_channels)} and {len(targets)}." + ) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + + if wrt_resource is not None: + containers = cast(List[Container], targets) + if resource_offsets is not None: + if len(resource_offsets) != len(containers): + raise ValueError( + f"resource_offsets length must match containers, " + f"got {len(resource_offsets)} and {len(containers)}." + ) + offsets = resource_offsets + else: + offsets = [Coordinate.zero()] * len(containers) + x_pos, y_pos = compute_positions(containers, offsets, wrt_resource) + else: + containers = None + coordinates = cast(List[Coordinate], targets) + x_pos = [c.x for c in coordinates] + y_pos = [c.y for c in coordinates] + + # Normalize scalar spacing to per-channel list. + max_ch = max(use_channels) + if isinstance(channel_spacings, (int, float)): + spacings: List[float] = [float(channel_spacings)] * (max_ch + 1) + else: + spacings = list(channel_spacings) + if len(spacings) < max_ch + 1: + spacings.extend([spacings[-1]] * (max_ch + 1 - len(spacings))) + + # Group indices by X position. Sorts by X then merges adjacent positions + # within tolerance into the same group, so positions like 99.99 and 100.01 + # (0.02mm apart) are never split across groups. + sorted_by_x = sorted(range(len(x_pos)), key=lambda i: x_pos[i]) + x_groups: Dict[float, List[int]] = {} + current_key: Optional[float] = None + for i in sorted_by_x: + if current_key is None or abs(x_pos[i] - current_key) > x_tolerance: + current_key = x_pos[i] + x_groups.setdefault(current_key, []).append(i) + + # When multiple channels target the same container, offset their Y positions + # so they can be batched together + adjusted_y = list(y_pos) + if containers is not None and resource_offsets is None: + for indices in x_groups.values(): + container_groups: Dict[int, List[int]] = defaultdict(list) + for idx in indices: + container_groups[id(containers[idx])].append(idx) + for c_indices in container_groups.values(): + if len(c_indices) < 2: + continue + group_channels = [use_channels[i] for i in c_indices] + spread = compute_single_container_offsets( + container=containers[c_indices[0]], + use_channels=group_channels, + channel_spacings=channel_spacings, + ) + if spread is not None: + for i, idx_val in enumerate(c_indices): + adjusted_y[idx_val] += spread[i].y + + result: List[ChannelBatch] = [] + for _, indices in sorted(x_groups.items()): + group_x = x_pos[indices[0]] + result.extend(_partition_into_y_batches(indices, use_channels, adjusted_y, spacings, group_x)) + + return result diff --git a/pylabrobot/liquid_handling/pipette_batch_scheduling_tests.py b/pylabrobot/liquid_handling/pipette_batch_scheduling_tests.py new file mode 100644 index 00000000000..2a5573d66ef --- /dev/null +++ b/pylabrobot/liquid_handling/pipette_batch_scheduling_tests.py @@ -0,0 +1,286 @@ +"""Tests for pipette_batch_scheduling module. + +Tests cover functionality added or changed by this module vs the previous +planning.py: mixed channel spacing, phantom interpolation, container auto-spreading, +Coordinate targets, and compute_single_container_offsets. +""" + +import unittest +from typing import List +from unittest.mock import MagicMock, patch + +from pylabrobot.liquid_handling.pipette_batch_scheduling import ( + _min_spacing_between, + compute_single_container_offsets, + plan_batches, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.resource import Resource + + +def _coords(x_pos: List[float], y_pos: List[float]) -> List[Coordinate]: + """Build Coordinate targets from parallel x/y lists.""" + return [Coordinate(x, y, 0) for x, y in zip(x_pos, y_pos)] + + +class TestMixedChannelSpacing(unittest.TestCase): + """Pairwise spacing with non-uniform channel sizes (e.g. 1mL + 5mL).""" + + SPACINGS = [8.98, 8.98, 17.96, 17.96] + + def test_pairwise_rounding(self): + # max(8.98, 17.96) = 17.96 -> ceil(179.6)/10 = 18.0 + self.assertAlmostEqual(_min_spacing_between(self.SPACINGS, 1, 2), 18.0) + # max(8.98, 8.98) = 8.98 -> ceil(89.8)/10 = 9.0 + self.assertAlmostEqual(_min_spacing_between(self.SPACINGS, 0, 1), 9.0) + + def test_mixed_spacing_boundary(self): + # 18.0mm needed between ch1 (1mL) and ch2 (5mL) + batches = plan_batches( + [1, 2], _coords([100.0] * 2, [217.9, 200.0]), self.SPACINGS, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 2) + batches = plan_batches( + [1, 2], _coords([100.0] * 2, [218.0, 200.0]), self.SPACINGS, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + + def test_pairwise_sum_not_uniform_product(self): + # ch0->ch3: 9.0 + 18.0 + 18.0 = 45.0mm pairwise, NOT 3 * 18.0 = 54.0mm uniform + batches = plan_batches( + [0, 3], _coords([100.0] * 2, [250.0, 200.0]), self.SPACINGS, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + + def test_mixed_phantoms_use_pairwise_spacing(self): + batches = plan_batches( + [0, 3], _coords([100.0] * 2, [245.0, 200.0]), self.SPACINGS, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + # ch0->ch1: 9.0mm, ch1->ch2: 18.0mm + self.assertAlmostEqual(y[1], 245.0 - 9.0) + self.assertAlmostEqual(y[2], 245.0 - 9.0 - 18.0) + + +class TestCoreBatching(unittest.TestCase): + """Fundamental X grouping, Y batching, and validation.""" + + S = 9.0 + + def test_spacing_boundary(self): + # Exactly 9mm -> one batch + batches = plan_batches([0, 1], _coords([100.0] * 2, [209.0, 200.0]), self.S, x_tolerance=0.1) + self.assertEqual(len(batches), 1) + # 0.1mm short -> two batches + batches = plan_batches([0, 1], _coords([100.0] * 2, [208.9, 200.0]), self.S, x_tolerance=0.1) + self.assertEqual(len(batches), 2) + + def test_same_y_serializes(self): + batches = plan_batches([0, 1, 2], _coords([100.0] * 3, [200.0] * 3), self.S, x_tolerance=0.1) + self.assertEqual(len(batches), 3) + + def test_x_tolerance_boundary(self): + # Within tolerance -> one group + batches = plan_batches( + [0, 1], _coords([100.0, 100.05], [270.0, 261.0]), self.S, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + # Outside tolerance -> two groups + batches = plan_batches([0, 1], _coords([100.0, 100.2], [270.0, 270.0]), self.S, x_tolerance=0.1) + self.assertEqual(len(batches), 2) + + def test_x_groups_sorted_ascending(self): + batches = plan_batches( + [0, 1, 2], _coords([300.0, 100.0, 200.0], [270.0] * 3), self.S, x_tolerance=0.1 + ) + xs = [b.x_position for b in batches] + self.assertEqual(xs, sorted(xs)) + + def test_empty_raises(self): + with self.assertRaises(ValueError): + plan_batches([], [], self.S, x_tolerance=0.1) + + def test_mismatched_lengths_raises(self): + with self.assertRaises(ValueError): + plan_batches([0, 1], _coords([100.0], [200.0]), self.S, x_tolerance=0.1) + + +class TestPhantomInterpolation(unittest.TestCase): + """Phantom channels between non-consecutive batch members.""" + + def test_phantoms_interpolated_at_spacing(self): + batches = plan_batches( + [0, 1, 2, 5, 6, 7], + _coords([100.0] * 6, [300.0, 291.0, 282.0, 255.0, 246.0, 237.0]), + 9.0, + x_tolerance=0.1, + ) + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + self.assertIn(3, y) + self.assertIn(4, y) + self.assertAlmostEqual(y[3], 282.0 - 9.0) + self.assertAlmostEqual(y[4], 282.0 - 18.0) + + def test_phantoms_only_within_batch(self): + # Split into 2 batches — no phantoms across batches + batches = plan_batches([0, 3], _coords([100.0] * 2, [200.0, 250.0]), 9.0, x_tolerance=0.1) + self.assertEqual(len(batches), 2) + for batch in batches: + self.assertEqual(len(batch.y_positions), 1) + + +class TestCoordinateTargets(unittest.TestCase): + """plan_batches with Coordinate targets (no containers).""" + + def test_coordinate_x_grouping_and_y_batching(self): + batches = plan_batches( + [0, 1, 2, 3], + _coords([100.0, 100.0, 200.0, 200.0], [200.0, 200.0, 270.0, 261.0]), + 9.0, + x_tolerance=0.1, + ) + x100 = [b for b in batches if abs(b.x_position - 100.0) < 0.01] + x200 = [b for b in batches if abs(b.x_position - 200.0) < 0.01] + self.assertEqual(len(x100), 2) # same Y -> serialized + self.assertEqual(len(x200), 1) # 9mm apart -> parallel + + def test_indices_map_back_correctly(self): + use_channels = [3, 7, 0] + batches = plan_batches( + use_channels, _coords([100.0] * 3, [261.0, 237.0, 270.0]), 9.0, x_tolerance=0.1 + ) + all_indices = [idx for b in batches for idx in b.indices] + self.assertEqual(sorted(all_indices), [0, 1, 2]) + for batch in batches: + for idx, ch in zip(batch.indices, batch.channels): + self.assertEqual(use_channels[idx], ch) + + +class TestContainerTargets(unittest.TestCase): + """plan_batches with Container targets and auto-spreading.""" + + S = 9.0 + + def _mock_container(self, cx: float, cy: float, size_y: float = 10.0, name: str = "well"): + c = MagicMock(spec=Container) + c.get_absolute_size_y.return_value = size_y + c.name = name + c.get_location_wrt = MagicMock(return_value=Coordinate(cx, cy, 0)) + return c + + def _mock_deck(self): + return MagicMock(spec=Resource) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.get_wide_single_resource_liquid_op_offsets" + ) + def test_same_container_auto_spreads(self, mock_offsets): + mock_offsets.return_value = [Coordinate(0, 4.5, 0), Coordinate(0, -4.5, 0)] + trough = self._mock_container(100.0, 200.0, size_y=50.0, name="trough") + deck = self._mock_deck() + batches = plan_batches([0, 1], [trough, trough], self.S, x_tolerance=0.1, wrt_resource=deck) + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + self.assertAlmostEqual(y[0], 200.0 + 4.5) + self.assertAlmostEqual(y[1], 200.0 - 4.5) + + def test_same_narrow_container_serialized(self): + well = self._mock_container(100.0, 200.0, size_y=5.0, name="narrow_well") + deck = self._mock_deck() + batches = plan_batches([0, 1], [well, well], self.S, x_tolerance=0.1, wrt_resource=deck) + self.assertEqual(len(batches), 2) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.get_wide_single_resource_liquid_op_offsets" + ) + def test_resource_offsets_skips_auto_spreading(self, mock_offsets): + trough = self._mock_container(100.0, 200.0, size_y=50.0, name="trough") + deck = self._mock_deck() + user_offsets = [Coordinate(0, 10.0, 0), Coordinate(0, -10.0, 0)] + batches = plan_batches( + [0, 1], + [trough, trough], + self.S, + x_tolerance=0.1, + wrt_resource=deck, + resource_offsets=user_offsets, + ) + mock_offsets.assert_not_called() + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + self.assertAlmostEqual(y[0], 210.0) + self.assertAlmostEqual(y[1], 190.0) + + +class TestComputeSingleContainerOffsets(unittest.TestCase): + S = 9.0 + + def _mock_container(self, size_y: float): + c = MagicMock(spec=["get_absolute_size_y"]) + c.get_absolute_size_y.return_value = size_y + return c + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.get_wide_single_resource_liquid_op_offsets" + ) + def test_even_span_no_center_offset(self, mock_offsets): + mock_offsets.return_value = [Coordinate(0, 4.5, 0), Coordinate(0, -4.5, 0)] + result = compute_single_container_offsets(self._mock_container(50.0), [0, 1], self.S) + assert result is not None + self.assertAlmostEqual(result[0].y, 4.5) + self.assertAlmostEqual(result[1].y, -4.5) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.get_wide_single_resource_liquid_op_offsets" + ) + def test_odd_span_center_offset_when_wide_enough(self, mock_offsets): + mock_offsets.return_value = [ + Coordinate(0, 9.0, 0), + Coordinate(0, 0.0, 0), + Coordinate(0, -9.0, 0), + ] + # 50mm: max_offset=9.0, 9.0 + 5.5 + 4.5 = 19.0 <= 25.0 -> shift applied + result = compute_single_container_offsets(self._mock_container(50.0), [0, 1, 2], self.S) + assert result is not None + self.assertAlmostEqual(result[0].y, 9.0 + 5.5) + + def test_container_too_small_returns_none(self): + self.assertIsNone(compute_single_container_offsets(self._mock_container(10.0), [0, 1], self.S)) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.get_wide_single_resource_liquid_op_offsets" + ) + def test_non_consecutive_uses_full_physical_span(self, mock_offsets): + mock_offsets.return_value = [ + Coordinate(0, 10.0, 0), + Coordinate(0, 0.0, 0), + Coordinate(0, -10.0, 0), + ] + result = compute_single_container_offsets(self._mock_container(50.0), [0, 2], self.S) + assert result is not None + self.assertEqual(len(result), 2) + mock_offsets.assert_called_once_with( + resource=unittest.mock.ANY, num_channels=3, min_spacing=self.S + ) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.get_wide_single_resource_liquid_op_offsets" + ) + def test_mixed_spacing_uses_effective(self, mock_offsets): + mock_offsets.return_value = [ + Coordinate(0, 18.0, 0), + Coordinate(0, 0.0, 0), + Coordinate(0, -18.0, 0), + ] + result = compute_single_container_offsets(self._mock_container(100.0), [0, 2], [9.0, 9.0, 18.0]) + self.assertIsNotNone(result) + mock_offsets.assert_called_once_with( + resource=unittest.mock.ANY, num_channels=3, min_spacing=18.0 + ) + + +if __name__ == "__main__": + unittest.main()