diff --git a/bec_server/bec_server/scan_server/path_optimization.py b/bec_server/bec_server/scan_server/path_optimization.py index cfa1ba15c..f85bcfa2e 100644 --- a/bec_server/bec_server/scan_server/path_optimization.py +++ b/bec_server/bec_server/scan_server/path_optimization.py @@ -4,6 +4,8 @@ from pydantic import BaseModel from scipy.spatial import cKDTree # type: ignore +from bec_server.scan_server.scans.position_generators import Direction + class PathQualityStats(BaseModel): max_jump: float @@ -31,10 +33,11 @@ def optimize_corridor( self, positions: np.ndarray, corridor_size: float | None = None, - sort_axis: int = 1, + fast_axis: Literal[0, 1] = 0, num_iterations: int = 1, - preferred_direction: int | None = None, - corridor_estimation: Literal["density", "median_distance"] = "median_distance", + first_corridor_direction: Direction | Literal[-1, 1] = Direction.ASCENDING, + snaked: bool = True, + corridor_estimation: Literal["density", "median_distance"] = "density", ): """ Optimize positions using a corridor-based approach. @@ -45,11 +48,14 @@ def optimize_corridor( Args: positions (np.ndarray): Array of positions corridor_size (float, optional): Width of each corridor. Defaults to None (auto-estimated). - sort_axis (int, optional): Axis along which to create corridors (0 or 1). Defaults to 1. + fast_axis (Literal[0, 1], optional): Axis traversed within each corridor (0 or 1). Defaults to 0. num_iterations (int, optional): Number of corridor sizes to try. Defaults to 1. - preferred_direction (int | None, optional): Preferred direction for the primary axis (1 or -1). - If None, alternates direction for each corridor. - corridor_estimation (str, optional): Method for estimating corridor size if not provided. + first_corridor_direction (Direction | Literal[-1, 1], optional): Traversal direction of the first + corridor along the fast axis. Positive means ascending, negative means + descending. Defaults to Direction.ASCENDING. + snaked (bool, optional): If True, alternate the traversal direction between + successive corridors. If False, keep the same direction in every corridor. + corridor_estimation (Literal["density", "median_distance"], optional): Method for estimating corridor size if not provided. Options are "density" or "median_distance". Defaults to "density". Returns: @@ -70,8 +76,14 @@ def optimize_corridor( 'Choose "density" or "median_distance".' ) - axis_vals = positions[:, sort_axis] - sec_axis = int(not sort_axis) + if not isinstance(first_corridor_direction, Direction): + first_corridor_direction = Direction(first_corridor_direction) + + if fast_axis not in (0, 1): + raise ValueError("fast_axis must be 0 or 1") + + slow_axis = int(not fast_axis) + axis_vals = positions[:, slow_axis] best_length = np.inf best_path = positions @@ -97,17 +109,15 @@ def optimize_corridor( if block.size == 0: continue - # Sort within corridor along secondary axis - block = block[np.argsort(positions[block, sec_axis])] + # Sort within corridor along fast axis + block = block[np.argsort(positions[block, fast_axis])] # Direction handling - if preferred_direction is not None: - if preferred_direction < 0: - block = block[::-1] - else: - # Alternate direction between corridors - if step % 2 == 0: - block = block[::-1] + direction = first_corridor_direction + if snaked and step % 2 == 1: + direction = Direction(-direction) + if direction == Direction.DESCENDING: + block = block[::-1] index_sorted.append(block) diff --git a/bec_server/bec_server/scan_server/scans/fermat_scan.py b/bec_server/bec_server/scan_server/scans/fermat_scan.py index 58433927a..3957ddf68 100644 --- a/bec_server/bec_server/scan_server/scans/fermat_scan.py +++ b/bec_server/bec_server/scan_server/scans/fermat_scan.py @@ -24,6 +24,7 @@ from bec_lib.logger import bec_logger from bec_lib.scan_args import ScanArgument, Units from bec_server.scan_server.scans import position_generators +from bec_server.scan_server.scans.position_generators import Direction from bec_server.scan_server.scans.scan_base import ScanBase, ScanType from bec_server.scan_server.scans.scan_modifier import scan_hook @@ -191,6 +192,17 @@ def prepare_scan(self): self.start_positions = self.components.get_start_positions(self.motors) self.positions += self.start_positions + if self.optim_trajectory: + self.positions = self.components.optimize_trajectory( + self.positions, + optimization_type=self.optim_trajectory, + first_direction=( + Direction.ASCENDING + if self.motor1_start_stop[1] > self.motor1_start_stop[0] + else Direction.DESCENDING + ), + ) + self.components.check_limits(self.motors, self.positions) self.update_scan_info( diff --git a/bec_server/bec_server/scan_server/scans/legacy_scans.py b/bec_server/bec_server/scan_server/scans/legacy_scans.py index 00abe5c44..0348a5f67 100644 --- a/bec_server/bec_server/scan_server/scans/legacy_scans.py +++ b/bec_server/bec_server/scan_server/scans/legacy_scans.py @@ -19,7 +19,7 @@ from bec_server.scan_server.instruction_handler import InstructionHandler from ..errors import LimitError, ScanAbortion -from ..path_optimization import PathOptimizerMixin +from ..path_optimization import Direction, PathOptimizerMixin from ..scan_stubs import ScanStubs logger = bec_logger.logger @@ -90,7 +90,8 @@ def _get_positions_recursively(current_axes): positions.extend([[val] + sp for sp in sub_positions]) return positions - return np.array(_get_positions_recursively(axes)) + positions = _get_positions_recursively(axes[::-1]) + return np.array([position[::-1] for position in positions], dtype=float) # pylint: disable=too-many-arguments @@ -195,10 +196,8 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) Generate a 2D hexagonal grid clipped to (start, stop) bounds. Args: - axes: [(x_start, x_stop, x_step), - (y_start, y_stop, y_step)] - x_step = horizontal spacing between columns - y_step = vertical spacing between rows + axes: [(axis0_start, axis0_stop, axis0_step), + (axis1_start, axis1_stop, axis1_step)] snaked: if True, reverse direction on alternate rows to minimize travel distance Returns: @@ -207,28 +206,27 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) if len(axes) != 2: raise ValueError("2D hex grid requires exactly 2 dimensions") - (x0, x1, sx), (y0, y1, sy) = axes + (a0_start, a0_stop, a0_step), (a1_start, a1_stop, a1_step) = axes points = [] - # Number of rows needed - n_rows = int(np.ceil((y1 - y0) / sy)) + 2 + # The second axis selects the rows and the first axis is traversed within each row. + n_rows = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 for row in range(n_rows): - y = y0 + row * sy + axis1 = a1_start + row * a1_step - # Alternate row offset - shift by half the x step - x_offset = (sx / 2) if (row % 2) else 0.0 + # Alternate row offset - shift by half the fast-axis step + axis0_offset = (a0_step / 2) if (row % 2) else 0.0 - # Number of columns needed - n_cols = int(np.ceil((x1 - x0) / sx)) + 2 + n_cols = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 row_points = [] for col in range(n_cols): - x = x0 + x_offset + col * sx + axis0 = a0_start + axis0_offset + col * a0_step - if x0 <= x <= x1 and y0 <= y <= y1: - row_points.append((x, y)) + if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop: + row_points.append((axis0, axis1)) # Reverse every other row if snaking is enabled if snaked and (row % 2 == 1): @@ -520,26 +518,14 @@ def _optimize_trajectory(self): if not self.optim_trajectory: return - # Get preferred directions from scan parameters if available - preferred_directions = getattr(self, "preferred_directions", None) - if self.optim_trajectory == "corridor": - # For corridor optimization, we use the primary axis preferred direction - if preferred_directions and len(preferred_directions) > 0: - primary_axis = getattr(self, "sort_axis", 1) - preferred_direction = ( - preferred_directions[primary_axis] - if len(preferred_directions) > primary_axis - else None - ) - self.positions = self.optimize_corridor( - self.positions, - num_iterations=5, - sort_axis=primary_axis, - preferred_direction=preferred_direction, - ) - else: - self.positions = self.optimize_corridor(self.positions, num_iterations=5) + # Corridor traversal direction follows the first pass along the axis within each corridor. + self.positions = self.optimize_corridor( + self.positions, + num_iterations=5, + fast_axis=getattr(self, "fast_axis", 0), + first_corridor_direction=getattr(self, "first_direction", 1), + ) return if self.optim_trajectory == "shell": @@ -1103,6 +1089,9 @@ def __init__( self.stop_motor2 = stop_motor2 self.step = step self.spiral_type = spiral_type + self.first_direction = ( + Direction.ASCENDING if self.stop_motor1 > self.start_motor1 else Direction.DESCENDING + ) def update_scan_motors(self): self.scan_motors = [self.motor1, self.motor2] diff --git a/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py b/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py index 70a715cba..ad4009bc4 100644 --- a/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py +++ b/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py @@ -95,7 +95,7 @@ def __init__( settling_time (float): settling time in seconds. Default is 0. settling_time_after_trigger (float): settling time after trigger in seconds. Default is 0. readout_time (float): readout time in seconds. Default is 0. - snaked (bool): if True, the second axis is traversed in alternating directions + snaked (bool): if True, the first axis is traversed in alternating directions within each sub-grid. Default is True. burst_at_each_point (int): number of exposures at each point. Default is 1. diff --git a/bec_server/bec_server/scan_server/scans/position_generators.py b/bec_server/bec_server/scan_server/scans/position_generators.py index 585529e98..ed21ec664 100644 --- a/bec_server/bec_server/scan_server/scans/position_generators.py +++ b/bec_server/bec_server/scan_server/scans/position_generators.py @@ -1,10 +1,16 @@ from __future__ import annotations +import enum from collections.abc import Iterator, Sequence import numpy as np +class Direction(int, enum.Enum): + ASCENDING = 1 + DESCENDING = -1 + + def rotate_points( points: np.ndarray, angle: float, center: tuple[float, float] | None = None ) -> np.ndarray: @@ -263,7 +269,7 @@ def multi_region_grid_positions( regions (list[tuple[tuple[float, float, int], tuple[float, float, int]]]): Sequence of paired region definitions. Each entry contains one ``(start, stop, steps)`` tuple for the first motor and one for the second motor. - snaked (bool): If ``True``, reverse traversal of the second axis on alternating positions + snaked (bool): If ``True``, reverse traversal of the first axis on alternating positions within each sub-grid. Returns: @@ -277,11 +283,11 @@ def multi_region_grid_positions( axis1_positions = _region_points(*region1) axis2_positions = _region_points(*region2) - for index, value1 in enumerate(axis1_positions): - current_axis2 = ( - axis2_positions[::-1] if snaked and (index % 2 == 1) else axis2_positions + for index, value2 in enumerate(axis2_positions): + current_axis1 = ( + axis1_positions[::-1] if snaked and (index % 2 == 1) else axis1_positions ) - for value2 in current_axis2: + for value1 in current_axis1: positions.append([value1, value2]) return np.asarray(positions, dtype=float) @@ -327,7 +333,9 @@ def _get_positions_recursively(current_axes): positions.extend([[val] + sp for sp in sub_positions]) return positions - return np.array(_get_positions_recursively(_axes_arrays)) + # Build the traversal in reverse axis order so the first user-provided axis is the fast one. + positions = _get_positions_recursively(_axes_arrays[::-1]) + return np.array([position[::-1] for position in positions], dtype=float) def fermat_spiral_pos( @@ -497,10 +505,8 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) -> Generate a 2D hexagonal grid clipped to (start, stop) bounds. Args: - axes: [(x_start, x_stop, x_step), - (y_start, y_stop, y_step)] - x_step = horizontal spacing between columns - y_step = vertical spacing between rows + axes: [(axis0_start, axis0_stop, axis0_step), + (axis1_start, axis1_stop, axis1_step)] snaked: if True, reverse direction on alternate rows to minimize travel distance Returns: @@ -509,28 +515,27 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) -> if len(axes) != 2: raise ValueError("2D hex grid requires exactly 2 dimensions") - (x0, x1, sx), (y0, y1, sy) = axes + (a0_start, a0_stop, a0_step), (a1_start, a1_stop, a1_step) = axes points = [] - # Number of rows needed - n_rows = int(np.ceil((y1 - y0) / sy)) + 2 + # The second axis selects the rows and the first axis is traversed within each row. + n_rows = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 for row in range(n_rows): - y = y0 + row * sy + axis1 = a1_start + row * a1_step - # Alternate row offset - shift by half the x step - x_offset = (sx / 2) if (row % 2) else 0.0 + # Alternate row offset - shift by half the fast-axis step + axis0_offset = (a0_step / 2) if (row % 2) else 0.0 - # Number of columns needed - n_cols = int(np.ceil((x1 - x0) / sx)) + 2 + n_cols = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 row_points = [] for col in range(n_cols): - x = x0 + x_offset + col * sx + axis0 = a0_start + axis0_offset + col * a0_step - if x0 <= x <= x1 and y0 <= y <= y1: - row_points.append((x, y)) + if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop: + row_points.append((axis0, axis1)) # Reverse every other row if snaking is enabled if snaked and (row % 2 == 1): diff --git a/bec_server/bec_server/scan_server/scans/scan_components.py b/bec_server/bec_server/scan_server/scans/scan_components.py index 83ac2bb5a..732dc27f1 100644 --- a/bec_server/bec_server/scan_server/scans/scan_components.py +++ b/bec_server/bec_server/scan_server/scans/scan_components.py @@ -8,6 +8,7 @@ from bec_lib.device import DeviceBase from bec_server.scan_server.errors import LimitError from bec_server.scan_server.path_optimization import PathOptimizerMixin +from bec_server.scan_server.scans.position_generators import Direction if TYPE_CHECKING: from bec_server.scan_server.scans.scan_base import ScanBase @@ -159,49 +160,43 @@ def optimize_trajectory( self, positions: np.ndarray, optimization_type: Literal["corridor", "shell", "nearest"] = "corridor", - primary_axis: int = 1, - preferred_directions: list[int] | None = None, + fast_axis: Literal[0, 1] = 0, + first_direction: Literal[-1, 1] | Direction = Direction.ASCENDING, + snaked: bool = True, corridor_size: int | None = None, num_iterations: int = 5, ) -> np.ndarray: """ Optimize the trajectory of the scan by reordering the positions. This can help to minimize the movement time of the motors. + + Important note: The optimization is only done for 2D scans. For higher-dimensional scans, the positions are not reordered and simply + returned as they are. + The optimization can be done in different ways, depending on the optimization_type parameter: - - "corridor": optimize the trajectory in a corridor-like way, where the scan moves back and forth along the primary axis. This is typically a good choice for grid scans. If preferred_directions are provided, the optimizer will try to optimize the trajectory in a way that minimizes the movement in the non-preferred direction. + - "corridor": optimize the trajectory in a corridor-like way, where the scan progresses between corridors along the slow axis and traverses each corridor along the fast axis. This is typically a good choice for grid scans. If first_direction is provided, it sets the traversal direction of the first corridor and snaked controls whether later corridors alternate direction, producing a snaked path along the fast axis. - "shell": optimize the trajectory in a shell-like way, where the scan moves in a spiral from the outside to the inside. This is typically a good choice for round scans. - "nearest": optimize the trajectory by always moving to the nearest next point. This is typically a good choice for random scans. Args: positions (np.ndarray): Array of positions to optimize, shape (num_points, num_motors). optimization_type (str, optional): Type of optimization to perform. Defaults to "corridor". - primary_axis (int, optional): Primary axis for corridor optimization. Defaults to 1. - preferred_directions (list[int] | None, optional): List of preferred directions for the non-primary axes. Each entry should be -1, 0, or 1, indicating the preferred direction of movement along that axis. The length of the list should be equal to the number of non-primary axes. Defaults to None, which means no preferred directions. + fast_axis (int, optional): Fast axis for corridor optimization. Defaults to 0. + first_direction (Direction | Literal[-1, 1], optional): Traversal direction for the first corridor along the fast axis. Positive means ascending, negative means descending. Defaults to Direction.ASCENDING. + snaked (bool, optional): If True, alternate direction between corridors. Defaults to True. corridor_size (int | None, optional): Size of the corridor for corridor optimization. Defaults to None, which means the default corridor size will be used. Returns: np.ndarray: Optimized array of positions, shape (num_points, num_motors). """ if optimization_type == "corridor": - if preferred_directions is None or len(preferred_directions) == 0: - positions = self._path_optimizer.optimize_corridor( - positions, - num_iterations=num_iterations, - corridor_size=corridor_size, - sort_axis=primary_axis, - ) - else: - preferred_direction = ( - preferred_directions[primary_axis] - if len(preferred_directions) > primary_axis - else None - ) - positions = self._path_optimizer.optimize_corridor( - positions, - num_iterations=num_iterations, - sort_axis=primary_axis, - preferred_direction=preferred_direction, - corridor_size=corridor_size, - ) + positions = self._path_optimizer.optimize_corridor( + positions, + num_iterations=num_iterations, + corridor_size=corridor_size, + fast_axis=fast_axis, + first_corridor_direction=first_direction, + snaked=snaked, + ) elif optimization_type == "shell": positions = self._path_optimizer.optimize_shell( diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py index f4c59fa6c..4af7b5789 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py @@ -1,3 +1,5 @@ +from unittest import mock + import numpy as np import pytest @@ -44,3 +46,50 @@ def test_fermat_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembler) assert len(read_messages) == 1 assert read_messages[0].device == ["samz"] assert read_messages[0].metadata["readout_priority"] == "baseline" + + +def test_fermat_scan_prepare_scan_uses_first_axis_as_corridor_axis(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_fermat_scan", + "samx", + -1.0, + 1.0, + "samy", + -2.0, + 2.0, + step=0.5, + optim_trajectory="corridor", + relative=False, + ) + optimized = np.array([[1.0, 0.0], [0.0, 1.0]]) + scan.components.optimize_trajectory = mock.MagicMock(return_value=optimized) + + scan.prepare_scan() + + scan.components.optimize_trajectory.assert_called_once() + _, kwargs = scan.components.optimize_trajectory.call_args + assert kwargs["optimization_type"] == "corridor" + assert kwargs["first_direction"] == 1 + np.testing.assert_allclose(scan.positions, optimized) + + +def test_fermat_scan_prepare_scan_uses_first_axis_range_for_preferred_direction(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_fermat_scan", + "samx", + 1.0, + -1.0, + "samy", + -4.0, + 4.0, + step=0.5, + optim_trajectory="corridor", + relative=False, + ) + optimized = np.array([[1.0, 0.0], [0.0, 1.0]]) + scan.components.optimize_trajectory = mock.MagicMock(return_value=optimized) + + scan.prepare_scan() + + _, kwargs = scan.components.optimize_trajectory.call_args + assert kwargs["first_direction"] == -1 diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py index 8cd5511a8..29bad0347 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py @@ -35,3 +35,16 @@ def test_grid_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembler): assert scan.scan_info.scan_report_instructions == [ {"scan_progress": {"points": 15, "show_table": False}} ] + + +def test_grid_scan_uses_first_axis_as_fast_axis(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_grid_scan", "samx", 0.0, 2.0, 3, "samy", 0.0, 1.0, 2, snaked=True, relative=False + ) + + scan.prepare_scan() + + expected_positions = np.array( + [[0.0, 0.0], [1.0, 0.0], [2.0, 0.0], [2.0, 1.0], [1.0, 1.0], [0.0, 1.0]] + ) + assert np.array_equal(scan.positions, expected_positions) diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py index 5dfd01163..2082e8498 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py @@ -39,6 +39,17 @@ def test_hexagonal_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembl assert np.array_equal(scan.scan_info.positions, expected_positions) +def test_hexagonal_scan_uses_first_axis_as_fast_axis(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_hexagonal_scan", "samx", 0.0, 2.0, 1.0, "samy", 0.0, 1.0, 1.0, relative=False + ) + + scan.prepare_scan() + + expected_positions = np.array([[0.0, 0.0], [1.0, 0.0], [2.0, 0.0], [1.5, 1.0], [0.5, 1.0]]) + assert np.array_equal(scan.positions, expected_positions) + + def test_hexagonal_scan_prepare_scan_offsets_positions_when_relative(v4_scan_assembler): scan = v4_scan_assembler( "_v4_hexagonal_scan", "samx", -1.0, 1.0, 1.0, "samy", -1.0, 1.0, 1.0, relative=True diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py index 607847203..a70b17739 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py @@ -43,17 +43,17 @@ def test_multi_region_grid_scan_prepare_scan_updates_scan_info_and_queue(v4_scan expected_positions = np.array( [ [-3.0, -2.0], + [-1.0, -2.0], + [-1.0, 0.0], [-3.0, 0.0], [-3.0, 2.0], [-1.0, 2.0], - [-1.0, 0.0], - [-1.0, -2.0], [1.0, -2.0], + [3.0, -2.0], + [3.0, 0.0], [1.0, 0.0], [1.0, 2.0], [3.0, 2.0], - [3.0, 0.0], - [3.0, -2.0], ] ) assert np.allclose(scan.positions, expected_positions) @@ -80,17 +80,17 @@ def test_multi_region_grid_scan_prepare_scan_offsets_positions_when_relative(v4_ expected_positions = np.array( [ [-2.0, -3.0], + [0.0, -3.0], + [0.0, -1.0], [-2.0, -1.0], [-2.0, 1.0], [0.0, 1.0], - [0.0, -1.0], - [0.0, -3.0], [2.0, -3.0], + [4.0, -3.0], + [4.0, -1.0], [2.0, -1.0], [2.0, 1.0], [4.0, 1.0], - [4.0, -1.0], - [4.0, -3.0], ] ) assert scan.start_positions == [1.0, -1.0] diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py b/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py index f13210873..2146b0420 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py @@ -99,17 +99,17 @@ def test_multi_region_grid_positions_builds_snaked_grid(): positions, [ [-3.0, -2.0], + [-1.0, -2.0], + [-1.0, 0.0], [-3.0, 0.0], [-3.0, 2.0], [-1.0, 2.0], - [-1.0, 0.0], - [-1.0, -2.0], [1.0, -2.0], + [3.0, -2.0], + [3.0, 0.0], [1.0, 0.0], [1.0, 2.0], [3.0, 2.0], - [3.0, 0.0], - [3.0, -2.0], ], ) diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py b/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py index 29178fe1d..47ec4c4ce 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py @@ -99,7 +99,7 @@ def test_get_start_positions_supports_motor_names_and_instances(v4_scan_assemble assert start_positions == [1.25, -3.5] -def test_optimize_trajectory_uses_corridor_defaults_without_preferred_direction(v4_scan_assembler): +def test_optimize_trajectory_uses_corridor_defaults(v4_scan_assembler): scan = v4_scan_assembler("_v4_mv", "samx", 1.5, "samy", -2.0, relative=False) positions = np.array([[0.0, 1.0], [1.0, 0.0]]) optimized = np.array([[1.0, 0.0], [0.0, 1.0]]) @@ -110,12 +110,17 @@ def test_optimize_trajectory_uses_corridor_defaults_without_preferred_direction( ) scan.components._path_optimizer.optimize_corridor.assert_called_once_with( - positions, num_iterations=7, corridor_size=4, sort_axis=1 + positions, + num_iterations=7, + corridor_size=4, + fast_axis=0, + first_corridor_direction=1, + snaked=True, ) np.testing.assert_allclose(result, optimized) -def test_optimize_trajectory_passes_primary_axis_preference_for_corridor(v4_scan_assembler): +def test_optimize_trajectory_passes_first_direction_for_corridor(v4_scan_assembler): scan = v4_scan_assembler("_v4_mv", "samx", 1.5, "samy", -2.0, relative=False) positions = np.array([[0.0, 1.0], [1.0, 0.0]]) scan.components._path_optimizer.optimize_corridor = mock.MagicMock(return_value=positions) @@ -123,14 +128,47 @@ def test_optimize_trajectory_passes_primary_axis_preference_for_corridor(v4_scan scan.components.optimize_trajectory( positions, optimization_type="corridor", - primary_axis=1, - preferred_directions=[-1, 1], + fast_axis=0, + first_direction=-1, + snaked=True, corridor_size=2, num_iterations=3, ) scan.components._path_optimizer.optimize_corridor.assert_called_once_with( - positions, num_iterations=3, sort_axis=1, preferred_direction=1, corridor_size=2 + positions, + num_iterations=3, + fast_axis=0, + first_corridor_direction=-1, + snaked=True, + corridor_size=2, + ) + + +def test_optimize_trajectory_passes_first_direction_when_first_axis_is_corridor_axis( + v4_scan_assembler, +): + scan = v4_scan_assembler("_v4_mv", "samx", 1.5, "samy", -2.0, relative=False) + positions = np.array([[0.0, 1.0], [1.0, 0.0]]) + scan.components._path_optimizer.optimize_corridor = mock.MagicMock(return_value=positions) + + scan.components.optimize_trajectory( + positions, + optimization_type="corridor", + fast_axis=1, + first_direction=1, + snaked=True, + corridor_size=2, + num_iterations=3, + ) + + scan.components._path_optimizer.optimize_corridor.assert_called_once_with( + positions, + num_iterations=3, + fast_axis=1, + first_corridor_direction=1, + snaked=True, + corridor_size=2, ) diff --git a/bec_server/tests/tests_scan_server/test_legacy_scans.py b/bec_server/tests/tests_scan_server/test_legacy_scans.py index c13eabdfa..f59c1b41f 100644 --- a/bec_server/tests/tests_scan_server/test_legacy_scans.py +++ b/bec_server/tests/tests_scan_server/test_legacy_scans.py @@ -494,7 +494,7 @@ def offset_mock(): "async": [], }, "num_points": 4, - "positions": [[-5.0, -5.0], [-5.0, 5.0], [5.0, 5.0], [5.0, -5.0]], + "positions": [[-5.0, -5.0], [5.0, -5.0], [5.0, 5.0], [-5.0, 5.0]], "scan_name": "grid_scan", "scan_type": "step", }, @@ -543,7 +543,7 @@ def offset_mock(): ), messages.DeviceInstructionMessage( metadata={"readout_priority": "monitored"}, - device="samy", + device="samx", action="set", parameter={"value": np.float64(5.0)}, ), @@ -561,7 +561,7 @@ def offset_mock(): ), messages.DeviceInstructionMessage( metadata={"readout_priority": "monitored"}, - device="samx", + device="samy", action="set", parameter={"value": np.float64(5.0)}, ), @@ -579,7 +579,7 @@ def offset_mock(): ), messages.DeviceInstructionMessage( metadata={"readout_priority": "monitored"}, - device="samy", + device="samx", action="set", parameter={"value": np.float64(-5.0)}, ), @@ -1123,29 +1123,29 @@ def test_round_scan_positions(in_args, reference_positions): @pytest.mark.parametrize( "in_args,reference_positions,snaked", [ - ([list(range(2)), list(range(2))], [[0, 0], [0, 1], [1, 1], [1, 0]], True), - ([list(range(2)), list(range(3))], [[0, 0], [0, 1], [0, 2], [1, 0], [1, 1], [1, 2]], False), + ([list(range(2)), list(range(2))], [[0, 0], [1, 0], [1, 1], [0, 1]], True), + ([list(range(2)), list(range(3))], [[0, 0], [1, 0], [0, 1], [1, 1], [0, 2], [1, 2]], False), ( [list(range(3)), list(range(3)), list(range(2))], [ [0, 0, 0], - [0, 0, 1], - [0, 1, 1], - [0, 1, 0], - [0, 2, 0], - [0, 2, 1], - [1, 2, 1], - [1, 2, 0], - [1, 1, 0], - [1, 1, 1], - [1, 0, 1], [1, 0, 0], [2, 0, 0], - [2, 0, 1], - [2, 1, 1], [2, 1, 0], + [1, 1, 0], + [0, 1, 0], + [0, 2, 0], + [1, 2, 0], [2, 2, 0], [2, 2, 1], + [1, 2, 1], + [0, 2, 1], + [0, 1, 1], + [1, 1, 1], + [2, 1, 1], + [2, 0, 1], + [1, 0, 1], + [0, 0, 1], ], True, ), diff --git a/bec_server/tests/tests_scan_server/test_path_optimization.py b/bec_server/tests/tests_scan_server/test_path_optimization.py index 8fc819afb..d16fbc1a8 100644 --- a/bec_server/tests/tests_scan_server/test_path_optimization.py +++ b/bec_server/tests/tests_scan_server/test_path_optimization.py @@ -257,3 +257,20 @@ def test_optimize_corridor_raises_corridor_estimation(): optim.optimize_corridor( positions_orig, num_iterations=10, corridor_estimation="invalid_method" ) + + +def test_optimize_corridor_respects_first_corridor_direction_and_snaking(): + optim = PathOptimizerMixin() + positions = np.asarray([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]) + + snaked = optim.optimize_corridor( + positions, corridor_size=1.0, fast_axis=1, first_corridor_direction=1, snaked=True + ) + non_snaked = optim.optimize_corridor( + positions, corridor_size=1.0, fast_axis=1, first_corridor_direction=1, snaked=False + ) + + np.testing.assert_allclose(snaked, np.asarray([[0.0, 0.0], [0.0, 1.0], [1.0, 1.0], [1.0, 0.0]])) + np.testing.assert_allclose( + non_snaked, np.asarray([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]) + )