Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 102 additions & 16 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
GateResult,
HumanGateHandler,
MaxIterationsHandler,
MaxIterationsPromptResult,
)
from conductor.gates.interrupt import InterruptAction, InterruptHandler, InterruptResult
from conductor.providers.base import AgentOutput
Expand Down Expand Up @@ -2442,20 +2443,63 @@ async def _check_iteration_with_prompt(self, agent_name: str) -> None:

Raises:
MaxIterationsError: If limit exceeded and user chooses not to continue.

Emits:
``iteration_limit_reached`` before the gate, and
``iteration_limit_resolved`` after — even when the prompt raises an
unexpected exception (with ``aborted=True``). See issue #134.
"""
try:
self.limits.check_iteration(agent_name)
except MaxIterationsError:
# Prompt user for more iterations
await self._suspend_listener()
# Surface the gate to subscribers (web dashboard, JSONL log) before
# blocking on the console prompt — otherwise the workflow appears
# silently stalled when monitored via --web. See issue #134.
recent_history = self.limits.execution_history[-5:]
self._emit(
"iteration_limit_reached",
{
"agent_name": agent_name,
"current_iteration": self.limits.current_iteration,
"max_iterations": self.limits.max_iterations,
"agent_history": recent_history,
"possible_loop": len(set(recent_history[-3:])) <= 1
and len(recent_history) >= 3,
"skip_gates": self.max_iterations_handler.skip_gates,
},
)

# Wrap resolved emission in an outer finally so the dashboard gate
# always closes — even if the prompt itself raises (EOFError on
# non-TTY, KeyboardInterrupt race, asyncio.CancelledError, etc.).
# Without this guarantee, the original #134 symptom recurs on the
# error path.
result: MaxIterationsPromptResult | None = None
try:
result = await self.max_iterations_handler.handle_limit_reached(
current_iteration=self.limits.current_iteration,
max_iterations=self.limits.max_iterations,
agent_history=self.limits.execution_history,
)
await self._suspend_listener()
try:
result = await self.max_iterations_handler.handle_limit_reached(
current_iteration=self.limits.current_iteration,
max_iterations=self.limits.max_iterations,
agent_history=self.limits.execution_history,
)
finally:
await self._resume_listener()
finally:
await self._resume_listener()
self._emit(
"iteration_limit_resolved",
{
"agent_name": agent_name,
"continue_execution": (
result.continue_execution if result is not None else False
),
"additional_iterations": (
result.additional_iterations if result is not None else 0
),
"aborted": result is None,
},
)

if result.continue_execution:
self.limits.increase_limit(result.additional_iterations)
# Re-check should now pass
Expand All @@ -2477,20 +2521,62 @@ async def _check_parallel_group_iteration_with_prompt(

Raises:
MaxIterationsError: If limit exceeded and user chooses not to continue.

Emits:
``iteration_limit_reached`` before the gate, and
``iteration_limit_resolved`` after — even when the prompt raises an
unexpected exception (with ``aborted=True``). See issue #134.
"""
try:
self.limits.check_parallel_group_iteration(group_name, agent_count)
except MaxIterationsError:
# Prompt user for more iterations
await self._suspend_listener()
# See _check_iteration_with_prompt — same dashboard-visibility fix
# for parallel groups (issue #134). Note: agent_history here is
# the cross-group execution list, so the possible_loop heuristic
# may surface false positives for true parallel patterns.
recent_history = self.limits.execution_history[-5:]
self._emit(
"iteration_limit_reached",
{
"group_name": group_name,
"agent_count": agent_count,
"current_iteration": self.limits.current_iteration,
"max_iterations": self.limits.max_iterations,
"agent_history": recent_history,
"possible_loop": len(set(recent_history[-3:])) <= 1
and len(recent_history) >= 3,
"skip_gates": self.max_iterations_handler.skip_gates,
},
)

# Wrap resolved emission in an outer finally so the dashboard gate
# always closes — see _check_iteration_with_prompt for the rationale.
result: MaxIterationsPromptResult | None = None
try:
result = await self.max_iterations_handler.handle_limit_reached(
current_iteration=self.limits.current_iteration,
max_iterations=self.limits.max_iterations,
agent_history=self.limits.execution_history,
)
await self._suspend_listener()
try:
result = await self.max_iterations_handler.handle_limit_reached(
current_iteration=self.limits.current_iteration,
max_iterations=self.limits.max_iterations,
agent_history=self.limits.execution_history,
)
finally:
await self._resume_listener()
finally:
await self._resume_listener()
self._emit(
"iteration_limit_resolved",
{
"group_name": group_name,
"continue_execution": (
result.continue_execution if result is not None else False
),
"additional_iterations": (
result.additional_iterations if result is not None else 0
),
"aborted": result is None,
},
)

if result.continue_execution:
self.limits.increase_limit(result.additional_iterations)
# Re-check should now pass
Expand Down
6 changes: 5 additions & 1 deletion src/conductor/gates/human.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,5 +377,9 @@ def _ask_int() -> int:

value = await asyncio.to_thread(_ask_int)
return max(0, value) # Ensure non-negative
except (ValueError, KeyboardInterrupt):
except (ValueError, KeyboardInterrupt, EOFError):
# EOFError fires when stdin is not a TTY (CI, ``< /dev/null``,
# containers without an attached terminal). Treat it as
# "stop" — same as the user typing 0 — so the dashboard's
# iteration_limit_resolved event still fires (issue #134).
return 0
35 changes: 27 additions & 8 deletions src/conductor/web/frontend/src/components/layout/StatusBar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export function StatusBar() {
const wsStatus = useWorkflowStore((s) => s.wsStatus);
const workflowFailure = useWorkflowStore((s) => s.workflowFailure);
const lastEventTime = useWorkflowStore((s) => s.lastEventTime);
const iterationLimitGate = useWorkflowStore((s) => s.iterationLimitGate);
const elapsed = useElapsedTimer();

// "Last activity X ago" — ticks every second while running
Expand All @@ -33,6 +34,11 @@ export function StatusBar() {
const isFailed = workflowStatus === 'failed';

const statusText = (() => {
if (iterationLimitGate && workflowStatus === 'running') {
const target = iterationLimitGate.agent_name ?? iterationLimitGate.group_name ?? 'workflow';
const auto = iterationLimitGate.skip_gates ? ' — auto-stopping' : ' — awaiting console input';
return `Iteration limit reached: ${target} ${iterationLimitGate.current_iteration}/${iterationLimitGate.max_iterations}${auto}`;
}
switch (workflowStatus) {
case 'pending':
return 'Waiting for workflow\u2026';
Expand All @@ -56,12 +62,15 @@ export function StatusBar() {
}
})();

const statusDotColor = {
pending: 'bg-[var(--pending)]',
running: 'bg-[var(--running)] animate-pulse',
completed: 'bg-[var(--completed)]',
failed: 'bg-[var(--failed)]',
}[workflowStatus];
const isAwaitingIterationGate = iterationLimitGate != null && workflowStatus === 'running';
const statusDotColor = isAwaitingIterationGate
? 'bg-[var(--waiting)] animate-pulse'
: ({
pending: 'bg-[var(--pending)]',
running: 'bg-[var(--running)] animate-pulse',
completed: 'bg-[var(--completed)]',
failed: 'bg-[var(--failed)]',
}[workflowStatus]);

const wsIndicator = (() => {
switch (wsStatus) {
Expand Down Expand Up @@ -102,11 +111,21 @@ export function StatusBar() {
'flex items-center gap-4 px-4 py-1.5 border-t text-xs flex-shrink-0 transition-colors duration-300',
isFailed
? 'bg-red-950/50 border-red-500/30'
: 'bg-[var(--surface)] border-[var(--border)]',
: isAwaitingIterationGate
? 'bg-amber-950/30 border-amber-500/30'
: 'bg-[var(--surface)] border-[var(--border)]',
)}
>
<span className={cn('w-2 h-2 rounded-full flex-shrink-0', statusDotColor)} />
<span className={cn(isFailed ? 'text-red-300' : 'text-[var(--text)]')}>
<span
className={cn(
isFailed
? 'text-red-300'
: isAwaitingIterationGate
? 'text-amber-200'
: 'text-[var(--text)]',
)}
>
{statusText}
</span>
{agentsTotal > 0 && (
Expand Down
87 changes: 87 additions & 0 deletions src/conductor/web/frontend/src/stores/workflow-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import type {
SubworkflowStartedData,
SubworkflowCompletedData,
SubworkflowFailedData,
IterationLimitReachedData,
IterationLimitResolvedData,
} from '@/types/events';

export interface ActivityEntry {
Expand Down Expand Up @@ -268,6 +270,8 @@ interface WorkflowState {
workflowOutput: unknown | null;
lastEventTime: number | null;
isPaused: boolean;
/** Set when the engine is blocked on a max-iterations gate (issue #134). */
iterationLimitGate: IterationLimitReachedData | null;

// --- Subworkflow depth tracking ---
/** Current nesting depth: 0 = root workflow events are active */
Expand Down Expand Up @@ -499,6 +503,7 @@ export const useWorkflowStore = create<WorkflowState>((set, get) => ({
workflowOutput: null,
lastEventTime: null,
isPaused: false,
iterationLimitGate: null,
wfDepth: 0,
subworkflowContexts: [],
activeContextPath: [],
Expand Down Expand Up @@ -690,6 +695,7 @@ export const useWorkflowStore = create<WorkflowState>((set, get) => ({
parallelGroups: [],
forEachGroups: [],
isPaused: false,
iterationLimitGate: null,
lastEventTime: null,
activeDialog: null,
dialogEngaged: false,
Expand Down Expand Up @@ -1343,6 +1349,9 @@ const eventHandlers: Record<string, (state: MutableState, data: Record<string, u
const data = _data as { output?: unknown };
state.workflowStatus = 'completed';
state.isPaused = false;
// Clear any iteration-limit gate that wasn't paired with a resolved
// event (defense-in-depth — see issue #134).
state.iterationLimitGate = null;
state.workflowOutput = data.output ?? null;
if (state.nodes['$end']) {
state.nodes['$end']!.status = 'completed';
Expand Down Expand Up @@ -1381,6 +1390,9 @@ const eventHandlers: Record<string, (state: MutableState, data: Record<string, u
// Root workflow failed
state.workflowStatus = 'failed';
state.isPaused = false;
// Clear any lingering iteration-limit gate so the StatusBar doesn't
// keep an orphan banner around (defense-in-depth — see issue #134).
state.iterationLimitGate = null;
state.workflowFailedAgent = data.agent_name || null;
if (data.agent_name && state.nodes[data.agent_name]) {
state.nodes[data.agent_name]!.status = 'failed';
Expand Down Expand Up @@ -1601,6 +1613,56 @@ const eventHandlers: Record<string, (state: MutableState, data: Record<string, u
state.isPaused = false;
},

iteration_limit_reached: (state, _data) => {
const data = _data as unknown as IterationLimitReachedData;
// Reuse the event payload directly so the slice always tracks the
// canonical shape (no inline type drift — see types/events.ts).
state.iterationLimitGate = data;
const target = data.agent_name ?? data.group_name;
if (target) {
const nd = ensureNode(state.nodes, target);
nd.activity.push({
type: 'iteration_limit_reached',
icon: '⚠',
label: 'Iteration limit',
text: `Reached ${data.current_iteration}/${data.max_iterations} iterations — ${
data.skip_gates ? 'auto-stopping (--skip-gates)' : 'awaiting console input'
}`,
});
replaceNode(state.nodes, target);
} else if (typeof console !== 'undefined') {
console.warn(
'[workflow-store] iteration_limit_reached event missing both agent_name and group_name',
data,
);
}
},

iteration_limit_resolved: (state, _data) => {
const data = _data as unknown as IterationLimitResolvedData;
state.iterationLimitGate = null;
const target = data.agent_name ?? data.group_name;
if (target) {
const nd = ensureNode(state.nodes, target);
nd.activity.push({
type: 'iteration_limit_resolved',
icon: data.continue_execution ? '▶' : '■',
label: 'Iteration limit',
text: data.aborted
? 'Gate aborted unexpectedly — stopping workflow'
: data.continue_execution
? `Continuing with ${data.additional_iterations} more iteration(s)`
: 'Stopping workflow',
});
replaceNode(state.nodes, target);
} else if (typeof console !== 'undefined') {
console.warn(
'[workflow-store] iteration_limit_resolved event missing both agent_name and group_name',
data,
);
}
},

dialog_started: (state, _data) => {
const data = _data as unknown as DialogStartedData;
const nd = ensureNode(state.nodes, data.agent_name);
Expand Down Expand Up @@ -1717,6 +1779,31 @@ function buildLogEntry(event: WorkflowEvent): LogEntry | null {
case 'agent_resumed':
return { timestamp: ts, level: 'info', source: String(d.agent_name), message: 'Agent resumed — re-executing' };

case 'iteration_limit_reached': {
const target = (d.agent_name ?? d.group_name ?? 'workflow') as string;
const auto = d.skip_gates ? ' — auto-stopping (--skip-gates)' : ' — awaiting console input';
return {
timestamp: ts,
level: 'warning',
source: String(target),
message: `Iteration limit reached (${d.current_iteration}/${d.max_iterations})${auto}`,
};
}

case 'iteration_limit_resolved': {
const target = (d.agent_name ?? d.group_name ?? 'workflow') as string;
const continued = Boolean(d.continue_execution);
const additional = (d.additional_iterations as number) ?? 0;
return {
timestamp: ts,
level: continued ? 'info' : 'warning',
source: String(target),
message: continued
? `Iteration limit resolved — continuing with ${additional} more`
: 'Iteration limit resolved — stopping workflow',
};
}

case 'dialog_started':
return { timestamp: ts, level: 'warning', source: String(d.agent_name), message: 'Dialog started — waiting for user…' };

Expand Down
Loading
Loading