From 03cf68d09c46acdb82e578247b4787d4cebd0ae3 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Wed, 6 May 2026 08:51:53 -0400 Subject: [PATCH 1/2] Bring `conductor resume` to flag parity with `conductor run` Adds the run-only flags that are meaningful during resumed execution to the resume command, fixing the broken UX where a workflow started with `--web` or `--web-bg` could not be resumed with a dashboard. New flags on resume: - --provider / -p runtime provider override - --metadata / -m CLI metadata merged on top of YAML metadata - --web start a real-time web dashboard - --web-port dashboard port (0 = auto-select) - --web-bg fork a detached process running resume + dashboard Intentionally not mirrored: - --input restored from checkpoint context - --workspace-instructions / --instructions instructions_preamble persisted in checkpoint - --dry-run incompatible with executing from a saved point Implementation: - resume_workflow_async() now wires up the same WorkflowEventEmitter, EventLogSubscriber, ConsoleEventSubscriber, WebDashboard lifecycle, and RunContext as run_workflow_async(). - Stop-signal handling refactored into shared _execute_with_stop_signal used by both _run_with_stop_signal and the new _resume_with_stop_signal. - New launch_background_resume() in bg_runner.py forks a detached `conductor resume` subprocess with the dashboard and writes a PID file so `conductor stop` can find it. - AGENTS.md gains a Run / Resume Parity subsection (mirroring the Provider Parity style) so future flag additions stay aligned. Notes the dashboard caveat in the docstring: on resume, only events from the resumed agent forward are shown. Events from agents that completed before the checkpoint were emitted in the original process and are not replayed. Tests: 10 new cases covering provider/metadata pass-through, --web flag handling, --web/--web-bg mutex, --web-bg dispatch to launch_background_resume, malformed metadata rejection, and direct unit tests of launch_background_resume command construction. Verification: full suite (2382 passed / 9 skipped), lint clean, format clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- AGENTS.md | 34 ++++- src/conductor/cli/app.py | 98 +++++++++++- src/conductor/cli/bg_runner.py | 133 ++++++++++++++++ src/conductor/cli/run.py | 165 +++++++++++++++++++- tests/test_cli/test_resume_command.py | 212 +++++++++++++++++++++++++- 5 files changed, 635 insertions(+), 7 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 7520f17..4122628 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -47,7 +47,11 @@ uv run conductor stop --all # stop all background workflows uv run conductor update # check for and install latest version # Resume a failed workflow from checkpoint -uv run conductor resume workflow.yaml # resume from latest checkpoint +uv run conductor resume workflow.yaml # resume from latest checkpoint +uv run conductor resume workflow.yaml --web # resume with dashboard +uv run conductor resume workflow.yaml --web-bg # resume with background dashboard +uv run conductor resume workflow.yaml --provider copilot +uv run conductor resume workflow.yaml -m tracker=ado uv run conductor checkpoints # list available checkpoints # Validate a workflow @@ -162,3 +166,31 @@ All providers (`copilot.py`, `claude.py`) must maintain feature parity. Any chan - **Reasoning effort**: All providers must accept the unified `reasoning.effort` field (`low` | `medium` | `high` | `xhigh`), translate it to the native API (Copilot `reasoning_effort` on the session; Claude extended `thinking` budget), validate that the selected model supports the requested effort, and raise `ValidationError` with a clear message when it does not. Any reasoning/thinking content the model returns must be surfaced via `agent_reasoning` events so the dashboard, JSONL logger, and console subscriber render it consistently. When modifying any provider, check all other providers for the same change. The dashboard, JSONL logger, console subscriber, and workflow engine all depend on consistent behavior across providers. + +### Run / Resume Parity + +The `run` and `resume` commands must accept the same flags wherever a flag is meaningful for a resumed run. When adding a new flag to `run`, add it to `resume` too unless there's a specific reason it cannot apply. + +Flags that **must** be mirrored on both: + +- `--provider` / `-p` — runtime provider override +- `--metadata` / `-m` — CLI metadata merged on top of YAML metadata +- `--skip-gates` — auto-select first option at human gates +- `--log-file` / `-l` — debug log file path (`auto` or explicit) +- `--no-interactive` — disable Esc-to-pause keyboard listener +- `--web` — start the real-time web dashboard +- `--web-port` — dashboard port (0 = auto-select) +- `--web-bg` — fork a detached process running the workflow + dashboard + +Flags intentionally **not** mirrored on `resume` (and why): + +- `--input` / `-i` — workflow inputs are restored from the checkpoint context; supplying them at resume would conflict. +- `--workspace-instructions`, `--instructions` — the `instructions_preamble` is persisted in the checkpoint and restored verbatim; re-supplying would be ambiguous. +- `--dry-run` — resume executes from a saved point and is incompatible with planning-only output. + +Implementation parity rules: + +- The async helpers (`run_workflow_async` and `resume_workflow_async` in `cli/run.py`) must wire up the same event emitter, JSONL event log subscriber, console event subscriber, and `WebDashboard` lifecycle. +- The `WorkflowEngine` constructor receives the same kwargs in both paths (`event_emitter`, `web_dashboard`, `run_context`, `interrupt_event`, `keyboard_listener`, `instructions_preamble`). +- Background-process forking lives in `cli/bg_runner.py`. `run --web-bg` calls `launch_background()` and `resume --web-bg` calls `launch_background_resume()`. Both must forward equivalent options and write a PID file via `cli/pid.py`. +- Note: on resume, the dashboard only shows events from the resumed agent forward — events from agents that completed before the checkpoint were emitted in the original process and are not replayed. diff --git a/src/conductor/cli/app.py b/src/conductor/cli/app.py index 6eeaa7e..6a773cf 100644 --- a/src/conductor/cli/app.py +++ b/src/conductor/cli/app.py @@ -688,6 +688,25 @@ def resume( help="Path to a specific checkpoint file to resume from.", ), ] = None, + provider: Annotated[ + str | None, + typer.Option( + "--provider", + "-p", + help="Override the provider specified in the workflow (e.g., 'copilot').", + ), + ] = None, + raw_metadata: Annotated[ + list[str] | None, + typer.Option( + "--metadata", + "-m", + help=( + "Workflow metadata in key=value format. " + "Merged on top of YAML metadata. Can be repeated." + ), + ), + ] = None, skip_gates: Annotated[ bool, typer.Option( @@ -713,6 +732,31 @@ def resume( help="Disable interactive interrupt capability (Esc to pause).", ), ] = False, + web: Annotated[ + bool, + typer.Option( + "--web", + help="Start a real-time web dashboard for workflow visualization.", + ), + ] = False, + web_port: Annotated[ + int, + typer.Option( + "--web-port", + help="Port for the web dashboard (0 = auto-select).", + ), + ] = 0, + web_bg: Annotated[ + bool, + typer.Option( + "--web-bg", + help=( + "Run resumed workflow + dashboard in a background process. " + "Prints the dashboard URL and exits immediately. " + "Does not require --web." + ), + ), + ] = False, ) -> None: """Resume a workflow from a checkpoint after failure. @@ -723,6 +767,11 @@ def resume( Either provide a workflow file (to find the latest checkpoint) or use --from to specify a checkpoint file directly. + Note: when running with --web or --web-bg, the dashboard only shows + events from the resumed agent forward. Agent runs that completed + before the checkpoint were emitted in the original process and are + not replayed. + \b Examples: conductor resume workflow.yaml @@ -730,11 +779,20 @@ def resume( conductor resume workflow.yaml --skip-gates conductor resume workflow.yaml --log-file auto conductor resume workflow.yaml --no-interactive + conductor resume workflow.yaml --provider copilot + conductor resume workflow.yaml --metadata tracker=ado -m work_item_id=1814 + conductor resume workflow.yaml --web + conductor resume workflow.yaml --web --web-port 8080 + conductor resume workflow.yaml --web-bg """ import asyncio import json - from conductor.cli.run import generate_log_path, resume_workflow_async + from conductor.cli.run import ( + generate_log_path, + parse_metadata_flags, + resume_workflow_async, + ) # Validate arguments if workflow is None and from_checkpoint is None: @@ -748,6 +806,10 @@ def resume( ) raise typer.Exit(code=1) + # Validate mutually exclusive flags + if web and web_bg: + raise typer.BadParameter("--web and --web-bg are mutually exclusive") + # Resolve workflow ref if provided resolved_workflow: Path | None = None if workflow is not None: @@ -789,6 +851,11 @@ def resume( ) raise typer.Exit(code=1) + # Parse --metadata key=value flags (no type coercion) + cli_metadata: dict[str, str] = {} + if raw_metadata: + cli_metadata.update(parse_metadata_flags(raw_metadata)) + # Resolve log file path resolved_log_file: Path | None = None if log_file is not None: @@ -798,14 +865,43 @@ def resume( else: resolved_log_file = Path(log_file) + # Handle --web-bg: fork a background process and exit immediately + if web_bg: + from conductor.cli.bg_runner import launch_background_resume + + try: + url = launch_background_resume( + workflow_path=resolved_workflow, + checkpoint_path=resolved_checkpoint, + provider_override=provider, + skip_gates=skip_gates, + log_file=resolved_log_file, + web_port=web_port, + metadata=cli_metadata, + ) + console.print(f"[bold cyan]Dashboard:[/bold cyan] {url}") + console.print( + "[dim]Resumed workflow running in background. Dashboard auto-shuts down after " + "workflow completes and all clients disconnect.[/dim]" + ) + except Exception as e: + print_error(e) + raise typer.Exit(code=1) from None + return + try: result = asyncio.run( resume_workflow_async( workflow_path=resolved_workflow, checkpoint_path=resolved_checkpoint, + provider_override=provider, skip_gates=skip_gates, log_file=resolved_log_file, no_interactive=no_interactive, + web=web, + web_port=web_port, + web_bg=web_bg, + metadata=cli_metadata, ) ) diff --git a/src/conductor/cli/bg_runner.py b/src/conductor/cli/bg_runner.py index d415ee9..b3bc25e 100644 --- a/src/conductor/cli/bg_runner.py +++ b/src/conductor/cli/bg_runner.py @@ -180,6 +180,139 @@ def launch_background( return f"http://127.0.0.1:{web_port}" +def launch_background_resume( + *, + workflow_path: Path | None, + checkpoint_path: Path | None, + provider_override: str | None = None, + skip_gates: bool = False, + log_file: Path | None = None, + web_port: int = 0, + metadata: dict[str, str] | None = None, +) -> str: + """Fork a detached child process resuming the workflow with a web dashboard. + + The child executes ``conductor resume --web ...`` + with all the caller-supplied options. The parent waits briefly for the + web server to become reachable, then returns the dashboard URL. + + Either ``workflow_path`` or ``checkpoint_path`` (or both) must be + provided — at least one is required by the resume command. + + Args: + workflow_path: Optional path to the workflow YAML file. Used to find + the latest checkpoint when ``checkpoint_path`` is not given. + checkpoint_path: Optional explicit path to a checkpoint file. + provider_override: Optional provider name override. + skip_gates: Whether to auto-select first option at human gates. + log_file: Optional log file path. + web_port: Desired port (0 = auto-select). + metadata: Optional CLI metadata key=value pairs. + + Returns: + The dashboard URL (e.g. ``http://127.0.0.1:8080``). + + Raises: + ValueError: If neither ``workflow_path`` nor ``checkpoint_path`` is + provided. + RuntimeError: If the child process fails to start or the server + doesn't become reachable within the timeout. + """ + if workflow_path is None and checkpoint_path is None: + raise ValueError( + "launch_background_resume requires either workflow_path or checkpoint_path" + ) + + # Resolve port early so we know what URL to return + if web_port == 0: + web_port = _find_free_port() + + # Build the subprocess command + cmd: list[str] = [ + sys.executable, + "-m", + "conductor", + "--silent", # suppress CLI output in the background process + "resume", + ] + + if workflow_path is not None: + cmd.append(str(workflow_path)) + + if checkpoint_path is not None: + cmd.extend(["--from", str(checkpoint_path)]) + + cmd.extend( + [ + "--web", + "--web-port", + str(web_port), + "--no-interactive", + ] + ) + + # Forward metadata + if metadata: + for key, value in metadata.items(): + cmd.extend(["--metadata", f"{key}={_serialize_value(value)}"]) + + if provider_override: + cmd.extend(["--provider", provider_override]) + + if skip_gates: + cmd.append("--skip-gates") + + if log_file: + cmd.extend(["--log-file", str(log_file)]) + + # Launch detached child + kwargs: dict[str, Any] = { + "stdout": subprocess.DEVNULL, + "stderr": subprocess.DEVNULL, + "stdin": subprocess.DEVNULL, + } + + if sys.platform != "win32": + kwargs["start_new_session"] = True + else: + # Windows: CREATE_NEW_PROCESS_GROUP for detachment + kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + + # Set environment variables to signal bg mode to the child + env = os.environ.copy() + env["CONDUCTOR_WEB_BG"] = "1" + env["CONDUCTOR_WEB_PORT"] = str(web_port) + kwargs["env"] = env + + try: + proc = subprocess.Popen(cmd, **kwargs) # noqa: S603 + except Exception as exc: + raise RuntimeError(f"Failed to start background process: {exc}") from exc + + # Wait for the web server to start + if not _wait_for_server(web_port, timeout=15.0): + retcode = proc.poll() + if retcode is not None: + raise RuntimeError( + f"Background process exited immediately with code {retcode}. " + f"Check logs or run without --web-bg for details." + ) + raise RuntimeError( + f"Dashboard did not start within 15 seconds on port {web_port}. " + f"The background process (PID {proc.pid}) may still be starting." + ) + + # Write PID file so `conductor stop` can find this process + from conductor.cli.pid import write_pid_file + + # Use workflow_path if available, otherwise checkpoint_path stem-derived + pid_workflow_ref = workflow_path if workflow_path is not None else checkpoint_path + assert pid_workflow_ref is not None # noqa: S101 # checked above + write_pid_file(proc.pid, web_port, pid_workflow_ref) + + return f"http://127.0.0.1:{web_port}" + + def _serialize_value(value: Any) -> str: """Serialize a value for passing as a CLI --input argument. diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 492141a..bf5b8ed 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -986,13 +986,56 @@ async def _run_with_stop_signal( Returns: The workflow result dict. + Raises: + ExecutionError: If the workflow was killed via the dashboard. + """ + return await _execute_with_stop_signal(engine.run(inputs), dashboard) + + +async def _resume_with_stop_signal( + engine: Any, + current_agent: str, + dashboard: Any | None, +) -> dict[str, Any]: + """Resume the workflow engine, racing against a dashboard kill signal. + + Mirrors :func:`_run_with_stop_signal` but invokes ``engine.resume()``. + + Args: + engine: The ``WorkflowEngine`` instance with restored state. + current_agent: Name of the agent to resume from. + dashboard: The ``WebDashboard`` instance, or None. + + Returns: + The workflow result dict. + + Raises: + ExecutionError: If the workflow was killed via the dashboard. + """ + return await _execute_with_stop_signal(engine.resume(current_agent), dashboard) + + +async def _execute_with_stop_signal( + engine_coro: Any, + dashboard: Any | None, +) -> dict[str, Any]: + """Execute an engine coroutine, racing against a dashboard kill signal. + + Args: + engine_coro: The coroutine to execute (``engine.run()`` or + ``engine.resume()``). + dashboard: The ``WebDashboard`` instance, or None. + + Returns: + The workflow result dict. + Raises: ExecutionError: If the workflow was killed via the dashboard. """ if dashboard is None: - return await engine.run(inputs) + return await engine_coro - engine_task = asyncio.create_task(engine.run(inputs)) + engine_task = asyncio.create_task(engine_coro) stop_task = asyncio.create_task(dashboard.wait_for_stop()) done, pending = await asyncio.wait( @@ -1471,9 +1514,15 @@ def _print_resume_instructions(engine: WorkflowEngine) -> None: async def resume_workflow_async( workflow_path: Path | None = None, checkpoint_path: Path | None = None, + provider_override: str | None = None, skip_gates: bool = False, log_file: Path | None = None, no_interactive: bool = False, + *, + web: bool = False, + web_port: int = 0, + web_bg: bool = False, + metadata: dict[str, str] | None = None, ) -> dict[str, Any]: """Resume a workflow from a checkpoint. @@ -1485,9 +1534,20 @@ async def resume_workflow_async( the latest checkpoint if ``checkpoint_path`` is not provided. checkpoint_path: Explicit path to a checkpoint file. Takes precedence over ``workflow_path``. + provider_override: Optional provider name to override workflow config + for the resumed run. skip_gates: If True, auto-selects first option at human gates. log_file: Optional path to write full debug output to a file. no_interactive: If True, disables the keyboard interrupt listener. + web: If True, start a real-time web dashboard for the resumed run. + Note: the dashboard only shows events from the resumed agent + forward; agent runs that completed before the checkpoint are + not replayed. + web_port: Port for the web dashboard (0 = auto-select). + web_bg: If True, auto-shutdown dashboard after workflow + client + disconnect. + metadata: Optional CLI metadata to merge on top of YAML-declared + metadata for the resumed run. Returns: The workflow output as a dictionary. @@ -1499,6 +1559,7 @@ async def resume_workflow_async( from conductor.engine.checkpoint import CheckpointManager from conductor.engine.context import WorkflowContext from conductor.engine.limits import LimitEnforcer + from conductor.events import WorkflowEventEmitter from conductor.exceptions import CheckpointError start_time = time.time() @@ -1512,6 +1573,11 @@ async def resume_workflow_async( f"[bold yellow]Warning:[/bold yellow] Cannot open log file {log_file}: {e}" ) + # Always create event emitter and JSONL log subscriber (parity with run) + emitter = WorkflowEventEmitter() + event_log_subscriber: Any = None + dashboard: Any = None + try: # Resolve checkpoint file if checkpoint_path is not None: @@ -1558,9 +1624,51 @@ async def resume_workflow_async( f"Checkpoint created: {cp.created_at} (failed at: {cp.failure.get('agent', 'unknown')})" ) + # Start web dashboard now that we have the workflow path + if web: + from conductor.web.server import WebDashboard + + bg_mode = web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1" + dashboard = WebDashboard( + emitter, + host="127.0.0.1", + port=web_port, + bg=bg_mode, + workflow_root=resolved_workflow_path.resolve().parent, + ) + + try: + await dashboard.start() + _verbose_console.print(f"[bold cyan]Dashboard:[/bold cyan] {dashboard.url}") + except Exception as e: + _verbose_console.print( + f"[bold yellow]Warning:[/bold yellow] " + f"Dashboard failed to start: {e}. Continuing without dashboard." + ) + dashboard = None + # Load workflow config config = load_config(resolved_workflow_path) + # Merge CLI metadata on top of YAML-declared metadata (parity with run) + if metadata: + config.workflow.metadata.update(metadata) + + # Apply provider override if specified (parity with run) + if provider_override: + verbose_log(f"Provider override: {provider_override}", style="yellow") + config.workflow.runtime.provider = provider_override # type: ignore[assignment] + + # Start JSONL event log subscriber (parity with run) + from conductor.engine.event_log import EventLogSubscriber + + event_log_subscriber = EventLogSubscriber(config.workflow.name) + emitter.subscribe(event_log_subscriber.on_event) + + # Subscribe console output to the event emitter (parity with run) + console_subscriber = ConsoleEventSubscriber() + emitter.subscribe(console_subscriber.on_event) + # Verify the current_agent exists in the workflow agent_names = {a.name for a in config.agents} parallel_names = {g.name for g in config.parallel} if config.parallel else set() @@ -1595,13 +1703,20 @@ async def resume_workflow_async( registry.set_resume_session_ids(cp.copilot_session_ids) # Set up interrupt listener if interactive mode is enabled + # Disabled in --web mode since the CLI isn't used for interaction interrupt_event: asyncio.Event | None = None listener = None - if not no_interactive and sys.stdin.isatty(): + if not no_interactive and not web and sys.stdin.isatty(): from conductor.interrupt.listener import KeyboardListener interrupt_event = asyncio.Event() listener = KeyboardListener(interrupt_event=interrupt_event) + elif web: + # In --web mode: no keyboard listener, but still need interrupt_event + # so POST /api/stop can interrupt the running agent mid-execution + interrupt_event = asyncio.Event() + + from conductor.engine.workflow import RunContext engine = WorkflowEngine( config, @@ -1609,18 +1724,30 @@ async def resume_workflow_async( skip_gates=skip_gates, workflow_path=resolved_workflow_path, interrupt_event=interrupt_event, + event_emitter=emitter, keyboard_listener=listener, + web_dashboard=dashboard, instructions_preamble=cp.instructions_preamble, + run_context=RunContext( + run_id=event_log_subscriber.run_id if event_log_subscriber else "", + log_file=str(event_log_subscriber.path) if event_log_subscriber else "", + dashboard_port=(dashboard.port if dashboard is not None else None), + bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1", + ), ) engine.set_context(restored_context) engine.set_limits(restored_limits) + # Share interrupt_event with dashboard so POST /api/stop can abort agents + if dashboard is not None and interrupt_event is not None: + dashboard.set_interrupt_event(interrupt_event) + try: if listener is not None: await listener.start() _verbose_console.print("[dim]Press Esc to interrupt and provide guidance[/dim]") - result = await engine.resume(cp.current_agent) + result = await _resume_with_stop_signal(engine, cp.current_agent, dashboard) except BaseException: _print_resume_instructions(engine) raise @@ -1642,8 +1769,38 @@ async def resume_workflow_async( CheckpointManager.cleanup(cp.file_path) verbose_log(f"Checkpoint cleaned up: {cp.file_path}", style="dim") + # Post-execution dashboard lifecycle (parity with run) + if dashboard is not None: + is_bg = web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1" + if is_bg: + await dashboard.wait_for_clients_disconnect() + else: + _verbose_console.print( + f"\n[bold green]Workflow complete.[/bold green] " + f"Dashboard still running at {dashboard.url} — " + f"press [bold]Ctrl+C[/bold] to exit." + ) + with contextlib.suppress(asyncio.CancelledError): + await asyncio.Event().wait() + return result finally: + # Clean up PID file if this is a background child process + is_bg_child = os.environ.get("CONDUCTOR_WEB_BG") == "1" + if is_bg_child: + from conductor.cli.pid import remove_pid_file_for_current_process + + remove_pid_file_for_current_process() + + # Stop dashboard if it was started + if dashboard is not None: + await dashboard.stop() + + # Close JSONL event log and report path + if event_log_subscriber is not None: + event_log_subscriber.close() + _verbose_console.print(f"[dim]Event log written to: {event_log_subscriber.path}[/dim]") + # Report log file path to stderr and close file logging if log_file is not None and _file_console is not None: _verbose_console.print(f"[dim]Log written to: {log_file}[/dim]") diff --git a/tests/test_cli/test_resume_command.py b/tests/test_cli/test_resume_command.py index beefe5e..64e6829 100644 --- a/tests/test_cli/test_resume_command.py +++ b/tests/test_cli/test_resume_command.py @@ -15,7 +15,7 @@ import json from pathlib import Path -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from typer.testing import CliRunner @@ -215,6 +215,216 @@ def test_resume_handles_execution_error(self, tmp_path: Path) -> None: assert result.exit_code == 1 + def test_resume_with_provider_override(self, tmp_path: Path) -> None: + """Test resume passes --provider through as provider_override.""" + wf_path = _write_workflow(tmp_path) + + with patch( + "conductor.cli.run.resume_workflow_async", new_callable=AsyncMock + ) as mock_resume: + mock_resume.return_value = {"result": "ok"} + runner.invoke(app, ["resume", str(wf_path), "--provider", "claude"]) + + call_kwargs = mock_resume.call_args + assert call_kwargs[1]["provider_override"] == "claude" + + def test_resume_with_metadata(self, tmp_path: Path) -> None: + """Test resume parses --metadata flags into a dict.""" + wf_path = _write_workflow(tmp_path) + + with patch( + "conductor.cli.run.resume_workflow_async", new_callable=AsyncMock + ) as mock_resume: + mock_resume.return_value = {"result": "ok"} + runner.invoke( + app, + [ + "resume", + str(wf_path), + "--metadata", + "tracker=ado", + "-m", + "work_item_id=1814", + ], + ) + + call_kwargs = mock_resume.call_args + assert call_kwargs[1]["metadata"] == { + "tracker": "ado", + "work_item_id": "1814", + } + + def test_resume_invalid_metadata_format(self, tmp_path: Path) -> None: + """Test resume rejects malformed --metadata values.""" + wf_path = _write_workflow(tmp_path) + + result = runner.invoke(app, ["resume", str(wf_path), "--metadata", "no_equals"]) + assert result.exit_code != 0 + + def test_resume_with_web(self, tmp_path: Path) -> None: + """Test resume passes --web and --web-port through.""" + wf_path = _write_workflow(tmp_path) + + with patch( + "conductor.cli.run.resume_workflow_async", new_callable=AsyncMock + ) as mock_resume: + mock_resume.return_value = {"result": "ok"} + runner.invoke(app, ["resume", str(wf_path), "--web", "--web-port", "9091"]) + + call_kwargs = mock_resume.call_args + assert call_kwargs[1]["web"] is True + assert call_kwargs[1]["web_port"] == 9091 + assert call_kwargs[1]["web_bg"] is False + + def test_resume_web_and_web_bg_mutually_exclusive(self, tmp_path: Path) -> None: + """Test that --web and --web-bg cannot be combined.""" + wf_path = _write_workflow(tmp_path) + + result = runner.invoke(app, ["resume", str(wf_path), "--web", "--web-bg"]) + assert result.exit_code != 0 + assert "mutually exclusive" in result.output.lower() + + def test_resume_web_bg_invokes_launch_background_resume(self, tmp_path: Path) -> None: + """Test that --web-bg dispatches to launch_background_resume.""" + wf_path = _write_workflow(tmp_path) + + with patch("conductor.cli.bg_runner.launch_background_resume") as mock_launch: + mock_launch.return_value = "http://127.0.0.1:9092" + result = runner.invoke( + app, + [ + "resume", + str(wf_path), + "--web-bg", + "--web-port", + "9092", + "--provider", + "copilot", + "-m", + "tracker=ado", + "--skip-gates", + ], + ) + + assert result.exit_code == 0 + assert "http://127.0.0.1:9092" in result.output + assert mock_launch.called + kwargs = mock_launch.call_args[1] + assert kwargs["workflow_path"] == wf_path.resolve() + assert kwargs["checkpoint_path"] is None + assert kwargs["provider_override"] == "copilot" + assert kwargs["skip_gates"] is True + assert kwargs["web_port"] == 9092 + assert kwargs["metadata"] == {"tracker": "ado"} + + def test_resume_web_bg_with_from_checkpoint(self, tmp_path: Path) -> None: + """Test --web-bg forwards --from checkpoint path.""" + wf_path = _write_workflow(tmp_path) + cp_path = _write_checkpoint(tmp_path, wf_path) + + with patch("conductor.cli.bg_runner.launch_background_resume") as mock_launch: + mock_launch.return_value = "http://127.0.0.1:9093" + result = runner.invoke(app, ["resume", "--from", str(cp_path), "--web-bg"]) + + assert result.exit_code == 0 + kwargs = mock_launch.call_args[1] + assert kwargs["workflow_path"] is None + assert kwargs["checkpoint_path"] == cp_path.resolve() + + +# --------------------------------------------------------------------------- +# launch_background_resume tests +# --------------------------------------------------------------------------- + + +class TestLaunchBackgroundResume: + """Tests for the launch_background_resume helper in bg_runner.py.""" + + def test_requires_workflow_or_checkpoint(self) -> None: + """Test that launch_background_resume raises when both args are None.""" + from conductor.cli.bg_runner import launch_background_resume + + with pytest.raises(ValueError, match="workflow_path or checkpoint_path"): + launch_background_resume(workflow_path=None, checkpoint_path=None) + + def test_builds_resume_subcommand_with_workflow(self, tmp_path: Path) -> None: + """Test the subprocess command starts with `conductor resume `.""" + from conductor.cli import bg_runner + + wf_path = tmp_path / "wf.yaml" + wf_path.write_text("workflow: {name: x, entry_point: a}\nagents: []\n") + + captured: dict[str, list[str]] = {} + + def _fake_popen(cmd: list[str], **kwargs: object) -> MagicMock: # type: ignore[no-untyped-def] + captured["cmd"] = cmd + proc = MagicMock() + proc.pid = 12345 + proc.poll.return_value = None + return proc + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", side_effect=_fake_popen), + patch("conductor.cli.bg_runner._wait_for_server", return_value=True), + patch("conductor.cli.pid.write_pid_file"), + ): + url = bg_runner.launch_background_resume( + workflow_path=wf_path, + checkpoint_path=None, + provider_override="copilot", + skip_gates=True, + metadata={"tracker": "ado"}, + web_port=9099, + ) + + assert url == "http://127.0.0.1:9099" + cmd = captured["cmd"] + # `--silent` is global and must precede the subcommand + assert "--silent" in cmd + assert cmd.index("resume") > cmd.index("--silent") + assert str(wf_path) in cmd + assert "--web" in cmd + assert "--web-port" in cmd + assert "9099" in cmd + assert "--no-interactive" in cmd + assert "--provider" in cmd and "copilot" in cmd + assert "--skip-gates" in cmd + assert "--metadata" in cmd + assert "tracker=ado" in cmd + + def test_builds_resume_subcommand_with_from_checkpoint(self, tmp_path: Path) -> None: + """Test --from is forwarded when checkpoint_path is given without workflow_path.""" + from conductor.cli import bg_runner + + cp_path = tmp_path / "cp.json" + cp_path.write_text("{}") + + captured: dict[str, list[str]] = {} + + def _fake_popen(cmd: list[str], **kwargs: object) -> MagicMock: # type: ignore[no-untyped-def] + captured["cmd"] = cmd + proc = MagicMock() + proc.pid = 12345 + proc.poll.return_value = None + return proc + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", side_effect=_fake_popen), + patch("conductor.cli.bg_runner._wait_for_server", return_value=True), + patch("conductor.cli.pid.write_pid_file"), + ): + bg_runner.launch_background_resume( + workflow_path=None, + checkpoint_path=cp_path, + web_port=9100, + ) + + cmd = captured["cmd"] + assert "resume" in cmd + assert "--from" in cmd + from_idx = cmd.index("--from") + assert cmd[from_idx + 1] == str(cp_path) + # --------------------------------------------------------------------------- # Checkpoints command tests From 0181004b8736a9c96509bd3239648f1581dd2e93 Mon Sep 17 00:00:00 2001 From: Jason Robert Date: Wed, 6 May 2026 12:30:42 -0400 Subject: [PATCH 2/2] Address PR review: prevent orphan bg children, fix cancel cleanup, add wiring tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bg_runner.py: - Extract _terminate_child() and _finalize_background_launch() helpers shared by launch_background and launch_background_resume. - On dashboard-startup timeout, terminate the still-running child before raising so it does not orphan holding the port with no PID file. - Wrap write_pid_file in try/except, terminating the child on failure so we never leave a discoverable-only-by-pkill background process. - Replace strippable assert with explicit guard. - Update module docstring to mention both run and resume. - Document that --no-interactive is always appended. run.py: - _execute_with_stop_signal: cancel pending tasks then drain via asyncio.gather(return_exceptions=True). The previous contextlib.suppress(CancelledError) only swallowed CancelledError, so a stored non-CancelledError on the losing task (e.g. dashboard.stop raised) aborted the cleanup loop and leaked the other pending task. tests/test_cli/test_resume_command.py: 15 new cases covering - launch_background_resume failure paths and detachment kwargs - _execute_with_stop_signal direct semantics (no-dashboard, engine-wins, stop-wins, losing-task-with-exception regression) - resume_workflow_async wiring without mocking the function itself: dashboard OSError non-fatal, provider_override mutates config, metadata merges into config, RunContext populated with bg_mode and run_id/log_file, --metadata value containing = survives parse 2397 passed, 9 skipped — no regressions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/bg_runner.py | 125 +++++--- src/conductor/cli/run.py | 8 +- tests/test_cli/test_resume_command.py | 438 ++++++++++++++++++++++++++ 3 files changed, 528 insertions(+), 43 deletions(-) diff --git a/src/conductor/cli/bg_runner.py b/src/conductor/cli/bg_runner.py index b3bc25e..9c30986 100644 --- a/src/conductor/cli/bg_runner.py +++ b/src/conductor/cli/bg_runner.py @@ -1,8 +1,9 @@ """Background runner for ``--web-bg`` mode. -When ``conductor run --web-bg`` is used, this module forks a detached child -process that runs the workflow with ``--web`` enabled, then the parent process -prints the dashboard URL and exits immediately. +When ``conductor run --web-bg`` or ``conductor resume --web-bg`` is used, this +module forks a detached child process that runs the workflow with ``--web`` +enabled, then the parent process prints the dashboard URL and exits +immediately. The child process is fully detached (new session on Unix, new process group on Windows) so it outlives the parent. It auto-shuts down after the workflow @@ -12,6 +13,7 @@ from __future__ import annotations +import contextlib import json import os import socket @@ -53,6 +55,73 @@ def _wait_for_server(port: int, timeout: float = 15.0) -> bool: return False +def _terminate_child(proc: subprocess.Popen[Any]) -> None: + """Best-effort terminate a still-running child process. + + Used to avoid orphaned background workflows when post-launch validation + (server reachability, PID file write) fails. Any errors raised while + terminating are swallowed so the original failure surfaces to the caller. + + Args: + proc: The subprocess.Popen handle to terminate. + """ + if proc.poll() is not None: + return + try: + proc.terminate() + try: + proc.wait(timeout=5.0) + except subprocess.TimeoutExpired: + proc.kill() + with contextlib.suppress(Exception): + proc.wait(timeout=2.0) + except Exception: # noqa: BLE001 - cleanup must not raise + pass + + +def _finalize_background_launch( + proc: subprocess.Popen[Any], + web_port: int, + pid_workflow_ref: Path, +) -> None: + """Wait for the dashboard to come up and write the PID file. + + On any failure (server didn't start, child died early, PID write raised), + the still-running child is terminated to avoid orphaned processes holding + the dashboard port without a discoverable PID file. + + Args: + proc: The detached child process. + web_port: The TCP port the child should be listening on. + pid_workflow_ref: Path used to derive the PID file name and recorded + inside it for ``conductor stop`` to display. + + Raises: + RuntimeError: If the child died early, the dashboard didn't start + within the timeout, or the PID file could not be written. + """ + if not _wait_for_server(web_port, timeout=15.0): + retcode = proc.poll() + if retcode is not None: + raise RuntimeError( + f"Background process exited immediately with code {retcode}. " + f"Check logs or run without --web-bg for details." + ) + _terminate_child(proc) + raise RuntimeError( + f"Dashboard did not start within 15 seconds on port {web_port}. " + f"The background process was terminated." + ) + + from conductor.cli.pid import write_pid_file + + try: + write_pid_file(proc.pid, web_port, pid_workflow_ref) + except Exception as exc: + _terminate_child(proc) + raise RuntimeError(f"Failed to write PID file for background process: {exc}") from exc + + def launch_background( *, workflow_path: Path, @@ -158,24 +227,7 @@ def launch_background( except Exception as exc: raise RuntimeError(f"Failed to start background process: {exc}") from exc - # Wait for the web server to start - if not _wait_for_server(web_port, timeout=15.0): - # Check if the process already died - retcode = proc.poll() - if retcode is not None: - raise RuntimeError( - f"Background process exited immediately with code {retcode}. " - f"Check logs or run without --web-bg for details." - ) - raise RuntimeError( - f"Dashboard did not start within 15 seconds on port {web_port}. " - f"The background process (PID {proc.pid}) may still be starting." - ) - - # Write PID file so `conductor stop` can find this process - from conductor.cli.pid import write_pid_file - - write_pid_file(proc.pid, web_port, workflow_path) + _finalize_background_launch(proc, web_port, workflow_path) return f"http://127.0.0.1:{web_port}" @@ -193,8 +245,9 @@ def launch_background_resume( """Fork a detached child process resuming the workflow with a web dashboard. The child executes ``conductor resume --web ...`` - with all the caller-supplied options. The parent waits briefly for the - web server to become reachable, then returns the dashboard URL. + with all the caller-supplied options. ``--no-interactive`` is always + appended since the detached child has no TTY. The parent waits briefly + for the web server to become reachable, then returns the dashboard URL. Either ``workflow_path`` or ``checkpoint_path`` (or both) must be provided — at least one is required by the resume command. @@ -289,26 +342,16 @@ def launch_background_resume( except Exception as exc: raise RuntimeError(f"Failed to start background process: {exc}") from exc - # Wait for the web server to start - if not _wait_for_server(web_port, timeout=15.0): - retcode = proc.poll() - if retcode is not None: - raise RuntimeError( - f"Background process exited immediately with code {retcode}. " - f"Check logs or run without --web-bg for details." - ) - raise RuntimeError( - f"Dashboard did not start within 15 seconds on port {web_port}. " - f"The background process (PID {proc.pid}) may still be starting." + # Use workflow_path if available, otherwise fall back to checkpoint_path + # for the PID file name and recorded reference. + pid_workflow_ref = workflow_path if workflow_path is not None else checkpoint_path + if pid_workflow_ref is None: # pragma: no cover - guarded above + _terminate_child(proc) + raise ValueError( + "launch_background_resume requires either workflow_path or checkpoint_path" ) - # Write PID file so `conductor stop` can find this process - from conductor.cli.pid import write_pid_file - - # Use workflow_path if available, otherwise checkpoint_path stem-derived - pid_workflow_ref = workflow_path if workflow_path is not None else checkpoint_path - assert pid_workflow_ref is not None # noqa: S101 # checked above - write_pid_file(proc.pid, web_port, pid_workflow_ref) + _finalize_background_launch(proc, web_port, pid_workflow_ref) return f"http://127.0.0.1:{web_port}" diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index bf5b8ed..11d1dd3 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -1043,10 +1043,14 @@ async def _execute_with_stop_signal( return_when=asyncio.FIRST_COMPLETED, ) + # Cancel any losing task and drain it. Use ``gather(return_exceptions=True)`` + # so a non-CancelledError stored on the losing task (e.g. dashboard.stop + # raised, or engine raised right as the kill button fired) does not abort + # the cleanup loop and leak an un-awaited task. for task in pending: task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await task + if pending: + await asyncio.gather(*pending, return_exceptions=True) if engine_task in done: return engine_task.result() diff --git a/tests/test_cli/test_resume_command.py b/tests/test_cli/test_resume_command.py index 64e6829..31ef622 100644 --- a/tests/test_cli/test_resume_command.py +++ b/tests/test_cli/test_resume_command.py @@ -15,6 +15,7 @@ import json from pathlib import Path +from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -638,3 +639,440 @@ async def test_workflow_file_not_found(self, tmp_path: Path) -> None: with pytest.raises(CheckpointError, match="Workflow file not found"): await resume_workflow_async(checkpoint_path=cp_path) + + +# --------------------------------------------------------------------------- +# launch_background_resume failure paths and detachment behavior +# --------------------------------------------------------------------------- + + +class TestLaunchBackgroundResumeFailures: + """Failure paths and detachment kwargs for launch_background_resume.""" + + def test_terminates_child_on_server_timeout(self, tmp_path: Path) -> None: + """If the dashboard never comes up, the still-running child is killed.""" + from conductor.cli import bg_runner + + wf_path = tmp_path / "wf.yaml" + wf_path.write_text("workflow: {name: x, entry_point: a}\nagents: []\n") + + proc = MagicMock() + proc.pid = 4242 + proc.poll.return_value = None # still running + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", return_value=proc), + patch("conductor.cli.bg_runner._wait_for_server", return_value=False), + patch("conductor.cli.pid.write_pid_file") as mock_write, + pytest.raises(RuntimeError, match="terminated"), + ): + bg_runner.launch_background_resume(workflow_path=wf_path, checkpoint_path=None) + + proc.terminate.assert_called_once() + mock_write.assert_not_called() + + def test_reports_immediate_child_exit(self, tmp_path: Path) -> None: + """If the child died before the server came up, surface its exit code.""" + from conductor.cli import bg_runner + + wf_path = tmp_path / "wf.yaml" + wf_path.write_text("workflow: {name: x, entry_point: a}\nagents: []\n") + + proc = MagicMock() + proc.pid = 4243 + proc.poll.return_value = 7 # exited with code 7 + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", return_value=proc), + patch("conductor.cli.bg_runner._wait_for_server", return_value=False), + patch("conductor.cli.pid.write_pid_file") as mock_write, + pytest.raises(RuntimeError, match="exited immediately with code 7"), + ): + bg_runner.launch_background_resume(workflow_path=wf_path, checkpoint_path=None) + + # Child already dead -> no terminate, no PID file written. + proc.terminate.assert_not_called() + mock_write.assert_not_called() + + def test_terminates_child_on_pid_write_failure(self, tmp_path: Path) -> None: + """If write_pid_file raises, the running child is killed (no orphan).""" + from conductor.cli import bg_runner + + wf_path = tmp_path / "wf.yaml" + wf_path.write_text("workflow: {name: x, entry_point: a}\nagents: []\n") + + proc = MagicMock() + proc.pid = 4244 + proc.poll.return_value = None + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", return_value=proc), + patch("conductor.cli.bg_runner._wait_for_server", return_value=True), + patch("conductor.cli.pid.write_pid_file", side_effect=OSError("disk full")), + pytest.raises(RuntimeError, match="Failed to write PID file"), + ): + bg_runner.launch_background_resume(workflow_path=wf_path, checkpoint_path=None) + + proc.terminate.assert_called_once() + + def test_pid_file_written_with_workflow_path(self, tmp_path: Path) -> None: + """When workflow_path is provided, the PID file is keyed to it.""" + from conductor.cli import bg_runner + + wf_path = tmp_path / "wf.yaml" + wf_path.write_text("workflow: {name: x, entry_point: a}\nagents: []\n") + + proc = MagicMock() + proc.pid = 5555 + proc.poll.return_value = None + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", return_value=proc), + patch("conductor.cli.bg_runner._wait_for_server", return_value=True), + patch("conductor.cli.pid.write_pid_file") as mock_write, + ): + bg_runner.launch_background_resume( + workflow_path=wf_path, checkpoint_path=None, web_port=9201 + ) + + mock_write.assert_called_once_with(5555, 9201, wf_path) + + def test_pid_file_falls_back_to_checkpoint_path(self, tmp_path: Path) -> None: + """When only checkpoint_path is given, it is used for the PID file ref.""" + from conductor.cli import bg_runner + + cp_path = tmp_path / "cp.json" + cp_path.write_text("{}") + + proc = MagicMock() + proc.pid = 5556 + proc.poll.return_value = None + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", return_value=proc), + patch("conductor.cli.bg_runner._wait_for_server", return_value=True), + patch("conductor.cli.pid.write_pid_file") as mock_write, + ): + bg_runner.launch_background_resume( + workflow_path=None, checkpoint_path=cp_path, web_port=9202 + ) + + mock_write.assert_called_once_with(5556, 9202, cp_path) + + def test_subprocess_detachment_kwargs(self, tmp_path: Path) -> None: + """Verify Popen is called with detachment + DEVNULL + bg env vars.""" + import sys as _sys + + from conductor.cli import bg_runner + + wf_path = tmp_path / "wf.yaml" + wf_path.write_text("workflow: {name: x, entry_point: a}\nagents: []\n") + + captured: dict[str, object] = {} + + def _fake_popen(cmd: list[str], **kwargs: object) -> MagicMock: + captured.update(kwargs) + captured["cmd"] = cmd + proc = MagicMock() + proc.pid = 1 + proc.poll.return_value = None + return proc + + with ( + patch("conductor.cli.bg_runner.subprocess.Popen", side_effect=_fake_popen), + patch("conductor.cli.bg_runner._wait_for_server", return_value=True), + patch("conductor.cli.pid.write_pid_file"), + ): + bg_runner.launch_background_resume( + workflow_path=wf_path, checkpoint_path=None, web_port=9203 + ) + + import subprocess as _sp + + assert captured["stdout"] is _sp.DEVNULL + assert captured["stderr"] is _sp.DEVNULL + assert captured["stdin"] is _sp.DEVNULL + if _sys.platform == "win32": + assert captured["creationflags"] == _sp.CREATE_NEW_PROCESS_GROUP + else: + assert captured["start_new_session"] is True + env = captured["env"] + assert isinstance(env, dict) + assert env["CONDUCTOR_WEB_BG"] == "1" + assert env["CONDUCTOR_WEB_PORT"] == "9203" + + +# --------------------------------------------------------------------------- +# _execute_with_stop_signal tests (used by both run and resume) +# --------------------------------------------------------------------------- + + +class TestExecuteWithStopSignal: + """Direct tests of the shared cancellation helper.""" + + @pytest.mark.asyncio + async def test_returns_engine_result_when_no_dashboard(self) -> None: + from conductor.cli.run import _execute_with_stop_signal + + async def _engine() -> dict[str, str]: + return {"ok": "yes"} + + result = await _execute_with_stop_signal(_engine(), dashboard=None) + assert result == {"ok": "yes"} + + @pytest.mark.asyncio + async def test_returns_engine_result_when_engine_finishes_first(self) -> None: + import asyncio + + from conductor.cli.run import _execute_with_stop_signal + + dashboard = MagicMock() + + async def _never_stop() -> None: + await asyncio.Event().wait() + + dashboard.wait_for_stop = _never_stop + + async def _engine() -> dict[str, str]: + await asyncio.sleep(0) + return {"ok": "yes"} + + result = await _execute_with_stop_signal(_engine(), dashboard=dashboard) + assert result == {"ok": "yes"} + + @pytest.mark.asyncio + async def test_raises_execution_error_when_stop_fires_first(self) -> None: + import asyncio + + from conductor.cli.run import _execute_with_stop_signal + from conductor.exceptions import ExecutionError + + dashboard = MagicMock() + + async def _stop() -> None: + return None # stop signal fires immediately + + dashboard.wait_for_stop = _stop + + async def _engine() -> dict[str, str]: + await asyncio.Event().wait() # would block forever + return {} + + with pytest.raises(ExecutionError, match="stopped by user"): + await _execute_with_stop_signal(_engine(), dashboard=dashboard) + + @pytest.mark.asyncio + async def test_losing_task_with_exception_does_not_leak(self) -> None: + """Regression: the cleanup loop must drain a losing task even if + cancelling it surfaces a stored non-CancelledError. With the previous + ``contextlib.suppress(CancelledError)`` the second pending task could + be left un-awaited; ``asyncio.gather(return_exceptions=True)`` fixes it. + """ + import asyncio + + from conductor.cli.run import _execute_with_stop_signal + + dashboard = MagicMock() + + async def _stop_raises() -> None: + # Stop signal "fires" by raising — this lands as a stored exception + # on the losing wait_for_stop task once it's cancelled. + raise RuntimeError("dashboard stop boom") + + dashboard.wait_for_stop = _stop_raises + + async def _engine() -> dict[str, str]: + await asyncio.sleep(0) + return {"ok": "engine won"} + + # Either outcome (engine wins or stop wins) is acceptable; the only + # thing this test guards against is the helper itself raising or + # leaking an un-awaited task warning. + import contextlib as _ctx + + with _ctx.suppress(Exception): + await _execute_with_stop_signal(_engine(), dashboard=dashboard) + + +# --------------------------------------------------------------------------- +# resume_workflow_async wiring tests (no mocking of resume_workflow_async) +# --------------------------------------------------------------------------- + + +def _make_resume_mocks() -> tuple[MagicMock, MagicMock]: + """Create ProviderRegistry + WorkflowEngine mocks for resume_workflow_async.""" + mock_registry = AsyncMock() + mock_registry.__aenter__ = AsyncMock(return_value=mock_registry) + mock_registry.__aexit__ = AsyncMock(return_value=False) + mock_registry.set_resume_session_ids = MagicMock() + + mock_engine = MagicMock() + mock_engine.resume = AsyncMock(return_value={"result": "ok"}) + mock_engine.config = MagicMock() + mock_engine.config.workflow.cost.show_summary = False + mock_engine._last_checkpoint_path = None + mock_engine.set_context = MagicMock() + mock_engine.set_limits = MagicMock() + mock_engine.get_execution_summary = MagicMock(return_value={}) + return mock_registry, mock_engine + + +class TestResumeWiring: + """Verify resume_workflow_async actually wires the new components.""" + + @pytest.mark.asyncio + async def test_dashboard_start_oserror_is_non_fatal(self, tmp_path: Path) -> None: + """Mirror of run-side test: dashboard start failure must not abort resume.""" + from conductor.cli.run import resume_workflow_async + + wf_path = _write_workflow(tmp_path) + cp_path = _write_checkpoint(tmp_path, wf_path) + + mock_dashboard = MagicMock() + mock_dashboard.start = AsyncMock(side_effect=OSError("port busy")) + mock_dashboard.stop = AsyncMock() + + mock_web_module = MagicMock() + mock_web_module.WebDashboard.return_value = mock_dashboard + + mock_registry, mock_engine = _make_resume_mocks() + + import sys as _sys + + with ( + patch.dict(_sys.modules, {"conductor.web.server": mock_web_module}), + patch("conductor.cli.run.ProviderRegistry", return_value=mock_registry), + patch("conductor.cli.run.WorkflowEngine", return_value=mock_engine), + patch( + "conductor.cli.run._build_mcp_servers", + new_callable=AsyncMock, + return_value=None, + ), + ): + result = await resume_workflow_async(checkpoint_path=cp_path, web=True) + + assert result == {"result": "ok"} + assert mock_engine.resume.await_count == 1 + + @pytest.mark.asyncio + async def test_provider_override_mutates_config(self, tmp_path: Path) -> None: + """provider_override must overwrite config.workflow.runtime.provider.""" + from conductor.cli.run import resume_workflow_async + + wf_path = _write_workflow(tmp_path) + cp_path = _write_checkpoint(tmp_path, wf_path) + + captured_configs: list[Any] = [] + + def _capture_config(config: Any, **_kwargs: Any) -> Any: # noqa: ANN401 + captured_configs.append(config) + mock_registry, _ = _make_resume_mocks() + return mock_registry + + mock_registry, mock_engine = _make_resume_mocks() + + with ( + patch("conductor.cli.run.ProviderRegistry", side_effect=_capture_config), + patch("conductor.cli.run.WorkflowEngine", return_value=mock_engine), + patch( + "conductor.cli.run._build_mcp_servers", + new_callable=AsyncMock, + return_value=None, + ), + ): + await resume_workflow_async(checkpoint_path=cp_path, provider_override="claude") + + assert captured_configs, "ProviderRegistry was not constructed" + cfg = captured_configs[0] + assert cfg.workflow.runtime.provider == "claude" + + @pytest.mark.asyncio + async def test_metadata_merges_into_config(self, tmp_path: Path) -> None: + """CLI metadata must be merged on top of YAML metadata on resume.""" + from conductor.cli.run import resume_workflow_async + + wf_path = _write_workflow(tmp_path) + cp_path = _write_checkpoint(tmp_path, wf_path) + + captured_configs: list[Any] = [] + + def _capture_config(config: Any, **_kwargs: Any) -> Any: # noqa: ANN401 + captured_configs.append(config) + mock_registry, _ = _make_resume_mocks() + return mock_registry + + _, mock_engine = _make_resume_mocks() + + with ( + patch("conductor.cli.run.ProviderRegistry", side_effect=_capture_config), + patch("conductor.cli.run.WorkflowEngine", return_value=mock_engine), + patch( + "conductor.cli.run._build_mcp_servers", + new_callable=AsyncMock, + return_value=None, + ), + ): + await resume_workflow_async( + checkpoint_path=cp_path, metadata={"tracker": "ado", "ticket": "1234"} + ) + + cfg = captured_configs[0] + assert cfg.workflow.metadata["tracker"] == "ado" + assert cfg.workflow.metadata["ticket"] == "1234" + + @pytest.mark.asyncio + async def test_run_context_populated_on_resume(self, tmp_path: Path) -> None: + """RunContext passed to WorkflowEngine must include run_id, log_file, bg_mode.""" + import os as _os + + from conductor.cli.run import resume_workflow_async + + wf_path = _write_workflow(tmp_path) + cp_path = _write_checkpoint(tmp_path, wf_path) + + engine_kwargs: dict[str, Any] = {} + + def _capture_engine(*_args: Any, **kwargs: Any) -> Any: # noqa: ANN401 + engine_kwargs.update(kwargs) + _, mock_engine = _make_resume_mocks() + return mock_engine + + mock_registry, _ = _make_resume_mocks() + + # Force bg_mode via env var (simulates the bg-child code path). + with ( + patch.dict(_os.environ, {"CONDUCTOR_WEB_BG": "1"}, clear=False), + patch("conductor.cli.run.ProviderRegistry", return_value=mock_registry), + patch("conductor.cli.run.WorkflowEngine", side_effect=_capture_engine), + patch( + "conductor.cli.run._build_mcp_servers", + new_callable=AsyncMock, + return_value=None, + ), + ): + await resume_workflow_async(checkpoint_path=cp_path) + + rc = engine_kwargs.get("run_context") + assert rc is not None, f"run_context not passed; got kwargs={list(engine_kwargs)}" + assert rc.bg_mode is True + assert isinstance(rc.run_id, str) and rc.run_id # populated from event log subscriber + assert isinstance(rc.log_file, str) and rc.log_file + # event_emitter must be wired so the dashboard / event log receive events + assert engine_kwargs.get("event_emitter") is not None + + def test_metadata_value_with_equals_sign_via_cli(self, tmp_path: Path) -> None: + """Regression: --metadata key=https://x?a=b must keep the right-hand =.""" + wf_path = _write_workflow(tmp_path) + _write_checkpoint(tmp_path, wf_path) + + with patch( + "conductor.cli.run.resume_workflow_async", new_callable=AsyncMock + ) as mock_resume: + mock_resume.return_value = {"result": "ok"} + result = runner.invoke( + app, + ["resume", str(wf_path), "-m", "url=https://x?a=b&c=d"], + ) + assert result.exit_code == 0, result.output + kwargs = mock_resume.call_args[1] + assert kwargs["metadata"] == {"url": "https://x?a=b&c=d"}