From c89b686da7422b1f64242c632e9bbbef4c28e4db Mon Sep 17 00:00:00 2001 From: Brent Rusinow Date: Mon, 4 May 2026 23:51:11 -0700 Subject: [PATCH 1/3] feat: add per-agent timeout_seconds for hard wall-clock timeouts (#82) Add timeout_seconds field to AgentDef that wraps agent execution in asyncio.wait_for() at the engine level. This provides hard cancellation for slow agents without blocking entire workflows. Key behaviors: - Effective timeout = min(agent.timeout_seconds, remaining_workflow_timeout) - When workflow timeout is stricter, it owns the error (no mislabeling) - Emits agent_timeout event with agent name, elapsed time, and limit - Raises AgentTimeoutError (subclass of TimeoutError) for existing error handling semantics (fail_fast, continue_on_error) - Scoped to provider-backed agents only (script uses 'timeout', human_gate/workflow types rejected) - Applied at all execution sites: main loop, parallel groups, for-each groups Schema: - New field: timeout_seconds: float | None (ge=1.0) - Rejected for script, human_gate, and workflow agent types Exception: - New AgentTimeoutError class with agent_name attribute Console: - New verbose_log_agent_timeout handler for agent_timeout events Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/cli/run.py | 39 ++ src/conductor/config/schema.py | 32 ++ src/conductor/engine/workflow.py | 86 +++- src/conductor/exceptions.py | 39 ++ tests/test_engine/test_agent_timeout.py | 575 ++++++++++++++++++++++++ 5 files changed, 760 insertions(+), 11 deletions(-) create mode 100644 tests/test_engine/test_agent_timeout.py diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 11d1dd3..ef9cc01 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -419,6 +419,38 @@ def verbose_log_parallel_agent_failed( _file_console.print(error_msg) +def verbose_log_agent_timeout( + agent_name: str, + elapsed: float, + timeout_seconds: float, +) -> None: + """Log agent timeout. + + Args: + agent_name: Name of the agent that timed out. + elapsed: Elapsed time in seconds. + timeout_seconds: Configured timeout limit. + """ + from rich.text import Text + + from conductor.cli.app import is_verbose + + should_console = is_verbose() + should_file = _file_console is not None + if not should_console and not should_file: + return + + text = Text() + text.append(" ⏱ ", style="yellow") + text.append(agent_name, style="yellow bold") + text.append(f" timed out after {elapsed:.1f}s (limit: {timeout_seconds:.0f}s)", style="dim") + + if should_console: + _verbose_console.print(text) + if _file_console is not None: + _file_console.print(text) + + def verbose_log_parallel_summary( group_name: str, success_count: int, @@ -665,6 +697,13 @@ def on_event(self, event: WorkflowEvent) -> None: output_tokens=d.get("output_tokens"), ) + elif t == "agent_timeout": + verbose_log_agent_timeout( + d.get("agent_name", "?"), + d.get("elapsed", 0.0), + d.get("timeout_seconds", 0.0), + ) + elif t == "route_taken": verbose_log_route(d.get("to_agent", "?")) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index eaa6b8f..42b6981 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -567,6 +567,29 @@ class AgentDef(BaseModel): max_depth: 3 # Allow at most 3 levels of recursion """ + timeout_seconds: float | None = Field(None, ge=1.0) + """Hard wall-clock timeout for this agent's execution in seconds. + + When set, the engine wraps the entire agent execution in + ``asyncio.wait_for()``. If exceeded, raises ``AgentTimeoutError`` + which is handled by existing error semantics (``fail_fast``, + ``continue_on_error``). + + The effective timeout is ``min(timeout_seconds, remaining_workflow_timeout)`` + so agent timeouts never exceed the workflow-level limit. + + Only applies to provider-backed agents (not script, human_gate, + or workflow types). This is a hard cancellation — unlike + ``max_session_seconds`` which checks between provider iterations. + + Note: Agent-level timeouts are non-retryable. The retry policy + operates inside the provider and is cancelled along with the agent. + + Example:: + + timeout_seconds: 120 # Cancel agent after 2 minutes + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -666,6 +689,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("human_gate agents cannot have 'max_depth'") if self.reasoning is not None: raise ValueError("human_gate agents cannot have 'reasoning'") + if self.timeout_seconds is not None: + raise ValueError("human_gate agents cannot have 'timeout_seconds'") elif self.type == "script": if not self.command: raise ValueError("script agents require 'command'") @@ -700,6 +725,11 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_depth'") if self.reasoning is not None: raise ValueError("script agents cannot have 'reasoning'") + if self.timeout_seconds is not None: + raise ValueError( + "script agents cannot have 'timeout_seconds' " + "(use 'timeout' for script-specific timeouts)" + ) elif self.type == "workflow": if not self.workflow: raise ValueError("workflow agents require 'workflow' path") @@ -725,6 +755,8 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'retry'") if self.dialog is not None: raise ValueError("workflow agents cannot have 'dialog'") + if self.timeout_seconds is not None: + raise ValueError("workflow agents cannot have 'timeout_seconds'") else: # Regular agent or human_gate — input_mapping is not valid if self.input_mapping is not None: diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 324ffc7..bff20f5 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -24,6 +24,7 @@ from conductor.engine.usage import UsageTracker, WorkflowUsage from conductor.events import WorkflowEvent, WorkflowEventEmitter from conductor.exceptions import ( + AgentTimeoutError, ConductorError, ExecutionError, InterruptError, @@ -602,6 +603,60 @@ async def _execute_script(self, agent: AgentDef, context: dict[str, Any]) -> Scr operation_name=f"script '{agent.name}'", ) + async def _execute_with_agent_timeout( + self, + agent: AgentDef, + coro: Any, + ) -> Any: + """Wrap an agent execution coroutine with the agent's timeout_seconds. + + If the agent has ``timeout_seconds`` configured, this wraps the coroutine + in ``asyncio.wait_for()`` with an effective timeout that is the minimum of + the agent's timeout and any remaining workflow-level timeout. + + When the remaining workflow timeout is stricter, the coroutine runs + without the agent timeout wrapper so the existing workflow timeout + path handles the error with correct attribution. + + On timeout, emits an ``agent_timeout`` event and raises ``AgentTimeoutError``. + + Args: + agent: Agent definition with optional ``timeout_seconds``. + coro: The coroutine to execute (typically ``executor.execute(...)``). + + Returns: + Result of the coroutine. + + Raises: + AgentTimeoutError: If the agent exceeds its timeout_seconds. + """ + if agent.timeout_seconds is None: + return await coro + + # If workflow remaining timeout is stricter, let it own the error + remaining = self.limits.get_remaining_timeout() + if remaining is not None and remaining <= agent.timeout_seconds: + return await coro + + start = _time.monotonic() + try: + return await asyncio.wait_for(coro, timeout=agent.timeout_seconds) + except TimeoutError: + elapsed = _time.monotonic() - start + self._emit( + "agent_timeout", + { + "agent_name": agent.name, + "elapsed": elapsed, + "timeout_seconds": agent.timeout_seconds, + }, + ) + raise AgentTimeoutError( + agent_name=agent.name, + elapsed_seconds=elapsed, + timeout_seconds=agent.timeout_seconds, + ) from None + def _build_subworkflow_inputs( self, agent: AgentDef, @@ -2138,12 +2193,15 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: executor = await self._get_executor_for_agent(agent) guidance_section = self.context.get_guidance_prompt_section() event_callback = self._make_event_callback(agent.name) - output = await executor.execute( + output = await self._execute_with_agent_timeout( agent, - agent_context, - guidance_section=guidance_section, - interrupt_signal=self._interrupt_event, - event_callback=event_callback, + executor.execute( + agent, + agent_context, + guidance_section=guidance_section, + interrupt_signal=self._interrupt_event, + event_callback=event_callback, + ), ) _agent_elapsed = _time.time() - _agent_start @@ -2925,10 +2983,13 @@ async def execute_single_agent(agent: AgentDef) -> tuple[str, Any]: # Execute agent (get executor for multi-provider support) executor = await self._get_executor_for_agent(agent) event_callback = self._make_event_callback(agent.name) - output = await executor.execute( + output = await self._execute_with_agent_timeout( agent, - agent_context, - event_callback=event_callback, + executor.execute( + agent, + agent_context, + event_callback=event_callback, + ), ) _agent_elapsed = _time.time() - _agent_start @@ -3348,10 +3409,13 @@ def _item_callback(event_type: str, data: dict[str, Any]) -> None: self._emit(event_type, data_with_agent) event_callback = _item_callback if self._event_emitter else None - output = await executor.execute( + output = await self._execute_with_agent_timeout( for_each_group.agent, - agent_context, - event_callback=event_callback, + executor.execute( + for_each_group.agent, + agent_context, + event_callback=event_callback, + ), ) _item_elapsed = _time.time() - _item_start diff --git a/src/conductor/exceptions.py b/src/conductor/exceptions.py index b49ad78..04fa793 100644 --- a/src/conductor/exceptions.py +++ b/src/conductor/exceptions.py @@ -443,6 +443,45 @@ def __init__( super().__init__(message, suggestion, file_path, line_number) +class AgentTimeoutError(TimeoutError): + """Raised when an individual agent exceeds its per-agent timeout_seconds limit. + + This is distinct from the workflow-level TimeoutError. An AgentTimeoutError + is raised when an agent's ``timeout_seconds`` configuration causes a hard + cancellation via ``asyncio.wait_for()``. + + Attributes: + agent_name: Name of the agent that timed out. + """ + + def __init__( + self, + *, + agent_name: str, + elapsed_seconds: float, + timeout_seconds: float, + ) -> None: + """Initialize an AgentTimeoutError. + + Args: + agent_name: Name of the agent that timed out. + elapsed_seconds: The time elapsed before the timeout. + timeout_seconds: The configured per-agent timeout limit. + """ + message = f"Agent '{agent_name}' exceeded its timeout ({timeout_seconds}s)" + suggestion = f"Increase timeout_seconds for agent '{agent_name}' or optimize its execution" + super().__init__( + message, + elapsed_seconds=elapsed_seconds, + timeout_seconds=timeout_seconds, + current_agent=agent_name, + suggestion=suggestion, + ) + # Set after super().__init__() because ExecutionError.__init__ + # defaults agent_name to None and would overwrite an earlier assignment. + self.agent_name = agent_name + + class HumanGateError(ExecutionError): """Raised when a human gate encounters an error. diff --git a/tests/test_engine/test_agent_timeout.py b/tests/test_engine/test_agent_timeout.py new file mode 100644 index 0000000..63c1eef --- /dev/null +++ b/tests/test_engine/test_agent_timeout.py @@ -0,0 +1,575 @@ +"""Tests for per-agent timeout_seconds feature (issue #82). + +Tests cover: +- Schema validation: timeout_seconds accepted/rejected per agent type +- Engine: _execute_with_agent_timeout helper behavior +- Integration: agent timeout in main loop, parallel groups, for-each groups +- Event emission: agent_timeout event +- Interaction with workflow-level timeout +""" + +from __future__ import annotations + +import asyncio +from unittest.mock import patch + +import pytest + +from conductor.config.schema import ( + AgentDef, + ContextConfig, + GateOption, + LimitsConfig, + OutputField, + ParallelGroup, + RouteDef, + RuntimeConfig, + WorkflowConfig, + WorkflowDef, +) +from conductor.engine.workflow import WorkflowEngine +from conductor.events import WorkflowEvent, WorkflowEventEmitter +from conductor.exceptions import AgentTimeoutError +from conductor.providers.copilot import CopilotProvider + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class EventCollector: + """Collects events emitted by a WorkflowEventEmitter.""" + + def __init__(self) -> None: + self.events: list[WorkflowEvent] = [] + + def __call__(self, event: WorkflowEvent) -> None: + self.events.append(event) + + def types(self) -> list[str]: + return [e.type for e in self.events] + + def of_type(self, event_type: str) -> list[WorkflowEvent]: + return [e for e in self.events if e.type == event_type] + + +def _make_emitter_and_collector() -> tuple[WorkflowEventEmitter, EventCollector]: + emitter = WorkflowEventEmitter() + collector = EventCollector() + emitter.subscribe(collector) + return emitter, collector + + +# --------------------------------------------------------------------------- +# Schema validation tests +# --------------------------------------------------------------------------- + + +class TestAgentTimeoutSchema: + """Validate timeout_seconds field acceptance/rejection per agent type.""" + + def test_regular_agent_accepts_timeout_seconds(self) -> None: + """Provider-backed agents accept timeout_seconds.""" + agent = AgentDef( + name="fast", + model="gpt-4", + prompt="Do something", + timeout_seconds=30.0, + routes=[RouteDef(to="$end")], + ) + assert agent.timeout_seconds == 30.0 + + def test_regular_agent_without_timeout_seconds(self) -> None: + """timeout_seconds defaults to None.""" + agent = AgentDef( + name="fast", + model="gpt-4", + prompt="Do something", + routes=[RouteDef(to="$end")], + ) + assert agent.timeout_seconds is None + + def test_timeout_seconds_must_be_positive(self) -> None: + """timeout_seconds must be >= 1.0.""" + with pytest.raises(Exception, match="greater than or equal to 1"): + AgentDef( + name="bad", + model="gpt-4", + prompt="Do something", + timeout_seconds=0.5, + routes=[RouteDef(to="$end")], + ) + + def test_script_agent_rejects_timeout_seconds(self) -> None: + """Script agents must use 'timeout', not 'timeout_seconds'.""" + with pytest.raises(ValueError, match="script agents cannot have 'timeout_seconds'"): + AgentDef( + name="script1", + type="script", + command="echo hello", + timeout_seconds=30.0, + routes=[RouteDef(to="$end")], + ) + + def test_human_gate_rejects_timeout_seconds(self) -> None: + """Human gate agents cannot have timeout_seconds.""" + with pytest.raises(ValueError, match="human_gate agents cannot have 'timeout_seconds'"): + AgentDef( + name="gate1", + type="human_gate", + prompt="Choose", + options=[GateOption(label="Yes", value="yes", route="$end")], + timeout_seconds=30.0, + routes=[RouteDef(to="$end")], + ) + + def test_workflow_agent_rejects_timeout_seconds(self) -> None: + """Workflow agents cannot have timeout_seconds.""" + with pytest.raises(ValueError, match="workflow agents cannot have 'timeout_seconds'"): + AgentDef( + name="sub1", + type="workflow", + workflow="./sub.yaml", + timeout_seconds=30.0, + routes=[RouteDef(to="$end")], + ) + + +# --------------------------------------------------------------------------- +# Engine helper tests +# --------------------------------------------------------------------------- + + +class TestExecuteWithAgentTimeout: + """Unit tests for WorkflowEngine._execute_with_agent_timeout.""" + + def _make_engine( + self, + timeout_seconds: float | None = None, + workflow_timeout: int | None = None, + emitter: WorkflowEventEmitter | None = None, + ) -> tuple[WorkflowEngine, AgentDef]: + """Create a minimal engine and agent for timeout testing.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="timeout-test", + entry_point="agent1", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10, timeout_seconds=workflow_timeout), + ), + agents=[ + AgentDef( + name="agent1", + model="gpt-4", + prompt="Test", + timeout_seconds=timeout_seconds, + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ agent1.output.result }}"}, + ) + provider = CopilotProvider(mock_handler=lambda a, p, c: {"result": "ok"}) + engine = WorkflowEngine(config, provider, event_emitter=emitter) + engine.limits.start() + return engine, config.agents[0] + + @pytest.mark.asyncio + async def test_no_timeout_passes_through(self) -> None: + """When timeout_seconds is None, coroutine runs without wrapping.""" + engine, agent = self._make_engine(timeout_seconds=None) + + async def fast_coro(): + return "result" + + result = await engine._execute_with_agent_timeout(agent, fast_coro()) + assert result == "result" + + @pytest.mark.asyncio + async def test_fast_agent_completes_within_timeout(self) -> None: + """Agent that completes within timeout succeeds.""" + engine, agent = self._make_engine(timeout_seconds=5.0) + + async def fast_coro(): + await asyncio.sleep(0.01) + return "done" + + result = await engine._execute_with_agent_timeout(agent, fast_coro()) + assert result == "done" + + @pytest.mark.asyncio + async def test_slow_agent_raises_agent_timeout_error(self) -> None: + """Agent that exceeds timeout_seconds raises AgentTimeoutError.""" + engine, agent = self._make_engine(timeout_seconds=1.0) + + async def slow_coro(): + await asyncio.sleep(10) + return "never" + + with pytest.raises(AgentTimeoutError) as exc_info: + await engine._execute_with_agent_timeout(agent, slow_coro()) + + assert exc_info.value.agent_name == "agent1" + assert exc_info.value.timeout_seconds == 1.0 + assert exc_info.value.elapsed_seconds > 0 + + @pytest.mark.asyncio + async def test_timeout_emits_agent_timeout_event(self) -> None: + """agent_timeout event is emitted when agent times out.""" + emitter, collector = _make_emitter_and_collector() + engine, agent = self._make_engine(timeout_seconds=1.0, emitter=emitter) + + async def slow_coro(): + await asyncio.sleep(10) + + with pytest.raises(AgentTimeoutError): + await engine._execute_with_agent_timeout(agent, slow_coro()) + + timeout_events = collector.of_type("agent_timeout") + assert len(timeout_events) == 1 + assert timeout_events[0].data["agent_name"] == "agent1" + assert timeout_events[0].data["timeout_seconds"] == 1.0 + assert timeout_events[0].data["elapsed"] > 0 + + @pytest.mark.asyncio + async def test_workflow_timeout_stricter_skips_agent_wrapper(self) -> None: + """When workflow remaining timeout <= agent timeout, skip agent wrapper. + + This prevents mislabeling a workflow timeout as an agent timeout. + """ + engine, agent = self._make_engine(timeout_seconds=60.0, workflow_timeout=1) + # Workflow started with 1s timeout. Remaining is ~1s which is < 60s. + # The helper should skip the agent timeout wrapper. + + async def fast_coro(): + return "ok" + + # Should complete without wrapping (no agent timeout applied) + result = await engine._execute_with_agent_timeout(agent, fast_coro()) + assert result == "ok" + + @pytest.mark.asyncio + async def test_agent_timeout_stricter_applies_agent_wrapper(self) -> None: + """When agent timeout is stricter than workflow, agent wrapper is used.""" + engine, agent = self._make_engine(timeout_seconds=1.0, workflow_timeout=3600) + + async def slow_coro(): + await asyncio.sleep(10) + + with pytest.raises(AgentTimeoutError): + await engine._execute_with_agent_timeout(agent, slow_coro()) + + +# --------------------------------------------------------------------------- +# Integration tests: full workflow execution with timeout +# --------------------------------------------------------------------------- + + +class TestAgentTimeoutIntegration: + """End-to-end tests for agent timeout in workflow execution.""" + + @pytest.mark.asyncio + async def test_agent_timeout_in_main_loop(self) -> None: + """Agent with timeout_seconds that exceeds its limit raises AgentTimeoutError.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="timeout-workflow", + entry_point="slow_agent", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="slow_agent", + model="gpt-4", + prompt="Think deeply", + timeout_seconds=1.0, + output={"result": OutputField(type="string")}, + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ slow_agent.output.result }}"}, + ) + + # Make the provider's execute slow by patching it + provider = CopilotProvider(mock_handler=lambda a, p, c: {"result": "ok"}) + engine = WorkflowEngine(config, provider) + + # Patch the executor's execute to add async delay + original_execute = engine.executor.execute + + async def slow_execute(*args, **kwargs): + await asyncio.sleep(10) + return await original_execute(*args, **kwargs) + + with ( + patch.object(engine.executor, "execute", side_effect=slow_execute), + pytest.raises(AgentTimeoutError) as exc_info, + ): + await engine.run({"question": "test"}) + + assert exc_info.value.agent_name == "slow_agent" + + @pytest.mark.asyncio + async def test_agent_timeout_emits_event_in_workflow(self) -> None: + """agent_timeout event is emitted during workflow execution.""" + emitter, collector = _make_emitter_and_collector() + + config = WorkflowConfig( + workflow=WorkflowDef( + name="timeout-workflow", + entry_point="slow_agent", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="slow_agent", + model="gpt-4", + prompt="Think deeply", + timeout_seconds=1.0, + output={"result": OutputField(type="string")}, + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "{{ slow_agent.output.result }}"}, + ) + + provider = CopilotProvider(mock_handler=lambda a, p, c: {"result": "ok"}) + engine = WorkflowEngine(config, provider, event_emitter=emitter) + + original_execute = engine.executor.execute + + async def slow_execute(*args, **kwargs): + await asyncio.sleep(10) + return await original_execute(*args, **kwargs) + + with ( + patch.object(engine.executor, "execute", side_effect=slow_execute), + pytest.raises(AgentTimeoutError), + ): + await engine.run({"question": "test"}) + + assert "agent_timeout" in collector.types() + timeout_event = collector.of_type("agent_timeout")[0] + assert timeout_event.data["agent_name"] == "slow_agent" + assert timeout_event.data["timeout_seconds"] == 1.0 + + @pytest.mark.asyncio + async def test_agent_without_timeout_completes_normally(self) -> None: + """Agent without timeout_seconds executes without wrapping.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="normal-workflow", + entry_point="agent1", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="agent1", + model="gpt-4", + prompt="Answer: {{ workflow.input.question }}", + output={"answer": OutputField(type="string")}, + routes=[RouteDef(to="$end")], + ), + ], + output={"answer": "{{ agent1.output.answer }}"}, + ) + + provider = CopilotProvider(mock_handler=lambda a, p, c: {"answer": "42"}) + engine = WorkflowEngine(config, provider) + result = await engine.run({"question": "test"}) + assert result["answer"] == 42 or result["answer"] == "42" + + +# --------------------------------------------------------------------------- +# Parallel group timeout tests +# --------------------------------------------------------------------------- + + +class TestAgentTimeoutParallel: + """Agent timeout behavior in parallel groups.""" + + @pytest.mark.asyncio + async def test_timeout_in_parallel_group_fail_fast(self) -> None: + """Timed-out agent in fail_fast parallel group fails the group.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="parallel-timeout", + entry_point="group1", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="fast_agent", + model="gpt-4", + prompt="Quick task", + output={"result": OutputField(type="string")}, + ), + AgentDef( + name="slow_agent", + model="gpt-4", + prompt="Slow task", + timeout_seconds=1.0, + output={"result": OutputField(type="string")}, + ), + ], + parallel=[ + ParallelGroup( + name="group1", + agents=["fast_agent", "slow_agent"], + failure_mode="fail_fast", + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "done"}, + ) + + def mock_handler(agent, prompt, context): + return {"result": "ok"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider) + + # Patch only slow_agent's execution to be slow + original_get_executor = engine._get_executor_for_agent + + async def patched_get_executor(agent): + executor = await original_get_executor(agent) + if agent.name == "slow_agent": + original_exec = executor.execute + + async def slow_exec(*args, **kwargs): + await asyncio.sleep(10) + return await original_exec(*args, **kwargs) + + executor.execute = slow_exec + return executor + + with ( + patch.object(engine, "_get_executor_for_agent", side_effect=patched_get_executor), + pytest.raises(Exception) as exc_info, + ): + await engine.run({}) + + # The error should be related to the timeout (either AgentTimeoutError + # or wrapped in an ExecutionError by the parallel group handler) + error_str = str(exc_info.value) + assert "timeout" in error_str.lower() or "slow_agent" in error_str + + @pytest.mark.asyncio + async def test_timeout_in_parallel_group_continue_on_error(self) -> None: + """Timed-out agent in continue_on_error parallel group allows others to succeed.""" + emitter, collector = _make_emitter_and_collector() + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parallel-timeout-continue", + entry_point="group1", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="fast_agent", + model="gpt-4", + prompt="Quick task", + output={"result": OutputField(type="string")}, + ), + AgentDef( + name="slow_agent", + model="gpt-4", + prompt="Slow task", + timeout_seconds=1.0, + output={"result": OutputField(type="string")}, + ), + ], + parallel=[ + ParallelGroup( + name="group1", + agents=["fast_agent", "slow_agent"], + failure_mode="continue_on_error", + routes=[RouteDef(to="$end")], + ), + ], + output={"result": "done"}, + ) + + def mock_handler(agent, prompt, context): + return {"result": "ok"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, event_emitter=emitter) + + original_get_executor = engine._get_executor_for_agent + + async def patched_get_executor(agent): + executor = await original_get_executor(agent) + if agent.name == "slow_agent": + original_exec = executor.execute + + async def slow_exec(*args, **kwargs): + await asyncio.sleep(10) + return await original_exec(*args, **kwargs) + + executor.execute = slow_exec + return executor + + with patch.object(engine, "_get_executor_for_agent", side_effect=patched_get_executor): + result = await engine.run({}) + + # Workflow should complete (continue_on_error) + assert result is not None + # agent_timeout should have been emitted + assert "agent_timeout" in collector.types() + + +# --------------------------------------------------------------------------- +# Exception tests +# --------------------------------------------------------------------------- + + +class TestAgentTimeoutError: + """Tests for AgentTimeoutError exception.""" + + def test_agent_timeout_error_attributes(self) -> None: + """AgentTimeoutError has correct attributes.""" + err = AgentTimeoutError( + agent_name="slow_agent", + elapsed_seconds=15.3, + timeout_seconds=10.0, + ) + assert err.agent_name == "slow_agent" + assert err.elapsed_seconds == 15.3 + assert err.timeout_seconds == 10.0 + assert err.current_agent == "slow_agent" + assert "slow_agent" in str(err) + assert "10" in str(err) + + def test_agent_timeout_error_is_timeout_error(self) -> None: + """AgentTimeoutError inherits from conductor.exceptions.TimeoutError.""" + from conductor.exceptions import TimeoutError as ConductorTimeoutError + + err = AgentTimeoutError( + agent_name="test", + elapsed_seconds=5.0, + timeout_seconds=3.0, + ) + assert isinstance(err, ConductorTimeoutError) + + def test_agent_timeout_error_suggestion(self) -> None: + """AgentTimeoutError has a helpful suggestion.""" + err = AgentTimeoutError( + agent_name="researcher", + elapsed_seconds=120.5, + timeout_seconds=120.0, + ) + assert "timeout_seconds" in err.suggestion + assert "researcher" in err.suggestion From 57331791707dbe27d49e47b190f99f7758cfdbbb Mon Sep 17 00:00:00 2001 From: Brent Rusinow Date: Tue, 5 May 2026 12:47:52 -0700 Subject: [PATCH 2/3] test: add coverage for verbose_log_agent_timeout and ConsoleEventSubscriber Add tests for the agent_timeout console logging path to address CodeCov coverage gaps: - verbose_log_agent_timeout with verbose mode enabled - verbose_log_agent_timeout with verbose mode disabled (no-op path) - verbose_log_agent_timeout file logging dual-write - ConsoleEventSubscriber.on_event agent_timeout dispatch Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/test_cli/test_logging.py | 104 +++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/tests/test_cli/test_logging.py b/tests/test_cli/test_logging.py index 1ddcbda..deb689a 100644 --- a/tests/test_cli/test_logging.py +++ b/tests/test_cli/test_logging.py @@ -626,6 +626,50 @@ def test_verbose_log_parallel_agent_failed(self) -> None: finally: verbose_mode.reset(token) + def test_verbose_log_agent_timeout(self) -> None: + """Test verbose_log_agent_timeout function.""" + from io import StringIO + + from rich.console import Console + + from conductor.cli.run import verbose_log_agent_timeout + + output = StringIO() + token = verbose_mode.set(True) + try: + with patch( + "conductor.cli.run._verbose_console", + Console(file=output, force_terminal=True, no_color=True), + ): + verbose_log_agent_timeout("slow_agent", 15.3, 10.0) + output_text = output.getvalue() + assert "slow_agent" in output_text + assert "15.3" in output_text + assert "10" in output_text + assert "timed out" in output_text + finally: + verbose_mode.reset(token) + + def test_verbose_log_agent_timeout_not_verbose(self) -> None: + """Test verbose_log_agent_timeout is a no-op when not verbose.""" + from io import StringIO + + from rich.console import Console + + from conductor.cli.run import verbose_log_agent_timeout + + output = StringIO() + token = verbose_mode.set(False) + try: + with patch( + "conductor.cli.run._verbose_console", + Console(file=output, force_terminal=True, no_color=True), + ): + verbose_log_agent_timeout("slow_agent", 15.3, 10.0) + assert output.getvalue() == "" + finally: + verbose_mode.reset(token) + def test_verbose_log_parallel_summary_success(self) -> None: """Test verbose_log_parallel_summary for successful execution.""" from io import StringIO @@ -1224,6 +1268,27 @@ def test_parallel_agent_failed_writes_to_file(self, tmp_path: Path) -> None: finally: verbose_mode.reset(token) + def test_agent_timeout_writes_to_file(self, tmp_path: Path) -> None: + """Test that verbose_log_agent_timeout writes to file.""" + from conductor.cli.run import ( + close_file_logging, + init_file_logging, + verbose_log_agent_timeout, + ) + + log_path = tmp_path / "test.log" + token = verbose_mode.set(False) + try: + init_file_logging(log_path) + verbose_log_agent_timeout("slow_agent", 15.3, 10.0) + close_file_logging() + + content = log_path.read_text(encoding="utf-8") + assert "slow_agent" in content + assert "timed out" in content + finally: + verbose_mode.reset(token) + def test_parallel_summary_writes_to_file(self, tmp_path: Path) -> None: """Test that verbose_log_parallel_summary writes to file.""" from conductor.cli.run import ( @@ -1645,3 +1710,42 @@ def test_file_output_no_ansi_for_all_styles(self, tmp_path: Path) -> None: finally: full_mode.reset(token_full) verbose_mode.reset(token_verbose) + + +class TestConsoleEventSubscriberAgentTimeout: + """Test ConsoleEventSubscriber handling of agent_timeout event.""" + + def test_agent_timeout_event_triggers_log(self) -> None: + """ConsoleEventSubscriber dispatches agent_timeout to verbose_log_agent_timeout.""" + import time + from io import StringIO + + from rich.console import Console + + from conductor.cli.run import ConsoleEventSubscriber + from conductor.events import WorkflowEvent + + subscriber = ConsoleEventSubscriber() + + output = StringIO() + token = verbose_mode.set(True) + try: + with patch( + "conductor.cli.run._verbose_console", + Console(file=output, force_terminal=True, no_color=True), + ): + event = WorkflowEvent( + type="agent_timeout", + timestamp=time.time(), + data={ + "agent_name": "slow_researcher", + "elapsed": 120.5, + "timeout_seconds": 120.0, + }, + ) + subscriber.on_event(event) + output_text = output.getvalue() + assert "slow_researcher" in output_text + assert "timed out" in output_text + finally: + verbose_mode.reset(token) From 30058b96378f97039ba1ee138c7aa891fbf8b38c Mon Sep 17 00:00:00 2001 From: Brent Rusinow Date: Wed, 6 May 2026 10:10:25 -0700 Subject: [PATCH 3/3] fix: address PR review feedback - Use 'from e' instead of 'from None' to preserve asyncio.TimeoutError chain for debugging (workflow.py) - Document hard cancellation implications in timeout_seconds docstring: in-flight sessions, MCP tools, HTTP connections may be left inconsistent (schema.py) - Document why both agent_name and current_agent exist on AgentTimeoutError for downstream consumer clarity (exceptions.py) - Pin brittle 'or' assertion to deterministic value (test) - Add TestAgentTimeoutForEach class with fail_fast and continue_on_error integration tests (test) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/conductor/config/schema.py | 6 + src/conductor/engine/workflow.py | 4 +- src/conductor/exceptions.py | 10 +- tests/test_engine/test_agent_timeout.py | 168 +++++++++++++++++++++++- 4 files changed, 181 insertions(+), 7 deletions(-) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 42b6981..200da59 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -582,6 +582,12 @@ class AgentDef(BaseModel): or workflow types). This is a hard cancellation — unlike ``max_session_seconds`` which checks between provider iterations. + Because this is a hard cancellation, in-flight provider sessions, + MCP tool calls, and HTTP connections receive ``CancelledError`` + mid-flight and may not get a clean shutdown. External state (e.g., + partially-written files, open MCP tool handles) may be left + inconsistent. + Note: Agent-level timeouts are non-retryable. The retry policy operates inside the provider and is cancelled along with the agent. diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index bff20f5..5c965cc 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -641,7 +641,7 @@ async def _execute_with_agent_timeout( start = _time.monotonic() try: return await asyncio.wait_for(coro, timeout=agent.timeout_seconds) - except TimeoutError: + except TimeoutError as e: elapsed = _time.monotonic() - start self._emit( "agent_timeout", @@ -655,7 +655,7 @@ async def _execute_with_agent_timeout( agent_name=agent.name, elapsed_seconds=elapsed, timeout_seconds=agent.timeout_seconds, - ) from None + ) from e def _build_subworkflow_inputs( self, diff --git a/src/conductor/exceptions.py b/src/conductor/exceptions.py index 04fa793..d3bac88 100644 --- a/src/conductor/exceptions.py +++ b/src/conductor/exceptions.py @@ -450,8 +450,10 @@ class AgentTimeoutError(TimeoutError): is raised when an agent's ``timeout_seconds`` configuration causes a hard cancellation via ``asyncio.wait_for()``. - Attributes: - agent_name: Name of the agent that timed out. + The timed-out agent name is available via ``current_agent`` (inherited + from ``TimeoutError``) and ``agent_name`` (inherited from + ``ExecutionError``). Both are set to the same value for consistency + with consumers that check either attribute. """ def __init__( @@ -477,8 +479,8 @@ def __init__( current_agent=agent_name, suggestion=suggestion, ) - # Set after super().__init__() because ExecutionError.__init__ - # defaults agent_name to None and would overwrite an earlier assignment. + # ExecutionError.__init__ defaults agent_name to None — set it here + # so both .agent_name and .current_agent resolve to the same value. self.agent_name = agent_name diff --git a/tests/test_engine/test_agent_timeout.py b/tests/test_engine/test_agent_timeout.py index 63c1eef..e9d86cc 100644 --- a/tests/test_engine/test_agent_timeout.py +++ b/tests/test_engine/test_agent_timeout.py @@ -18,6 +18,7 @@ from conductor.config.schema import ( AgentDef, ContextConfig, + ForEachDef, GateOption, LimitsConfig, OutputField, @@ -383,7 +384,7 @@ async def test_agent_without_timeout_completes_normally(self) -> None: provider = CopilotProvider(mock_handler=lambda a, p, c: {"answer": "42"}) engine = WorkflowEngine(config, provider) result = await engine.run({"question": "test"}) - assert result["answer"] == 42 or result["answer"] == "42" + assert result["answer"] == 42 # --------------------------------------------------------------------------- @@ -573,3 +574,168 @@ def test_agent_timeout_error_suggestion(self) -> None: ) assert "timeout_seconds" in err.suggestion assert "researcher" in err.suggestion + + +# --------------------------------------------------------------------------- +# For-each group timeout tests +# --------------------------------------------------------------------------- + + +class TestAgentTimeoutForEach: + """Agent timeout behavior in for-each groups.""" + + @pytest.mark.asyncio + async def test_timeout_in_for_each_continue_on_error(self) -> None: + """Timed-out agent in continue_on_error for-each group allows others to succeed.""" + emitter, collector = _make_emitter_and_collector() + + config = WorkflowConfig( + workflow=WorkflowDef( + name="for-each-timeout", + entry_point="finder", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=50), + ), + agents=[ + AgentDef( + name="finder", + model="gpt-4", + prompt="Find items", + output={"items": OutputField(type="array")}, + routes=[RouteDef(to="processors")], + ), + ], + for_each=[ + ForEachDef.model_validate( + { + "name": "processors", + "type": "for_each", + "source": "finder.output.items", + "as": "item", + "max_concurrent": 2, + "failure_mode": "continue_on_error", + "agent": { + "name": "processor", + "model": "gpt-4", + "prompt": "Process: {{ item }}", + "timeout_seconds": 1.0, + "output": {"result": {"type": "string"}}, + }, + "routes": [{"to": "$end"}], + } + ), + ], + output={"result": "done"}, + ) + + call_count = 0 + + def mock_handler(agent, prompt, context): + nonlocal call_count + if agent.name == "finder": + return {"items": ["item1", "item2", "item3"]} + return {"result": "processed"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, event_emitter=emitter) + + original_get_executor = engine._get_executor_for_agent + + async def patched_get_executor(agent): + executor = await original_get_executor(agent) + if agent.name == "processor": + original_exec = executor.execute + + async def slow_exec(*args, **kwargs): + nonlocal call_count + call_count += 1 + # Make every other item slow + if call_count % 2 == 0: + await asyncio.sleep(10) + return await original_exec(*args, **kwargs) + + executor.execute = slow_exec + return executor + + with patch.object(engine, "_get_executor_for_agent", side_effect=patched_get_executor): + result = await engine.run({}) + + # Workflow should complete (continue_on_error) + assert result is not None + # At least one agent_timeout should have been emitted + assert "agent_timeout" in collector.types() + + @pytest.mark.asyncio + async def test_timeout_in_for_each_fail_fast(self) -> None: + """Timed-out agent in fail_fast for-each group fails the group.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="for-each-timeout-fail", + entry_point="finder", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=50), + ), + agents=[ + AgentDef( + name="finder", + model="gpt-4", + prompt="Find items", + output={"items": OutputField(type="array")}, + routes=[RouteDef(to="processors")], + ), + ], + for_each=[ + ForEachDef.model_validate( + { + "name": "processors", + "type": "for_each", + "source": "finder.output.items", + "as": "item", + "max_concurrent": 1, + "failure_mode": "fail_fast", + "agent": { + "name": "processor", + "model": "gpt-4", + "prompt": "Process: {{ item }}", + "timeout_seconds": 1.0, + "output": {"result": {"type": "string"}}, + }, + "routes": [{"to": "$end"}], + } + ), + ], + output={"result": "done"}, + ) + + def mock_handler(agent, prompt, context): + if agent.name == "finder": + return {"items": ["item1"]} + return {"result": "processed"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider) + + original_get_executor = engine._get_executor_for_agent + + async def patched_get_executor(agent): + executor = await original_get_executor(agent) + if agent.name == "processor": + original_exec = executor.execute + + async def slow_exec(*args, **kwargs): + await asyncio.sleep(10) + return await original_exec(*args, **kwargs) + + executor.execute = slow_exec + return executor + + with ( + patch.object(engine, "_get_executor_for_agent", side_effect=patched_get_executor), + pytest.raises(Exception) as exc_info, + ): + await engine.run({}) + + error_str = str(exc_info.value) + assert "timeout" in error_str.lower() or "processor" in error_str