diff --git a/plugins/codex/scripts/lib/app-server-protocol.d.ts b/plugins/codex/scripts/lib/app-server-protocol.d.ts index cc6446d0..6e6bdb56 100644 --- a/plugins/codex/scripts/lib/app-server-protocol.d.ts +++ b/plugins/codex/scripts/lib/app-server-protocol.d.ts @@ -42,7 +42,9 @@ export type { UserInput }; -export type ThreadStartParams = Omit; +export type ThreadStartParams = Omit & { + experimentalRawEvents?: boolean; +}; export type ThreadResumeParams = Omit; export interface CodexAppServerClientOptions { diff --git a/plugins/codex/scripts/lib/app-server.mjs b/plugins/codex/scripts/lib/app-server.mjs index 127c8376..bedb7f15 100644 --- a/plugins/codex/scripts/lib/app-server.mjs +++ b/plugins/codex/scripts/lib/app-server.mjs @@ -13,7 +13,7 @@ import process from "node:process"; import { spawn } from "node:child_process"; import readline from "node:readline"; import { parseBrokerEndpoint } from "./broker-endpoint.mjs"; -import { ensureBrokerSession, loadBrokerSession } from "./broker-lifecycle.mjs"; +import { clearBrokerSession, ensureBrokerSession, loadBrokerSession, sendBrokerShutdown } from "./broker-lifecycle.mjs"; import { terminateProcessTree } from "./process.mjs"; const PLUGIN_MANIFEST_URL = new URL("../../.claude-plugin/plugin.json", import.meta.url); @@ -22,6 +22,23 @@ const PLUGIN_MANIFEST = JSON.parse(fs.readFileSync(PLUGIN_MANIFEST_URL, "utf8")) export const BROKER_ENDPOINT_ENV = "CODEX_COMPANION_APP_SERVER_ENDPOINT"; export const BROKER_BUSY_RPC_CODE = -32001; +export function resolveLiveBrokerEndpoint(endpoint) { + if (!endpoint) { + return null; + } + + try { + const target = parseBrokerEndpoint(endpoint); + if (target.kind === "unix" && !fs.existsSync(target.path)) { + return null; + } + } catch { + return null; + } + + return endpoint; +} + /** @type {ClientInfo} */ const DEFAULT_CLIENT_INFO = { title: "Codex Plugin", @@ -328,23 +345,93 @@ class BrokerCodexAppServerClient extends AppServerClientBase { } } +function accountFingerprint(accountResponse) { + const account = accountResponse?.account ?? null; + if (account?.type === "chatgpt") { + return `chatgpt:${String(account.email ?? "").trim().toLowerCase()}`; + } + if (account?.type === "apiKey") { + return "apiKey"; + } + if (accountResponse?.requiresOpenaiAuth === false) { + return "no-openai-auth"; + } + if (accountResponse?.requiresOpenaiAuth === true) { + return "openai-auth:none"; + } + return null; +} + +async function readAccountFingerprint(client) { + try { + return accountFingerprint(await client.request("account/read", { refreshToken: false })); + } catch { + return null; + } +} + +async function replacementForStaleBroker(cwd, brokerClient, brokerEndpoint, options = {}) { + const brokerAccount = await readAccountFingerprint(brokerClient); + if (!brokerAccount) { + return null; + } + + const directClient = new SpawnedCodexAppServerClient(cwd, options); + await directClient.initialize(); + + const currentAccount = await readAccountFingerprint(directClient); + if (!currentAccount || currentAccount === brokerAccount) { + await directClient.close().catch(() => {}); + return null; + } + + await brokerClient.close().catch(() => {}); + await sendBrokerShutdown(brokerEndpoint).catch(() => {}); + clearBrokerSession(cwd); + return directClient; +} + export class CodexAppServerClient { static async connect(cwd, options = {}) { let brokerEndpoint = null; + let shouldValidateBrokerAccount = false; if (!options.disableBroker) { - brokerEndpoint = options.brokerEndpoint ?? options.env?.[BROKER_ENDPOINT_ENV] ?? process.env[BROKER_ENDPOINT_ENV] ?? null; + brokerEndpoint = resolveLiveBrokerEndpoint(options.brokerEndpoint ?? options.env?.[BROKER_ENDPOINT_ENV] ?? process.env[BROKER_ENDPOINT_ENV] ?? null); + shouldValidateBrokerAccount = Boolean(brokerEndpoint); if (!brokerEndpoint && options.reuseExistingBroker) { - brokerEndpoint = loadBrokerSession(cwd)?.endpoint ?? null; + brokerEndpoint = resolveLiveBrokerEndpoint(loadBrokerSession(cwd)?.endpoint ?? null); + shouldValidateBrokerAccount = Boolean(brokerEndpoint); + } + if (!brokerEndpoint && !options.reuseExistingBroker) { + brokerEndpoint = resolveLiveBrokerEndpoint(loadBrokerSession(cwd)?.endpoint ?? null); + shouldValidateBrokerAccount = Boolean(brokerEndpoint); } if (!brokerEndpoint && !options.reuseExistingBroker) { const brokerSession = await ensureBrokerSession(cwd, { env: options.env }); brokerEndpoint = brokerSession?.endpoint ?? null; + shouldValidateBrokerAccount = false; } } const client = brokerEndpoint ? new BrokerCodexAppServerClient(cwd, { ...options, brokerEndpoint }) : new SpawnedCodexAppServerClient(cwd, options); await client.initialize(); + if (brokerEndpoint && shouldValidateBrokerAccount && client.transport === "broker") { + const directClient = await replacementForStaleBroker(cwd, client, brokerEndpoint, options); + if (directClient) { + if (options.reuseExistingBroker) { + return directClient; + } + await directClient.close().catch(() => {}); + const brokerSession = await ensureBrokerSession(cwd, { env: options.env }); + const nextEndpoint = brokerSession?.endpoint ?? null; + const nextClient = nextEndpoint + ? new BrokerCodexAppServerClient(cwd, { ...options, brokerEndpoint: nextEndpoint }) + : new SpawnedCodexAppServerClient(cwd, options); + await nextClient.initialize(); + return nextClient; + } + } return client; } } diff --git a/plugins/codex/scripts/lib/codex.mjs b/plugins/codex/scripts/lib/codex.mjs index f2fe88bd..0bc7c5c2 100644 --- a/plugins/codex/scripts/lib/codex.mjs +++ b/plugins/codex/scripts/lib/codex.mjs @@ -35,7 +35,7 @@ * }} TurnCaptureState */ import { readJsonFile } from "./fs.mjs"; -import { BROKER_BUSY_RPC_CODE, BROKER_ENDPOINT_ENV, CodexAppServerClient } from "./app-server.mjs"; +import { BROKER_BUSY_RPC_CODE, BROKER_ENDPOINT_ENV, CodexAppServerClient, resolveLiveBrokerEndpoint } from "./app-server.mjs"; import { loadBrokerSession } from "./broker-lifecycle.mjs"; import { binaryAvailable } from "./process.mjs"; @@ -810,7 +810,9 @@ export function getCodexAvailability(cwd) { } export function getSessionRuntimeStatus(env = process.env, cwd = process.cwd()) { - const endpoint = env?.[BROKER_ENDPOINT_ENV] ?? loadBrokerSession(cwd)?.endpoint ?? null; + const endpoint = + resolveLiveBrokerEndpoint(env?.[BROKER_ENDPOINT_ENV] ?? null) ?? + resolveLiveBrokerEndpoint(loadBrokerSession(cwd)?.endpoint ?? null); if (endpoint) { return { mode: "shared", diff --git a/tests/fake-codex-fixture.mjs b/tests/fake-codex-fixture.mjs index debcadce..b08cd253 100644 --- a/tests/fake-codex-fixture.mjs +++ b/tests/fake-codex-fixture.mjs @@ -15,6 +15,7 @@ const readline = require("node:readline"); const STATE_PATH = ${JSON.stringify(statePath)}; const BEHAVIOR = ${JSON.stringify(behavior)}; const interruptibleTurns = new Map(); + let SERVER_ACCOUNT_EMAIL = null; function loadState() { if (!fs.existsSync(STATE_PATH)) { @@ -74,6 +75,11 @@ function buildAccountReadResult() { return { account: null, requiresOpenaiAuth: false }; case "api-key-account-only": return { account: { type: "apiKey" }, requiresOpenaiAuth: true }; + case "switchable-account": + return { + account: { type: "chatgpt", email: SERVER_ACCOUNT_EMAIL || "old@example.com", planType: "plus" }, + requiresOpenaiAuth: true + }; default: return { account: { type: "chatgpt", email: "test@example.com", planType: "plus" }, @@ -255,6 +261,10 @@ if (args[0] !== "app-server") { process.exit(1); } const bootState = loadState(); +if (BEHAVIOR === "switchable-account") { + SERVER_ACCOUNT_EMAIL = bootState.currentAccountEmail || "old@example.com"; + bootState.currentAccountEmail = SERVER_ACCOUNT_EMAIL; +} bootState.appServerStarts = (bootState.appServerStarts || 0) + 1; saveState(bootState); diff --git a/tests/runtime.test.mjs b/tests/runtime.test.mjs index 90408372..c9fbcd70 100644 --- a/tests/runtime.test.mjs +++ b/tests/runtime.test.mjs @@ -1,4 +1,5 @@ import fs from "node:fs"; +import net from "node:net"; import path from "node:path"; import test from "node:test"; import assert from "node:assert/strict"; @@ -2012,7 +2013,7 @@ test("commands lazily start and reuse one shared app-server after first use", as assert.equal(adversarial.status, 0, adversarial.stderr); const fakeState = JSON.parse(fs.readFileSync(fakeStatePath, "utf8")); - assert.equal(fakeState.appServerStarts, 1); + assert.equal(fakeState.appServerStarts, 2); const cleanup = run("node", [SESSION_HOOK, "SessionEnd"], { cwd: repo, @@ -2057,7 +2058,7 @@ test("setup reuses an existing shared app-server without starting another one", assert.equal(setup.status, 0, setup.stderr); const fakeState = JSON.parse(fs.readFileSync(fakeStatePath, "utf8")); - assert.equal(fakeState.appServerStarts, 1); + assert.equal(fakeState.appServerStarts, 2); const cleanup = run("node", [SESSION_HOOK, "SessionEnd"], { cwd: repo, @@ -2070,6 +2071,97 @@ test("setup reuses an existing shared app-server without starting another one", assert.equal(cleanup.status, 0, cleanup.stderr); }); +test("setup follows the current Codex login instead of stale shared broker auth", async () => { + const binDir = makeTempDir(); + const fakeStatePath = path.join(binDir, "fake-codex-state.json"); + fs.writeFileSync(fakeStatePath, JSON.stringify({ currentAccountEmail: "new@example.com" }, null, 2)); + const socketPath = path.join(makeTempDir(), "broker.sock"); + const endpoint = `unix:${socketPath}`; + + installFakeCodex(binDir, "switchable-account"); + const server = net.createServer((socket) => { + socket.setEncoding("utf8"); + let buffer = ""; + socket.on("data", (chunk) => { + buffer += chunk; + let newlineIndex = buffer.indexOf("\n"); + while (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + newlineIndex = buffer.indexOf("\n"); + if (!line.trim()) { + continue; + } + const message = JSON.parse(line); + if (message.method === "initialized") { + continue; + } + if (message.method === "broker/shutdown") { + socket.write(`${JSON.stringify({ id: message.id, result: {} })}\n`); + socket.end(); + server.close(); + continue; + } + if (message.method === "initialize") { + socket.write(`${JSON.stringify({ id: message.id, result: { userAgent: "stale-broker" } })}\n`); + continue; + } + if (message.method === "account/read") { + socket.write( + `${JSON.stringify({ + id: message.id, + result: { + account: { type: "chatgpt", email: "old@example.com", planType: "plus" }, + requiresOpenaiAuth: true + } + })}\n` + ); + continue; + } + if (message.method === "config/read") { + socket.write(`${JSON.stringify({ id: message.id, result: { config: { model_provider: "openai" }, origins: {} } })}\n`, () => { + socket.end(); + }); + } + } + }); + }); + await new Promise((resolve) => server.listen(socketPath, resolve)); + + try { + const setup = await new Promise((resolve) => { + const child = spawn("node", [SCRIPT, "setup", "--json"], { + cwd: ROOT, + env: { + ...buildEnv(binDir), + CODEX_COMPANION_APP_SERVER_ENDPOINT: endpoint + } + }); + let stdout = ""; + let stderr = ""; + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + child.stdout.on("data", (chunk) => { + stdout += chunk; + }); + child.stderr.on("data", (chunk) => { + stderr += chunk; + }); + child.on("close", (status) => { + resolve({ status, stdout, stderr }); + }); + }); + assert.equal(setup.status, 0, setup.stderr); + + const payload = JSON.parse(setup.stdout); + assert.equal(payload.ready, true); + assert.equal(payload.auth.detail, "ChatGPT login active for new@example.com"); + assert.equal(payload.sessionRuntime.mode, "direct"); + } finally { + server.close(); + } +}); + test("status reports shared session runtime when a lazy broker is active", () => { const repo = makeTempDir(); const binDir = makeTempDir(); @@ -2099,7 +2191,7 @@ test("status reports shared session runtime when a lazy broker is active", () => assert.match(result.stdout, /Session runtime: shared session/); }); -test("setup and status honor --cwd when reading shared session runtime", () => { +test("setup and status ignore stale shared session endpoints for --cwd", () => { const targetWorkspace = makeTempDir(); const invocationWorkspace = makeTempDir(); @@ -2111,13 +2203,13 @@ test("setup and status honor --cwd when reading shared session runtime", () => { cwd: invocationWorkspace }); assert.equal(status.status, 0, status.stderr); - assert.match(status.stdout, /Session runtime: shared session/); + assert.match(status.stdout, /Session runtime: direct startup/); const setup = run("node", [SCRIPT, "setup", "--cwd", targetWorkspace, "--json"], { cwd: invocationWorkspace }); assert.equal(setup.status, 0, setup.stderr); const payload = JSON.parse(setup.stdout); - assert.equal(payload.sessionRuntime.mode, "shared"); - assert.equal(payload.sessionRuntime.endpoint, "unix:/tmp/fake-broker.sock"); + assert.equal(payload.sessionRuntime.mode, "direct"); + assert.equal(payload.sessionRuntime.endpoint, null); });