Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,38 @@ def verbose_log_parallel_agent_failed(
_file_console.print(error_msg)


def verbose_log_agent_timeout(
agent_name: str,
elapsed: float,
timeout_seconds: float,
) -> None:
"""Log agent timeout.

Args:
agent_name: Name of the agent that timed out.
elapsed: Elapsed time in seconds.
timeout_seconds: Configured timeout limit.
"""
from rich.text import Text

from conductor.cli.app import is_verbose

should_console = is_verbose()
should_file = _file_console is not None
if not should_console and not should_file:
return

text = Text()
text.append(" ⏱ ", style="yellow")
text.append(agent_name, style="yellow bold")
text.append(f" timed out after {elapsed:.1f}s (limit: {timeout_seconds:.0f}s)", style="dim")

if should_console:
_verbose_console.print(text)
if _file_console is not None:
_file_console.print(text)


def verbose_log_parallel_summary(
group_name: str,
success_count: int,
Expand Down Expand Up @@ -665,6 +697,13 @@ def on_event(self, event: WorkflowEvent) -> None:
output_tokens=d.get("output_tokens"),
)

elif t == "agent_timeout":
verbose_log_agent_timeout(
d.get("agent_name", "?"),
d.get("elapsed", 0.0),
d.get("timeout_seconds", 0.0),
)

elif t == "route_taken":
verbose_log_route(d.get("to_agent", "?"))

Expand Down
32 changes: 32 additions & 0 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,29 @@ class AgentDef(BaseModel):
max_depth: 3 # Allow at most 3 levels of recursion
"""

timeout_seconds: float | None = Field(None, ge=1.0)
"""Hard wall-clock timeout for this agent's execution in seconds.

When set, the engine wraps the entire agent execution in
``asyncio.wait_for()``. If exceeded, raises ``AgentTimeoutError``
which is handled by existing error semantics (``fail_fast``,
``continue_on_error``).

The effective timeout is ``min(timeout_seconds, remaining_workflow_timeout)``
so agent timeouts never exceed the workflow-level limit.

Only applies to provider-backed agents (not script, human_gate,
or workflow types). This is a hard cancellation — unlike
``max_session_seconds`` which checks between provider iterations.

Note: Agent-level timeouts are non-retryable. The retry policy
operates inside the provider and is cancelled along with the agent.

Example::

timeout_seconds: 120 # Cancel agent after 2 minutes
"""

max_session_seconds: float | None = Field(None, ge=1.0)
"""Maximum wall-clock duration for this agent's session in seconds.

Expand Down Expand Up @@ -610,6 +633,8 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("human_gate agents cannot have 'dialog'")
if self.max_depth is not None:
raise ValueError("human_gate agents cannot have 'max_depth'")
if self.timeout_seconds is not None:
raise ValueError("human_gate agents cannot have 'timeout_seconds'")
elif self.type == "script":
if not self.command:
raise ValueError("script agents require 'command'")
Expand Down Expand Up @@ -642,6 +667,11 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("script agents cannot have 'dialog'")
if self.max_depth is not None:
raise ValueError("script agents cannot have 'max_depth'")
if self.timeout_seconds is not None:
raise ValueError(
"script agents cannot have 'timeout_seconds' "
"(use 'timeout' for script-specific timeouts)"
)
elif self.type == "workflow":
if not self.workflow:
raise ValueError("workflow agents require 'workflow' path")
Expand All @@ -667,6 +697,8 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("workflow agents cannot have 'retry'")
if self.dialog is not None:
raise ValueError("workflow agents cannot have 'dialog'")
if self.timeout_seconds is not None:
raise ValueError("workflow agents cannot have 'timeout_seconds'")
else:
# Regular agent or human_gate — input_mapping is not valid
if self.input_mapping is not None:
Expand Down
86 changes: 75 additions & 11 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from conductor.engine.usage import UsageTracker, WorkflowUsage
from conductor.events import WorkflowEvent, WorkflowEventEmitter
from conductor.exceptions import (
AgentTimeoutError,
ConductorError,
ExecutionError,
InterruptError,
Expand Down Expand Up @@ -601,6 +602,60 @@ async def _execute_script(self, agent: AgentDef, context: dict[str, Any]) -> Scr
operation_name=f"script '{agent.name}'",
)

async def _execute_with_agent_timeout(
self,
agent: AgentDef,
coro: Any,
) -> Any:
"""Wrap an agent execution coroutine with the agent's timeout_seconds.

If the agent has ``timeout_seconds`` configured, this wraps the coroutine
in ``asyncio.wait_for()`` with an effective timeout that is the minimum of
the agent's timeout and any remaining workflow-level timeout.

When the remaining workflow timeout is stricter, the coroutine runs
without the agent timeout wrapper so the existing workflow timeout
path handles the error with correct attribution.

On timeout, emits an ``agent_timeout`` event and raises ``AgentTimeoutError``.

Args:
agent: Agent definition with optional ``timeout_seconds``.
coro: The coroutine to execute (typically ``executor.execute(...)``).

Returns:
Result of the coroutine.

Raises:
AgentTimeoutError: If the agent exceeds its timeout_seconds.
"""
if agent.timeout_seconds is None:
return await coro

# If workflow remaining timeout is stricter, let it own the error
remaining = self.limits.get_remaining_timeout()
if remaining is not None and remaining <= agent.timeout_seconds:
return await coro

start = _time.monotonic()
try:
return await asyncio.wait_for(coro, timeout=agent.timeout_seconds)
except TimeoutError:
elapsed = _time.monotonic() - start
self._emit(
"agent_timeout",
{
"agent_name": agent.name,
"elapsed": elapsed,
"timeout_seconds": agent.timeout_seconds,
},
)
raise AgentTimeoutError(
agent_name=agent.name,
elapsed_seconds=elapsed,
timeout_seconds=agent.timeout_seconds,
) from None

def _build_subworkflow_inputs(
self,
agent: AgentDef,
Expand Down Expand Up @@ -2121,12 +2176,15 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
executor = await self._get_executor_for_agent(agent)
guidance_section = self.context.get_guidance_prompt_section()
event_callback = self._make_event_callback(agent.name)
output = await executor.execute(
output = await self._execute_with_agent_timeout(
agent,
agent_context,
guidance_section=guidance_section,
interrupt_signal=self._interrupt_event,
event_callback=event_callback,
executor.execute(
agent,
agent_context,
guidance_section=guidance_section,
interrupt_signal=self._interrupt_event,
event_callback=event_callback,
),
)
_agent_elapsed = _time.time() - _agent_start

Expand Down Expand Up @@ -2823,10 +2881,13 @@ async def execute_single_agent(agent: AgentDef) -> tuple[str, Any]:
# Execute agent (get executor for multi-provider support)
executor = await self._get_executor_for_agent(agent)
event_callback = self._make_event_callback(agent.name)
output = await executor.execute(
output = await self._execute_with_agent_timeout(
agent,
agent_context,
event_callback=event_callback,
executor.execute(
agent,
agent_context,
event_callback=event_callback,
),
)
_agent_elapsed = _time.time() - _agent_start

Expand Down Expand Up @@ -3246,10 +3307,13 @@ def _item_callback(event_type: str, data: dict[str, Any]) -> None:
self._emit(event_type, data_with_agent)

event_callback = _item_callback if self._event_emitter else None
output = await executor.execute(
output = await self._execute_with_agent_timeout(
for_each_group.agent,
agent_context,
event_callback=event_callback,
executor.execute(
for_each_group.agent,
agent_context,
event_callback=event_callback,
),
)
_item_elapsed = _time.time() - _item_start

Expand Down
39 changes: 39 additions & 0 deletions src/conductor/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,45 @@ def __init__(
super().__init__(message, suggestion, file_path, line_number)


class AgentTimeoutError(TimeoutError):
"""Raised when an individual agent exceeds its per-agent timeout_seconds limit.

This is distinct from the workflow-level TimeoutError. An AgentTimeoutError
is raised when an agent's ``timeout_seconds`` configuration causes a hard
cancellation via ``asyncio.wait_for()``.

Attributes:
agent_name: Name of the agent that timed out.
"""

def __init__(
self,
*,
agent_name: str,
elapsed_seconds: float,
timeout_seconds: float,
) -> None:
"""Initialize an AgentTimeoutError.

Args:
agent_name: Name of the agent that timed out.
elapsed_seconds: The time elapsed before the timeout.
timeout_seconds: The configured per-agent timeout limit.
"""
message = f"Agent '{agent_name}' exceeded its timeout ({timeout_seconds}s)"
suggestion = f"Increase timeout_seconds for agent '{agent_name}' or optimize its execution"
super().__init__(
message,
elapsed_seconds=elapsed_seconds,
timeout_seconds=timeout_seconds,
current_agent=agent_name,
suggestion=suggestion,
)
# Set after super().__init__() because ExecutionError.__init__
# defaults agent_name to None and would overwrite an earlier assignment.
self.agent_name = agent_name


class HumanGateError(ExecutionError):
"""Raised when a human gate encounters an error.

Expand Down
Loading
Loading