From a451c2db87cbda8c7353a915f0977a1cf6ab27ea Mon Sep 17 00:00:00 2001 From: f0reachARR Date: Thu, 20 Nov 2025 12:41:48 +0900 Subject: [PATCH 1/5] Fix parameter passing error * run.default with params now works written in README.md --- replay_testing/decorators/run.py | 4 ++-- replay_testing/models.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/replay_testing/decorators/run.py b/replay_testing/decorators/run.py index 7d11e50..bd78d1a 100644 --- a/replay_testing/decorators/run.py +++ b/replay_testing/decorators/run.py @@ -52,5 +52,5 @@ def parameterize(parameters: list[ReplayRunParams]): return run(parameters=parameters) @staticmethod - def default(params: dict = {}): - return run(parameters=[ReplayRunParams(name='default', params=params)]) + def default(params: ReplayRunParams = ReplayRunParams(name='default')): + return run(parameters=[params]) diff --git a/replay_testing/models.py b/replay_testing/models.py index 4c55758..fdb6501 100644 --- a/replay_testing/models.py +++ b/replay_testing/models.py @@ -34,7 +34,7 @@ class RunnerArgs(BaseModel): class ReplayRunParams(BaseModel): name: str - params: dict + params: dict = {} runner_args: RunnerArgs = RunnerArgs() From 13a9a9f6397c693653559c31f8a3724b896ddc15 Mon Sep 17 00:00:00 2001 From: f0reachARR Date: Thu, 20 Nov 2025 12:45:14 +0900 Subject: [PATCH 2/5] Add ignore_playback_finish options * To meet our requirements, controlling shutdown condition in `Run` class is needed --- replay_testing/models.py | 1 + replay_testing/replay_runner.py | 28 ++++++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/replay_testing/models.py b/replay_testing/models.py index fdb6501..4054874 100644 --- a/replay_testing/models.py +++ b/replay_testing/models.py @@ -36,6 +36,7 @@ class ReplayRunParams(BaseModel): name: str params: dict = {} runner_args: RunnerArgs = RunnerArgs() + ignore_playback_finish: bool = False class Mcap(BaseModel): diff --git a/replay_testing/replay_runner.py b/replay_testing/replay_runner.py index bb7af2b..9e78a7e 100644 --- a/replay_testing/replay_runner.py +++ b/replay_testing/replay_runner.py @@ -139,26 +139,30 @@ def _create_run_launch_description( output='screen', ) - # Event handler to gracefully exit when the process finishes - on_exit_handler = RegisterEventHandler( - OnProcessExit( - target_action=player_action, - # Shutdown the launch service - on_exit=[launch.actions.EmitEvent(event=Shutdown())], - ) - ) - - # Launch description with the event handler - return LaunchDescription([ + # Launch description + ld = LaunchDescription([ ExecuteProcess( cmd=['ros2', 'bag', 'record', '-s', 'mcap', '-o', str(run_fixture.path), '--all'], output='screen', ), test_ld, player_action, # Add the MCAP playback action - on_exit_handler, # Add the event handler to shutdown after playback finishes ]) + if not params.ignore_playback_finish: + # Event handler to gracefully exit when the process finishes + on_exit_handler = RegisterEventHandler( + OnProcessExit( + target_action=player_action, + # Shutdown the launch service + on_exit=[launch.actions.EmitEvent(event=Shutdown())], + ) + ) + + ld.add_action(on_exit_handler) + + return ld + def filter_fixtures(self) -> list[ReplayFixture]: self._log_stage_start(ReplayTestingPhase.FIXTURES) From 89c944de84471eab1184b85942392351ecbb3eb7 Mon Sep 17 00:00:00 2001 From: f0reachARR Date: Thu, 4 Dec 2025 13:47:05 +0900 Subject: [PATCH 3/5] Add ignore_playback_finish in README --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 8e67135..4c621c8 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,24 @@ When `use_clock=True` (default), the replay framework will: When `use_clock=False`, the replay will: - Skip `/clock` topic publishing +#### Termination Condition + +By default, the replay will run until all messages from the input MCAP are played back. +After that, the `run` stops immediately and the `analyze` step is executed. + +To wait for `run` to finish instead of stopping immediately after playback, you can set a `ignore_playback_finish` flag in `ReplayRunParams`: + +```python +from replay_testing import ReplayRunParams + +@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True)) +class Run: + def generate_launch_description(self) -> LaunchDescription: + # Your launch description here + # Note: Every node in the launch description must finish on its own. Otherwise, the test will hang. + pass +``` + ### Analyze `@analyze` The analyze step is run after the mcap from the `run` is recorded and written. It is a basic wrapper over `unittest.TestCase`, so any `unittest` assertions are built in. From a9849ff09d55402306f66e585606004bc65486b2 Mon Sep 17 00:00:00 2001 From: f0reachARR Date: Thu, 4 Dec 2025 15:16:30 +0900 Subject: [PATCH 4/5] Add test for replay finish condition --- .../basic_replay_finish_condition.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 test/replay_tests/basic_replay_finish_condition.py diff --git a/test/replay_tests/basic_replay_finish_condition.py b/test/replay_tests/basic_replay_finish_condition.py new file mode 100644 index 0000000..164bdca --- /dev/null +++ b/test/replay_tests/basic_replay_finish_condition.py @@ -0,0 +1,79 @@ +# Copyright (c) 2025-present Polymath Robotics, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pathlib + +from launch import LaunchDescription +from launch.actions import ExecuteProcess, RegisterEventHandler, EmitEvent +from launch.events import Shutdown +from launch.event_handlers import OnProcessExit + +from replay_testing import ( + LocalFixture, + analyze, + fixtures, + read_messages, + run, + ReplayRunParams, +) + +cmd_vel_only_fixture = pathlib.Path(__file__).parent.parent / 'fixtures' / 'cmd_vel_only.mcap' + + +@fixtures.parameterize([LocalFixture(path=cmd_vel_only_fixture)]) +class FilterFixtures: + required_input_topics = ['/vehicle/cmd_vel'] + expected_output_topics = ['/user/cmd_vel'] + + +@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True)) +class Run: + def generate_launch_description(self) -> LaunchDescription: + target_process = ExecuteProcess( + cmd=[ + 'ros2', + 'topic', + 'pub', + '-r', + '10', + '-t', + '20', + '/user/cmd_vel', + 'geometry_msgs/msg/Twist', + '{linear: {x: 1.0}, angular: {z: 0.5}}', + ], + name='topic_pub', + output='screen', + ) + on_exit_handler = RegisterEventHandler( + OnProcessExit( + target_action=target_process, + # Shutdown the launch service + on_exit=[EmitEvent(event=Shutdown())], + ) + ) + return LaunchDescription([ + target_process, + on_exit_handler, + ]) + + +@analyze +class AnalyzeBasicReplay: + def test_expected_failure(self): + msgs_it = read_messages(self.reader, topics=['/user/cmd_vel']) + + msgs = [(topic_name, msg, timestamp) for topic_name, msg, timestamp in msgs_it] + assert len(msgs) == 20 From 3516b1c041a8cc2e11de730a3dabecad5a29d5b7 Mon Sep 17 00:00:00 2001 From: f0reachARR Date: Wed, 10 Dec 2025 22:57:00 +0900 Subject: [PATCH 5/5] run pre-commit --- replay_testing/replay_runner.py | 2 +- .../basic_replay_finish_condition.py | 46 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/replay_testing/replay_runner.py b/replay_testing/replay_runner.py index 9e78a7e..6644e9f 100644 --- a/replay_testing/replay_runner.py +++ b/replay_testing/replay_runner.py @@ -160,7 +160,7 @@ def _create_run_launch_description( ) ld.add_action(on_exit_handler) - + return ld def filter_fixtures(self) -> list[ReplayFixture]: diff --git a/test/replay_tests/basic_replay_finish_condition.py b/test/replay_tests/basic_replay_finish_condition.py index 164bdca..6e672b9 100644 --- a/test/replay_tests/basic_replay_finish_condition.py +++ b/test/replay_tests/basic_replay_finish_condition.py @@ -16,17 +16,17 @@ import pathlib from launch import LaunchDescription -from launch.actions import ExecuteProcess, RegisterEventHandler, EmitEvent -from launch.events import Shutdown +from launch.actions import EmitEvent, ExecuteProcess, RegisterEventHandler from launch.event_handlers import OnProcessExit +from launch.events import Shutdown from replay_testing import ( LocalFixture, + ReplayRunParams, analyze, fixtures, read_messages, run, - ReplayRunParams, ) cmd_vel_only_fixture = pathlib.Path(__file__).parent.parent / 'fixtures' / 'cmd_vel_only.mcap' @@ -42,28 +42,28 @@ class FilterFixtures: class Run: def generate_launch_description(self) -> LaunchDescription: target_process = ExecuteProcess( - cmd=[ - 'ros2', - 'topic', - 'pub', - '-r', - '10', - '-t', - '20', - '/user/cmd_vel', - 'geometry_msgs/msg/Twist', - '{linear: {x: 1.0}, angular: {z: 0.5}}', - ], - name='topic_pub', - output='screen', - ) + cmd=[ + 'ros2', + 'topic', + 'pub', + '-r', + '10', + '-t', + '20', + '/user/cmd_vel', + 'geometry_msgs/msg/Twist', + '{linear: {x: 1.0}, angular: {z: 0.5}}', + ], + name='topic_pub', + output='screen', + ) on_exit_handler = RegisterEventHandler( - OnProcessExit( - target_action=target_process, - # Shutdown the launch service - on_exit=[EmitEvent(event=Shutdown())], - ) + OnProcessExit( + target_action=target_process, + # Shutdown the launch service + on_exit=[EmitEvent(event=Shutdown())], ) + ) return LaunchDescription([ target_process, on_exit_handler,