Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion plugins/codex/scripts/lib/app-server-protocol.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ export type {
UserInput
};

export type ThreadStartParams = Omit<RawThreadStartParams, "persistExtendedHistory">;
export type ThreadStartParams = Omit<RawThreadStartParams, "persistExtendedHistory"> & {
experimentalRawEvents?: boolean;
};
export type ThreadResumeParams = Omit<RawThreadResumeParams, "persistExtendedHistory">;

export interface CodexAppServerClientOptions {
Expand Down
93 changes: 90 additions & 3 deletions plugins/codex/scripts/lib/app-server.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import process from "node:process";
import { spawn } from "node:child_process";
import readline from "node:readline";
import { parseBrokerEndpoint } from "./broker-endpoint.mjs";
import { ensureBrokerSession, loadBrokerSession } from "./broker-lifecycle.mjs";
import { clearBrokerSession, ensureBrokerSession, loadBrokerSession, sendBrokerShutdown } from "./broker-lifecycle.mjs";
import { terminateProcessTree } from "./process.mjs";

const PLUGIN_MANIFEST_URL = new URL("../../.claude-plugin/plugin.json", import.meta.url);
Expand All @@ -22,6 +22,23 @@ const PLUGIN_MANIFEST = JSON.parse(fs.readFileSync(PLUGIN_MANIFEST_URL, "utf8"))
export const BROKER_ENDPOINT_ENV = "CODEX_COMPANION_APP_SERVER_ENDPOINT";
export const BROKER_BUSY_RPC_CODE = -32001;

export function resolveLiveBrokerEndpoint(endpoint) {
if (!endpoint) {
return null;
}

try {
const target = parseBrokerEndpoint(endpoint);
if (target.kind === "unix" && !fs.existsSync(target.path)) {
return null;
}
} catch {
return null;
}

return endpoint;
}

/** @type {ClientInfo} */
const DEFAULT_CLIENT_INFO = {
title: "Codex Plugin",
Expand Down Expand Up @@ -328,23 +345,93 @@ class BrokerCodexAppServerClient extends AppServerClientBase {
}
}

function accountFingerprint(accountResponse) {
const account = accountResponse?.account ?? null;
if (account?.type === "chatgpt") {
return `chatgpt:${String(account.email ?? "").trim().toLowerCase()}`;
}
if (account?.type === "apiKey") {
return "apiKey";
}
if (accountResponse?.requiresOpenaiAuth === false) {
return "no-openai-auth";
}
if (accountResponse?.requiresOpenaiAuth === true) {
return "openai-auth:none";
}
return null;
}

async function readAccountFingerprint(client) {
try {
return accountFingerprint(await client.request("account/read", { refreshToken: false }));
} catch {
return null;
}
}

async function replacementForStaleBroker(cwd, brokerClient, brokerEndpoint, options = {}) {
const brokerAccount = await readAccountFingerprint(brokerClient);
if (!brokerAccount) {
return null;
}

const directClient = new SpawnedCodexAppServerClient(cwd, options);
await directClient.initialize();

const currentAccount = await readAccountFingerprint(directClient);
if (!currentAccount || currentAccount === brokerAccount) {
await directClient.close().catch(() => {});
return null;
}

await brokerClient.close().catch(() => {});
await sendBrokerShutdown(brokerEndpoint).catch(() => {});
clearBrokerSession(cwd);
return directClient;
}

export class CodexAppServerClient {
static async connect(cwd, options = {}) {
let brokerEndpoint = null;
let shouldValidateBrokerAccount = false;
if (!options.disableBroker) {
brokerEndpoint = options.brokerEndpoint ?? options.env?.[BROKER_ENDPOINT_ENV] ?? process.env[BROKER_ENDPOINT_ENV] ?? null;
brokerEndpoint = resolveLiveBrokerEndpoint(options.brokerEndpoint ?? options.env?.[BROKER_ENDPOINT_ENV] ?? process.env[BROKER_ENDPOINT_ENV] ?? null);
shouldValidateBrokerAccount = Boolean(brokerEndpoint);
if (!brokerEndpoint && options.reuseExistingBroker) {
brokerEndpoint = loadBrokerSession(cwd)?.endpoint ?? null;
brokerEndpoint = resolveLiveBrokerEndpoint(loadBrokerSession(cwd)?.endpoint ?? null);
shouldValidateBrokerAccount = Boolean(brokerEndpoint);
}
if (!brokerEndpoint && !options.reuseExistingBroker) {
brokerEndpoint = resolveLiveBrokerEndpoint(loadBrokerSession(cwd)?.endpoint ?? null);
shouldValidateBrokerAccount = Boolean(brokerEndpoint);
}
if (!brokerEndpoint && !options.reuseExistingBroker) {
const brokerSession = await ensureBrokerSession(cwd, { env: options.env });
brokerEndpoint = brokerSession?.endpoint ?? null;
shouldValidateBrokerAccount = false;
}
}
const client = brokerEndpoint
? new BrokerCodexAppServerClient(cwd, { ...options, brokerEndpoint })
: new SpawnedCodexAppServerClient(cwd, options);
await client.initialize();
if (brokerEndpoint && shouldValidateBrokerAccount && client.transport === "broker") {
const directClient = await replacementForStaleBroker(cwd, client, brokerEndpoint, options);
if (directClient) {
if (options.reuseExistingBroker) {
return directClient;
}
await directClient.close().catch(() => {});
const brokerSession = await ensureBrokerSession(cwd, { env: options.env });
const nextEndpoint = brokerSession?.endpoint ?? null;
const nextClient = nextEndpoint
? new BrokerCodexAppServerClient(cwd, { ...options, brokerEndpoint: nextEndpoint })
: new SpawnedCodexAppServerClient(cwd, options);
await nextClient.initialize();
return nextClient;
}
}
return client;
}
}
6 changes: 4 additions & 2 deletions plugins/codex/scripts/lib/codex.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* }} TurnCaptureState
*/
import { readJsonFile } from "./fs.mjs";
import { BROKER_BUSY_RPC_CODE, BROKER_ENDPOINT_ENV, CodexAppServerClient } from "./app-server.mjs";
import { BROKER_BUSY_RPC_CODE, BROKER_ENDPOINT_ENV, CodexAppServerClient, resolveLiveBrokerEndpoint } from "./app-server.mjs";
import { loadBrokerSession } from "./broker-lifecycle.mjs";
import { binaryAvailable } from "./process.mjs";

Expand Down Expand Up @@ -810,7 +810,9 @@ export function getCodexAvailability(cwd) {
}

export function getSessionRuntimeStatus(env = process.env, cwd = process.cwd()) {
const endpoint = env?.[BROKER_ENDPOINT_ENV] ?? loadBrokerSession(cwd)?.endpoint ?? null;
const endpoint =
resolveLiveBrokerEndpoint(env?.[BROKER_ENDPOINT_ENV] ?? null) ??
resolveLiveBrokerEndpoint(loadBrokerSession(cwd)?.endpoint ?? null);
if (endpoint) {
return {
mode: "shared",
Expand Down
10 changes: 10 additions & 0 deletions tests/fake-codex-fixture.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const readline = require("node:readline");
const STATE_PATH = ${JSON.stringify(statePath)};
const BEHAVIOR = ${JSON.stringify(behavior)};
const interruptibleTurns = new Map();
let SERVER_ACCOUNT_EMAIL = null;

function loadState() {
if (!fs.existsSync(STATE_PATH)) {
Expand Down Expand Up @@ -74,6 +75,11 @@ function buildAccountReadResult() {
return { account: null, requiresOpenaiAuth: false };
case "api-key-account-only":
return { account: { type: "apiKey" }, requiresOpenaiAuth: true };
case "switchable-account":
return {
account: { type: "chatgpt", email: SERVER_ACCOUNT_EMAIL || "old@example.com", planType: "plus" },
requiresOpenaiAuth: true
};
default:
return {
account: { type: "chatgpt", email: "test@example.com", planType: "plus" },
Expand Down Expand Up @@ -255,6 +261,10 @@ if (args[0] !== "app-server") {
process.exit(1);
}
const bootState = loadState();
if (BEHAVIOR === "switchable-account") {
SERVER_ACCOUNT_EMAIL = bootState.currentAccountEmail || "old@example.com";
bootState.currentAccountEmail = SERVER_ACCOUNT_EMAIL;
}
bootState.appServerStarts = (bootState.appServerStarts || 0) + 1;
saveState(bootState);

Expand Down
104 changes: 98 additions & 6 deletions tests/runtime.test.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from "node:fs";
import net from "node:net";
import path from "node:path";
import test from "node:test";
import assert from "node:assert/strict";
Expand Down Expand Up @@ -2012,7 +2013,7 @@ test("commands lazily start and reuse one shared app-server after first use", as
assert.equal(adversarial.status, 0, adversarial.stderr);

const fakeState = JSON.parse(fs.readFileSync(fakeStatePath, "utf8"));
assert.equal(fakeState.appServerStarts, 1);
assert.equal(fakeState.appServerStarts, 2);

const cleanup = run("node", [SESSION_HOOK, "SessionEnd"], {
cwd: repo,
Expand Down Expand Up @@ -2057,7 +2058,7 @@ test("setup reuses an existing shared app-server without starting another one",
assert.equal(setup.status, 0, setup.stderr);

const fakeState = JSON.parse(fs.readFileSync(fakeStatePath, "utf8"));
assert.equal(fakeState.appServerStarts, 1);
assert.equal(fakeState.appServerStarts, 2);

const cleanup = run("node", [SESSION_HOOK, "SessionEnd"], {
cwd: repo,
Expand All @@ -2070,6 +2071,97 @@ test("setup reuses an existing shared app-server without starting another one",
assert.equal(cleanup.status, 0, cleanup.stderr);
});

test("setup follows the current Codex login instead of stale shared broker auth", async () => {
const binDir = makeTempDir();
const fakeStatePath = path.join(binDir, "fake-codex-state.json");
fs.writeFileSync(fakeStatePath, JSON.stringify({ currentAccountEmail: "new@example.com" }, null, 2));
const socketPath = path.join(makeTempDir(), "broker.sock");
const endpoint = `unix:${socketPath}`;

installFakeCodex(binDir, "switchable-account");
const server = net.createServer((socket) => {
socket.setEncoding("utf8");
let buffer = "";
socket.on("data", (chunk) => {
buffer += chunk;
let newlineIndex = buffer.indexOf("\n");
while (newlineIndex !== -1) {
const line = buffer.slice(0, newlineIndex);
buffer = buffer.slice(newlineIndex + 1);
newlineIndex = buffer.indexOf("\n");
if (!line.trim()) {
continue;
}
const message = JSON.parse(line);
if (message.method === "initialized") {
continue;
}
if (message.method === "broker/shutdown") {
socket.write(`${JSON.stringify({ id: message.id, result: {} })}\n`);
socket.end();
server.close();
continue;
}
if (message.method === "initialize") {
socket.write(`${JSON.stringify({ id: message.id, result: { userAgent: "stale-broker" } })}\n`);
continue;
}
if (message.method === "account/read") {
socket.write(
`${JSON.stringify({
id: message.id,
result: {
account: { type: "chatgpt", email: "old@example.com", planType: "plus" },
requiresOpenaiAuth: true
}
})}\n`
);
continue;
}
if (message.method === "config/read") {
socket.write(`${JSON.stringify({ id: message.id, result: { config: { model_provider: "openai" }, origins: {} } })}\n`, () => {
socket.end();
});
}
}
});
});
await new Promise((resolve) => server.listen(socketPath, resolve));

try {
const setup = await new Promise((resolve) => {
const child = spawn("node", [SCRIPT, "setup", "--json"], {
cwd: ROOT,
env: {
...buildEnv(binDir),
CODEX_COMPANION_APP_SERVER_ENDPOINT: endpoint
}
});
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => {
stdout += chunk;
});
child.stderr.on("data", (chunk) => {
stderr += chunk;
});
child.on("close", (status) => {
resolve({ status, stdout, stderr });
});
});
assert.equal(setup.status, 0, setup.stderr);

const payload = JSON.parse(setup.stdout);
assert.equal(payload.ready, true);
assert.equal(payload.auth.detail, "ChatGPT login active for new@example.com");
assert.equal(payload.sessionRuntime.mode, "direct");
} finally {
server.close();
}
});

test("status reports shared session runtime when a lazy broker is active", () => {
const repo = makeTempDir();
const binDir = makeTempDir();
Expand Down Expand Up @@ -2099,7 +2191,7 @@ test("status reports shared session runtime when a lazy broker is active", () =>
assert.match(result.stdout, /Session runtime: shared session/);
});

test("setup and status honor --cwd when reading shared session runtime", () => {
test("setup and status ignore stale shared session endpoints for --cwd", () => {
const targetWorkspace = makeTempDir();
const invocationWorkspace = makeTempDir();

Expand All @@ -2111,13 +2203,13 @@ test("setup and status honor --cwd when reading shared session runtime", () => {
cwd: invocationWorkspace
});
assert.equal(status.status, 0, status.stderr);
assert.match(status.stdout, /Session runtime: shared session/);
assert.match(status.stdout, /Session runtime: direct startup/);

const setup = run("node", [SCRIPT, "setup", "--cwd", targetWorkspace, "--json"], {
cwd: invocationWorkspace
});
assert.equal(setup.status, 0, setup.stderr);
const payload = JSON.parse(setup.stdout);
assert.equal(payload.sessionRuntime.mode, "shared");
assert.equal(payload.sessionRuntime.endpoint, "unix:/tmp/fake-broker.sock");
assert.equal(payload.sessionRuntime.mode, "direct");
assert.equal(payload.sessionRuntime.endpoint, null);
});