diff --git a/plugins/codex/scripts/codex-companion.mjs b/plugins/codex/scripts/codex-companion.mjs index 35222fd5..fd5d0fd7 100644 --- a/plugins/codex/scripts/codex-companion.mjs +++ b/plugins/codex/scripts/codex-companion.mjs @@ -18,7 +18,8 @@ import { parseStructuredOutput, readOutputSchema, runAppServerReview, - runAppServerTurn + runAppServerTurn, + TurnWatchdogError } from "./lib/codex.mjs"; import { readStdinIfPiped } from "./lib/fs.mjs"; import { collectReviewContext, ensureGitRepository, resolveReviewTarget } from "./lib/git.mjs"; @@ -1021,6 +1022,19 @@ async function main() { } main().catch((error) => { + if (error instanceof TurnWatchdogError) { + process.stderr.write( + JSON.stringify({ + error: "TurnWatchdogTimeout", + message: error.message, + watchdogMs: error.watchdogMs, + threadId: error.threadId, + turnId: error.turnId + }) + "\n" + ); + process.exitCode = error.exitCode ?? 124; + return; + } const message = error instanceof Error ? error.message : String(error); process.stderr.write(`${message}\n`); process.exitCode = 1; diff --git a/plugins/codex/scripts/lib/codex.mjs b/plugins/codex/scripts/lib/codex.mjs index f2fe88bd..dd097415 100644 --- a/plugins/codex/scripts/lib/codex.mjs +++ b/plugins/codex/scripts/lib/codex.mjs @@ -293,6 +293,24 @@ function describeCompletedItem(state, item) { } } +/** + * Thrown by captureTurn when no JSON-RPC notification arrives within the + * configured watchdog window. Wrappers can detect via `instanceof` or by + * `error.code === 'TURN_WATCHDOG_TIMEOUT'` and propagate exit code 124 + * (matching `timeout(1)` convention). + */ +export class TurnWatchdogError extends Error { + constructor(message, { watchdogMs, threadId, turnId } = {}) { + super(message); + this.name = "TurnWatchdogError"; + this.code = "TURN_WATCHDOG_TIMEOUT"; + this.exitCode = 124; + this.watchdogMs = watchdogMs ?? null; + this.threadId = threadId ?? null; + this.turnId = turnId ?? null; + } +} + /** @returns {TurnCaptureState} */ function createTurnCaptureState(threadId, options = {}) { let resolveCompletion; @@ -319,6 +337,11 @@ function createTurnCaptureState(threadId, options = {}) { pendingCollaborations: new Set(), activeSubagentTurns: new Set(), completionTimer: null, + watchdogTimer: null, + watchdogMs: + typeof options.watchdogMs === "number" && options.watchdogMs > 0 + ? options.watchdogMs + : null, lastAgentMessage: "", reviewText: "", reasoningSummary: [], @@ -337,12 +360,54 @@ function clearCompletionTimer(state) { } } +function disarmWatchdog(state) { + if (state.watchdogTimer) { + clearTimeout(state.watchdogTimer); + state.watchdogTimer = null; + } +} + +function armWatchdog(state) { + if (!state.watchdogMs || state.completed) { + return; + } + disarmWatchdog(state); + state.watchdogTimer = setTimeout(() => { + state.watchdogTimer = null; + if (state.completed) { + return; + } + state.completed = true; + clearCompletionTimer(state); + const message = + `Codex turn watchdog fired after ${state.watchdogMs}ms of silence ` + + `(thread ${state.threadId}, turn ${state.turnId ?? "pending"}). ` + + `No JSON-RPC notification arrived in that window.`; + state.rejectCompletion( + new TurnWatchdogError(message, { + watchdogMs: state.watchdogMs, + threadId: state.threadId, + turnId: state.turnId + }) + ); + }, state.watchdogMs); + state.watchdogTimer.unref?.(); +} + +function kickWatchdog(state) { + if (!state.watchdogMs || state.completed) { + return; + } + armWatchdog(state); +} + function completeTurn(state, turn = null, options = {}) { if (state.completed) { return; } clearCompletionTimer(state); + disarmWatchdog(state); state.completed = true; if (turn) { @@ -555,6 +620,8 @@ async function captureTurn(client, threadId, startRequest, options = {}) { const previousHandler = client.notificationHandler; client.setNotificationHandler((message) => { + kickWatchdog(state); + if (!state.turnId) { state.bufferedNotifications.push(message); return; @@ -576,6 +643,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) { }); try { + armWatchdog(state); const response = await startRequest(); options.onResponse?.(response, state); state.turnId = response.turn?.id ?? null; @@ -600,6 +668,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) { return await state.completion; } finally { clearCompletionTimer(state); + disarmWatchdog(state); client.setNotificationHandler(previousHandler ?? null); } } @@ -1009,7 +1078,13 @@ export async function runAppServerTurn(cwd, options = {}) { effort: options.effort ?? null, outputSchema: options.outputSchema ?? null }), - { onProgress: options.onProgress } + { + onProgress: options.onProgress, + watchdogMs: + typeof options.watchdogMs === "number" + ? options.watchdogMs + : Number(process.env.CODEX_TURN_WATCHDOG_MS) || null + } ); return { diff --git a/tests/turn-watchdog.test.mjs b/tests/turn-watchdog.test.mjs new file mode 100644 index 00000000..6e0a5814 --- /dev/null +++ b/tests/turn-watchdog.test.mjs @@ -0,0 +1,31 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { TurnWatchdogError } from "../plugins/codex/scripts/lib/codex.mjs"; + +test("TurnWatchdogError carries code, exitCode, and metadata", () => { + const err = new TurnWatchdogError("watchdog fired after 600000ms", { + watchdogMs: 600000, + threadId: "thr_123", + turnId: "turn_456" + }); + + assert.equal(err.name, "TurnWatchdogError"); + assert.equal(err.code, "TURN_WATCHDOG_TIMEOUT"); + assert.equal(err.exitCode, 124); + assert.equal(err.watchdogMs, 600000); + assert.equal(err.threadId, "thr_123"); + assert.equal(err.turnId, "turn_456"); + assert.equal(err.message, "watchdog fired after 600000ms"); + assert.ok(err instanceof Error); +}); + +test("TurnWatchdogError defaults metadata to null when omitted", () => { + const err = new TurnWatchdogError("silent"); + + assert.equal(err.code, "TURN_WATCHDOG_TIMEOUT"); + assert.equal(err.exitCode, 124); + assert.equal(err.watchdogMs, null); + assert.equal(err.threadId, null); + assert.equal(err.turnId, null); +});