diff --git a/README.md b/README.md index 8e67135..4c621c8 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,24 @@ When `use_clock=True` (default), the replay framework will: When `use_clock=False`, the replay will: - Skip `/clock` topic publishing +#### Termination Condition + +By default, the replay will run until all messages from the input MCAP are played back. +After that, the `run` stops immediately and the `analyze` step is executed. + +To wait for `run` to finish instead of stopping immediately after playback, you can set a `ignore_playback_finish` flag in `ReplayRunParams`: + +```python +from replay_testing import ReplayRunParams + +@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True)) +class Run: + def generate_launch_description(self) -> LaunchDescription: + # Your launch description here + # Note: Every node in the launch description must finish on its own. Otherwise, the test will hang. + pass +``` + ### Analyze `@analyze` The analyze step is run after the mcap from the `run` is recorded and written. It is a basic wrapper over `unittest.TestCase`, so any `unittest` assertions are built in. diff --git a/replay_testing/decorators/run.py b/replay_testing/decorators/run.py index 7d11e50..bd78d1a 100644 --- a/replay_testing/decorators/run.py +++ b/replay_testing/decorators/run.py @@ -52,5 +52,5 @@ def parameterize(parameters: list[ReplayRunParams]): return run(parameters=parameters) @staticmethod - def default(params: dict = {}): - return run(parameters=[ReplayRunParams(name='default', params=params)]) + def default(params: ReplayRunParams = ReplayRunParams(name='default')): + return run(parameters=[params]) diff --git a/replay_testing/models.py b/replay_testing/models.py index 4c55758..4054874 100644 --- a/replay_testing/models.py +++ b/replay_testing/models.py @@ -34,8 +34,9 @@ class RunnerArgs(BaseModel): class ReplayRunParams(BaseModel): name: str - params: dict + params: dict = {} runner_args: RunnerArgs = RunnerArgs() + ignore_playback_finish: bool = False class Mcap(BaseModel): diff --git a/replay_testing/replay_runner.py b/replay_testing/replay_runner.py index bb7af2b..6644e9f 100644 --- a/replay_testing/replay_runner.py +++ b/replay_testing/replay_runner.py @@ -139,26 +139,30 @@ def _create_run_launch_description( output='screen', ) - # Event handler to gracefully exit when the process finishes - on_exit_handler = RegisterEventHandler( - OnProcessExit( - target_action=player_action, - # Shutdown the launch service - on_exit=[launch.actions.EmitEvent(event=Shutdown())], - ) - ) - - # Launch description with the event handler - return LaunchDescription([ + # Launch description + ld = LaunchDescription([ ExecuteProcess( cmd=['ros2', 'bag', 'record', '-s', 'mcap', '-o', str(run_fixture.path), '--all'], output='screen', ), test_ld, player_action, # Add the MCAP playback action - on_exit_handler, # Add the event handler to shutdown after playback finishes ]) + if not params.ignore_playback_finish: + # Event handler to gracefully exit when the process finishes + on_exit_handler = RegisterEventHandler( + OnProcessExit( + target_action=player_action, + # Shutdown the launch service + on_exit=[launch.actions.EmitEvent(event=Shutdown())], + ) + ) + + ld.add_action(on_exit_handler) + + return ld + def filter_fixtures(self) -> list[ReplayFixture]: self._log_stage_start(ReplayTestingPhase.FIXTURES) diff --git a/test/replay_tests/basic_replay_finish_condition.py b/test/replay_tests/basic_replay_finish_condition.py new file mode 100644 index 0000000..6e672b9 --- /dev/null +++ b/test/replay_tests/basic_replay_finish_condition.py @@ -0,0 +1,79 @@ +# Copyright (c) 2025-present Polymath Robotics, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pathlib + +from launch import LaunchDescription +from launch.actions import EmitEvent, ExecuteProcess, RegisterEventHandler +from launch.event_handlers import OnProcessExit +from launch.events import Shutdown + +from replay_testing import ( + LocalFixture, + ReplayRunParams, + analyze, + fixtures, + read_messages, + run, +) + +cmd_vel_only_fixture = pathlib.Path(__file__).parent.parent / 'fixtures' / 'cmd_vel_only.mcap' + + +@fixtures.parameterize([LocalFixture(path=cmd_vel_only_fixture)]) +class FilterFixtures: + required_input_topics = ['/vehicle/cmd_vel'] + expected_output_topics = ['/user/cmd_vel'] + + +@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True)) +class Run: + def generate_launch_description(self) -> LaunchDescription: + target_process = ExecuteProcess( + cmd=[ + 'ros2', + 'topic', + 'pub', + '-r', + '10', + '-t', + '20', + '/user/cmd_vel', + 'geometry_msgs/msg/Twist', + '{linear: {x: 1.0}, angular: {z: 0.5}}', + ], + name='topic_pub', + output='screen', + ) + on_exit_handler = RegisterEventHandler( + OnProcessExit( + target_action=target_process, + # Shutdown the launch service + on_exit=[EmitEvent(event=Shutdown())], + ) + ) + return LaunchDescription([ + target_process, + on_exit_handler, + ]) + + +@analyze +class AnalyzeBasicReplay: + def test_expected_failure(self): + msgs_it = read_messages(self.reader, topics=['/user/cmd_vel']) + + msgs = [(topic_name, msg, timestamp) for topic_name, msg, timestamp in msgs_it] + assert len(msgs) == 20