diff --git a/utama_core/custom_referee/profiles/profile_loader.py b/utama_core/custom_referee/profiles/profile_loader.py index b3228143..6ae4f1cf 100644 --- a/utama_core/custom_referee/profiles/profile_loader.py +++ b/utama_core/custom_referee/profiles/profile_loader.py @@ -79,6 +79,9 @@ class AutoAdvanceConfig: # NORMAL_START → FORCE_START after kickoff_timeout_seconds if ball hasn't # moved (catches a stuck kickoff). normal_start_to_force: bool = True + # NORMAL_START|FORCE_START → BALL_OBSCURED when ball is missing for + # _BALL_OBSCURED_TIMEOUT seconds (single-camera occlusion recovery). + ball_obscured_recovery: bool = False @dataclass @@ -194,6 +197,7 @@ def _parse_profile(data: dict) -> RefereeProfile: direct_free_to_normal=aa.get("direct_free_to_normal", True), ball_placement_to_next=aa.get("ball_placement_to_next", True), normal_start_to_force=aa.get("normal_start_to_force", True), + ball_obscured_recovery=aa.get("ball_obscured_recovery", False), ) game = GameConfig( half_duration_seconds=game_d.get("half_duration_seconds", 300.0), diff --git a/utama_core/custom_referee/state_machine.py b/utama_core/custom_referee/state_machine.py index f194bdcd..abdbba4e 100644 --- a/utama_core/custom_referee/state_machine.py +++ b/utama_core/custom_referee/state_machine.py @@ -24,6 +24,10 @@ _KICKER_READY_DIST = 0.3 # metres — kicker must be within this distance to trigger free kick start _PLACEMENT_DONE_DIST = 0.15 # metres — ball within this dist of target → placement complete _AUTO_ADVANCE_DELAY = 2.0 # seconds — readiness must be sustained this long before play starts +_BALL_OBSCURED_TIMEOUT = 5.0 # seconds of missing ball during play before scatter triggers +_BALL_VISIBLE_DEBOUNCE = 0.5 # seconds ball must be continuously visible before resuming play +_APPROACH_DONE_DIST = 0.25 # metres — closest friendly robot to ball to trigger FORCE_START +_PLAY_COMMANDS = frozenset({RefereeCommand.NORMAL_START, RefereeCommand.FORCE_START}) class GameStateMachine: @@ -124,6 +128,10 @@ def __init__( # Per-transition enable flags (default: all on). self._auto_advance = auto_advance if auto_advance is not None else AutoAdvanceConfig() + # Ball-obscured recovery timers. + self._ball_missing_since: float = math.inf + self._ball_visible_since: float = math.inf + # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @@ -355,6 +363,55 @@ def step( self._last_transition_time = current_time logger.info("Auto-advanced STOP → FORCE_START after goal (force-start profile mode)") + # ---------------------------------------------------------------- + # Auto-advance 6a: NORMAL_START|FORCE_START → BALL_OBSCURED + # Fires when ball has been None for > _BALL_OBSCURED_TIMEOUT. + # GoalRule/OutOfBoundsRule take priority: if they detect a violation + # the command transitions away and the timer resets naturally. + # ---------------------------------------------------------------- + if self._auto_advance.ball_obscured_recovery and self.command in _PLAY_COMMANDS: + if game_frame is not None and game_frame.ball is None: + if self._ball_missing_since == math.inf: + self._ball_missing_since = current_time + elif (current_time - self._ball_missing_since) >= _BALL_OBSCURED_TIMEOUT: + logger.info("Ball missing for %.1fs — issuing BALL_OBSCURED", _BALL_OBSCURED_TIMEOUT) + self.command = RefereeCommand.BALL_OBSCURED + self.command_counter += 1 + self.command_timestamp = current_time + self._ball_missing_since = math.inf + self._ball_visible_since = math.inf + self._last_transition_time = current_time + else: + self._ball_missing_since = math.inf + elif self.command != RefereeCommand.BALL_OBSCURED: + self._ball_missing_since = math.inf + + # ---------------------------------------------------------------- + # Auto-advance 6b: BALL_OBSCURED → FORCE_START + # Fires when ball is visible for > _BALL_VISIBLE_DEBOUNCE AND the + # closest friendly robot is within _APPROACH_DONE_DIST of the ball. + # ---------------------------------------------------------------- + if self._auto_advance.ball_obscured_recovery and self.command == RefereeCommand.BALL_OBSCURED: + if game_frame is not None and game_frame.ball is not None: + if self._ball_visible_since == math.inf: + self._ball_visible_since = current_time + ball_pos = game_frame.ball.p + closest_dist = min( + (math.hypot(r.p.x - ball_pos.x, r.p.y - ball_pos.y) for r in game_frame.friendly_robots.values()), + default=math.inf, + ) + if ( + current_time - self._ball_visible_since + ) >= _BALL_VISIBLE_DEBOUNCE and closest_dist <= _APPROACH_DONE_DIST: + logger.info("Ball recovered — auto-advancing BALL_OBSCURED → FORCE_START") + self.command = RefereeCommand.FORCE_START + self.command_counter += 1 + self.command_timestamp = current_time + self._ball_visible_since = math.inf + self._last_transition_time = current_time + else: + self._ball_visible_since = math.inf + return self._generate_referee_data(current_time) def _all_robots_clear(self, game_frame: "GameFrame") -> bool: @@ -472,6 +529,8 @@ def seed_clock(self, timestamp: float) -> None: self._last_transition_time = timestamp - _TRANSITION_COOLDOWN # allow transitions immediately self._stop_entered_time = timestamp - self._stop_duration_seconds # won't auto-advance yet self._prepare_entered_time = timestamp - self._prepare_duration_seconds # same + self._ball_missing_since = math.inf + self._ball_visible_since = math.inf def set_command(self, command: RefereeCommand, timestamp: float) -> None: """Manual override — for operator use or test scripting. diff --git a/utama_core/entities/referee/referee_command.py b/utama_core/entities/referee/referee_command.py index 1d95b081..e0da1977 100644 --- a/utama_core/entities/referee/referee_command.py +++ b/utama_core/entities/referee/referee_command.py @@ -22,6 +22,7 @@ class RefereeCommand(Enum): GOAL_BLUE = 15 # deprecated BALL_PLACEMENT_YELLOW = 16 BALL_PLACEMENT_BLUE = 17 + BALL_OBSCURED = 18 @staticmethod def from_id(command_id: int): diff --git a/utama_core/strategy/referee/actions.py b/utama_core/strategy/referee/actions.py index facc9093..9fad6abe 100644 --- a/utama_core/strategy/referee/actions.py +++ b/utama_core/strategy/referee/actions.py @@ -601,6 +601,57 @@ def update(self) -> py_trees.common.Status: ) +# --------------------------------------------------------------------------- +# BALL_OBSCURED — scatter east/west, then approach ball from north +# --------------------------------------------------------------------------- + +_SCATTER_X_RATIO = 0.4 # fraction of half-length for scatter targets +_CAMERA_APPROACH_OFFSET = Vector2D(0.0, 0.3) # camera is south; approach ball from north (+y) + + +class BallObscuredStep(AbstractBehaviour): + """Two-phase recovery when the ball is obscured from the single south-facing camera. + + Scatter phase (ball is None): robots move to x-axis positions (east/west) to clear + the camera's north-south line of sight. + + Approach phase (ball reappears): the closest robot approaches from the north + (camera-safe direction) while others hold their scatter position. The state machine + auto-advances to FORCE_START once the robot is within 0.25 m of the ball. + """ + + def update(self) -> py_trees.common.Status: + game = self.blackboard.game + motion_controller = self.blackboard.motion_controller + robot_ids = sorted(game.friendly_robots.keys()) + scatter_x = _field_half_length(game) * _SCATTER_X_RATIO + + if game.ball is None: + scatter_targets = [ + _clamp_to_field(Vector2D(-scatter_x, 0.0), game), + _clamp_to_field(Vector2D(+scatter_x, 0.0), game), + ] + for i, robot_id in enumerate(robot_ids): + target = scatter_targets[i % 2] + oren = game.friendly_robots[robot_id].p.angle_to(target) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, target, oren) + else: + ball_pos = Vector2D(game.ball.p.x, game.ball.p.y) + approacher_id = min( + robot_ids, + key=lambda rid: game.friendly_robots[rid].p.distance_to(ball_pos), + ) + approach_target = _clamp_to_field(ball_pos + _CAMERA_APPROACH_OFFSET, game) + for robot_id in robot_ids: + if robot_id == approacher_id: + oren = game.friendly_robots[robot_id].p.angle_to(ball_pos) + self.blackboard.cmd_map[robot_id] = move(game, motion_controller, robot_id, approach_target, oren) + else: + self.blackboard.cmd_map[robot_id] = empty_command(False) + + return py_trees.common.Status.RUNNING + + # --------------------------------------------------------------------------- # Helper: resolve bilateral commands # --------------------------------------------------------------------------- diff --git a/utama_core/strategy/referee/tree.py b/utama_core/strategy/referee/tree.py index e77f3d6b..018ac128 100644 --- a/utama_core/strategy/referee/tree.py +++ b/utama_core/strategy/referee/tree.py @@ -19,6 +19,7 @@ from utama_core.entities.referee.referee_command import RefereeCommand from utama_core.strategy.referee.actions import ( + BallObscuredStep, BallPlacementOursStep, BallPlacementTheirsStep, DirectFreeOursStep, @@ -54,12 +55,13 @@ def build_referee_override_tree() -> py_trees.composites.Selector: Priority order (top = highest): 1. HALT — immediate stop, no exceptions - 2. STOP — slowed stop, keep distance from ball - 3. TIMEOUT — idle (same as STOP) - 4. BALL_PLACEMENT — ours or theirs - 5. PREPARE_KICKOFF - 6. PREPARE_PENALTY - 7. DIRECT_FREE + 2. BALL_OBSCURED — scatter east/west, approach ball from north + 3. STOP — slowed stop, keep distance from ball + 4. TIMEOUT — idle (same as STOP) + 5. BALL_PLACEMENT — ours or theirs + 6. PREPARE_KICKOFF + 7. PREPARE_PENALTY + 8. DIRECT_FREE """ override = py_trees.composites.Selector(name="RefereeOverride", memory=False) @@ -72,7 +74,16 @@ def build_referee_override_tree() -> py_trees.composites.Selector: ) ) - # 2. STOP + # 2. BALL_OBSCURED — scatter east/west to clear camera sightline, then approach from north + override.add_child( + _make_subtree( + "BallObscured", + CheckRefereeCommand(RefereeCommand.BALL_OBSCURED), + BallObscuredStep(name="BallObscuredStep"), + ) + ) + + # 3. STOP override.add_child( _make_subtree( "Stop", @@ -81,7 +92,7 @@ def build_referee_override_tree() -> py_trees.composites.Selector: ) ) - # 3. TIMEOUT (yellow or blue — same behaviour: idle) + # 4. TIMEOUT (yellow or blue — same behaviour: idle) override.add_child( _make_subtree( "Timeout", diff --git a/utama_core/tests/referee/test_referee_unit.py b/utama_core/tests/referee/test_referee_unit.py index ce99eb69..9d31a848 100644 --- a/utama_core/tests/referee/test_referee_unit.py +++ b/utama_core/tests/referee/test_referee_unit.py @@ -908,9 +908,9 @@ def test_root_is_selector(self): def test_root_name(self): assert self.tree.name == "RefereeOverride" - def test_has_eleven_children(self): - # HALT, STOP, TIMEOUT, BALL_PLACEMENT×2, KICKOFF×2, PENALTY×2, DIRECT_FREE×2 - assert len(self.tree.children) == 11 + def test_has_twelve_children(self): + # HALT, BALL_OBSCURED, STOP, TIMEOUT, BALL_PLACEMENT×2, KICKOFF×2, PENALTY×2, DIRECT_FREE×2 + assert len(self.tree.children) == 12 def test_each_child_is_sequence(self): for child in self.tree.children: @@ -929,13 +929,18 @@ def test_halt_is_first(self): condition = first_seq.children[0] assert RefereeCommand.HALT in condition.expected_commands - def test_stop_is_second(self): + def test_ball_obscured_is_second(self): second_seq = self.tree.children[1] condition = second_seq.children[0] + assert RefereeCommand.BALL_OBSCURED in condition.expected_commands + + def test_stop_is_third(self): + third_seq = self.tree.children[2] + condition = third_seq.children[0] assert RefereeCommand.STOP in condition.expected_commands def test_timeout_handles_both_colours(self): - timeout_seq = self.tree.children[2] + timeout_seq = self.tree.children[3] condition = timeout_seq.children[0] assert RefereeCommand.TIMEOUT_YELLOW in condition.expected_commands assert RefereeCommand.TIMEOUT_BLUE in condition.expected_commands