From 9881de35f208e720d663aa1492b109f0e72f7d17 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 14 Apr 2026 11:51:51 +0200 Subject: [PATCH 1/4] fix: corridor optimization should respect fast axis --- .../scan_server/path_optimization.py | 50 ++++++++++++------- .../scan_server/scans/fermat_scan.py | 12 +++++ .../scan_server/scans/legacy_scans.py | 31 ++++-------- .../scan_server/scans/scan_components.py | 40 ++++++--------- .../scans_v4/test_fermat_scan.py | 49 ++++++++++++++++++ .../scans_v4/test_scan_components.py | 50 ++++++++++++++++--- .../test_path_optimization.py | 17 +++++++ 7 files changed, 180 insertions(+), 69 deletions(-) diff --git a/bec_server/bec_server/scan_server/path_optimization.py b/bec_server/bec_server/scan_server/path_optimization.py index cfa1ba15c..bb41877fe 100644 --- a/bec_server/bec_server/scan_server/path_optimization.py +++ b/bec_server/bec_server/scan_server/path_optimization.py @@ -1,3 +1,4 @@ +import enum from typing import Literal import numpy as np @@ -14,6 +15,11 @@ class PathQualityStats(BaseModel): model_config = {"arbitrary_types_allowed": True} +class Direction(int, enum.Enum): + ASCENDING = 1 + DESCENDING = -1 + + class PathOptimizerMixin: def get_radius(self, pos: np.ndarray) -> np.ndarray: """ @@ -31,10 +37,11 @@ def optimize_corridor( self, positions: np.ndarray, corridor_size: float | None = None, - sort_axis: int = 1, + fast_axis: Literal[0, 1] = 1, num_iterations: int = 1, - preferred_direction: int | None = None, - corridor_estimation: Literal["density", "median_distance"] = "median_distance", + first_corridor_direction: Direction | Literal[-1, 1] = Direction.ASCENDING, + snaked: bool = True, + corridor_estimation: Literal["density", "median_distance"] = "density", ): """ Optimize positions using a corridor-based approach. @@ -45,11 +52,14 @@ def optimize_corridor( Args: positions (np.ndarray): Array of positions corridor_size (float, optional): Width of each corridor. Defaults to None (auto-estimated). - sort_axis (int, optional): Axis along which to create corridors (0 or 1). Defaults to 1. + fast_axis (Literal[0, 1], optional): Axis traversed within each corridor (0 or 1). Defaults to 1. num_iterations (int, optional): Number of corridor sizes to try. Defaults to 1. - preferred_direction (int | None, optional): Preferred direction for the primary axis (1 or -1). - If None, alternates direction for each corridor. - corridor_estimation (str, optional): Method for estimating corridor size if not provided. + first_corridor_direction (Direction | Literal[-1, 1], optional): Traversal direction of the first + corridor along the fast axis. Positive means ascending, negative means + descending. Defaults to Direction.ASCENDING. + snaked (bool, optional): If True, alternate the traversal direction between + successive corridors. If False, keep the same direction in every corridor. + corridor_estimation (Literal["density", "median_distance"], optional): Method for estimating corridor size if not provided. Options are "density" or "median_distance". Defaults to "density". Returns: @@ -70,8 +80,14 @@ def optimize_corridor( 'Choose "density" or "median_distance".' ) - axis_vals = positions[:, sort_axis] - sec_axis = int(not sort_axis) + if not isinstance(first_corridor_direction, Direction): + first_corridor_direction = Direction(first_corridor_direction) + + if fast_axis not in (0, 1): + raise ValueError("fast_axis must be 0 or 1") + + slow_axis = int(not fast_axis) + axis_vals = positions[:, slow_axis] best_length = np.inf best_path = positions @@ -97,17 +113,15 @@ def optimize_corridor( if block.size == 0: continue - # Sort within corridor along secondary axis - block = block[np.argsort(positions[block, sec_axis])] + # Sort within corridor along fast axis + block = block[np.argsort(positions[block, fast_axis])] # Direction handling - if preferred_direction is not None: - if preferred_direction < 0: - block = block[::-1] - else: - # Alternate direction between corridors - if step % 2 == 0: - block = block[::-1] + direction = first_corridor_direction + if snaked and step % 2 == 1: + direction = Direction(-direction) + if direction == Direction.DESCENDING: + block = block[::-1] index_sorted.append(block) diff --git a/bec_server/bec_server/scan_server/scans/fermat_scan.py b/bec_server/bec_server/scan_server/scans/fermat_scan.py index 58433927a..aa0532524 100644 --- a/bec_server/bec_server/scan_server/scans/fermat_scan.py +++ b/bec_server/bec_server/scan_server/scans/fermat_scan.py @@ -23,6 +23,7 @@ from bec_lib.device import DeviceBase from bec_lib.logger import bec_logger from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.path_optimization import Direction from bec_server.scan_server.scans import position_generators from bec_server.scan_server.scans.scan_base import ScanBase, ScanType from bec_server.scan_server.scans.scan_modifier import scan_hook @@ -191,6 +192,17 @@ def prepare_scan(self): self.start_positions = self.components.get_start_positions(self.motors) self.positions += self.start_positions + if self.optim_trajectory: + self.positions = self.components.optimize_trajectory( + self.positions, + optimization_type=self.optim_trajectory, + first_direction=( + Direction.ASCENDING + if self.motor2_start_stop[1] > self.motor2_start_stop[0] + else Direction.DESCENDING + ), + ) + self.components.check_limits(self.motors, self.positions) self.update_scan_info( diff --git a/bec_server/bec_server/scan_server/scans/legacy_scans.py b/bec_server/bec_server/scan_server/scans/legacy_scans.py index 00abe5c44..0f31fc7ac 100644 --- a/bec_server/bec_server/scan_server/scans/legacy_scans.py +++ b/bec_server/bec_server/scan_server/scans/legacy_scans.py @@ -19,7 +19,7 @@ from bec_server.scan_server.instruction_handler import InstructionHandler from ..errors import LimitError, ScanAbortion -from ..path_optimization import PathOptimizerMixin +from ..path_optimization import Direction, PathOptimizerMixin from ..scan_stubs import ScanStubs logger = bec_logger.logger @@ -520,26 +520,14 @@ def _optimize_trajectory(self): if not self.optim_trajectory: return - # Get preferred directions from scan parameters if available - preferred_directions = getattr(self, "preferred_directions", None) - if self.optim_trajectory == "corridor": - # For corridor optimization, we use the primary axis preferred direction - if preferred_directions and len(preferred_directions) > 0: - primary_axis = getattr(self, "sort_axis", 1) - preferred_direction = ( - preferred_directions[primary_axis] - if len(preferred_directions) > primary_axis - else None - ) - self.positions = self.optimize_corridor( - self.positions, - num_iterations=5, - sort_axis=primary_axis, - preferred_direction=preferred_direction, - ) - else: - self.positions = self.optimize_corridor(self.positions, num_iterations=5) + # Corridor traversal direction follows the first pass along the axis within each corridor. + self.positions = self.optimize_corridor( + self.positions, + num_iterations=5, + fast_axis=getattr(self, "fast_axis", 1), + first_corridor_direction=getattr(self, "first_direction", 1), + ) return if self.optim_trajectory == "shell": @@ -1103,6 +1091,9 @@ def __init__( self.stop_motor2 = stop_motor2 self.step = step self.spiral_type = spiral_type + self.first_direction = ( + Direction.ASCENDING if self.stop_motor2 > self.start_motor2 else Direction.DESCENDING + ) def update_scan_motors(self): self.scan_motors = [self.motor1, self.motor2] diff --git a/bec_server/bec_server/scan_server/scans/scan_components.py b/bec_server/bec_server/scan_server/scans/scan_components.py index 83ac2bb5a..889417eb0 100644 --- a/bec_server/bec_server/scan_server/scans/scan_components.py +++ b/bec_server/bec_server/scan_server/scans/scan_components.py @@ -159,49 +159,39 @@ def optimize_trajectory( self, positions: np.ndarray, optimization_type: Literal["corridor", "shell", "nearest"] = "corridor", - primary_axis: int = 1, - preferred_directions: list[int] | None = None, + fast_axis: int = 1, + first_direction: int = 1, + snaked: bool = True, corridor_size: int | None = None, num_iterations: int = 5, ) -> np.ndarray: """ Optimize the trajectory of the scan by reordering the positions. This can help to minimize the movement time of the motors. The optimization can be done in different ways, depending on the optimization_type parameter: - - "corridor": optimize the trajectory in a corridor-like way, where the scan moves back and forth along the primary axis. This is typically a good choice for grid scans. If preferred_directions are provided, the optimizer will try to optimize the trajectory in a way that minimizes the movement in the non-preferred direction. + - "corridor": optimize the trajectory in a corridor-like way, where the scan progresses between corridors along the slow axis and traverses each corridor along the fast axis. This is typically a good choice for grid scans. If first_direction is provided, it sets the traversal direction of the first corridor and snaked controls whether later corridors alternate direction, producing a snaked path along the fast axis. - "shell": optimize the trajectory in a shell-like way, where the scan moves in a spiral from the outside to the inside. This is typically a good choice for round scans. - "nearest": optimize the trajectory by always moving to the nearest next point. This is typically a good choice for random scans. Args: positions (np.ndarray): Array of positions to optimize, shape (num_points, num_motors). optimization_type (str, optional): Type of optimization to perform. Defaults to "corridor". - primary_axis (int, optional): Primary axis for corridor optimization. Defaults to 1. - preferred_directions (list[int] | None, optional): List of preferred directions for the non-primary axes. Each entry should be -1, 0, or 1, indicating the preferred direction of movement along that axis. The length of the list should be equal to the number of non-primary axes. Defaults to None, which means no preferred directions. + fast_axis (int, optional): Fast axis for corridor optimization. Defaults to 1. + first_direction (int, optional): Traversal direction for the first corridor along the fast axis. Positive means ascending, negative means descending. Defaults to 1. + snaked (bool, optional): If True, alternate direction between corridors. Defaults to True. corridor_size (int | None, optional): Size of the corridor for corridor optimization. Defaults to None, which means the default corridor size will be used. Returns: np.ndarray: Optimized array of positions, shape (num_points, num_motors). """ if optimization_type == "corridor": - if preferred_directions is None or len(preferred_directions) == 0: - positions = self._path_optimizer.optimize_corridor( - positions, - num_iterations=num_iterations, - corridor_size=corridor_size, - sort_axis=primary_axis, - ) - else: - preferred_direction = ( - preferred_directions[primary_axis] - if len(preferred_directions) > primary_axis - else None - ) - positions = self._path_optimizer.optimize_corridor( - positions, - num_iterations=num_iterations, - sort_axis=primary_axis, - preferred_direction=preferred_direction, - corridor_size=corridor_size, - ) + positions = self._path_optimizer.optimize_corridor( + positions, + num_iterations=num_iterations, + corridor_size=corridor_size, + fast_axis=fast_axis, + first_corridor_direction=first_direction, + snaked=snaked, + ) elif optimization_type == "shell": positions = self._path_optimizer.optimize_shell( diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py index f4c59fa6c..34d019694 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py @@ -1,3 +1,5 @@ +from unittest import mock + import numpy as np import pytest @@ -44,3 +46,50 @@ def test_fermat_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembler) assert len(read_messages) == 1 assert read_messages[0].device == ["samz"] assert read_messages[0].metadata["readout_priority"] == "baseline" + + +def test_fermat_scan_prepare_scan_uses_first_axis_as_corridor_axis(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_fermat_scan", + "samx", + -1.0, + 1.0, + "samy", + -2.0, + 2.0, + step=0.5, + optim_trajectory="corridor", + relative=False, + ) + optimized = np.array([[1.0, 0.0], [0.0, 1.0]]) + scan.components.optimize_trajectory = mock.MagicMock(return_value=optimized) + + scan.prepare_scan() + + scan.components.optimize_trajectory.assert_called_once() + _, kwargs = scan.components.optimize_trajectory.call_args + assert kwargs["optimization_type"] == "corridor" + assert kwargs["first_direction"] == 1 + np.testing.assert_allclose(scan.positions, optimized) + + +def test_fermat_scan_prepare_scan_uses_fast_axis_range_for_preferred_direction(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_fermat_scan", + "samx", + -1.0, + 1.0, + "samy", + 4.0, + -4.0, + step=0.5, + optim_trajectory="corridor", + relative=False, + ) + optimized = np.array([[1.0, 0.0], [0.0, 1.0]]) + scan.components.optimize_trajectory = mock.MagicMock(return_value=optimized) + + scan.prepare_scan() + + _, kwargs = scan.components.optimize_trajectory.call_args + assert kwargs["first_direction"] == -1 diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py b/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py index 29178fe1d..89a262a41 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py @@ -99,7 +99,7 @@ def test_get_start_positions_supports_motor_names_and_instances(v4_scan_assemble assert start_positions == [1.25, -3.5] -def test_optimize_trajectory_uses_corridor_defaults_without_preferred_direction(v4_scan_assembler): +def test_optimize_trajectory_uses_corridor_defaults(v4_scan_assembler): scan = v4_scan_assembler("_v4_mv", "samx", 1.5, "samy", -2.0, relative=False) positions = np.array([[0.0, 1.0], [1.0, 0.0]]) optimized = np.array([[1.0, 0.0], [0.0, 1.0]]) @@ -110,12 +110,17 @@ def test_optimize_trajectory_uses_corridor_defaults_without_preferred_direction( ) scan.components._path_optimizer.optimize_corridor.assert_called_once_with( - positions, num_iterations=7, corridor_size=4, sort_axis=1 + positions, + num_iterations=7, + corridor_size=4, + fast_axis=1, + first_corridor_direction=1, + snaked=True, ) np.testing.assert_allclose(result, optimized) -def test_optimize_trajectory_passes_primary_axis_preference_for_corridor(v4_scan_assembler): +def test_optimize_trajectory_passes_first_direction_for_corridor(v4_scan_assembler): scan = v4_scan_assembler("_v4_mv", "samx", 1.5, "samy", -2.0, relative=False) positions = np.array([[0.0, 1.0], [1.0, 0.0]]) scan.components._path_optimizer.optimize_corridor = mock.MagicMock(return_value=positions) @@ -123,14 +128,47 @@ def test_optimize_trajectory_passes_primary_axis_preference_for_corridor(v4_scan scan.components.optimize_trajectory( positions, optimization_type="corridor", - primary_axis=1, - preferred_directions=[-1, 1], + fast_axis=0, + first_direction=-1, + snaked=True, corridor_size=2, num_iterations=3, ) scan.components._path_optimizer.optimize_corridor.assert_called_once_with( - positions, num_iterations=3, sort_axis=1, preferred_direction=1, corridor_size=2 + positions, + num_iterations=3, + fast_axis=0, + first_corridor_direction=-1, + snaked=True, + corridor_size=2, + ) + + +def test_optimize_trajectory_passes_first_direction_when_first_axis_is_corridor_axis( + v4_scan_assembler, +): + scan = v4_scan_assembler("_v4_mv", "samx", 1.5, "samy", -2.0, relative=False) + positions = np.array([[0.0, 1.0], [1.0, 0.0]]) + scan.components._path_optimizer.optimize_corridor = mock.MagicMock(return_value=positions) + + scan.components.optimize_trajectory( + positions, + optimization_type="corridor", + fast_axis=1, + first_direction=1, + snaked=True, + corridor_size=2, + num_iterations=3, + ) + + scan.components._path_optimizer.optimize_corridor.assert_called_once_with( + positions, + num_iterations=3, + fast_axis=1, + first_corridor_direction=1, + snaked=True, + corridor_size=2, ) diff --git a/bec_server/tests/tests_scan_server/test_path_optimization.py b/bec_server/tests/tests_scan_server/test_path_optimization.py index 8fc819afb..d16fbc1a8 100644 --- a/bec_server/tests/tests_scan_server/test_path_optimization.py +++ b/bec_server/tests/tests_scan_server/test_path_optimization.py @@ -257,3 +257,20 @@ def test_optimize_corridor_raises_corridor_estimation(): optim.optimize_corridor( positions_orig, num_iterations=10, corridor_estimation="invalid_method" ) + + +def test_optimize_corridor_respects_first_corridor_direction_and_snaking(): + optim = PathOptimizerMixin() + positions = np.asarray([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]) + + snaked = optim.optimize_corridor( + positions, corridor_size=1.0, fast_axis=1, first_corridor_direction=1, snaked=True + ) + non_snaked = optim.optimize_corridor( + positions, corridor_size=1.0, fast_axis=1, first_corridor_direction=1, snaked=False + ) + + np.testing.assert_allclose(snaked, np.asarray([[0.0, 0.0], [0.0, 1.0], [1.0, 1.0], [1.0, 0.0]])) + np.testing.assert_allclose( + non_snaked, np.asarray([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]) + ) From 147bc5d097d5948fdee3067437d733a1eb1eea30 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 14 Apr 2026 11:52:50 +0200 Subject: [PATCH 2/4] fix: hexagonal scan should use second axis as fast axis --- .../scan_server/scans/legacy_scans.py | 27 +++++++++---------- .../scan_server/scans/position_generators.py | 27 +++++++++---------- .../scans_v4/test_hexagonal_scan.py | 11 ++++++++ .../tests_scan_server/test_legacy_scans.py | 16 +++++------ 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/bec_server/bec_server/scan_server/scans/legacy_scans.py b/bec_server/bec_server/scan_server/scans/legacy_scans.py index 0f31fc7ac..0e33e3b08 100644 --- a/bec_server/bec_server/scan_server/scans/legacy_scans.py +++ b/bec_server/bec_server/scan_server/scans/legacy_scans.py @@ -195,10 +195,8 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) Generate a 2D hexagonal grid clipped to (start, stop) bounds. Args: - axes: [(x_start, x_stop, x_step), - (y_start, y_stop, y_step)] - x_step = horizontal spacing between columns - y_step = vertical spacing between rows + axes: [(axis0_start, axis0_stop, axis0_step), + (axis1_start, axis1_stop, axis1_step)] snaked: if True, reverse direction on alternate rows to minimize travel distance Returns: @@ -207,28 +205,27 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) if len(axes) != 2: raise ValueError("2D hex grid requires exactly 2 dimensions") - (x0, x1, sx), (y0, y1, sy) = axes + (a0_start, a0_stop, a0_step), (a1_start, a1_stop, a1_step) = axes points = [] - # Number of rows needed - n_rows = int(np.ceil((y1 - y0) / sy)) + 2 + # The first axis is the slow row axis and the second axis is the fast axis. + n_rows = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 for row in range(n_rows): - y = y0 + row * sy + axis0 = a0_start + row * a0_step - # Alternate row offset - shift by half the x step - x_offset = (sx / 2) if (row % 2) else 0.0 + # Alternate row offset - shift by half the fast-axis step + axis1_offset = (a1_step / 2) if (row % 2) else 0.0 - # Number of columns needed - n_cols = int(np.ceil((x1 - x0) / sx)) + 2 + n_cols = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 row_points = [] for col in range(n_cols): - x = x0 + x_offset + col * sx + axis1 = a1_start + axis1_offset + col * a1_step - if x0 <= x <= x1 and y0 <= y <= y1: - row_points.append((x, y)) + if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop: + row_points.append((axis0, axis1)) # Reverse every other row if snaking is enabled if snaked and (row % 2 == 1): diff --git a/bec_server/bec_server/scan_server/scans/position_generators.py b/bec_server/bec_server/scan_server/scans/position_generators.py index 585529e98..e79c94a97 100644 --- a/bec_server/bec_server/scan_server/scans/position_generators.py +++ b/bec_server/bec_server/scan_server/scans/position_generators.py @@ -497,10 +497,8 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) -> Generate a 2D hexagonal grid clipped to (start, stop) bounds. Args: - axes: [(x_start, x_stop, x_step), - (y_start, y_stop, y_step)] - x_step = horizontal spacing between columns - y_step = vertical spacing between rows + axes: [(axis0_start, axis0_stop, axis0_step), + (axis1_start, axis1_stop, axis1_step)] snaked: if True, reverse direction on alternate rows to minimize travel distance Returns: @@ -509,28 +507,27 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) -> if len(axes) != 2: raise ValueError("2D hex grid requires exactly 2 dimensions") - (x0, x1, sx), (y0, y1, sy) = axes + (a0_start, a0_stop, a0_step), (a1_start, a1_stop, a1_step) = axes points = [] - # Number of rows needed - n_rows = int(np.ceil((y1 - y0) / sy)) + 2 + # The first axis is the slow row axis and the second axis is the fast axis. + n_rows = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 for row in range(n_rows): - y = y0 + row * sy + axis0 = a0_start + row * a0_step - # Alternate row offset - shift by half the x step - x_offset = (sx / 2) if (row % 2) else 0.0 + # Alternate row offset - shift by half the fast-axis step + axis1_offset = (a1_step / 2) if (row % 2) else 0.0 - # Number of columns needed - n_cols = int(np.ceil((x1 - x0) / sx)) + 2 + n_cols = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 row_points = [] for col in range(n_cols): - x = x0 + x_offset + col * sx + axis1 = a1_start + axis1_offset + col * a1_step - if x0 <= x <= x1 and y0 <= y <= y1: - row_points.append((x, y)) + if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop: + row_points.append((axis0, axis1)) # Reverse every other row if snaking is enabled if snaked and (row % 2 == 1): diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py index 5dfd01163..c9a1c4a57 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py @@ -39,6 +39,17 @@ def test_hexagonal_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembl assert np.array_equal(scan.scan_info.positions, expected_positions) +def test_hexagonal_scan_uses_second_axis_as_fast_axis(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_hexagonal_scan", "samx", 0.0, 2.0, 1.0, "samy", 0.0, 1.0, 1.0, relative=False + ) + + scan.prepare_scan() + + expected_positions = np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 0.5], [2.0, 0.0], [2.0, 1.0]]) + assert np.array_equal(scan.positions, expected_positions) + + def test_hexagonal_scan_prepare_scan_offsets_positions_when_relative(v4_scan_assembler): scan = v4_scan_assembler( "_v4_hexagonal_scan", "samx", -1.0, 1.0, 1.0, "samy", -1.0, 1.0, 1.0, relative=True diff --git a/bec_server/tests/tests_scan_server/test_legacy_scans.py b/bec_server/tests/tests_scan_server/test_legacy_scans.py index c13eabdfa..69e079411 100644 --- a/bec_server/tests/tests_scan_server/test_legacy_scans.py +++ b/bec_server/tests/tests_scan_server/test_legacy_scans.py @@ -2287,17 +2287,17 @@ def run(self): "axes,snaked,reference_positions", [ # Simple 2x2 grid with snaking - ([(0, 1, 1), (0, 1, 1)], True, [[0, 0], [1, 0], [0.5, 1]]), + ([(0, 1, 1), (0, 1, 1)], True, [[0, 0], [0, 1], [1, 0.5]]), # Simple 2x2 grid without snaking - ([(0, 1, 1), (0, 1, 1)], False, [[0, 0], [1, 0], [0.5, 1]]), + ([(0, 1, 1), (0, 1, 1)], False, [[0, 0], [0, 1], [1, 0.5]]), # 3x2 grid with different step sizes and snaking ( [(0, 2, 1), (0, 1, 0.5)], True, - [[0, 0], [1, 0], [2, 0], [1.5, 0.5], [0.5, 0.5], [0, 1], [1, 1], [2, 1]], + [[0, 0], [0, 0.5], [0, 1], [1, 0.75], [1, 0.25], [2, 0], [2, 0.5], [2, 1]], ), # Small grid with exact boundaries - ([(0, 0.5, 0.5), (0, 0.5, 0.5)], True, [[0, 0], [0.5, 0], [0.25, 0.5]]), + ([(0, 0.5, 0.5), (0, 0.5, 0.5)], True, [[0, 0], [0, 0.5], [0.5, 0.25]]), ], ) def test_get_hex_grid_2d(axes, snaked, reference_positions): @@ -2397,11 +2397,11 @@ def test_hexagonal_scan_snaking(): request_snaked = HexagonalScan( "samx", 0, - 2, + 1, 1, "samy", 0, - 1, + 2, 1, device_manager=device_manager, exp_time=0.1, @@ -2414,11 +2414,11 @@ def test_hexagonal_scan_snaking(): request_unsnaked = HexagonalScan( "samx", 0, - 2, + 1, 1, "samy", 0, - 1, + 2, 1, device_manager=device_manager, exp_time=0.1, From 43b0104df09521fed5930408b41ef952e08bc991 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 28 Apr 2026 14:19:03 +0200 Subject: [PATCH 3/4] f Co-authored-by: Copilot --- .../scan_server/path_optimization.py | 12 +++---- .../scan_server/scans/fermat_scan.py | 4 +-- .../scan_server/scans/legacy_scans.py | 19 +++++------ .../scans/multi_region_grid_scan.py | 2 +- .../scan_server/scans/position_generators.py | 32 ++++++++++++------- .../scan_server/scans/scan_components.py | 13 +++++--- .../scans_v4/test_fermat_scan.py | 7 ++-- .../scans_v4/test_grid_scan.py | 13 ++++++++ .../scans_v4/test_hexagonal_scan.py | 4 +-- .../scans_v4/test_multi_region_grid_scan.py | 16 +++++----- .../scans_v4/test_position_generators.py | 8 ++--- .../scans_v4/test_scan_components.py | 2 +- .../tests_scan_server/test_legacy_scans.py | 16 +++++----- 13 files changed, 86 insertions(+), 62 deletions(-) diff --git a/bec_server/bec_server/scan_server/path_optimization.py b/bec_server/bec_server/scan_server/path_optimization.py index bb41877fe..f85bcfa2e 100644 --- a/bec_server/bec_server/scan_server/path_optimization.py +++ b/bec_server/bec_server/scan_server/path_optimization.py @@ -1,10 +1,11 @@ -import enum from typing import Literal import numpy as np from pydantic import BaseModel from scipy.spatial import cKDTree # type: ignore +from bec_server.scan_server.scans.position_generators import Direction + class PathQualityStats(BaseModel): max_jump: float @@ -15,11 +16,6 @@ class PathQualityStats(BaseModel): model_config = {"arbitrary_types_allowed": True} -class Direction(int, enum.Enum): - ASCENDING = 1 - DESCENDING = -1 - - class PathOptimizerMixin: def get_radius(self, pos: np.ndarray) -> np.ndarray: """ @@ -37,7 +33,7 @@ def optimize_corridor( self, positions: np.ndarray, corridor_size: float | None = None, - fast_axis: Literal[0, 1] = 1, + fast_axis: Literal[0, 1] = 0, num_iterations: int = 1, first_corridor_direction: Direction | Literal[-1, 1] = Direction.ASCENDING, snaked: bool = True, @@ -52,7 +48,7 @@ def optimize_corridor( Args: positions (np.ndarray): Array of positions corridor_size (float, optional): Width of each corridor. Defaults to None (auto-estimated). - fast_axis (Literal[0, 1], optional): Axis traversed within each corridor (0 or 1). Defaults to 1. + fast_axis (Literal[0, 1], optional): Axis traversed within each corridor (0 or 1). Defaults to 0. num_iterations (int, optional): Number of corridor sizes to try. Defaults to 1. first_corridor_direction (Direction | Literal[-1, 1], optional): Traversal direction of the first corridor along the fast axis. Positive means ascending, negative means diff --git a/bec_server/bec_server/scan_server/scans/fermat_scan.py b/bec_server/bec_server/scan_server/scans/fermat_scan.py index aa0532524..3957ddf68 100644 --- a/bec_server/bec_server/scan_server/scans/fermat_scan.py +++ b/bec_server/bec_server/scan_server/scans/fermat_scan.py @@ -23,8 +23,8 @@ from bec_lib.device import DeviceBase from bec_lib.logger import bec_logger from bec_lib.scan_args import ScanArgument, Units -from bec_server.scan_server.path_optimization import Direction from bec_server.scan_server.scans import position_generators +from bec_server.scan_server.scans.position_generators import Direction from bec_server.scan_server.scans.scan_base import ScanBase, ScanType from bec_server.scan_server.scans.scan_modifier import scan_hook @@ -198,7 +198,7 @@ def prepare_scan(self): optimization_type=self.optim_trajectory, first_direction=( Direction.ASCENDING - if self.motor2_start_stop[1] > self.motor2_start_stop[0] + if self.motor1_start_stop[1] > self.motor1_start_stop[0] else Direction.DESCENDING ), ) diff --git a/bec_server/bec_server/scan_server/scans/legacy_scans.py b/bec_server/bec_server/scan_server/scans/legacy_scans.py index 0e33e3b08..0348a5f67 100644 --- a/bec_server/bec_server/scan_server/scans/legacy_scans.py +++ b/bec_server/bec_server/scan_server/scans/legacy_scans.py @@ -90,7 +90,8 @@ def _get_positions_recursively(current_axes): positions.extend([[val] + sp for sp in sub_positions]) return positions - return np.array(_get_positions_recursively(axes)) + positions = _get_positions_recursively(axes[::-1]) + return np.array([position[::-1] for position in positions], dtype=float) # pylint: disable=too-many-arguments @@ -209,20 +210,20 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) points = [] - # The first axis is the slow row axis and the second axis is the fast axis. - n_rows = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 + # The second axis selects the rows and the first axis is traversed within each row. + n_rows = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 for row in range(n_rows): - axis0 = a0_start + row * a0_step + axis1 = a1_start + row * a1_step # Alternate row offset - shift by half the fast-axis step - axis1_offset = (a1_step / 2) if (row % 2) else 0.0 + axis0_offset = (a0_step / 2) if (row % 2) else 0.0 - n_cols = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 + n_cols = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 row_points = [] for col in range(n_cols): - axis1 = a1_start + axis1_offset + col * a1_step + axis0 = a0_start + axis0_offset + col * a0_step if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop: row_points.append((axis0, axis1)) @@ -522,7 +523,7 @@ def _optimize_trajectory(self): self.positions = self.optimize_corridor( self.positions, num_iterations=5, - fast_axis=getattr(self, "fast_axis", 1), + fast_axis=getattr(self, "fast_axis", 0), first_corridor_direction=getattr(self, "first_direction", 1), ) return @@ -1089,7 +1090,7 @@ def __init__( self.step = step self.spiral_type = spiral_type self.first_direction = ( - Direction.ASCENDING if self.stop_motor2 > self.start_motor2 else Direction.DESCENDING + Direction.ASCENDING if self.stop_motor1 > self.start_motor1 else Direction.DESCENDING ) def update_scan_motors(self): diff --git a/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py b/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py index 70a715cba..ad4009bc4 100644 --- a/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py +++ b/bec_server/bec_server/scan_server/scans/multi_region_grid_scan.py @@ -95,7 +95,7 @@ def __init__( settling_time (float): settling time in seconds. Default is 0. settling_time_after_trigger (float): settling time after trigger in seconds. Default is 0. readout_time (float): readout time in seconds. Default is 0. - snaked (bool): if True, the second axis is traversed in alternating directions + snaked (bool): if True, the first axis is traversed in alternating directions within each sub-grid. Default is True. burst_at_each_point (int): number of exposures at each point. Default is 1. diff --git a/bec_server/bec_server/scan_server/scans/position_generators.py b/bec_server/bec_server/scan_server/scans/position_generators.py index e79c94a97..ed21ec664 100644 --- a/bec_server/bec_server/scan_server/scans/position_generators.py +++ b/bec_server/bec_server/scan_server/scans/position_generators.py @@ -1,10 +1,16 @@ from __future__ import annotations +import enum from collections.abc import Iterator, Sequence import numpy as np +class Direction(int, enum.Enum): + ASCENDING = 1 + DESCENDING = -1 + + def rotate_points( points: np.ndarray, angle: float, center: tuple[float, float] | None = None ) -> np.ndarray: @@ -263,7 +269,7 @@ def multi_region_grid_positions( regions (list[tuple[tuple[float, float, int], tuple[float, float, int]]]): Sequence of paired region definitions. Each entry contains one ``(start, stop, steps)`` tuple for the first motor and one for the second motor. - snaked (bool): If ``True``, reverse traversal of the second axis on alternating positions + snaked (bool): If ``True``, reverse traversal of the first axis on alternating positions within each sub-grid. Returns: @@ -277,11 +283,11 @@ def multi_region_grid_positions( axis1_positions = _region_points(*region1) axis2_positions = _region_points(*region2) - for index, value1 in enumerate(axis1_positions): - current_axis2 = ( - axis2_positions[::-1] if snaked and (index % 2 == 1) else axis2_positions + for index, value2 in enumerate(axis2_positions): + current_axis1 = ( + axis1_positions[::-1] if snaked and (index % 2 == 1) else axis1_positions ) - for value2 in current_axis2: + for value1 in current_axis1: positions.append([value1, value2]) return np.asarray(positions, dtype=float) @@ -327,7 +333,9 @@ def _get_positions_recursively(current_axes): positions.extend([[val] + sp for sp in sub_positions]) return positions - return np.array(_get_positions_recursively(_axes_arrays)) + # Build the traversal in reverse axis order so the first user-provided axis is the fast one. + positions = _get_positions_recursively(_axes_arrays[::-1]) + return np.array([position[::-1] for position in positions], dtype=float) def fermat_spiral_pos( @@ -511,20 +519,20 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) -> points = [] - # The first axis is the slow row axis and the second axis is the fast axis. - n_rows = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 + # The second axis selects the rows and the first axis is traversed within each row. + n_rows = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 for row in range(n_rows): - axis0 = a0_start + row * a0_step + axis1 = a1_start + row * a1_step # Alternate row offset - shift by half the fast-axis step - axis1_offset = (a1_step / 2) if (row % 2) else 0.0 + axis0_offset = (a0_step / 2) if (row % 2) else 0.0 - n_cols = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2 + n_cols = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2 row_points = [] for col in range(n_cols): - axis1 = a1_start + axis1_offset + col * a1_step + axis0 = a0_start + axis0_offset + col * a0_step if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop: row_points.append((axis0, axis1)) diff --git a/bec_server/bec_server/scan_server/scans/scan_components.py b/bec_server/bec_server/scan_server/scans/scan_components.py index 889417eb0..732dc27f1 100644 --- a/bec_server/bec_server/scan_server/scans/scan_components.py +++ b/bec_server/bec_server/scan_server/scans/scan_components.py @@ -8,6 +8,7 @@ from bec_lib.device import DeviceBase from bec_server.scan_server.errors import LimitError from bec_server.scan_server.path_optimization import PathOptimizerMixin +from bec_server.scan_server.scans.position_generators import Direction if TYPE_CHECKING: from bec_server.scan_server.scans.scan_base import ScanBase @@ -159,14 +160,18 @@ def optimize_trajectory( self, positions: np.ndarray, optimization_type: Literal["corridor", "shell", "nearest"] = "corridor", - fast_axis: int = 1, - first_direction: int = 1, + fast_axis: Literal[0, 1] = 0, + first_direction: Literal[-1, 1] | Direction = Direction.ASCENDING, snaked: bool = True, corridor_size: int | None = None, num_iterations: int = 5, ) -> np.ndarray: """ Optimize the trajectory of the scan by reordering the positions. This can help to minimize the movement time of the motors. + + Important note: The optimization is only done for 2D scans. For higher-dimensional scans, the positions are not reordered and simply + returned as they are. + The optimization can be done in different ways, depending on the optimization_type parameter: - "corridor": optimize the trajectory in a corridor-like way, where the scan progresses between corridors along the slow axis and traverses each corridor along the fast axis. This is typically a good choice for grid scans. If first_direction is provided, it sets the traversal direction of the first corridor and snaked controls whether later corridors alternate direction, producing a snaked path along the fast axis. - "shell": optimize the trajectory in a shell-like way, where the scan moves in a spiral from the outside to the inside. This is typically a good choice for round scans. @@ -175,8 +180,8 @@ def optimize_trajectory( Args: positions (np.ndarray): Array of positions to optimize, shape (num_points, num_motors). optimization_type (str, optional): Type of optimization to perform. Defaults to "corridor". - fast_axis (int, optional): Fast axis for corridor optimization. Defaults to 1. - first_direction (int, optional): Traversal direction for the first corridor along the fast axis. Positive means ascending, negative means descending. Defaults to 1. + fast_axis (int, optional): Fast axis for corridor optimization. Defaults to 0. + first_direction (Direction | Literal[-1, 1], optional): Traversal direction for the first corridor along the fast axis. Positive means ascending, negative means descending. Defaults to Direction.ASCENDING. snaked (bool, optional): If True, alternate direction between corridors. Defaults to True. corridor_size (int | None, optional): Size of the corridor for corridor optimization. Defaults to None, which means the default corridor size will be used. Returns: diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py index 34d019694..85ab8f76f 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py @@ -69,19 +69,20 @@ def test_fermat_scan_prepare_scan_uses_first_axis_as_corridor_axis(v4_scan_assem scan.components.optimize_trajectory.assert_called_once() _, kwargs = scan.components.optimize_trajectory.call_args assert kwargs["optimization_type"] == "corridor" + assert kwargs["fast_axis"] == 0 assert kwargs["first_direction"] == 1 np.testing.assert_allclose(scan.positions, optimized) -def test_fermat_scan_prepare_scan_uses_fast_axis_range_for_preferred_direction(v4_scan_assembler): +def test_fermat_scan_prepare_scan_uses_first_axis_range_for_preferred_direction(v4_scan_assembler): scan = v4_scan_assembler( "_v4_fermat_scan", "samx", - -1.0, 1.0, + -1.0, "samy", - 4.0, -4.0, + 4.0, step=0.5, optim_trajectory="corridor", relative=False, diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py index 8cd5511a8..29bad0347 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_grid_scan.py @@ -35,3 +35,16 @@ def test_grid_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembler): assert scan.scan_info.scan_report_instructions == [ {"scan_progress": {"points": 15, "show_table": False}} ] + + +def test_grid_scan_uses_first_axis_as_fast_axis(v4_scan_assembler): + scan = v4_scan_assembler( + "_v4_grid_scan", "samx", 0.0, 2.0, 3, "samy", 0.0, 1.0, 2, snaked=True, relative=False + ) + + scan.prepare_scan() + + expected_positions = np.array( + [[0.0, 0.0], [1.0, 0.0], [2.0, 0.0], [2.0, 1.0], [1.0, 1.0], [0.0, 1.0]] + ) + assert np.array_equal(scan.positions, expected_positions) diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py index c9a1c4a57..2082e8498 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_hexagonal_scan.py @@ -39,14 +39,14 @@ def test_hexagonal_scan_prepare_scan_updates_scan_info_and_queue(v4_scan_assembl assert np.array_equal(scan.scan_info.positions, expected_positions) -def test_hexagonal_scan_uses_second_axis_as_fast_axis(v4_scan_assembler): +def test_hexagonal_scan_uses_first_axis_as_fast_axis(v4_scan_assembler): scan = v4_scan_assembler( "_v4_hexagonal_scan", "samx", 0.0, 2.0, 1.0, "samy", 0.0, 1.0, 1.0, relative=False ) scan.prepare_scan() - expected_positions = np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 0.5], [2.0, 0.0], [2.0, 1.0]]) + expected_positions = np.array([[0.0, 0.0], [1.0, 0.0], [2.0, 0.0], [1.5, 1.0], [0.5, 1.0]]) assert np.array_equal(scan.positions, expected_positions) diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py index 607847203..a70b17739 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_multi_region_grid_scan.py @@ -43,17 +43,17 @@ def test_multi_region_grid_scan_prepare_scan_updates_scan_info_and_queue(v4_scan expected_positions = np.array( [ [-3.0, -2.0], + [-1.0, -2.0], + [-1.0, 0.0], [-3.0, 0.0], [-3.0, 2.0], [-1.0, 2.0], - [-1.0, 0.0], - [-1.0, -2.0], [1.0, -2.0], + [3.0, -2.0], + [3.0, 0.0], [1.0, 0.0], [1.0, 2.0], [3.0, 2.0], - [3.0, 0.0], - [3.0, -2.0], ] ) assert np.allclose(scan.positions, expected_positions) @@ -80,17 +80,17 @@ def test_multi_region_grid_scan_prepare_scan_offsets_positions_when_relative(v4_ expected_positions = np.array( [ [-2.0, -3.0], + [0.0, -3.0], + [0.0, -1.0], [-2.0, -1.0], [-2.0, 1.0], [0.0, 1.0], - [0.0, -1.0], - [0.0, -3.0], [2.0, -3.0], + [4.0, -3.0], + [4.0, -1.0], [2.0, -1.0], [2.0, 1.0], [4.0, 1.0], - [4.0, -1.0], - [4.0, -3.0], ] ) assert scan.start_positions == [1.0, -1.0] diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py b/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py index f13210873..2146b0420 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_position_generators.py @@ -99,17 +99,17 @@ def test_multi_region_grid_positions_builds_snaked_grid(): positions, [ [-3.0, -2.0], + [-1.0, -2.0], + [-1.0, 0.0], [-3.0, 0.0], [-3.0, 2.0], [-1.0, 2.0], - [-1.0, 0.0], - [-1.0, -2.0], [1.0, -2.0], + [3.0, -2.0], + [3.0, 0.0], [1.0, 0.0], [1.0, 2.0], [3.0, 2.0], - [3.0, 0.0], - [3.0, -2.0], ], ) diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py b/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py index 89a262a41..47ec4c4ce 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_scan_components.py @@ -113,7 +113,7 @@ def test_optimize_trajectory_uses_corridor_defaults(v4_scan_assembler): positions, num_iterations=7, corridor_size=4, - fast_axis=1, + fast_axis=0, first_corridor_direction=1, snaked=True, ) diff --git a/bec_server/tests/tests_scan_server/test_legacy_scans.py b/bec_server/tests/tests_scan_server/test_legacy_scans.py index 69e079411..a9eae2d16 100644 --- a/bec_server/tests/tests_scan_server/test_legacy_scans.py +++ b/bec_server/tests/tests_scan_server/test_legacy_scans.py @@ -494,7 +494,7 @@ def offset_mock(): "async": [], }, "num_points": 4, - "positions": [[-5.0, -5.0], [-5.0, 5.0], [5.0, 5.0], [5.0, -5.0]], + "positions": [[-5.0, -5.0], [5.0, -5.0], [5.0, 5.0], [-5.0, 5.0]], "scan_name": "grid_scan", "scan_type": "step", }, @@ -543,7 +543,7 @@ def offset_mock(): ), messages.DeviceInstructionMessage( metadata={"readout_priority": "monitored"}, - device="samy", + device="samx", action="set", parameter={"value": np.float64(5.0)}, ), @@ -561,7 +561,7 @@ def offset_mock(): ), messages.DeviceInstructionMessage( metadata={"readout_priority": "monitored"}, - device="samx", + device="samy", action="set", parameter={"value": np.float64(5.0)}, ), @@ -579,7 +579,7 @@ def offset_mock(): ), messages.DeviceInstructionMessage( metadata={"readout_priority": "monitored"}, - device="samy", + device="samx", action="set", parameter={"value": np.float64(-5.0)}, ), @@ -2397,11 +2397,11 @@ def test_hexagonal_scan_snaking(): request_snaked = HexagonalScan( "samx", 0, - 1, + 2, 1, "samy", 0, - 2, + 1, 1, device_manager=device_manager, exp_time=0.1, @@ -2414,11 +2414,11 @@ def test_hexagonal_scan_snaking(): request_unsnaked = HexagonalScan( "samx", 0, - 1, + 2, 1, "samy", 0, - 2, + 1, 1, device_manager=device_manager, exp_time=0.1, From c00855b8fdc77f485f071258bd2eb1d11f64bb4b Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Tue, 28 Apr 2026 14:36:25 +0200 Subject: [PATCH 4/4] f --- .../scans_v4/test_fermat_scan.py | 1 - .../tests_scan_server/test_legacy_scans.py | 36 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py index 85ab8f76f..4af7b5789 100644 --- a/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py +++ b/bec_server/tests/tests_scan_server/scans_v4/test_fermat_scan.py @@ -69,7 +69,6 @@ def test_fermat_scan_prepare_scan_uses_first_axis_as_corridor_axis(v4_scan_assem scan.components.optimize_trajectory.assert_called_once() _, kwargs = scan.components.optimize_trajectory.call_args assert kwargs["optimization_type"] == "corridor" - assert kwargs["fast_axis"] == 0 assert kwargs["first_direction"] == 1 np.testing.assert_allclose(scan.positions, optimized) diff --git a/bec_server/tests/tests_scan_server/test_legacy_scans.py b/bec_server/tests/tests_scan_server/test_legacy_scans.py index a9eae2d16..f59c1b41f 100644 --- a/bec_server/tests/tests_scan_server/test_legacy_scans.py +++ b/bec_server/tests/tests_scan_server/test_legacy_scans.py @@ -1123,29 +1123,29 @@ def test_round_scan_positions(in_args, reference_positions): @pytest.mark.parametrize( "in_args,reference_positions,snaked", [ - ([list(range(2)), list(range(2))], [[0, 0], [0, 1], [1, 1], [1, 0]], True), - ([list(range(2)), list(range(3))], [[0, 0], [0, 1], [0, 2], [1, 0], [1, 1], [1, 2]], False), + ([list(range(2)), list(range(2))], [[0, 0], [1, 0], [1, 1], [0, 1]], True), + ([list(range(2)), list(range(3))], [[0, 0], [1, 0], [0, 1], [1, 1], [0, 2], [1, 2]], False), ( [list(range(3)), list(range(3)), list(range(2))], [ [0, 0, 0], - [0, 0, 1], - [0, 1, 1], - [0, 1, 0], - [0, 2, 0], - [0, 2, 1], - [1, 2, 1], - [1, 2, 0], - [1, 1, 0], - [1, 1, 1], - [1, 0, 1], [1, 0, 0], [2, 0, 0], - [2, 0, 1], - [2, 1, 1], [2, 1, 0], + [1, 1, 0], + [0, 1, 0], + [0, 2, 0], + [1, 2, 0], [2, 2, 0], [2, 2, 1], + [1, 2, 1], + [0, 2, 1], + [0, 1, 1], + [1, 1, 1], + [2, 1, 1], + [2, 0, 1], + [1, 0, 1], + [0, 0, 1], ], True, ), @@ -2287,17 +2287,17 @@ def run(self): "axes,snaked,reference_positions", [ # Simple 2x2 grid with snaking - ([(0, 1, 1), (0, 1, 1)], True, [[0, 0], [0, 1], [1, 0.5]]), + ([(0, 1, 1), (0, 1, 1)], True, [[0, 0], [1, 0], [0.5, 1]]), # Simple 2x2 grid without snaking - ([(0, 1, 1), (0, 1, 1)], False, [[0, 0], [0, 1], [1, 0.5]]), + ([(0, 1, 1), (0, 1, 1)], False, [[0, 0], [1, 0], [0.5, 1]]), # 3x2 grid with different step sizes and snaking ( [(0, 2, 1), (0, 1, 0.5)], True, - [[0, 0], [0, 0.5], [0, 1], [1, 0.75], [1, 0.25], [2, 0], [2, 0.5], [2, 1]], + [[0, 0], [1, 0], [2, 0], [1.5, 0.5], [0.5, 0.5], [0, 1], [1, 1], [2, 1]], ), # Small grid with exact boundaries - ([(0, 0.5, 0.5), (0, 0.5, 0.5)], True, [[0, 0], [0, 0.5], [0.5, 0.25]]), + ([(0, 0.5, 0.5), (0, 0.5, 0.5)], True, [[0, 0], [0.5, 0], [0.25, 0.5]]), ], ) def test_get_hex_grid_2d(axes, snaked, reference_positions):