-
Notifications
You must be signed in to change notification settings - Fork 8
[P1 #261] runtime utils: withTimeout + classifyRuntimeResult (covers codex/grok/telegram in one pass) #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,9 +44,13 @@ import { CurrentAliasResolver } from "./runtime/current-alias"; | |
| import { delegationTargetExists } from "./runtime/delegation-precheck"; | ||
| import { | ||
| isRateLimitOrQuotaError, | ||
| isEmptyResultSoftFailure, | ||
| quotaRemediationHint, | ||
| } from "./runtime/claude-error-classify"; | ||
| import { | ||
| classifyRuntimeResult, | ||
| formatClassificationError, | ||
| } from "./runtime/classify-result"; | ||
| import { withTimeout, TimeoutError, resolveTimeoutMs } from "./util/timeout"; | ||
|
|
||
| const home = homedir(); | ||
|
|
||
|
|
@@ -314,6 +318,45 @@ const CLAUDE_MAX_RETRIES = parseInt( | |
| opts["claude-max-retries"] || process.env.CLAUDE_MAX_RETRIES | ||
| || fileConfig.flags?.claudeMaxRetries || fileConfig.claudeMaxRetries || "2" | ||
| ); | ||
| // #261 P1 redirect (2026-06-28) — codex previously had zero wall-clock | ||
| // guard, so a wedged turn could hang forever (no abort path, no retry). | ||
| // Mirror CLAUDE_TIMEOUT_MS shape but default 300s, settable via env / | ||
| // flag for the parity flags listed in docs/runbooks/. resolveTimeoutMs | ||
| // honours `0` as "disabled" so power users can opt out. | ||
| const CODEX_TIMEOUT_MS = resolveTimeoutMs({ | ||
| envValue: opts["codex-timeout-ms"] || process.env.CODEX_TIMEOUT_MS, | ||
| flagValue: typeof fileConfig.flags?.codexTimeoutMs === "number" ? fileConfig.flags.codexTimeoutMs : undefined, | ||
| defaultMs: 300_000, | ||
| }).valueMs; | ||
| // Grok handshake (initialize + authenticate + session/new) is decoupled | ||
| // from the prompt timeout. Pre-redirect both shared the same 300s knob, | ||
| // so a stuck handshake hid behind the prompt deadline. | ||
| // | ||
| // Back-compat: when no explicit env / flag is set, leave this `undefined` | ||
| // and let `runGrokAcpTurn` apply its own default `min(45s, timeoutMs)`. | ||
| // Forcing a fixed 45s here would silently widen the handshake deadline | ||
| // for users who already set a tight `GROK_ACP_TIMEOUT_MS` (e.g. 10s) — | ||
| // they expect handshake ≤ timeoutMs, not handshake = 45s. | ||
| const GROK_HANDSHAKE_TIMEOUT_MS: number | undefined = (() => { | ||
| const envSet = process.env.GROK_HANDSHAKE_TIMEOUT_MS !== undefined | ||
| && process.env.GROK_HANDSHAKE_TIMEOUT_MS !== ""; | ||
| const flagSet = typeof fileConfig.flags?.grokHandshakeTimeoutMs === "number"; | ||
| if (!envSet && !flagSet) return undefined; | ||
| return resolveTimeoutMs({ | ||
| envValue: process.env.GROK_HANDSHAKE_TIMEOUT_MS, | ||
| flagValue: typeof fileConfig.flags?.grokHandshakeTimeoutMs === "number" | ||
| ? fileConfig.flags.grokHandshakeTimeoutMs | ||
| : undefined, | ||
| defaultMs: 45_000, | ||
| }).valueMs; | ||
| })(); | ||
| // Telegram getUpdates long-poll (server-side 30s) needs a client-side | ||
| // safety net so a wedged TCP socket / DPI drop doesn't pin the poll | ||
| // loop forever. 45s leaves 15s headroom over the server timeout. | ||
| const TELEGRAM_GETUPDATES_TIMEOUT_MS = resolveTimeoutMs({ | ||
| envValue: process.env.TELEGRAM_GETUPDATES_TIMEOUT_MS, | ||
| defaultMs: 45_000, | ||
| }).valueMs; | ||
| const NEW_SESSION = opts["new-session"] === "true"; | ||
| const SESSION_ID = NEW_SESSION ? "" : ( | ||
| RUNTIME === "grok" | ||
|
|
@@ -1522,56 +1565,70 @@ async function processWithClaude(task: string, from: string, images?: string[]): | |
| // abort controller + timeout window. On auth-class error, short-circuit | ||
| // (fast-fail, no retry). On transient error / timeout, backoff 4s, 8s | ||
| // (+ jitter to spread herd retries across the vendor queue) and retry. | ||
| // | ||
| // #261 P1 redirect (2026-06-28) — wrap the for-await loop in withTimeout | ||
| // so the deadline / abort / cleanup contract matches codex / grok / | ||
| // telegram. The SDK's `query()` watches `options.abortController.signal` | ||
| // for cancellation, so we still create an AbortController inside the | ||
| // factory and forward withTimeout's signal into it. | ||
| let lastErr: string = ""; | ||
| let timedOutFinal = false; | ||
| for (let attempt = 0; attempt <= CLAUDE_MAX_RETRIES; attempt++) { | ||
| let timedOut = false; | ||
| let timer: ReturnType<typeof setTimeout> | undefined; | ||
| const ac = new AbortController(); | ||
| options.abortController = ac; | ||
| if (CLAUDE_TIMEOUT_MS > 0) { | ||
| timer = setTimeout(() => { timedOut = true; ac.abort(); }, CLAUDE_TIMEOUT_MS); | ||
| } | ||
| const attemptStart = Date.now(); | ||
| try { | ||
| result = ""; // reset accumulator for this attempt | ||
| for await (const message of query({ prompt, options })) { | ||
| const m = message as any; | ||
| if (m.type === "system" && m.subtype === "init") { | ||
| claudeSessionId = m.session_id; | ||
| log(`[claude] session=${m.session_id?.slice(0, 8)} model=${MODEL || "default"} attempt=${attempt + 1}`); | ||
| writebackSession(m.session_id); | ||
| } | ||
| if (m.type === "result") { | ||
| const dt = Date.now() - t0; | ||
| const u = m.usage || {}; | ||
| log(`[claude] ${m.subtype} | ${dt}ms | $${m.total_cost_usd?.toFixed(4) || "?"} | in=${u.input_tokens || 0} out=${u.output_tokens || 0} | turns=${m.num_turns}${attempt > 0 ? ` | attempt=${attempt + 1}` : ""}`); | ||
| if (m.subtype === "success") { | ||
| // #261 P1-① (2026-06-28) — empty-result soft-failure gate. | ||
| // Pre-fix `m.result || "任务完成"` silently rebranded an | ||
| // empty vendor reply as "task complete" — root cause of the | ||
| // M3 incident where a silent rate-limit produced 0 output | ||
| // tokens but the upstream caller saw "任务完成" and moved on. | ||
| // Now: classifier surfaces the empty reply as a soft fail | ||
| // with a vendor-specific hint, so the next hop / operator | ||
| // sees the truth. | ||
| if (isEmptyResultSoftFailure(m)) { | ||
| const hint = quotaRemediationHint(process.env.ANTHROPIC_BASE_URL); | ||
| log(`[claude] ✗ vendor returned EMPTY result despite success subtype (in=${u.input_tokens || 0}, out=${u.output_tokens || 0}, result.len=${String(m.result || "").length}) — treating as soft fail`); | ||
| result = `执行出错: vendor 返回空响应 (in=${u.input_tokens || 0} out=${u.output_tokens || 0}) — 疑似 vendor 静默限流/配额. ${hint}`; | ||
| } else { | ||
| result = m.result; | ||
| result = await withTimeout(async (signal) => { | ||
| let inner = ""; | ||
| const ac = new AbortController(); | ||
| options.abortController = ac; | ||
| const forward = () => ac.abort(); | ||
| signal.addEventListener("abort", forward, { once: true }); | ||
| try { | ||
| for await (const message of query({ prompt, options })) { | ||
| const m = message as any; | ||
| if (m.type === "system" && m.subtype === "init") { | ||
| claudeSessionId = m.session_id; | ||
| log(`[claude] session=${m.session_id?.slice(0, 8)} model=${MODEL || "default"} attempt=${attempt + 1}`); | ||
| writebackSession(m.session_id); | ||
| } | ||
| if (m.type === "result") { | ||
| const dt = Date.now() - t0; | ||
| const u = m.usage || {}; | ||
| log(`[claude] ${m.subtype} | ${dt}ms | $${m.total_cost_usd?.toFixed(4) || "?"} | in=${u.input_tokens || 0} out=${u.output_tokens || 0} | turns=${m.num_turns}${attempt > 0 ? ` | attempt=${attempt + 1}` : ""}`); | ||
| if (m.subtype === "success") { | ||
| // #261 P1 redirect (2026-06-28) — delegate to classifyRuntimeResult | ||
| // which folds the empty-result rule from #267 + the in=0 & out=0 | ||
| // & cost=0 silent-reject rule into one decision shared with | ||
| // codex / grok. Pre-fix `m.result || "任务完成"` silently | ||
| // rebranded an empty vendor reply as "task complete" — the M3 | ||
| // incident shape. Now a non-success classification surfaces a | ||
| // soft-fail string the upstream caller can act on. | ||
| const cls = classifyRuntimeResult( | ||
| { result: m.result, usage: m.usage, totalCostUsd: m.total_cost_usd }, | ||
| { baseUrl: process.env.ANTHROPIC_BASE_URL }, | ||
| ); | ||
| if (cls.kind === "success") { | ||
| inner = m.result; | ||
| } else { | ||
| log(`[claude] ✗ ${cls.reason || cls.kind} (in=${u.input_tokens || 0}, out=${u.output_tokens || 0}, cost=${m.total_cost_usd ?? "?"})`); | ||
| inner = formatClassificationError(cls, { runtime: "claude-agent-sdk", usage: m.usage }); | ||
| } | ||
| } else { | ||
| inner = `执行出错: ${m.error || m.result || "未知错误"}`; | ||
| } | ||
| } | ||
| } else { | ||
| result = `执行出错: ${m.error || m.result || "未知错误"}`; | ||
| } | ||
| return inner; | ||
| } finally { | ||
| signal.removeEventListener("abort", forward); | ||
| } | ||
| } | ||
| if (timer) clearTimeout(timer); | ||
| }, CLAUDE_TIMEOUT_MS, `claude-attempt-${attempt + 1}/${CLAUDE_MAX_RETRIES + 1}`); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If Useful? React with 👍 / 👎. |
||
| return result; | ||
| } catch (err: any) { | ||
| if (timer) clearTimeout(timer); | ||
| const msg = String(err?.message || err).slice(0, 300); | ||
| timedOut = err instanceof TimeoutError; | ||
| const msg = err instanceof TimeoutError | ||
| ? `timed out after ${err.timeoutMs}ms` | ||
| : String(err?.message || err).slice(0, 300); | ||
| const attemptDt = Date.now() - attemptStart; | ||
|
|
||
| // Fast-fail on auth errors — no point retrying with the same bad key. | ||
|
|
@@ -1764,33 +1821,76 @@ async function processWithCodex(task: string, from: string, images?: string[]): | |
| : promptText; | ||
| const t0 = Date.now(); | ||
| try { | ||
| const { events } = await codexThread.runStreamed(input); | ||
| let finalResponse = ""; | ||
| let usage: any = null; | ||
| let itemCount = 0; | ||
| for await (const ev of events) { | ||
| if (ev.type === "item.started") { | ||
| const it = ev.item as any; | ||
| debug(`[codex] ${it.type}${it.command ? `: ${it.command.slice(0, 60)}` : it.tool ? `: ${it.server}/${it.tool}` : ""}`); | ||
| } else if (ev.type === "item.completed") { | ||
| itemCount++; | ||
| const it = ev.item as any; | ||
| if (it.type === "agent_message") finalResponse = it.text || ""; | ||
| if (it.type === "command_execution") debug(`[codex] cmd exit=${it.exit_code} | ${it.aggregated_output?.slice(0, 80)}`); | ||
| if (it.type === "reasoning") debug(`[codex] thinking: ${it.text?.slice(0, 80)}`); | ||
| if (it.type === "mcp_tool_call") debug(`[codex] mcp: ${it.server}/${it.tool} → ${it.status}`); | ||
| } else if (ev.type === "turn.completed") { | ||
| usage = ev.usage; | ||
| } | ||
| } | ||
| // #261 P1 redirect (2026-06-28) — wrap the codex turn in withTimeout. | ||
| // Pre-fix: codex had ZERO wall-clock guard — a wedged turn (vendor | ||
| // outage, dropped TCP) would hang the agent-node forever with no | ||
| // abort path. Codex SDK's `TurnOptions.signal` lets us propagate | ||
| // cancellation cleanly when the deadline fires. | ||
| const outcome = await withTimeout( | ||
| async (signal) => { | ||
| const { events } = await codexThread.runStreamed(input, { signal }); | ||
| let finalResponse = ""; | ||
| let usage: any = null; | ||
| let itemCount = 0; | ||
| for await (const ev of events) { | ||
| if (ev.type === "item.started") { | ||
| const it = ev.item as any; | ||
| debug(`[codex] ${it.type}${it.command ? `: ${it.command.slice(0, 60)}` : it.tool ? `: ${it.server}/${it.tool}` : ""}`); | ||
| } else if (ev.type === "item.completed") { | ||
| itemCount++; | ||
| const it = ev.item as any; | ||
| if (it.type === "agent_message") finalResponse = it.text || ""; | ||
| if (it.type === "command_execution") debug(`[codex] cmd exit=${it.exit_code} | ${it.aggregated_output?.slice(0, 80)}`); | ||
| if (it.type === "reasoning") debug(`[codex] thinking: ${it.text?.slice(0, 80)}`); | ||
| if (it.type === "mcp_tool_call") debug(`[codex] mcp: ${it.server}/${it.tool} → ${it.status}`); | ||
| } else if (ev.type === "turn.completed") { | ||
| usage = ev.usage; | ||
| } | ||
| } | ||
| return { finalResponse, usage, itemCount }; | ||
| }, | ||
| CODEX_TIMEOUT_MS, | ||
| "codex-turn", | ||
| ); | ||
| const dt = Date.now() - t0; | ||
| const inTokens = usage?.input_tokens || 0; | ||
| log(`[codex] done | ${dt}ms | in=${inTokens} out=${usage?.output_tokens || 0} | items=${itemCount}`); | ||
| const inTokens = outcome.usage?.input_tokens || 0; | ||
| log(`[codex] done | ${dt}ms | in=${inTokens} out=${outcome.usage?.output_tokens || 0} | items=${outcome.itemCount}`); | ||
| if (codexThread?.id) writebackSession(codexThread.id); | ||
| // Auto-compact 由 Codex CLI 原生处理(model_auto_compact_token_limit=200000) | ||
|
|
||
| return finalResponse || "(无回复)"; | ||
| // #261 P1 redirect — classify codex turn result so silent reject | ||
| // (in=0 & out=0) or empty reply surfaces as a soft failure too, | ||
| // not just `"(无回复)"`. Mirrors the claude path so the | ||
| // upstream caller sees one consistent error shape. | ||
| const cls = classifyRuntimeResult( | ||
| { result: outcome.finalResponse, usage: outcome.usage }, | ||
| { baseUrl: process.env.OPENAI_BASE_URL || process.env.OPENAI_API_BASE }, | ||
| ); | ||
| if (cls.kind !== "success") { | ||
| log(`[codex] ✗ ${cls.reason || cls.kind} (in=${inTokens}, out=${outcome.usage?.output_tokens || 0})`); | ||
| return formatClassificationError(cls, { runtime: "codex-sdk", usage: outcome.usage }); | ||
| } | ||
| return outcome.finalResponse || "(无回复)"; | ||
| } catch (e: any) { | ||
| // #261 P1 redirect — TimeoutError gets a specific message so the | ||
| // upstream caller can distinguish "vendor wedged" from "vendor | ||
| // errored". Codex SDK aborts cleanly on signal, but the wedged-TCP | ||
| // case (no events flowing) is exactly what the timeout catches. | ||
| if (e instanceof TimeoutError) { | ||
| log(`[codex] ✗ ${e.message}; reset thread for next turn`); | ||
| codexThread = null; | ||
| return `执行出错: codex-sdk 调用超时 (${Math.round(CODEX_TIMEOUT_MS / 1000)}s) — 检查 OPENAI_BASE_URL / vendor 负载`; | ||
| } | ||
| // #261 P1 redirect — fast-fail on quota the same way claude does, so | ||
| // a 429-flooded codex node doesn't keep tearing down + rebuilding | ||
| // its thread on every backoff cycle (rebuild is expensive — full | ||
| // codex CLI spawn). Mirrors the claude-runtime fast-fail path. | ||
| const msg0 = String(e?.message || e).slice(0, 300); | ||
| if (isRateLimitOrQuotaError(msg0)) { | ||
| const hint = quotaRemediationHint(process.env.OPENAI_BASE_URL); | ||
| log(`[codex] ✗ vendor rate-limit/quota: ${msg0.slice(0, 150)}`); | ||
| return `执行出错: codex 限流/配额耗尽 (${msg0.slice(0, 80)}) — ${hint}`; | ||
| } | ||
| log(`codex thread error: ${e.message}, 重建`); | ||
| const codex = new Codex({ config: CODEX_CONFIG }); | ||
| codexThread = codex.startThread({ | ||
|
|
@@ -2124,6 +2224,10 @@ async function processWithGrok(task: string, from: string, images?: string[]): P | |
| flagValue: fileConfig.flags?.grokAcpTimeoutMs, | ||
| defaultMs: 300000, | ||
| }).valueMs, | ||
| // #261 P1 redirect — handshake decoupled from prompt deadline. | ||
| // 45s default keeps wedged initialize / authenticate / session | ||
| // bounded without affecting long-running prompts. | ||
| handshakeTimeoutMs: GROK_HANDSHAKE_TIMEOUT_MS, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This affects Grok nodes that set Useful? React with 👍 / 👎. |
||
| drainMs: resolveGrokAcpTimeout({ | ||
| envValue: process.env.GROK_ACP_DRAIN_MS, | ||
| flagValue: fileConfig.flags?.grokAcpDrainMs, | ||
|
|
@@ -2936,7 +3040,16 @@ async function connectTelegram(channel: TelegramChannel) { | |
| log(`Telegram polling: ${channel.dir}`); | ||
| while (true) { | ||
| try { | ||
| const res = await fetch(`${tg.apiBase}/getUpdates?offset=${tg.offset}&timeout=30`); | ||
| // #261 P1 redirect (2026-06-28) — wrap getUpdates in withTimeout so | ||
| // a wedged TCP socket / DPI drop can't pin the poll loop forever. | ||
| // Server-side timeout is 30s; client deadline is 45s leaving 15s | ||
| // headroom. AbortSignal propagates into fetch so the dangling | ||
| // request is torn down on deadline, not just the await. | ||
| const res = await withTimeout( | ||
| (signal) => fetch(`${tg.apiBase}/getUpdates?offset=${tg.offset}&timeout=30`, { signal }), | ||
| TELEGRAM_GETUPDATES_TIMEOUT_MS, | ||
| "telegram-getUpdates", | ||
| ); | ||
|
Comment on lines
+3048
to
+3052
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This only bounds the Useful? React with 👍 / 👎. |
||
| const data = await res.json() as any; | ||
| if (!data.ok) throw new Error(data.description || "getUpdates failed"); | ||
| for (const update of data.result || []) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
CLAUDE_TIMEOUT_MSfires andCLAUDE_MAX_RETRIESis nonzero,withTimeoutrejects as soon as it aborts the signal, so the retry loop can start the nextquery()after the backoff even if the previous SDK turn is still unwinding or running a tool. The old code only calledac.abort()and did not leave thefor awaituntil the SDK settled, so slow/abort-ignoring tools could not overlap; now the same task can be executed twice and duplicate side effects such as file edits orsend_taskcalls. On timeout, either avoid retrying or wait for the aborted attempt to finish cleanup before starting another attempt.Useful? React with 👍 / 👎.