Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 176 additions & 63 deletions agent-node/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ import { CurrentAliasResolver } from "./runtime/current-alias";
import { delegationTargetExists } from "./runtime/delegation-precheck";
import {
isRateLimitOrQuotaError,
isEmptyResultSoftFailure,
quotaRemediationHint,
} from "./runtime/claude-error-classify";
import {
classifyRuntimeResult,
formatClassificationError,
} from "./runtime/classify-result";
import { withTimeout, TimeoutError, resolveTimeoutMs } from "./util/timeout";

const home = homedir();

Expand Down Expand Up @@ -314,6 +318,45 @@ const CLAUDE_MAX_RETRIES = parseInt(
opts["claude-max-retries"] || process.env.CLAUDE_MAX_RETRIES
|| fileConfig.flags?.claudeMaxRetries || fileConfig.claudeMaxRetries || "2"
);
// #261 P1 redirect (2026-06-28) — codex previously had zero wall-clock
// guard, so a wedged turn could hang forever (no abort path, no retry).
// Mirror CLAUDE_TIMEOUT_MS shape but default 300s, settable via env /
// flag for the parity flags listed in docs/runbooks/. resolveTimeoutMs
// honours `0` as "disabled" so power users can opt out.
const CODEX_TIMEOUT_MS = resolveTimeoutMs({
envValue: opts["codex-timeout-ms"] || process.env.CODEX_TIMEOUT_MS,
flagValue: typeof fileConfig.flags?.codexTimeoutMs === "number" ? fileConfig.flags.codexTimeoutMs : undefined,
defaultMs: 300_000,
}).valueMs;
// Grok handshake (initialize + authenticate + session/new) is decoupled
// from the prompt timeout. Pre-redirect both shared the same 300s knob,
// so a stuck handshake hid behind the prompt deadline.
//
// Back-compat: when no explicit env / flag is set, leave this `undefined`
// and let `runGrokAcpTurn` apply its own default `min(45s, timeoutMs)`.
// Forcing a fixed 45s here would silently widen the handshake deadline
// for users who already set a tight `GROK_ACP_TIMEOUT_MS` (e.g. 10s) —
// they expect handshake ≤ timeoutMs, not handshake = 45s.
const GROK_HANDSHAKE_TIMEOUT_MS: number | undefined = (() => {
const envSet = process.env.GROK_HANDSHAKE_TIMEOUT_MS !== undefined
&& process.env.GROK_HANDSHAKE_TIMEOUT_MS !== "";
const flagSet = typeof fileConfig.flags?.grokHandshakeTimeoutMs === "number";
if (!envSet && !flagSet) return undefined;
return resolveTimeoutMs({
envValue: process.env.GROK_HANDSHAKE_TIMEOUT_MS,
flagValue: typeof fileConfig.flags?.grokHandshakeTimeoutMs === "number"
? fileConfig.flags.grokHandshakeTimeoutMs
: undefined,
defaultMs: 45_000,
}).valueMs;
})();
// Telegram getUpdates long-poll (server-side 30s) needs a client-side
// safety net so a wedged TCP socket / DPI drop doesn't pin the poll
// loop forever. 45s leaves 15s headroom over the server timeout.
const TELEGRAM_GETUPDATES_TIMEOUT_MS = resolveTimeoutMs({
envValue: process.env.TELEGRAM_GETUPDATES_TIMEOUT_MS,
defaultMs: 45_000,
}).valueMs;
const NEW_SESSION = opts["new-session"] === "true";
const SESSION_ID = NEW_SESSION ? "" : (
RUNTIME === "grok"
Expand Down Expand Up @@ -1522,56 +1565,70 @@ async function processWithClaude(task: string, from: string, images?: string[]):
// abort controller + timeout window. On auth-class error, short-circuit
// (fast-fail, no retry). On transient error / timeout, backoff 4s, 8s
// (+ jitter to spread herd retries across the vendor queue) and retry.
//
// #261 P1 redirect (2026-06-28) — wrap the for-await loop in withTimeout
// so the deadline / abort / cleanup contract matches codex / grok /
// telegram. The SDK's `query()` watches `options.abortController.signal`
// for cancellation, so we still create an AbortController inside the
// factory and forward withTimeout's signal into it.
let lastErr: string = "";
let timedOutFinal = false;
for (let attempt = 0; attempt <= CLAUDE_MAX_RETRIES; attempt++) {
let timedOut = false;
let timer: ReturnType<typeof setTimeout> | undefined;
const ac = new AbortController();
options.abortController = ac;
if (CLAUDE_TIMEOUT_MS > 0) {
timer = setTimeout(() => { timedOut = true; ac.abort(); }, CLAUDE_TIMEOUT_MS);
}
const attemptStart = Date.now();
try {
result = ""; // reset accumulator for this attempt
for await (const message of query({ prompt, options })) {
const m = message as any;
if (m.type === "system" && m.subtype === "init") {
claudeSessionId = m.session_id;
log(`[claude] session=${m.session_id?.slice(0, 8)} model=${MODEL || "default"} attempt=${attempt + 1}`);
writebackSession(m.session_id);
}
if (m.type === "result") {
const dt = Date.now() - t0;
const u = m.usage || {};
log(`[claude] ${m.subtype} | ${dt}ms | $${m.total_cost_usd?.toFixed(4) || "?"} | in=${u.input_tokens || 0} out=${u.output_tokens || 0} | turns=${m.num_turns}${attempt > 0 ? ` | attempt=${attempt + 1}` : ""}`);
if (m.subtype === "success") {
// #261 P1-① (2026-06-28) — empty-result soft-failure gate.
// Pre-fix `m.result || "任务完成"` silently rebranded an
// empty vendor reply as "task complete" — root cause of the
// M3 incident where a silent rate-limit produced 0 output
// tokens but the upstream caller saw "任务完成" and moved on.
// Now: classifier surfaces the empty reply as a soft fail
// with a vendor-specific hint, so the next hop / operator
// sees the truth.
if (isEmptyResultSoftFailure(m)) {
const hint = quotaRemediationHint(process.env.ANTHROPIC_BASE_URL);
log(`[claude] ✗ vendor returned EMPTY result despite success subtype (in=${u.input_tokens || 0}, out=${u.output_tokens || 0}, result.len=${String(m.result || "").length}) — treating as soft fail`);
result = `执行出错: vendor 返回空响应 (in=${u.input_tokens || 0} out=${u.output_tokens || 0}) — 疑似 vendor 静默限流/配额. ${hint}`;
} else {
result = m.result;
result = await withTimeout(async (signal) => {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid overlapping Claude retries after timeout

When CLAUDE_TIMEOUT_MS fires and CLAUDE_MAX_RETRIES is nonzero, withTimeout rejects as soon as it aborts the signal, so the retry loop can start the next query() after the backoff even if the previous SDK turn is still unwinding or running a tool. The old code only called ac.abort() and did not leave the for await until the SDK settled, so slow/abort-ignoring tools could not overlap; now the same task can be executed twice and duplicate side effects such as file edits or send_task calls. On timeout, either avoid retrying or wait for the aborted attempt to finish cleanup before starting another attempt.

Useful? React with 👍 / 👎.

let inner = "";
const ac = new AbortController();
options.abortController = ac;
const forward = () => ac.abort();
signal.addEventListener("abort", forward, { once: true });
try {
for await (const message of query({ prompt, options })) {
const m = message as any;
if (m.type === "system" && m.subtype === "init") {
claudeSessionId = m.session_id;
log(`[claude] session=${m.session_id?.slice(0, 8)} model=${MODEL || "default"} attempt=${attempt + 1}`);
writebackSession(m.session_id);
}
if (m.type === "result") {
const dt = Date.now() - t0;
const u = m.usage || {};
log(`[claude] ${m.subtype} | ${dt}ms | $${m.total_cost_usd?.toFixed(4) || "?"} | in=${u.input_tokens || 0} out=${u.output_tokens || 0} | turns=${m.num_turns}${attempt > 0 ? ` | attempt=${attempt + 1}` : ""}`);
if (m.subtype === "success") {
// #261 P1 redirect (2026-06-28) — delegate to classifyRuntimeResult
// which folds the empty-result rule from #267 + the in=0 & out=0
// & cost=0 silent-reject rule into one decision shared with
// codex / grok. Pre-fix `m.result || "任务完成"` silently
// rebranded an empty vendor reply as "task complete" — the M3
// incident shape. Now a non-success classification surfaces a
// soft-fail string the upstream caller can act on.
const cls = classifyRuntimeResult(
{ result: m.result, usage: m.usage, totalCostUsd: m.total_cost_usd },
{ baseUrl: process.env.ANTHROPIC_BASE_URL },
);
if (cls.kind === "success") {
inner = m.result;
} else {
log(`[claude] ✗ ${cls.reason || cls.kind} (in=${u.input_tokens || 0}, out=${u.output_tokens || 0}, cost=${m.total_cost_usd ?? "?"})`);
inner = formatClassificationError(cls, { runtime: "claude-agent-sdk", usage: m.usage });
}
} else {
inner = `执行出错: ${m.error || m.result || "未知错误"}`;
}
}
} else {
result = `执行出错: ${m.error || m.result || "未知错误"}`;
}
return inner;
} finally {
signal.removeEventListener("abort", forward);
}
}
if (timer) clearTimeout(timer);
}, CLAUDE_TIMEOUT_MS, `claude-attempt-${attempt + 1}/${CLAUDE_MAX_RETRIES + 1}`);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate Claude timeout before passing it through

If CLAUDE_TIMEOUT_MS, --claude-timeout-ms, or the config value is a non-numeric string, parseInt produces NaN; the old CLAUDE_TIMEOUT_MS > 0 guard simply disabled the timer in that case, but withTimeout only treats <= 0 as disabled and passes NaN to setTimeout, which fires immediately. A typo in the timeout knob will therefore make every Claude attempt time out instantly; resolve or validate this value before calling withTimeout.

Useful? React with 👍 / 👎.

return result;
} catch (err: any) {
if (timer) clearTimeout(timer);
const msg = String(err?.message || err).slice(0, 300);
timedOut = err instanceof TimeoutError;
const msg = err instanceof TimeoutError
? `timed out after ${err.timeoutMs}ms`
: String(err?.message || err).slice(0, 300);
const attemptDt = Date.now() - attemptStart;

// Fast-fail on auth errors — no point retrying with the same bad key.
Expand Down Expand Up @@ -1764,33 +1821,76 @@ async function processWithCodex(task: string, from: string, images?: string[]):
: promptText;
const t0 = Date.now();
try {
const { events } = await codexThread.runStreamed(input);
let finalResponse = "";
let usage: any = null;
let itemCount = 0;
for await (const ev of events) {
if (ev.type === "item.started") {
const it = ev.item as any;
debug(`[codex] ${it.type}${it.command ? `: ${it.command.slice(0, 60)}` : it.tool ? `: ${it.server}/${it.tool}` : ""}`);
} else if (ev.type === "item.completed") {
itemCount++;
const it = ev.item as any;
if (it.type === "agent_message") finalResponse = it.text || "";
if (it.type === "command_execution") debug(`[codex] cmd exit=${it.exit_code} | ${it.aggregated_output?.slice(0, 80)}`);
if (it.type === "reasoning") debug(`[codex] thinking: ${it.text?.slice(0, 80)}`);
if (it.type === "mcp_tool_call") debug(`[codex] mcp: ${it.server}/${it.tool} → ${it.status}`);
} else if (ev.type === "turn.completed") {
usage = ev.usage;
}
}
// #261 P1 redirect (2026-06-28) — wrap the codex turn in withTimeout.
// Pre-fix: codex had ZERO wall-clock guard — a wedged turn (vendor
// outage, dropped TCP) would hang the agent-node forever with no
// abort path. Codex SDK's `TurnOptions.signal` lets us propagate
// cancellation cleanly when the deadline fires.
const outcome = await withTimeout(
async (signal) => {
const { events } = await codexThread.runStreamed(input, { signal });
let finalResponse = "";
let usage: any = null;
let itemCount = 0;
for await (const ev of events) {
if (ev.type === "item.started") {
const it = ev.item as any;
debug(`[codex] ${it.type}${it.command ? `: ${it.command.slice(0, 60)}` : it.tool ? `: ${it.server}/${it.tool}` : ""}`);
} else if (ev.type === "item.completed") {
itemCount++;
const it = ev.item as any;
if (it.type === "agent_message") finalResponse = it.text || "";
if (it.type === "command_execution") debug(`[codex] cmd exit=${it.exit_code} | ${it.aggregated_output?.slice(0, 80)}`);
if (it.type === "reasoning") debug(`[codex] thinking: ${it.text?.slice(0, 80)}`);
if (it.type === "mcp_tool_call") debug(`[codex] mcp: ${it.server}/${it.tool} → ${it.status}`);
} else if (ev.type === "turn.completed") {
usage = ev.usage;
}
}
return { finalResponse, usage, itemCount };
},
CODEX_TIMEOUT_MS,
"codex-turn",
);
const dt = Date.now() - t0;
const inTokens = usage?.input_tokens || 0;
log(`[codex] done | ${dt}ms | in=${inTokens} out=${usage?.output_tokens || 0} | items=${itemCount}`);
const inTokens = outcome.usage?.input_tokens || 0;
log(`[codex] done | ${dt}ms | in=${inTokens} out=${outcome.usage?.output_tokens || 0} | items=${outcome.itemCount}`);
if (codexThread?.id) writebackSession(codexThread.id);
// Auto-compact 由 Codex CLI 原生处理(model_auto_compact_token_limit=200000)

return finalResponse || "(无回复)";
// #261 P1 redirect — classify codex turn result so silent reject
// (in=0 & out=0) or empty reply surfaces as a soft failure too,
// not just `"(无回复)"`. Mirrors the claude path so the
// upstream caller sees one consistent error shape.
const cls = classifyRuntimeResult(
{ result: outcome.finalResponse, usage: outcome.usage },
{ baseUrl: process.env.OPENAI_BASE_URL || process.env.OPENAI_API_BASE },
);
if (cls.kind !== "success") {
log(`[codex] ✗ ${cls.reason || cls.kind} (in=${inTokens}, out=${outcome.usage?.output_tokens || 0})`);
return formatClassificationError(cls, { runtime: "codex-sdk", usage: outcome.usage });
}
return outcome.finalResponse || "(无回复)";
} catch (e: any) {
// #261 P1 redirect — TimeoutError gets a specific message so the
// upstream caller can distinguish "vendor wedged" from "vendor
// errored". Codex SDK aborts cleanly on signal, but the wedged-TCP
// case (no events flowing) is exactly what the timeout catches.
if (e instanceof TimeoutError) {
log(`[codex] ✗ ${e.message}; reset thread for next turn`);
codexThread = null;
return `执行出错: codex-sdk 调用超时 (${Math.round(CODEX_TIMEOUT_MS / 1000)}s) — 检查 OPENAI_BASE_URL / vendor 负载`;
}
// #261 P1 redirect — fast-fail on quota the same way claude does, so
// a 429-flooded codex node doesn't keep tearing down + rebuilding
// its thread on every backoff cycle (rebuild is expensive — full
// codex CLI spawn). Mirrors the claude-runtime fast-fail path.
const msg0 = String(e?.message || e).slice(0, 300);
if (isRateLimitOrQuotaError(msg0)) {
const hint = quotaRemediationHint(process.env.OPENAI_BASE_URL);
log(`[codex] ✗ vendor rate-limit/quota: ${msg0.slice(0, 150)}`);
return `执行出错: codex 限流/配额耗尽 (${msg0.slice(0, 80)}) — ${hint}`;
}
log(`codex thread error: ${e.message}, 重建`);
const codex = new Codex({ config: CODEX_CONFIG });
codexThread = codex.startThread({
Expand Down Expand Up @@ -2124,6 +2224,10 @@ async function processWithGrok(task: string, from: string, images?: string[]): P
flagValue: fileConfig.flags?.grokAcpTimeoutMs,
defaultMs: 300000,
}).valueMs,
// #261 P1 redirect — handshake decoupled from prompt deadline.
// 45s default keeps wedged initialize / authenticate / session
// bounded without affecting long-running prompts.
handshakeTimeoutMs: GROK_HANDSHAKE_TIMEOUT_MS,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Respect shorter Grok prompt timeouts for handshakes

This affects Grok nodes that set GROK_ACP_TIMEOUT_MS or flags.grokAcpTimeoutMs below 45s: this explicit default handshakeTimeoutMs bypasses runGrokAcpTurn's min(45_000, timeoutMs) fallback, so a node that previously bounded initialize/auth/session by (for example) 10s now waits the full 45s unless it also discovers and sets the new handshake knob. Compute the handshake default from the resolved prompt timeout, or only pass this option when the new knob is explicitly configured.

Useful? React with 👍 / 👎.

drainMs: resolveGrokAcpTimeout({
envValue: process.env.GROK_ACP_DRAIN_MS,
flagValue: fileConfig.flags?.grokAcpDrainMs,
Expand Down Expand Up @@ -2936,7 +3040,16 @@ async function connectTelegram(channel: TelegramChannel) {
log(`Telegram polling: ${channel.dir}`);
while (true) {
try {
const res = await fetch(`${tg.apiBase}/getUpdates?offset=${tg.offset}&timeout=30`);
// #261 P1 redirect (2026-06-28) — wrap getUpdates in withTimeout so
// a wedged TCP socket / DPI drop can't pin the poll loop forever.
// Server-side timeout is 30s; client deadline is 45s leaving 15s
// headroom. AbortSignal propagates into fetch so the dangling
// request is torn down on deadline, not just the await.
const res = await withTimeout(
(signal) => fetch(`${tg.apiBase}/getUpdates?offset=${tg.offset}&timeout=30`, { signal }),
TELEGRAM_GETUPDATES_TIMEOUT_MS,
"telegram-getUpdates",
);
Comment on lines +3048 to +3052

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep the Telegram body read under the timeout

This only bounds the fetch() promise, which resolves when response headers arrive; if Telegram or an intermediate connection sends headers and then stalls before the JSON body completes, the timer is cleared and the following res.json() can still pin the polling loop indefinitely. Keep the body read inside the withTimeout callback (return the parsed JSON there) so the abort signal and 45s deadline cover the entire getUpdates request.

Useful? React with 👍 / 👎.

const data = await res.json() as any;
if (!data.ok) throw new Error(data.description || "getUpdates failed");
for (const update of data.result || []) {
Expand Down
Loading
Loading