Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion plugins/codex/scripts/lib/app-server.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ const PLUGIN_MANIFEST = JSON.parse(fs.readFileSync(PLUGIN_MANIFEST_URL, "utf8"))
export const BROKER_ENDPOINT_ENV = "CODEX_COMPANION_APP_SERVER_ENDPOINT";
export const BROKER_BUSY_RPC_CODE = -32001;

const DEFAULT_RPC_TIMEOUT_MS = 30_000;

function resolveRpcTimeoutMs() {
const raw = process.env.CODEX_APP_SERVER_RPC_TIMEOUT_MS;
if (raw === undefined || raw === "") {
return DEFAULT_RPC_TIMEOUT_MS;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed) || parsed < 0) {
return DEFAULT_RPC_TIMEOUT_MS;
}
return parsed;
}

/** @type {ClientInfo} */
const DEFAULT_CLIENT_INFO = {
title: "Codex Plugin",
Expand Down Expand Up @@ -91,7 +105,26 @@ class AppServerClientBase {
this.nextId += 1;

return new Promise((resolve, reject) => {
this.pending.set(id, { resolve, reject, method });
const timeoutMs = resolveRpcTimeoutMs();
let timer = null;
const wrappedResolve = (value) => {
if (timer) clearTimeout(timer);
resolve(value);
};
const wrappedReject = (err) => {
if (timer) clearTimeout(timer);
reject(err);
};
if (timeoutMs > 0) {
timer = setTimeout(() => {
if (this.pending.get(id)?.reject === wrappedReject) {
this.pending.delete(id);
}
reject(new Error(`codex app-server ${method} request timed out after ${timeoutMs}ms`));
}, timeoutMs);
timer.unref?.();
}
this.pending.set(id, { resolve: wrappedResolve, reject: wrappedReject, method });
this.sendMessage({ id, method, params });
});
}
Expand Down
47 changes: 47 additions & 0 deletions plugins/codex/scripts/lib/codex.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,54 @@ function applyTurnNotification(state, message) {
}
}

const DEFAULT_TURN_IDLE_TIMEOUT_MS = 240_000;

function resolveTurnIdleTimeoutMs() {
const raw = process.env.CODEX_TURN_IDLE_TIMEOUT_MS;
if (raw === undefined || raw === "") {
return DEFAULT_TURN_IDLE_TIMEOUT_MS;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed) || parsed < 0) {
return DEFAULT_TURN_IDLE_TIMEOUT_MS;
}
return parsed;
}

async function captureTurn(client, threadId, startRequest, options = {}) {
const state = createTurnCaptureState(threadId, options);
const previousHandler = client.notificationHandler;

const idleTimeoutMs = resolveTurnIdleTimeoutMs();
let idleTimer = null;
const clearIdle = () => {
if (idleTimer) {
clearTimeout(idleTimer);
idleTimer = null;
}
};
const armIdle = () => {
if (idleTimeoutMs <= 0 || state.completed) {
return;
}
clearIdle();
idleTimer = setTimeout(() => {
idleTimer = null;
if (state.completed) {
return;
}
state.completed = true;
clearCompletionTimer(state);
const tid = state.turnId ?? "(pending)";
state.rejectCompletion(
new Error(`codex turn idle timeout after ${idleTimeoutMs}ms (thread=${state.threadId}, turn=${tid})`)
);
}, idleTimeoutMs);
idleTimer.unref?.();
};

client.setNotificationHandler((message) => {
armIdle();
if (!state.turnId) {
state.bufferedNotifications.push(message);
return;
Expand All @@ -575,8 +618,11 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
applyTurnNotification(state, message);
});

armIdle();

try {
const response = await startRequest();
armIdle();
options.onResponse?.(response, state);
state.turnId = response.turn?.id ?? null;
if (state.turnId) {
Expand All @@ -599,6 +645,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) {

return await state.completion;
} finally {
clearIdle();
clearCompletionTimer(state);
client.setNotificationHandler(previousHandler ?? null);
}
Expand Down
20 changes: 20 additions & 0 deletions tests/fake-codex-fixture.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ rl.on("line", (line) => {
break;

case "thread/start": {
if (BEHAVIOR === "hang-rpc-thread-start") {
// Intentionally drop the request: never respond, never error.
break;
}
if (BEHAVIOR === "auth-run-fails") {
throw new Error("authentication expired; run codex login");
}
Expand Down Expand Up @@ -368,6 +372,22 @@ rl.on("line", (line) => {
}

case "turn/start": {
if (BEHAVIOR === "hang-after-turn-start") {
const thread = ensureThread(state, message.params.threadId);
const hangTurnId = nextTurnId(state);
thread.updatedAt = now();
state.lastTurnStart = {
threadId: message.params.threadId,
turnId: hangTurnId,
model: message.params.model ?? null,
effort: message.params.effort ?? null,
prompt: ""
};
saveState(state);
send({ id: message.id, result: { turn: buildTurn(hangTurnId) } });
// Never emit turn/completed or any further notifications for this turn.
break;
}
const thread = ensureThread(state, message.params.threadId);
const prompt = (message.params.input || [])
.filter((item) => item.type === "text")
Expand Down
52 changes: 52 additions & 0 deletions tests/runtime.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2121,3 +2121,55 @@ test("setup and status honor --cwd when reading shared session runtime", () => {
assert.equal(payload.sessionRuntime.mode, "shared");
assert.equal(payload.sessionRuntime.endpoint, "unix:/tmp/fake-broker.sock");
});

test("task fails fast when an app-server JSON-RPC request never receives a response", () => {
const repo = makeTempDir();
const binDir = makeTempDir();
installFakeCodex(binDir, "hang-rpc-thread-start");
initGitRepo(repo);
fs.writeFileSync(path.join(repo, "README.md"), "hello\n");
run("git", ["add", "README.md"], { cwd: repo });
run("git", ["commit", "-m", "init"], { cwd: repo });

const env = {
...buildEnv(binDir),
CODEX_APP_SERVER_RPC_TIMEOUT_MS: "200"
};
const startedAt = Date.now();
const result = run("node", [SCRIPT, "task", "rpc hang check"], {
cwd: repo,
env
});
const elapsed = Date.now() - startedAt;

assert.notEqual(result.status, 0);
assert.match(result.stderr + result.stdout, /timed out after 200ms/);
// Should fail well before any default 30s timeout — give plenty of slack for
// node startup + IPC, but assert we did not silently wait minutes.
assert.ok(elapsed < 15000, `expected fast timeout, took ${elapsed}ms`);
});

test("task fails fast when the app-server stops emitting notifications mid-turn", () => {
const repo = makeTempDir();
const binDir = makeTempDir();
installFakeCodex(binDir, "hang-after-turn-start");
initGitRepo(repo);
fs.writeFileSync(path.join(repo, "README.md"), "hello\n");
run("git", ["add", "README.md"], { cwd: repo });
run("git", ["commit", "-m", "init"], { cwd: repo });

const env = {
...buildEnv(binDir),
CODEX_TURN_IDLE_TIMEOUT_MS: "200"
};
const startedAt = Date.now();
const result = run("node", [SCRIPT, "task", "turn idle hang check"], {
cwd: repo,
env
});
const elapsed = Date.now() - startedAt;

assert.notEqual(result.status, 0);
assert.match(result.stderr + result.stdout, /turn idle timeout after 200ms/);
assert.ok(elapsed < 15000, `expected fast turn-idle timeout, took ${elapsed}ms`);
});