Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions bec_server/bec_server/scan_server/path_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pydantic import BaseModel
from scipy.spatial import cKDTree # type: ignore

from bec_server.scan_server.scans.position_generators import Direction


class PathQualityStats(BaseModel):
max_jump: float
Expand Down Expand Up @@ -31,10 +33,11 @@ def optimize_corridor(
self,
positions: np.ndarray,
corridor_size: float | None = None,
sort_axis: int = 1,
fast_axis: Literal[0, 1] = 0,
num_iterations: int = 1,
preferred_direction: int | None = None,
corridor_estimation: Literal["density", "median_distance"] = "median_distance",
first_corridor_direction: Direction | Literal[-1, 1] = Direction.ASCENDING,
snaked: bool = True,
corridor_estimation: Literal["density", "median_distance"] = "density",
):
"""
Optimize positions using a corridor-based approach.
Expand All @@ -45,11 +48,14 @@ def optimize_corridor(
Args:
positions (np.ndarray): Array of positions
corridor_size (float, optional): Width of each corridor. Defaults to None (auto-estimated).
sort_axis (int, optional): Axis along which to create corridors (0 or 1). Defaults to 1.
fast_axis (Literal[0, 1], optional): Axis traversed within each corridor (0 or 1). Defaults to 0.
num_iterations (int, optional): Number of corridor sizes to try. Defaults to 1.
preferred_direction (int | None, optional): Preferred direction for the primary axis (1 or -1).
If None, alternates direction for each corridor.
corridor_estimation (str, optional): Method for estimating corridor size if not provided.
first_corridor_direction (Direction | Literal[-1, 1], optional): Traversal direction of the first
corridor along the fast axis. Positive means ascending, negative means
descending. Defaults to Direction.ASCENDING.
snaked (bool, optional): If True, alternate the traversal direction between
successive corridors. If False, keep the same direction in every corridor.
corridor_estimation (Literal["density", "median_distance"], optional): Method for estimating corridor size if not provided.
Options are "density" or "median_distance". Defaults to "density".
Comment thread
wakonig marked this conversation as resolved.

Returns:
Expand All @@ -70,8 +76,14 @@ def optimize_corridor(
'Choose "density" or "median_distance".'
)

axis_vals = positions[:, sort_axis]
sec_axis = int(not sort_axis)
if not isinstance(first_corridor_direction, Direction):
first_corridor_direction = Direction(first_corridor_direction)

if fast_axis not in (0, 1):
raise ValueError("fast_axis must be 0 or 1")

slow_axis = int(not fast_axis)
axis_vals = positions[:, slow_axis]
Comment thread
wakonig marked this conversation as resolved.

best_length = np.inf
best_path = positions
Expand All @@ -97,17 +109,15 @@ def optimize_corridor(
if block.size == 0:
continue

# Sort within corridor along secondary axis
block = block[np.argsort(positions[block, sec_axis])]
# Sort within corridor along fast axis
block = block[np.argsort(positions[block, fast_axis])]

# Direction handling
if preferred_direction is not None:
if preferred_direction < 0:
block = block[::-1]
else:
# Alternate direction between corridors
if step % 2 == 0:
block = block[::-1]
direction = first_corridor_direction
if snaked and step % 2 == 1:
direction = Direction(-direction)
if direction == Direction.DESCENDING:
block = block[::-1]

index_sorted.append(block)

Expand Down
12 changes: 12 additions & 0 deletions bec_server/bec_server/scan_server/scans/fermat_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from bec_lib.logger import bec_logger
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans import position_generators
from bec_server.scan_server.scans.position_generators import Direction
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook

Expand Down Expand Up @@ -191,6 +192,17 @@ def prepare_scan(self):
self.start_positions = self.components.get_start_positions(self.motors)
self.positions += self.start_positions

if self.optim_trajectory:
self.positions = self.components.optimize_trajectory(
self.positions,
optimization_type=self.optim_trajectory,
first_direction=(
Direction.ASCENDING
if self.motor1_start_stop[1] > self.motor1_start_stop[0]
else Direction.DESCENDING
),
)

self.components.check_limits(self.motors, self.positions)

self.update_scan_info(
Expand Down
61 changes: 25 additions & 36 deletions bec_server/bec_server/scan_server/scans/legacy_scans.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from bec_server.scan_server.instruction_handler import InstructionHandler

from ..errors import LimitError, ScanAbortion
from ..path_optimization import PathOptimizerMixin
from ..path_optimization import Direction, PathOptimizerMixin
from ..scan_stubs import ScanStubs

logger = bec_logger.logger
Expand Down Expand Up @@ -90,7 +90,8 @@ def _get_positions_recursively(current_axes):
positions.extend([[val] + sp for sp in sub_positions])
return positions

return np.array(_get_positions_recursively(axes))
positions = _get_positions_recursively(axes[::-1])
return np.array([position[::-1] for position in positions], dtype=float)


# pylint: disable=too-many-arguments
Expand Down Expand Up @@ -195,10 +196,8 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True)
Generate a 2D hexagonal grid clipped to (start, stop) bounds.

Args:
axes: [(x_start, x_stop, x_step),
(y_start, y_stop, y_step)]
x_step = horizontal spacing between columns
y_step = vertical spacing between rows
axes: [(axis0_start, axis0_stop, axis0_step),
(axis1_start, axis1_stop, axis1_step)]
snaked: if True, reverse direction on alternate rows to minimize travel distance

Returns:
Expand All @@ -207,28 +206,27 @@ def get_hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True)
if len(axes) != 2:
raise ValueError("2D hex grid requires exactly 2 dimensions")

(x0, x1, sx), (y0, y1, sy) = axes
(a0_start, a0_stop, a0_step), (a1_start, a1_stop, a1_step) = axes

points = []

# Number of rows needed
n_rows = int(np.ceil((y1 - y0) / sy)) + 2
# The second axis selects the rows and the first axis is traversed within each row.
n_rows = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2

for row in range(n_rows):
y = y0 + row * sy
axis1 = a1_start + row * a1_step

# Alternate row offset - shift by half the x step
x_offset = (sx / 2) if (row % 2) else 0.0
# Alternate row offset - shift by half the fast-axis step
axis0_offset = (a0_step / 2) if (row % 2) else 0.0

# Number of columns needed
n_cols = int(np.ceil((x1 - x0) / sx)) + 2
n_cols = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2

row_points = []
for col in range(n_cols):
x = x0 + x_offset + col * sx
axis0 = a0_start + axis0_offset + col * a0_step

if x0 <= x <= x1 and y0 <= y <= y1:
row_points.append((x, y))
if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop:
row_points.append((axis0, axis1))

# Reverse every other row if snaking is enabled
if snaked and (row % 2 == 1):
Expand Down Expand Up @@ -520,26 +518,14 @@ def _optimize_trajectory(self):
if not self.optim_trajectory:
return

# Get preferred directions from scan parameters if available
preferred_directions = getattr(self, "preferred_directions", None)

if self.optim_trajectory == "corridor":
# For corridor optimization, we use the primary axis preferred direction
if preferred_directions and len(preferred_directions) > 0:
primary_axis = getattr(self, "sort_axis", 1)
preferred_direction = (
preferred_directions[primary_axis]
if len(preferred_directions) > primary_axis
else None
)
self.positions = self.optimize_corridor(
self.positions,
num_iterations=5,
sort_axis=primary_axis,
preferred_direction=preferred_direction,
)
else:
self.positions = self.optimize_corridor(self.positions, num_iterations=5)
# Corridor traversal direction follows the first pass along the axis within each corridor.
self.positions = self.optimize_corridor(
self.positions,
num_iterations=5,
fast_axis=getattr(self, "fast_axis", 0),
first_corridor_direction=getattr(self, "first_direction", 1),
)
return

if self.optim_trajectory == "shell":
Expand Down Expand Up @@ -1103,6 +1089,9 @@ def __init__(
self.stop_motor2 = stop_motor2
self.step = step
self.spiral_type = spiral_type
self.first_direction = (
Direction.ASCENDING if self.stop_motor1 > self.start_motor1 else Direction.DESCENDING
)

def update_scan_motors(self):
self.scan_motors = [self.motor1, self.motor2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(
settling_time (float): settling time in seconds. Default is 0.
settling_time_after_trigger (float): settling time after trigger in seconds. Default is 0.
readout_time (float): readout time in seconds. Default is 0.
snaked (bool): if True, the second axis is traversed in alternating directions
snaked (bool): if True, the first axis is traversed in alternating directions
within each sub-grid. Default is True.
burst_at_each_point (int): number of exposures at each point. Default is 1.

Expand Down
47 changes: 26 additions & 21 deletions bec_server/bec_server/scan_server/scans/position_generators.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from __future__ import annotations

import enum
from collections.abc import Iterator, Sequence

import numpy as np


class Direction(int, enum.Enum):
ASCENDING = 1
DESCENDING = -1


def rotate_points(
points: np.ndarray, angle: float, center: tuple[float, float] | None = None
) -> np.ndarray:
Expand Down Expand Up @@ -263,7 +269,7 @@ def multi_region_grid_positions(
regions (list[tuple[tuple[float, float, int], tuple[float, float, int]]]): Sequence
of paired region definitions. Each entry contains one
``(start, stop, steps)`` tuple for the first motor and one for the second motor.
snaked (bool): If ``True``, reverse traversal of the second axis on alternating positions
snaked (bool): If ``True``, reverse traversal of the first axis on alternating positions
within each sub-grid.

Returns:
Expand All @@ -277,11 +283,11 @@ def multi_region_grid_positions(
axis1_positions = _region_points(*region1)
axis2_positions = _region_points(*region2)

for index, value1 in enumerate(axis1_positions):
current_axis2 = (
axis2_positions[::-1] if snaked and (index % 2 == 1) else axis2_positions
for index, value2 in enumerate(axis2_positions):
current_axis1 = (
axis1_positions[::-1] if snaked and (index % 2 == 1) else axis1_positions
)
for value2 in current_axis2:
for value1 in current_axis1:
positions.append([value1, value2])

return np.asarray(positions, dtype=float)
Expand Down Expand Up @@ -327,7 +333,9 @@ def _get_positions_recursively(current_axes):
positions.extend([[val] + sp for sp in sub_positions])
return positions

return np.array(_get_positions_recursively(_axes_arrays))
# Build the traversal in reverse axis order so the first user-provided axis is the fast one.
positions = _get_positions_recursively(_axes_arrays[::-1])
return np.array([position[::-1] for position in positions], dtype=float)


def fermat_spiral_pos(
Expand Down Expand Up @@ -497,10 +505,8 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) ->
Generate a 2D hexagonal grid clipped to (start, stop) bounds.

Args:
axes: [(x_start, x_stop, x_step),
(y_start, y_stop, y_step)]
x_step = horizontal spacing between columns
y_step = vertical spacing between rows
axes: [(axis0_start, axis0_stop, axis0_step),
(axis1_start, axis1_stop, axis1_step)]
snaked: if True, reverse direction on alternate rows to minimize travel distance

Returns:
Expand All @@ -509,28 +515,27 @@ def hex_grid_2d(axes: list[tuple[float, float, float]], snaked: bool = True) ->
if len(axes) != 2:
raise ValueError("2D hex grid requires exactly 2 dimensions")

(x0, x1, sx), (y0, y1, sy) = axes
(a0_start, a0_stop, a0_step), (a1_start, a1_stop, a1_step) = axes

points = []

# Number of rows needed
n_rows = int(np.ceil((y1 - y0) / sy)) + 2
# The second axis selects the rows and the first axis is traversed within each row.
n_rows = int(np.ceil((a1_stop - a1_start) / a1_step)) + 2

for row in range(n_rows):
y = y0 + row * sy
axis1 = a1_start + row * a1_step

# Alternate row offset - shift by half the x step
x_offset = (sx / 2) if (row % 2) else 0.0
# Alternate row offset - shift by half the fast-axis step
axis0_offset = (a0_step / 2) if (row % 2) else 0.0

# Number of columns needed
n_cols = int(np.ceil((x1 - x0) / sx)) + 2
n_cols = int(np.ceil((a0_stop - a0_start) / a0_step)) + 2

row_points = []
for col in range(n_cols):
x = x0 + x_offset + col * sx
axis0 = a0_start + axis0_offset + col * a0_step

if x0 <= x <= x1 and y0 <= y <= y1:
row_points.append((x, y))
if a0_start <= axis0 <= a0_stop and a1_start <= axis1 <= a1_stop:
row_points.append((axis0, axis1))

# Reverse every other row if snaking is enabled
if snaked and (row % 2 == 1):
Expand Down
Loading
Loading