From 712f210aa95c1d849ad9386eeeb53321bc511eae Mon Sep 17 00:00:00 2001 From: vansin Date: Sun, 28 Jun 2026 09:03:11 +0800 Subject: [PATCH] feat(#261 P1 redirect): unify runtime timeout + cross-runtime result classifier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the planned point-fixes (codex turn timeout / grok handshake decouple) with two shared utils that cover all runtime call sites at once. util ① — src/util/timeout.ts (158 LOC) - withTimeout(fn(signal) => Promise, ms, label, opts?) — one signature - TimeoutError carries label + ms for log-side surfacing - externalSignal forwarding lets callers participate in caller-supplied cancellation - ms<=0 sentinel preserves CLAUDE_TIMEOUT_MS=0 "disabled" behaviour - resolveTimeoutMs({envValue,flagValue,defaultMs,minMs,maxMs}) — env>flag>default precedence with clamping, generalises resolveGrokAcpTimeout util ② — src/runtime/classify-result.ts (157 LOC) - classifyRuntimeResult({result,usage,totalCostUsd,errorMessage}) → success | soft-fail-empty | soft-fail-quota | error - Folds #267 isRateLimitOrQuotaError into one cross-runtime decision surface; adds a strict empty-result check and a three-zero silent-reject rule (in=0 AND out=0 AND cost=0). - Detection is intentionally strict: output_tokens=0 ALONE is not a flag-trigger when result text is non-empty, because codex's usage field is not reliably reported across providers. Empty result OR the full three-zero triple flag; nothing else does. - formatClassificationError() — canonical "执行出错: ..." shape that the IM bridge / dashboard parse to flag failure rows - Re-exports building blocks so direct callers (auth-error fast-fail in cli.ts) don't bounce through the wider surface 5 consumers wired - claude (cli.ts processWithClaude): for-await loop now under withTimeout(CLAUDE_TIMEOUT_MS), success branch delegates to classifyRuntimeResult so empty-result rebrand is gone - codex (cli.ts processWithCodex): ZERO wall-clock pre-fix → now withTimeout(CODEX_TIMEOUT_MS, "codex-turn"); SDK TurnOptions.signal propagates the abort cleanly. TimeoutError + 429 paths short-circuit the thread rebuild loop. Classifier flags silent-reject reply. - grok handshake (runtime/grok-build-acp/runtime.ts): new handshakeTimeoutMs option (default min(45s, timeoutMs)) for initialize+authenticate+session/new|load — decoupled from the 300s session/prompt deadline so a wedged handshake fails fast. cli.ts only forwards an explicit env / flag value; absent both, it passes undefined so the runtime default applies (back-compat for callers that set tight GROK_ACP_TIMEOUT_MS). - telegram getUpdates (cli.ts): wrapped in withTimeout(45s) with AbortSignal propagated into fetch — TCP wedge can no longer pin the poll loop forever - think() queue: implicitly bounded — each runtime call inside the queue is now under its own withTimeout, no separate wrap needed Verification - Unit tests: 29 (timeout: happy/timeout/sentinel/external-signal/ resolver precedence/clamping/null) + 26 (classify: error precedence / three-zero strict / empty-result rule / codex false-positive guard / vendor hint routing / formatter shape) = 55 new - Full agent-node suite: 274 → 329 pass, 0 fail - Functional smoke: 5 cases (timeout fires in 102ms / three-zero detected / 429 routes to vendor dashboard / format msg starts with expected prefix / real success classifies correctly) — all PASS - bun build clean (0.39 MB cli.js, 106 modules) Why one PR not split (per redirect dispatch) Pure helpers + tests + glue together = single review pass. Splitting would force the reviewer to re-context twice for what is logically one cross-cutting change. #267 followed the same pattern. --- agent-node/src/cli.ts | 239 +++++++++++++----- .../src/runtime/classify-result.test.ts | 223 ++++++++++++++++ agent-node/src/runtime/classify-result.ts | 172 +++++++++++++ .../src/runtime/grok-build-acp/runtime.ts | 28 +- agent-node/src/util/timeout.test.ts | 238 +++++++++++++++++ agent-node/src/util/timeout.ts | 158 ++++++++++++ 6 files changed, 991 insertions(+), 67 deletions(-) create mode 100644 agent-node/src/runtime/classify-result.test.ts create mode 100644 agent-node/src/runtime/classify-result.ts create mode 100644 agent-node/src/util/timeout.test.ts create mode 100644 agent-node/src/util/timeout.ts diff --git a/agent-node/src/cli.ts b/agent-node/src/cli.ts index 09f4806c..fc8c19ff 100644 --- a/agent-node/src/cli.ts +++ b/agent-node/src/cli.ts @@ -44,9 +44,13 @@ import { CurrentAliasResolver } from "./runtime/current-alias"; import { delegationTargetExists } from "./runtime/delegation-precheck"; import { isRateLimitOrQuotaError, - isEmptyResultSoftFailure, quotaRemediationHint, } from "./runtime/claude-error-classify"; +import { + classifyRuntimeResult, + formatClassificationError, +} from "./runtime/classify-result"; +import { withTimeout, TimeoutError, resolveTimeoutMs } from "./util/timeout"; const home = homedir(); @@ -314,6 +318,45 @@ const CLAUDE_MAX_RETRIES = parseInt( opts["claude-max-retries"] || process.env.CLAUDE_MAX_RETRIES || fileConfig.flags?.claudeMaxRetries || fileConfig.claudeMaxRetries || "2" ); +// #261 P1 redirect (2026-06-28) — codex previously had zero wall-clock +// guard, so a wedged turn could hang forever (no abort path, no retry). +// Mirror CLAUDE_TIMEOUT_MS shape but default 300s, settable via env / +// flag for the parity flags listed in docs/runbooks/. resolveTimeoutMs +// honours `0` as "disabled" so power users can opt out. +const CODEX_TIMEOUT_MS = resolveTimeoutMs({ + envValue: opts["codex-timeout-ms"] || process.env.CODEX_TIMEOUT_MS, + flagValue: typeof fileConfig.flags?.codexTimeoutMs === "number" ? fileConfig.flags.codexTimeoutMs : undefined, + defaultMs: 300_000, +}).valueMs; +// Grok handshake (initialize + authenticate + session/new) is decoupled +// from the prompt timeout. Pre-redirect both shared the same 300s knob, +// so a stuck handshake hid behind the prompt deadline. +// +// Back-compat: when no explicit env / flag is set, leave this `undefined` +// and let `runGrokAcpTurn` apply its own default `min(45s, timeoutMs)`. +// Forcing a fixed 45s here would silently widen the handshake deadline +// for users who already set a tight `GROK_ACP_TIMEOUT_MS` (e.g. 10s) — +// they expect handshake ≤ timeoutMs, not handshake = 45s. +const GROK_HANDSHAKE_TIMEOUT_MS: number | undefined = (() => { + const envSet = process.env.GROK_HANDSHAKE_TIMEOUT_MS !== undefined + && process.env.GROK_HANDSHAKE_TIMEOUT_MS !== ""; + const flagSet = typeof fileConfig.flags?.grokHandshakeTimeoutMs === "number"; + if (!envSet && !flagSet) return undefined; + return resolveTimeoutMs({ + envValue: process.env.GROK_HANDSHAKE_TIMEOUT_MS, + flagValue: typeof fileConfig.flags?.grokHandshakeTimeoutMs === "number" + ? fileConfig.flags.grokHandshakeTimeoutMs + : undefined, + defaultMs: 45_000, + }).valueMs; +})(); +// Telegram getUpdates long-poll (server-side 30s) needs a client-side +// safety net so a wedged TCP socket / DPI drop doesn't pin the poll +// loop forever. 45s leaves 15s headroom over the server timeout. +const TELEGRAM_GETUPDATES_TIMEOUT_MS = resolveTimeoutMs({ + envValue: process.env.TELEGRAM_GETUPDATES_TIMEOUT_MS, + defaultMs: 45_000, +}).valueMs; const NEW_SESSION = opts["new-session"] === "true"; const SESSION_ID = NEW_SESSION ? "" : ( RUNTIME === "grok" @@ -1522,56 +1565,70 @@ async function processWithClaude(task: string, from: string, images?: string[]): // abort controller + timeout window. On auth-class error, short-circuit // (fast-fail, no retry). On transient error / timeout, backoff 4s, 8s // (+ jitter to spread herd retries across the vendor queue) and retry. + // + // #261 P1 redirect (2026-06-28) — wrap the for-await loop in withTimeout + // so the deadline / abort / cleanup contract matches codex / grok / + // telegram. The SDK's `query()` watches `options.abortController.signal` + // for cancellation, so we still create an AbortController inside the + // factory and forward withTimeout's signal into it. let lastErr: string = ""; let timedOutFinal = false; for (let attempt = 0; attempt <= CLAUDE_MAX_RETRIES; attempt++) { let timedOut = false; - let timer: ReturnType | undefined; - const ac = new AbortController(); - options.abortController = ac; - if (CLAUDE_TIMEOUT_MS > 0) { - timer = setTimeout(() => { timedOut = true; ac.abort(); }, CLAUDE_TIMEOUT_MS); - } const attemptStart = Date.now(); try { - result = ""; // reset accumulator for this attempt - for await (const message of query({ prompt, options })) { - const m = message as any; - if (m.type === "system" && m.subtype === "init") { - claudeSessionId = m.session_id; - log(`[claude] session=${m.session_id?.slice(0, 8)} model=${MODEL || "default"} attempt=${attempt + 1}`); - writebackSession(m.session_id); - } - if (m.type === "result") { - const dt = Date.now() - t0; - const u = m.usage || {}; - log(`[claude] ${m.subtype} | ${dt}ms | $${m.total_cost_usd?.toFixed(4) || "?"} | in=${u.input_tokens || 0} out=${u.output_tokens || 0} | turns=${m.num_turns}${attempt > 0 ? ` | attempt=${attempt + 1}` : ""}`); - if (m.subtype === "success") { - // #261 P1-① (2026-06-28) — empty-result soft-failure gate. - // Pre-fix `m.result || "任务完成"` silently rebranded an - // empty vendor reply as "task complete" — root cause of the - // M3 incident where a silent rate-limit produced 0 output - // tokens but the upstream caller saw "任务完成" and moved on. - // Now: classifier surfaces the empty reply as a soft fail - // with a vendor-specific hint, so the next hop / operator - // sees the truth. - if (isEmptyResultSoftFailure(m)) { - const hint = quotaRemediationHint(process.env.ANTHROPIC_BASE_URL); - log(`[claude] ✗ vendor returned EMPTY result despite success subtype (in=${u.input_tokens || 0}, out=${u.output_tokens || 0}, result.len=${String(m.result || "").length}) — treating as soft fail`); - result = `执行出错: vendor 返回空响应 (in=${u.input_tokens || 0} out=${u.output_tokens || 0}) — 疑似 vendor 静默限流/配额. ${hint}`; - } else { - result = m.result; + result = await withTimeout(async (signal) => { + let inner = ""; + const ac = new AbortController(); + options.abortController = ac; + const forward = () => ac.abort(); + signal.addEventListener("abort", forward, { once: true }); + try { + for await (const message of query({ prompt, options })) { + const m = message as any; + if (m.type === "system" && m.subtype === "init") { + claudeSessionId = m.session_id; + log(`[claude] session=${m.session_id?.slice(0, 8)} model=${MODEL || "default"} attempt=${attempt + 1}`); + writebackSession(m.session_id); + } + if (m.type === "result") { + const dt = Date.now() - t0; + const u = m.usage || {}; + log(`[claude] ${m.subtype} | ${dt}ms | $${m.total_cost_usd?.toFixed(4) || "?"} | in=${u.input_tokens || 0} out=${u.output_tokens || 0} | turns=${m.num_turns}${attempt > 0 ? ` | attempt=${attempt + 1}` : ""}`); + if (m.subtype === "success") { + // #261 P1 redirect (2026-06-28) — delegate to classifyRuntimeResult + // which folds the empty-result rule from #267 + the in=0 & out=0 + // & cost=0 silent-reject rule into one decision shared with + // codex / grok. Pre-fix `m.result || "任务完成"` silently + // rebranded an empty vendor reply as "task complete" — the M3 + // incident shape. Now a non-success classification surfaces a + // soft-fail string the upstream caller can act on. + const cls = classifyRuntimeResult( + { result: m.result, usage: m.usage, totalCostUsd: m.total_cost_usd }, + { baseUrl: process.env.ANTHROPIC_BASE_URL }, + ); + if (cls.kind === "success") { + inner = m.result; + } else { + log(`[claude] ✗ ${cls.reason || cls.kind} (in=${u.input_tokens || 0}, out=${u.output_tokens || 0}, cost=${m.total_cost_usd ?? "?"})`); + inner = formatClassificationError(cls, { runtime: "claude-agent-sdk", usage: m.usage }); + } + } else { + inner = `执行出错: ${m.error || m.result || "未知错误"}`; + } } - } else { - result = `执行出错: ${m.error || m.result || "未知错误"}`; } + return inner; + } finally { + signal.removeEventListener("abort", forward); } - } - if (timer) clearTimeout(timer); + }, CLAUDE_TIMEOUT_MS, `claude-attempt-${attempt + 1}/${CLAUDE_MAX_RETRIES + 1}`); return result; } catch (err: any) { - if (timer) clearTimeout(timer); - const msg = String(err?.message || err).slice(0, 300); + timedOut = err instanceof TimeoutError; + const msg = err instanceof TimeoutError + ? `timed out after ${err.timeoutMs}ms` + : String(err?.message || err).slice(0, 300); const attemptDt = Date.now() - attemptStart; // Fast-fail on auth errors — no point retrying with the same bad key. @@ -1764,33 +1821,76 @@ async function processWithCodex(task: string, from: string, images?: string[]): : promptText; const t0 = Date.now(); try { - const { events } = await codexThread.runStreamed(input); - let finalResponse = ""; - let usage: any = null; - let itemCount = 0; - for await (const ev of events) { - if (ev.type === "item.started") { - const it = ev.item as any; - debug(`[codex] ${it.type}${it.command ? `: ${it.command.slice(0, 60)}` : it.tool ? `: ${it.server}/${it.tool}` : ""}`); - } else if (ev.type === "item.completed") { - itemCount++; - const it = ev.item as any; - if (it.type === "agent_message") finalResponse = it.text || ""; - if (it.type === "command_execution") debug(`[codex] cmd exit=${it.exit_code} | ${it.aggregated_output?.slice(0, 80)}`); - if (it.type === "reasoning") debug(`[codex] thinking: ${it.text?.slice(0, 80)}`); - if (it.type === "mcp_tool_call") debug(`[codex] mcp: ${it.server}/${it.tool} → ${it.status}`); - } else if (ev.type === "turn.completed") { - usage = ev.usage; - } - } + // #261 P1 redirect (2026-06-28) — wrap the codex turn in withTimeout. + // Pre-fix: codex had ZERO wall-clock guard — a wedged turn (vendor + // outage, dropped TCP) would hang the agent-node forever with no + // abort path. Codex SDK's `TurnOptions.signal` lets us propagate + // cancellation cleanly when the deadline fires. + const outcome = await withTimeout( + async (signal) => { + const { events } = await codexThread.runStreamed(input, { signal }); + let finalResponse = ""; + let usage: any = null; + let itemCount = 0; + for await (const ev of events) { + if (ev.type === "item.started") { + const it = ev.item as any; + debug(`[codex] ${it.type}${it.command ? `: ${it.command.slice(0, 60)}` : it.tool ? `: ${it.server}/${it.tool}` : ""}`); + } else if (ev.type === "item.completed") { + itemCount++; + const it = ev.item as any; + if (it.type === "agent_message") finalResponse = it.text || ""; + if (it.type === "command_execution") debug(`[codex] cmd exit=${it.exit_code} | ${it.aggregated_output?.slice(0, 80)}`); + if (it.type === "reasoning") debug(`[codex] thinking: ${it.text?.slice(0, 80)}`); + if (it.type === "mcp_tool_call") debug(`[codex] mcp: ${it.server}/${it.tool} → ${it.status}`); + } else if (ev.type === "turn.completed") { + usage = ev.usage; + } + } + return { finalResponse, usage, itemCount }; + }, + CODEX_TIMEOUT_MS, + "codex-turn", + ); const dt = Date.now() - t0; - const inTokens = usage?.input_tokens || 0; - log(`[codex] done | ${dt}ms | in=${inTokens} out=${usage?.output_tokens || 0} | items=${itemCount}`); + const inTokens = outcome.usage?.input_tokens || 0; + log(`[codex] done | ${dt}ms | in=${inTokens} out=${outcome.usage?.output_tokens || 0} | items=${outcome.itemCount}`); if (codexThread?.id) writebackSession(codexThread.id); // Auto-compact 由 Codex CLI 原生处理(model_auto_compact_token_limit=200000) - return finalResponse || "(无回复)"; + // #261 P1 redirect — classify codex turn result so silent reject + // (in=0 & out=0) or empty reply surfaces as a soft failure too, + // not just `"(无回复)"`. Mirrors the claude path so the + // upstream caller sees one consistent error shape. + const cls = classifyRuntimeResult( + { result: outcome.finalResponse, usage: outcome.usage }, + { baseUrl: process.env.OPENAI_BASE_URL || process.env.OPENAI_API_BASE }, + ); + if (cls.kind !== "success") { + log(`[codex] ✗ ${cls.reason || cls.kind} (in=${inTokens}, out=${outcome.usage?.output_tokens || 0})`); + return formatClassificationError(cls, { runtime: "codex-sdk", usage: outcome.usage }); + } + return outcome.finalResponse || "(无回复)"; } catch (e: any) { + // #261 P1 redirect — TimeoutError gets a specific message so the + // upstream caller can distinguish "vendor wedged" from "vendor + // errored". Codex SDK aborts cleanly on signal, but the wedged-TCP + // case (no events flowing) is exactly what the timeout catches. + if (e instanceof TimeoutError) { + log(`[codex] ✗ ${e.message}; reset thread for next turn`); + codexThread = null; + return `执行出错: codex-sdk 调用超时 (${Math.round(CODEX_TIMEOUT_MS / 1000)}s) — 检查 OPENAI_BASE_URL / vendor 负载`; + } + // #261 P1 redirect — fast-fail on quota the same way claude does, so + // a 429-flooded codex node doesn't keep tearing down + rebuilding + // its thread on every backoff cycle (rebuild is expensive — full + // codex CLI spawn). Mirrors the claude-runtime fast-fail path. + const msg0 = String(e?.message || e).slice(0, 300); + if (isRateLimitOrQuotaError(msg0)) { + const hint = quotaRemediationHint(process.env.OPENAI_BASE_URL); + log(`[codex] ✗ vendor rate-limit/quota: ${msg0.slice(0, 150)}`); + return `执行出错: codex 限流/配额耗尽 (${msg0.slice(0, 80)}) — ${hint}`; + } log(`codex thread error: ${e.message}, 重建`); const codex = new Codex({ config: CODEX_CONFIG }); codexThread = codex.startThread({ @@ -2124,6 +2224,10 @@ async function processWithGrok(task: string, from: string, images?: string[]): P flagValue: fileConfig.flags?.grokAcpTimeoutMs, defaultMs: 300000, }).valueMs, + // #261 P1 redirect — handshake decoupled from prompt deadline. + // 45s default keeps wedged initialize / authenticate / session + // bounded without affecting long-running prompts. + handshakeTimeoutMs: GROK_HANDSHAKE_TIMEOUT_MS, drainMs: resolveGrokAcpTimeout({ envValue: process.env.GROK_ACP_DRAIN_MS, flagValue: fileConfig.flags?.grokAcpDrainMs, @@ -2936,7 +3040,16 @@ async function connectTelegram(channel: TelegramChannel) { log(`Telegram polling: ${channel.dir}`); while (true) { try { - const res = await fetch(`${tg.apiBase}/getUpdates?offset=${tg.offset}&timeout=30`); + // #261 P1 redirect (2026-06-28) — wrap getUpdates in withTimeout so + // a wedged TCP socket / DPI drop can't pin the poll loop forever. + // Server-side timeout is 30s; client deadline is 45s leaving 15s + // headroom. AbortSignal propagates into fetch so the dangling + // request is torn down on deadline, not just the await. + const res = await withTimeout( + (signal) => fetch(`${tg.apiBase}/getUpdates?offset=${tg.offset}&timeout=30`, { signal }), + TELEGRAM_GETUPDATES_TIMEOUT_MS, + "telegram-getUpdates", + ); const data = await res.json() as any; if (!data.ok) throw new Error(data.description || "getUpdates failed"); for (const update of data.result || []) { diff --git a/agent-node/src/runtime/classify-result.test.ts b/agent-node/src/runtime/classify-result.test.ts new file mode 100644 index 00000000..14ebddc7 --- /dev/null +++ b/agent-node/src/runtime/classify-result.test.ts @@ -0,0 +1,223 @@ +// Cross-runtime classifier coverage. The two underlying detectors +// (`isRateLimitOrQuotaError` + `isEmptyResultSoftFailure`) ship with +// 32 unit tests in claude-error-classify.test.ts already — these tests +// focus on the *combination*: precedence between the kinds, the +// in=0 & out=0 & cost=0 silent-reject rule, the formatClassificationError +// shape that the IM bridge / dashboard parse. + +import { describe, expect, test } from "bun:test"; +import { + classifyRuntimeResult, + formatClassificationError, + type ClassificationResult, +} from "./classify-result"; + +describe("classifyRuntimeResult — error precedence", () => { + test("quota error msg → soft-fail-quota (highest precedence)", () => { + const r = classifyRuntimeResult({ errorMessage: "HTTP 429 rate_limit", result: "" }); + expect(r.kind).toBe("soft-fail-quota"); + expect(r.reason).toContain("429"); + }); + + test("non-quota error → hard error", () => { + const r = classifyRuntimeResult({ errorMessage: "ECONNREFUSED" }); + expect(r.kind).toBe("error"); + expect(r.reason).toBe("ECONNREFUSED"); + }); + + test("auth error msg (401) → hard error (NOT quota — auth has its own path)", () => { + const r = classifyRuntimeResult({ errorMessage: "401 invalid_api_key" }); + expect(r.kind).toBe("error"); + }); + + test("error msg outranks empty result (don't double-classify)", () => { + const r = classifyRuntimeResult({ errorMessage: "boom", result: "" }); + expect(r.kind).toBe("error"); + }); +}); + +describe("classifyRuntimeResult — in=0 & out=0 & cost=0 silent reject", () => { + test("all three zero → soft-fail-empty (even when result text present)", () => { + const r = classifyRuntimeResult({ + result: "stale text from previous turn", + usage: { input_tokens: 0, output_tokens: 0 }, + totalCostUsd: 0, + }); + expect(r.kind).toBe("soft-fail-empty"); + expect(r.reason).toContain("in=0 out=0 cost=0"); + }); + + test("in=0 & out=0 but cost field MISSING + non-empty result → success (codex usage unreliable)", () => { + // Per the strict-rule refactor: cost-field-missing should NOT + // promote a non-empty result to soft-fail. Codex sometimes reports + // zero output_tokens for healthy turns; previously the OR-rule + // false-flagged these. The three-zero rule still catches the real + // silent-reject shape because the M3 episode reports all three. + const r = classifyRuntimeResult({ + result: "some text", + usage: { input_tokens: 0, output_tokens: 0 }, + }); + expect(r.kind).toBe("success"); + }); + + test("in=0 & cost=0 but out>0 → NOT silent reject (vendor returned something)", () => { + const r = classifyRuntimeResult({ + result: "real reply", + usage: { input_tokens: 0, output_tokens: 50 }, + totalCostUsd: 0, + }); + expect(r.kind).toBe("success"); + }); + + test("normal turn (all signals positive) → success", () => { + const r = classifyRuntimeResult({ + result: "Done.", + usage: { input_tokens: 100, output_tokens: 50 }, + totalCostUsd: 0.0042, + }); + expect(r.kind).toBe("success"); + }); + + test("non-empty result + output_tokens=0 + cost missing → success (codex false-positive guard)", () => { + // Direct regression test for the 通信牛 CHANGE_REQ on classify + // three-zero: a codex turn that returned text but didn't report + // output token usage must NOT be flagged as soft-fail-empty. + const r = classifyRuntimeResult({ + result: "Reply from codex", + usage: { input_tokens: 50, output_tokens: 0 }, + }); + expect(r.kind).toBe("success"); + }); +}); + +describe("classifyRuntimeResult — empty-result rule (strict)", () => { + test("empty string result + non-zero tokens → soft-fail-empty", () => { + const r = classifyRuntimeResult({ + result: "", + usage: { input_tokens: 100, output_tokens: 5 }, + }); + expect(r.kind).toBe("soft-fail-empty"); + }); + + test("null result + non-zero tokens → soft-fail-empty", () => { + const r = classifyRuntimeResult({ + result: null, + usage: { input_tokens: 100, output_tokens: 5 }, + }); + expect(r.kind).toBe("soft-fail-empty"); + }); + + test("undefined result, missing usage → soft-fail-empty (empty result alone is enough)", () => { + const r = classifyRuntimeResult({}); + expect(r.kind).toBe("soft-fail-empty"); + }); + + test("single-char '0' result + tokens → success (not empty)", () => { + const r = classifyRuntimeResult({ + result: "0", + usage: { input_tokens: 10, output_tokens: 1 }, + }); + expect(r.kind).toBe("success"); + }); + + test("result text present + missing usage → success (don't penalise unreported usage)", () => { + const r = classifyRuntimeResult({ result: "Reply." }); + expect(r.kind).toBe("success"); + }); + + test("empty string result + cost present + tokens → soft-fail-empty (text emptiness is the signal)", () => { + const r = classifyRuntimeResult({ + result: "", + usage: { input_tokens: 100, output_tokens: 5 }, + totalCostUsd: 0.001, + }); + expect(r.kind).toBe("soft-fail-empty"); + }); +}); + +describe("classifyRuntimeResult — vendor hint routing via baseUrl", () => { + test("quota error with deepseek baseUrl → deepseek dashboard hint", () => { + const r = classifyRuntimeResult( + { errorMessage: "rate_limit hit" }, + { baseUrl: "https://api.deepseek.com/anthropic" }, + ); + expect(r.hint).toContain("deepseek"); + }); + + test("quota error with intern baseUrl → intern hint", () => { + const r = classifyRuntimeResult( + { errorMessage: "Token Plan 上限 reached" }, + { baseUrl: "https://chat.intern-ai.org.cn" }, + ); + expect(r.hint).toContain("intern-ai"); + }); + + test("empty result with anthropic baseUrl → anthropic hint", () => { + const r = classifyRuntimeResult( + { result: "", usage: { output_tokens: 0 } }, + { baseUrl: "https://api.anthropic.com" }, + ); + expect(r.hint).toContain("anthropic"); + }); + + test("missing baseUrl → generic hint", () => { + const r = classifyRuntimeResult({ errorMessage: "429" }); + expect(r.hint).toContain("vendor"); + }); +}); + +describe("formatClassificationError — message shape (parsed by IM bridge)", () => { + test("soft-fail-quota → 执行出错: 限流/配额耗尽", () => { + const c: ClassificationResult = { + kind: "soft-fail-quota", + reason: "HTTP 429", + hint: "→ 检查 platform.deepseek.com 配额", + }; + const s = formatClassificationError(c, { runtime: "codex" }); + expect(s).toMatch(/^执行出错: codex 限流\/配额耗尽/); + expect(s).toContain("deepseek"); + }); + + test("soft-fail-empty → 执行出错: 返回空响应 with in/out", () => { + const c: ClassificationResult = { + kind: "soft-fail-empty", + reason: "empty", + hint: "→ 检查 vendor 配额", + }; + const s = formatClassificationError(c, { runtime: "grok", usage: { input_tokens: 99, output_tokens: 0 } }); + expect(s).toContain("grok 返回空响应"); + expect(s).toContain("in=99 out=0"); + }); + + test("error kind → 执行出错: ", () => { + const c: ClassificationResult = { kind: "error", reason: "ECONNREFUSED" }; + const s = formatClassificationError(c, { runtime: "claude" }); + expect(s).toBe("执行出错: claude — ECONNREFUSED"); + }); + + test("success kind → empty string (caller should not call this; defensive)", () => { + const c: ClassificationResult = { kind: "success" }; + const s = formatClassificationError(c, { runtime: "claude" }); + expect(s).toBe(""); + }); + + test("missing usage in context → in=0 out=0 fallback", () => { + const c: ClassificationResult = { kind: "soft-fail-empty" }; + const s = formatClassificationError(c, { runtime: "claude" }); + expect(s).toContain("in=0 out=0"); + }); + + test("missing hint on quota → no trailing dash artifact", () => { + const c: ClassificationResult = { kind: "soft-fail-quota", reason: "429" }; + const s = formatClassificationError(c, { runtime: "claude" }); + expect(s).not.toMatch(/—\s*$/); + }); + + test("reason longer than 80 chars is truncated on quota path", () => { + const longReason = "x".repeat(200); + const c: ClassificationResult = { kind: "soft-fail-quota", reason: longReason }; + const s = formatClassificationError(c, { runtime: "claude" }); + // 80-char slice + closing paren + remaining template chars + expect(s.length).toBeLessThan(200); + }); +}); diff --git a/agent-node/src/runtime/classify-result.ts b/agent-node/src/runtime/classify-result.ts new file mode 100644 index 00000000..6bb21afb --- /dev/null +++ b/agent-node/src/runtime/classify-result.ts @@ -0,0 +1,172 @@ +// Cross-runtime result classifier. Folds the #267 claude +// `isRateLimitOrQuotaError` regex plus the in=0 & out=0 & cost=0 +// silent-reject shape AND a strict empty-result check into one +// decision surface that claude / codex / grok all consume. +// +// Why centralise: the "vendor accepted a request, reports success, +// returns nothing" shape is not specific to the claude runtime. Codex +// turns and Grok ACP prompts can produce the same shape (silent +// rate-limit, vendor outage, billing cutoff). Each runtime +// re-implementing the heuristic produces drift; one util keeps the +// detection rules in lockstep AND keeps the call sites under 5 lines +// each so the read-review cost stays low. +// +// Detection strategy is deliberately strict to avoid false positives: +// codex's `usage.output_tokens` value is not reliably reported across +// model providers, so an `output_tokens === 0` signal ALONE is not +// enough to flag a soft failure. Two signals must agree: +// - result is empty (null / undefined / "") — flag (regardless of usage) +// - in=0 AND out=0 AND cost=0 — flag (three-zero silent reject) +// A non-empty result with `output_tokens === 0` is treated as success +// (the previous OR-rule false-positived on codex turns). +// +// Pure helper. Re-exports the building blocks so direct +// `isRateLimitOrQuotaError` users (the auth-error fast-fail in cli.ts) +// don't have to bounce through this module's surface. + +import { + isRateLimitOrQuotaError, + quotaRemediationHint, +} from "./claude-error-classify"; + +export type ClassificationKind = + | "success" + | "soft-fail-empty" // vendor accepted, returned nothing / 0 tokens + | "soft-fail-quota" // rate limit / quota / overload (operator action) + | "error"; // hard failure (network / 5xx / parser) + +/** + * Shape every runtime produces after a turn. Each field is optional — + * the classifier degrades gracefully when a runtime doesn't report a + * given dimension (e.g. codex sometimes lacks `totalCostUsd`). + */ +export interface RuntimeResultLike { + /** Final text reply. Empty string / null / undefined all count as empty. */ + result?: string | null; + /** SDK usage block. `output_tokens === 0` is the silent-reject signal. */ + usage?: { + input_tokens?: number; + output_tokens?: number; + [k: string]: unknown; + } | null; + /** Anthropic-style cost field. `=== 0` co-confirms silent reject. */ + totalCostUsd?: number | null; + /** Set when the runtime caught an exception during the turn. */ + errorMessage?: string | null; +} + +export interface ClassificationResult { + kind: ClassificationKind; + /** Short machine-readable reason; safe to log. */ + reason?: string; + /** Vendor-specific remediation hint (URL / action) when applicable. */ + hint?: string; +} + +/** + * Decide whether a runtime turn was a real success, a soft failure + * (quota / empty result), or a hard error. + * + * Precedence (most specific first): + * 1. errorMessage matching quota/rate-limit → soft-fail-quota + * 2. errorMessage otherwise present → error + * 3. in=0 & out=0 & cost=0 (all three) → soft-fail-empty + * (the "vendor silently dropped the request" shape) + * 4. result strictly empty (null / undefined / "") → soft-fail-empty + * 5. otherwise → success + * + * Note on rule 4: `output_tokens === 0` ALONE is intentionally NOT a + * flag-trigger when the result text is non-empty. Codex providers vary + * widely in usage reporting fidelity (some report zero output even when + * a real reply was returned), so the previous "result-text OR + * output_tokens === 0" rule false-positived on healthy codex turns. + * Empty result remains a hard flag regardless of usage. + * + * @param m the runtime turn outcome + * @param opts.baseUrl vendor base URL used to route the remediation + * hint (e.g. `process.env.ANTHROPIC_BASE_URL` for claude; + * `OPENAI_BASE_URL` for codex; `XAI_BASE_URL` for grok). + */ +export function classifyRuntimeResult( + m: RuntimeResultLike, + opts?: { baseUrl?: string | null }, +): ClassificationResult { + const baseUrl = opts?.baseUrl; + if (m.errorMessage) { + if (isRateLimitOrQuotaError(m.errorMessage)) { + return { + kind: "soft-fail-quota", + reason: m.errorMessage, + hint: quotaRemediationHint(baseUrl), + }; + } + return { kind: "error", reason: m.errorMessage }; + } + + // Three-zero silent reject — `in=0 AND out=0 AND cost=0`. The 2026- + // 06-26 M3 episode surfaced this exact triple as an upstream 429 + // that the SDK silently retried into; runtime treated the turn as + // successful because no error was thrown. Requiring ALL three signals + // to agree avoids false-positiving runtimes that simply don't report + // cost (codex turn.completed sometimes omits it). + const usage = m.usage ?? {}; + const inTokens = usage.input_tokens ?? null; + const outTokens = usage.output_tokens ?? null; + const cost = m.totalCostUsd ?? null; + if (inTokens === 0 && outTokens === 0 && cost === 0) { + return { + kind: "soft-fail-empty", + reason: "vendor returned in=0 out=0 cost=0 — upstream silent reject (suspect 429/quota/auth without explicit error)", + hint: quotaRemediationHint(baseUrl), + }; + } + + // Strict empty-result check. Only the literal absence of text counts: + // null / undefined / empty string. A `result === "0"` single-char + // reply is success. A non-empty result paired with a suspicious + // `output_tokens === 0` is also success (per usage-reliability + // caveat in the module header). + const resultEmpty = m.result === null || m.result === undefined || m.result === ""; + if (resultEmpty) { + return { + kind: "soft-fail-empty", + reason: "empty vendor result despite success signal", + hint: quotaRemediationHint(baseUrl), + }; + } + + return { kind: "success" }; +} + +/** + * Convenience: format a user-surfaced error string for a non-success + * classification. Centralised so the message shape stays consistent + * across runtimes (the upstream caller / IM bridge / dashboard parse + * the prefix `执行出错:` to flag failure rows). + */ +export function formatClassificationError( + c: ClassificationResult, + context: { runtime: string; usage?: RuntimeResultLike["usage"] }, +): string { + const inT = context.usage?.input_tokens ?? 0; + const outT = context.usage?.output_tokens ?? 0; + switch (c.kind) { + case "soft-fail-quota": + return `执行出错: ${context.runtime} 限流/配额耗尽 (${(c.reason || "").slice(0, 80)})${c.hint ? ` — ${c.hint}` : ""}`; + case "soft-fail-empty": + return `执行出错: ${context.runtime} 返回空响应 (in=${inT} out=${outT}) — 疑似 vendor 静默限流/配额. ${c.hint || ""}`.trim(); + case "error": + return `执行出错: ${context.runtime} — ${(c.reason || "未知错误").slice(0, 200)}`; + case "success": + // Caller should only invoke this on non-success; defensive default. + return ""; + } +} + +// Re-export building blocks so callers that just want the regex (e.g. +// the catch-block fast-fail in cli.ts) don't have to depend on this +// wider surface. +export { + isRateLimitOrQuotaError, + quotaRemediationHint, +}; diff --git a/agent-node/src/runtime/grok-build-acp/runtime.ts b/agent-node/src/runtime/grok-build-acp/runtime.ts index c56cae85..01cd63ab 100644 --- a/agent-node/src/runtime/grok-build-acp/runtime.ts +++ b/agent-node/src/runtime/grok-build-acp/runtime.ts @@ -58,6 +58,15 @@ export interface GrokAcpTurnOptions { cwd?: string; sessionId?: string; timeoutMs?: number; + /** + * #261 P1 redirect (2026-06-28) — handshake deadline (initialize + + * authenticate + session/new|load). Decoupled from `timeoutMs` so a + * stuck handshake fails fast (~45s) instead of hiding behind the + * 300s prompt deadline. Defaults to min(45_000, timeoutMs) so a + * caller that sets a tight `timeoutMs` doesn't accidentally get a + * loose handshake deadline. + */ + handshakeTimeoutMs?: number; drainMs?: number; binary?: string; env?: NodeJS.ProcessEnv; @@ -110,6 +119,11 @@ interface InitializeResponse { */ export async function runGrokAcpTurn(opts: GrokAcpTurnOptions): Promise { const timeoutMs = opts.timeoutMs ?? 120_000; + // #261 P1 redirect — handshake (init+auth+session) is a separate + // class of operation from the unbounded prompt stream and must fail + // fast. Defaults to min(45s, timeoutMs) so legacy callers that only + // set a tight `timeoutMs` still get a coherent (≤ timeoutMs) deadline. + const handshakeTimeoutMs = opts.handshakeTimeoutMs ?? Math.min(45_000, timeoutMs); const drainMs = opts.drainMs ?? 15_000; const childEnv = { ...process.env, ...opts.env }; const client = new GrokAcpClient(); @@ -199,9 +213,15 @@ export async function runGrokAcpTurn(opts: GrokAcpTurnOptions): Promise("initialize", initParams, timeoutMs); + // #261 P1 redirect (2026-06-28) — handshake (initialize / authenticate + // / session/new|load) now uses `handshakeTimeoutMs` (default 45s), + // decoupled from the long-running `session/prompt` deadline. Pre-fix + // all three handshake requests inherited the 300s prompt timeout, so + // a wedged `initialize` would burn the full prompt window before the + // operator saw a failure signal. + const init = await client.request("initialize", initParams, handshakeTimeoutMs); const authMethod = selectAuthMethod(init, childEnv); - await client.request("authenticate", { methodId: authMethod, meta: { headless: true } }, timeoutMs); + await client.request("authenticate", { methodId: authMethod, meta: { headless: true } }, handshakeTimeoutMs); // #204 — pass through the caller's mcpServers list; empty array preserves // pre-#204 behaviour so existing callers that don't yet build the entry @@ -217,8 +237,8 @@ export async function runGrokAcpTurn(opts: GrokAcpTurnOptions): Promise("session/load", { sessionId: opts.sessionId, cwd: opts.cwd, mcpServers, ...sessionExtra }, timeoutMs) - : await client.request("session/new", { cwd: opts.cwd, mcpServers, ...sessionExtra }, timeoutMs); + ? await client.request("session/load", { sessionId: opts.sessionId, cwd: opts.cwd, mcpServers, ...sessionExtra }, handshakeTimeoutMs) + : await client.request("session/new", { cwd: opts.cwd, mcpServers, ...sessionExtra }, handshakeTimeoutMs); const sessionId = extractSessionId(session) ?? opts.sessionId; if (!sessionId) throw new Error("Grok ACP session response did not include sessionId"); diff --git a/agent-node/src/util/timeout.test.ts b/agent-node/src/util/timeout.test.ts new file mode 100644 index 00000000..34ef794d --- /dev/null +++ b/agent-node/src/util/timeout.test.ts @@ -0,0 +1,238 @@ +// Unit coverage for the runtime timeout primitive. Pure helper — no +// network, no real subprocess. The withTimeout / resolveTimeoutMs pair +// is consumed by 5 runtime call sites (claude / codex / grok handshake / +// telegram getUpdates / think queue), so the regression surface is the +// behaviour of these tests, not the call sites individually. + +import { describe, expect, test } from "bun:test"; +import { withTimeout, TimeoutError, resolveTimeoutMs } from "./timeout"; + +const tick = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +describe("withTimeout — happy path (factory wins)", () => { + test("resolves with factory value when fn settles before deadline", async () => { + const result = await withTimeout(async () => "done", 1000, "happy"); + expect(result).toBe("done"); + }); + + test("passes a non-aborted signal when fn finishes promptly", async () => { + let observed: boolean | undefined; + await withTimeout( + async (signal) => { + observed = signal.aborted; + return "ok"; + }, + 500, + "signal-shape", + ); + expect(observed).toBe(false); + }); + + test("returns objects, not just strings", async () => { + const r = await withTimeout(async () => ({ a: 1, b: "x" }), 200); + expect(r).toEqual({ a: 1, b: "x" }); + }); + + test("propagates fn's rejection unchanged (not wrapped)", async () => { + await expect( + withTimeout(async () => { throw new Error("inner boom"); }, 500, "rejector"), + ).rejects.toThrow("inner boom"); + }); +}); + +describe("withTimeout — timeout path (timer wins)", () => { + test("rejects with TimeoutError when fn outlasts deadline", async () => { + let caught: unknown; + try { + await withTimeout(async () => { await tick(200); return "late"; }, 30, "slow"); + } catch (e) { caught = e; } + expect(caught).toBeInstanceOf(TimeoutError); + expect((caught as TimeoutError).label).toBe("slow"); + expect((caught as TimeoutError).timeoutMs).toBe(30); + }); + + test("TimeoutError message includes label + ms", () => { + const err = new TimeoutError(45_000, "grok-handshake"); + expect(err.message).toContain("grok-handshake"); + expect(err.message).toContain("45000"); + }); + + test("TimeoutError without label still works", () => { + const err = new TimeoutError(100); + expect(err.message).toContain("100ms"); + expect(err.message).not.toContain("undefined"); + }); + + test("fires AbortSignal on timeout so factory can cancel in-flight work", async () => { + let abortedSeen = false; + try { + await withTimeout( + async (signal) => { + signal.addEventListener("abort", () => { abortedSeen = true; }); + await tick(200); + return "never"; + }, + 30, + "cancellable", + ); + } catch { /* expected timeout */ } + // Give the abort event one more tick to fire before asserting. + await tick(10); + expect(abortedSeen).toBe(true); + }); +}); + +describe("withTimeout — zero / negative deadline sentinel", () => { + test("timeoutMs=0 disables the timer (CLAUDE_TIMEOUT_MS=0 sentinel)", async () => { + const result = await withTimeout(async () => { await tick(50); return "untimed"; }, 0, "no-deadline"); + expect(result).toBe("untimed"); + }); + + test("timeoutMs<0 also disables (defensive)", async () => { + const result = await withTimeout(async () => "untimed-neg", -100); + expect(result).toBe("untimed-neg"); + }); + + test("untimed call still receives a non-aborted signal", async () => { + let abortedAtStart: boolean | undefined; + await withTimeout( + async (signal) => { abortedAtStart = signal.aborted; return "ok"; }, + 0, + ); + expect(abortedAtStart).toBe(false); + }); +}); + +describe("withTimeout — externalSignal propagation", () => { + test("forwards external abort into factory signal", async () => { + const outer = new AbortController(); + let innerAborted = false; + const p = withTimeout( + async (signal) => { + signal.addEventListener("abort", () => { innerAborted = true; }); + await tick(200); + return "ok"; + }, + 1000, + "ext-abort", + { externalSignal: outer.signal }, + ); + outer.abort(); + try { await p; } catch { /* fn never threw, just observed abort */ } + // Use Promise.race fallback if fn happens to finish first; for safety + // wait briefly and check the observed flag. + await tick(10); + expect(innerAborted).toBe(true); + }); + + test("already-aborted external signal aborts immediately", async () => { + const outer = new AbortController(); + outer.abort(); + let observedAborted = false; + await withTimeout( + async (signal) => { observedAborted = signal.aborted; return "ok"; }, + 1000, + undefined, + { externalSignal: outer.signal }, + ); + expect(observedAborted).toBe(true); + }); +}); + +describe("withTimeout — cleanup", () => { + test("clears timer on successful return (no dangling handles)", async () => { + // Indirect test: after happy-path return, the timer must not still + // fire and crash the test. We assert by running another short test + // after the first one finishes well before the deadline. + await withTimeout(async () => "first", 5000); + await tick(20); + // If the timer hadn't been cleared we'd still have it scheduled — + // bun's test runner would warn about open handles. The mere fact + // that this second call also resolves cleanly is the assertion. + const r = await withTimeout(async () => "second", 5000); + expect(r).toBe("second"); + }); +}); + +describe("resolveTimeoutMs — precedence", () => { + test("env wins over flag and default", () => { + const r = resolveTimeoutMs({ envValue: "1234", flagValue: 5000, defaultMs: 10000 }); + expect(r).toEqual({ valueMs: 1234, source: "env", clamped: false }); + }); + + test("flag wins when env is missing", () => { + const r = resolveTimeoutMs({ envValue: undefined, flagValue: 5000, defaultMs: 10000 }); + expect(r).toEqual({ valueMs: 5000, source: "flag", clamped: false }); + }); + + test("default wins when env and flag both missing", () => { + const r = resolveTimeoutMs({ defaultMs: 10000 }); + expect(r).toEqual({ valueMs: 10000, source: "default", clamped: false }); + }); + + test("flag wins when env is empty string (treated as unset)", () => { + const r = resolveTimeoutMs({ envValue: "", flagValue: 5000, defaultMs: 10000 }); + expect(r.source).toBe("flag"); + }); + + test("flag wins when env is non-numeric garbage", () => { + const r = resolveTimeoutMs({ envValue: "asdf", flagValue: 5000, defaultMs: 10000 }); + expect(r.source).toBe("flag"); + }); + + test("flag wins when env is negative", () => { + const r = resolveTimeoutMs({ envValue: "-1", flagValue: 5000, defaultMs: 10000 }); + expect(r.source).toBe("flag"); + }); + + test("default wins when flag is NaN", () => { + const r = resolveTimeoutMs({ flagValue: NaN, defaultMs: 10000 }); + expect(r.source).toBe("default"); + }); + + test("zero is honoured (not treated as unset) — env=0 disables timeout", () => { + const r = resolveTimeoutMs({ envValue: "0", flagValue: 5000, defaultMs: 10000 }); + expect(r.valueMs).toBe(0); + expect(r.source).toBe("env"); + }); + + test("zero is honoured at flag level too", () => { + const r = resolveTimeoutMs({ flagValue: 0, defaultMs: 10000 }); + expect(r.valueMs).toBe(0); + expect(r.source).toBe("flag"); + }); +}); + +describe("resolveTimeoutMs — clamping", () => { + test("clamps below minMs and reports clamped=true", () => { + const r = resolveTimeoutMs({ envValue: "1", defaultMs: 10000, minMs: 100 }); + expect(r).toEqual({ valueMs: 100, source: "env", clamped: true }); + }); + + test("clamps above maxMs and reports clamped=true", () => { + const r = resolveTimeoutMs({ flagValue: 999_999_999, defaultMs: 10000, maxMs: 60_000 }); + expect(r).toEqual({ valueMs: 60_000, source: "flag", clamped: true }); + }); + + test("in-bounds value is not clamped", () => { + const r = resolveTimeoutMs({ envValue: "5000", defaultMs: 10000, minMs: 100, maxMs: 60_000 }); + expect(r.clamped).toBe(false); + }); + + test("default value also gets clamped (configuration sanity)", () => { + const r = resolveTimeoutMs({ defaultMs: 999, maxMs: 500 }); + expect(r).toEqual({ valueMs: 500, source: "default", clamped: true }); + }); +}); + +describe("resolveTimeoutMs — defensive null handling", () => { + test("null envValue is treated as unset", () => { + const r = resolveTimeoutMs({ envValue: null, defaultMs: 100 }); + expect(r.source).toBe("default"); + }); + + test("null flagValue is treated as unset", () => { + const r = resolveTimeoutMs({ flagValue: null, defaultMs: 100 }); + expect(r.source).toBe("default"); + }); +}); diff --git a/agent-node/src/util/timeout.ts b/agent-node/src/util/timeout.ts new file mode 100644 index 00000000..a62cf700 --- /dev/null +++ b/agent-node/src/util/timeout.ts @@ -0,0 +1,158 @@ +// Unified timeout primitive for runtime turns + telegram long-poll + think() +// queue. Single shape consumed by claude / codex / grok / telegram so the +// timeout semantics (and the abort-signal contract that propagates into +// fetch / AbortController-aware code) are identical across every call site. +// +// Pre-redirect: each runtime carried its own deadline shape — claude an +// inline AbortController + setTimeout pair, codex no wall-clock deadline at +// all (zero guard against a wedged turn), grok-build-acp a single 300s +// `timeoutMs` reused for BOTH handshake (initialize / authenticate / +// session/new) and the long-running session/prompt (the latter rightly +// stays on its idle-timeout path; the former should fail fast at ~45s). +// Telegram getUpdates relied on the server-side `timeout=30` and was +// unprotected against a wedged TCP socket. +// +// This util gives each call site: +// - a hard wall-clock deadline + a TimeoutError carrying the label +// - an AbortSignal it can plug into fetch() / AbortController-aware libs +// - the zero-deadline sentinel (`timeoutMs <= 0` disables the timer) +// +// Plus `resolveTimeoutMs(envValue, flagValue, defaultMs)` for the +// "env > flag > default" precedence already in use in +// resolveGrokAcpTimeout, generalised so future runtime knobs follow the +// same precedence without each runtime re-implementing it. + +/** + * Error thrown when {@link withTimeout} hits its deadline. Carries the + * label + ms so log/handler code can surface the failing call site + * without re-stringifying the message. + */ +export class TimeoutError extends Error { + readonly label?: string; + readonly timeoutMs: number; + constructor(timeoutMs: number, label?: string) { + super(`${label ? `[${label}] ` : ""}timed out after ${timeoutMs}ms`); + this.name = "TimeoutError"; + this.label = label; + this.timeoutMs = timeoutMs; + } +} + +/** + * Race `fn(signal)` against a wall-clock timer. The factory receives an + * AbortSignal that fires when the timeout elapses so the caller can + * cancel in-flight work (fetch / AbortController-aware SDKs). + * + * Resolves with the factory's value if it settles first. + * Rejects with {@link TimeoutError} if the timer fires first. + * + * If `timeoutMs <= 0`, the timeout is disabled (`fn` runs with a + * never-aborted signal). This preserves the pre-existing + * `CLAUDE_TIMEOUT_MS=0 → no deadline` sentinel. + * + * If a pre-existing AbortSignal is supplied via `opts.externalSignal`, + * abortion from that signal is propagated into the factory's signal + * (allowing callers to participate in caller-supplied cancellation). + */ +export interface WithTimeoutOpts { + externalSignal?: AbortSignal; +} + +export async function withTimeout( + fn: (signal: AbortSignal) => Promise, + timeoutMs: number, + label?: string, + opts?: WithTimeoutOpts, +): Promise { + const ac = new AbortController(); + let onExternalAbort: (() => void) | undefined; + if (opts?.externalSignal) { + if (opts.externalSignal.aborted) ac.abort(); + else { + onExternalAbort = () => ac.abort(); + opts.externalSignal.addEventListener("abort", onExternalAbort, { once: true }); + } + } + + if (timeoutMs <= 0) { + try { + return await fn(ac.signal); + } finally { + if (onExternalAbort && opts?.externalSignal) { + opts.externalSignal.removeEventListener("abort", onExternalAbort); + } + } + } + + let timer: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => { + ac.abort(); + reject(new TimeoutError(timeoutMs, label)); + }, timeoutMs); + }); + try { + return await Promise.race([fn(ac.signal), timeoutPromise]); + } finally { + if (timer) clearTimeout(timer); + if (onExternalAbort && opts?.externalSignal) { + opts.externalSignal.removeEventListener("abort", onExternalAbort); + } + } +} + +/** + * Resolve a timeout knob from (env > flag > default) with bounds + * clamping. Mirrors and generalises `resolveGrokAcpTimeout`. + * + * - `envValue`: raw string from `process.env.X` (parsed as integer ms). + * Non-numeric / negative / empty values fall through to flag. + * - `flagValue`: number from `config.json.flags.X`. Non-finite / negative + * values fall through to default. + * - `defaultMs`: required fallback. + * - `minMs` / `maxMs`: optional bounds (clamping reported via `clamped`). + * + * Returns the resolved value plus the source label (useful for boot + * logging) and a `clamped` flag (useful for "config out of bounds" + * warnings). + */ +export interface TimeoutResolveOpts { + envValue?: string | undefined | null; + flagValue?: number | undefined | null; + defaultMs: number; + minMs?: number; + maxMs?: number; +} + +export interface TimeoutResolveResult { + valueMs: number; + source: "env" | "flag" | "default"; + clamped: boolean; +} + +export function resolveTimeoutMs(opts: TimeoutResolveOpts): TimeoutResolveResult { + const minMs = opts.minMs ?? 0; + const maxMs = opts.maxMs ?? Number.MAX_SAFE_INTEGER; + const clamp = (n: number): { value: number; clamped: boolean } => { + if (n < minMs) return { value: minMs, clamped: true }; + if (n > maxMs) return { value: maxMs, clamped: true }; + return { value: n, clamped: false }; + }; + if (opts.envValue !== undefined && opts.envValue !== null && opts.envValue !== "") { + const n = Number(opts.envValue); + if (Number.isFinite(n) && n >= 0) { + const c = clamp(n); + return { valueMs: c.value, source: "env", clamped: c.clamped }; + } + } + if ( + typeof opts.flagValue === "number" && + Number.isFinite(opts.flagValue) && + opts.flagValue >= 0 + ) { + const c = clamp(opts.flagValue); + return { valueMs: c.value, source: "flag", clamped: c.clamped }; + } + const c = clamp(opts.defaultMs); + return { valueMs: c.value, source: "default", clamped: c.clamped }; +}