From da2d5aa8ad5f40ef88741017e7c04769c0533489 Mon Sep 17 00:00:00 2001 From: cullysu Date: Thu, 14 May 2026 15:08:01 +0800 Subject: [PATCH 1/4] fix codex app-server stdio bridge --- install-weixin-service-after-login.cmd | 5 + package-lock.json | 55 ++++- src/core/bridge_coordinator.ts | 7 +- src/providers/codex/app_client.ts | 218 +++++++++++------- src/providers/codex/cli_command.ts | 49 ++++ .../codex/experimental_features_manager.ts | 43 +++- src/providers/codex/review_runner.ts | 13 +- start-weixin-login.cmd | 8 + start-weixin-serve.cmd | 8 + test/providers/codex/app_client.test.ts | 121 ++++++++++ .../experimental_features_manager.test.ts | 37 +++ 11 files changed, 463 insertions(+), 101 deletions(-) create mode 100644 install-weixin-service-after-login.cmd create mode 100644 src/providers/codex/cli_command.ts create mode 100644 start-weixin-login.cmd create mode 100644 start-weixin-serve.cmd diff --git a/install-weixin-service-after-login.cmd b/install-weixin-service-after-login.cmd new file mode 100644 index 0000000..b0057b4 --- /dev/null +++ b/install-weixin-service-after-login.cmd @@ -0,0 +1,5 @@ +@echo off +setlocal +cd /d "%~dp0" +powershell -NoProfile -ExecutionPolicy Bypass -File ".\scripts\service\install-windows-task.ps1" -DefaultCwd "D:\cully\Documents" +powershell -NoProfile -ExecutionPolicy Bypass -File ".\scripts\service\status-windows-task.ps1" diff --git a/package-lock.json b/package-lock.json index 9452276..87852f1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1115,6 +1115,50 @@ "node": ">=18.0.0" } }, + "node_modules/express": { + "version": "5.2.1", + "resolved": "https://registry.npmmirror.com/express/-/express-5.2.1.tgz", + "integrity": "sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==", + "license": "MIT", + "optional": true, + "dependencies": { + "accepts": "^2.0.0", + "body-parser": "^2.2.1", + "content-disposition": "^1.0.0", + "content-type": "^1.0.5", + "cookie": "^0.7.1", + "cookie-signature": "^1.2.1", + "debug": "^4.4.0", + "depd": "^2.0.0", + "encodeurl": "^2.0.0", + "escape-html": "^1.0.3", + "etag": "^1.8.1", + "finalhandler": "^2.1.0", + "fresh": "^2.0.0", + "http-errors": "^2.0.0", + "merge-descriptors": "^2.0.0", + "mime-types": "^3.0.0", + "on-finished": "^2.4.1", + "once": "^1.4.0", + "parseurl": "^1.3.3", + "proxy-addr": "^2.0.7", + "qs": "^6.14.0", + "range-parser": "^1.2.1", + "router": "^2.2.0", + "send": "^1.1.0", + "serve-static": "^2.2.0", + "statuses": "^2.0.1", + "type-is": "^2.0.1", + "vary": "^1.1.2" + }, + "engines": { + "node": ">= 18" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/express" + } + }, "node_modules/express-rate-limit": { "version": "8.4.1", "resolved": "https://registry.npmjs.org/express-rate-limit/-/express-rate-limit-8.4.1.tgz", @@ -1360,6 +1404,16 @@ "node": ">= 0.4" } }, + "node_modules/hono": { + "version": "4.12.18", + "resolved": "https://registry.npmmirror.com/hono/-/hono-4.12.18.tgz", + "integrity": "sha512-RWzP96k/yv0PQfyXnWjs6zot20TqfpfsNXhOnev8d1InAxubW93L11/oNUc3tQqn2G0bSdAOBpX+2uDFHV7kdQ==", + "license": "MIT", + "optional": true, + "engines": { + "node": ">=16.9.0" + } + }, "node_modules/http-errors": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/http-errors/-/http-errors-2.0.1.tgz", @@ -2340,7 +2394,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz", "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/src/core/bridge_coordinator.ts b/src/core/bridge_coordinator.ts index 8da181c..7b5dde0 100644 --- a/src/core/bridge_coordinator.ts +++ b/src/core/bridge_coordinator.ts @@ -9050,7 +9050,12 @@ export class BridgeCoordinator { if (!normalizedName) { return false; } - const features = await this.listCodexExperimentalFeatures(); + let features: CodexExperimentalFeatureInfo[]; + try { + features = await this.listCodexExperimentalFeatures(); + } catch { + return false; + } return features.some((feature) => feature.name.toLowerCase() === normalizedName && feature.enabled); } diff --git a/src/providers/codex/app_client.ts b/src/providers/codex/app_client.ts index fa54fa4..ac8ce93 100644 --- a/src/providers/codex/app_client.ts +++ b/src/providers/codex/app_client.ts @@ -6,6 +6,7 @@ import path from 'node:path'; import { spawn, type ChildProcess } from 'node:child_process'; import { writeSequencedStderrLine } from '../../core/sequenced_stderr.js'; import { readCodexAccountIdentity } from './auth_state.js'; +import { createCodexCliLaunchSpec } from './cli_command.js'; import type { ProviderAppInfo, ProviderApprovalRequest, @@ -278,6 +279,8 @@ interface ProgressState { lastAssistantActivityAt: number; } +type CodexAppServerTransport = 'auto' | 'websocket' | 'stdio'; + interface CodexAppClientOptions { codexCliBin: string; codexCliArgs?: string[]; @@ -289,6 +292,7 @@ interface CodexAppClientOptions { clientInfo?: CodexClientInfo; spawnImpl?: typeof spawn; webSocketFactory?: (url: string) => WebSocket; + appServerTransport?: CodexAppServerTransport | string | null; platform?: NodeJS.Platform; logger?: CodexAppLogger; turnPollSleep?: (ms: number) => Promise; @@ -329,6 +333,8 @@ export class CodexAppClient extends EventEmitter { webSocketFactory: (url: string) => WebSocket; + appServerTransport: CodexAppServerTransport; + platform: NodeJS.Platform; logger: CodexAppLogger; @@ -341,6 +347,10 @@ export class CodexAppClient extends EventEmitter { socket: WebSocket | null; + transportKind: 'websocket' | 'stdio' | null; + + stdioLineBuffer: string; + pending: Map; pendingApprovals: Map; @@ -374,6 +384,7 @@ export class CodexAppClient extends EventEmitter { }, spawnImpl = spawn, webSocketFactory = (url) => new WebSocket(url), + appServerTransport = normalizeCodexAppServerTransport(process.env.CODEX_APP_SERVER_TRANSPORT), platform = process.platform, logger = createNoopLogger(), turnPollSleep = sleep, @@ -390,6 +401,7 @@ export class CodexAppClient extends EventEmitter { this.clientInfo = clientInfo; this.spawnImpl = spawnImpl; this.webSocketFactory = webSocketFactory; + this.appServerTransport = normalizeCodexAppServerTransport(appServerTransport); this.platform = platform; this.logger = logger; this.turnPollSleep = turnPollSleep; @@ -397,6 +409,8 @@ export class CodexAppClient extends EventEmitter { this.child = null; this.socket = null; + this.transportKind = null; + this.stdioLineBuffer = ''; this.pending = new Map(); this.pendingApprovals = new Map(); this.approvedExecutions = new Map(); @@ -420,8 +434,18 @@ export class CodexAppClient extends EventEmitter { return this.connected; } + isTransportConnected(): boolean { + if (!this.connected) { + return false; + } + if (this.transportKind === 'stdio') { + return Boolean(this.child?.stdin?.writable); + } + return Boolean(this.socket && this.socket.readyState === WebSocket.OPEN); + } + async start(): Promise { - if (this.connected) { + if (this.isTransportConnected()) { return; } if (this.startPromise) { @@ -441,6 +465,8 @@ export class CodexAppClient extends EventEmitter { this.connected = false; this.socket?.close(); this.socket = null; + this.transportKind = null; + this.stdioLineBuffer = ''; this.childStartError = null; this.childStderrTail = []; const child = this.child; @@ -1018,21 +1044,26 @@ export class CodexAppClient extends EventEmitter { } this.childStartError = null; this.childStderrTail = []; - this.port = await reservePort(); + this.stdioLineBuffer = ''; + const transportKind = this.resolveAppServerTransportKind(); + this.port = transportKind === 'websocket' ? await reservePort() : null; const featureArgs = this.enabledFeatures.flatMap((feature) => ['--enable', feature]); + const appServerArgs = transportKind === 'websocket' + ? [...this.codexCliArgs, 'app-server', ...featureArgs, '--listen', `ws://127.0.0.1:${this.port}`] + : [...this.codexCliArgs, 'app-server', ...featureArgs]; const launchSpec = createCodexAppServerLaunchSpec({ command: this.codexCliBin, - args: [...this.codexCliArgs, 'app-server', ...featureArgs, '--listen', `ws://127.0.0.1:${this.port}`], + args: appServerArgs, platform: this.platform, }); try { this.child = launchSpec.args ? this.spawnImpl(launchSpec.command, launchSpec.args, { - stdio: ['ignore', 'pipe', 'pipe'], + stdio: transportKind === 'stdio' ? ['pipe', 'pipe', 'pipe'] : ['ignore', 'pipe', 'pipe'], ...launchSpec.options, }) : this.spawnImpl(launchSpec.command, { - stdio: ['ignore', 'pipe', 'pipe'], + stdio: transportKind === 'stdio' ? ['pipe', 'pipe', 'pipe'] : ['ignore', 'pipe', 'pipe'], ...launchSpec.options, }); } catch (error) { @@ -1046,12 +1077,16 @@ export class CodexAppClient extends EventEmitter { command: launchSpec.displayCommand, spawnCommand: launchSpec.command, spawnArgs: launchSpec.args, + transportKind, port: this.port, codexCliArgs: this.codexCliArgs, enabledFeatures: this.enabledFeatures, autolaunch: this.autolaunch, launchCommand: this.launchCommand, }); + if (transportKind === 'stdio') { + this.child.stdout?.on('data', (chunk) => this.handleStdioData(chunk)); + } this.child.stderr?.on('data', (chunk) => { const text = String(chunk).trim(); if (text) { @@ -1069,11 +1104,39 @@ export class CodexAppClient extends EventEmitter { this.child.on('exit', () => { this.connected = false; this.socket = null; + this.transportKind = null; }); - await this.connectWebSocket(); + if (transportKind === 'stdio') { + this.transportKind = 'stdio'; + this.connected = true; + } else { + await this.connectWebSocket(); + } await this.initialize(); } + resolveAppServerTransportKind(): 'websocket' | 'stdio' { + if (this.appServerTransport === 'stdio') { + return 'stdio'; + } + return 'websocket'; + } + + handleStdioData(chunk: unknown): void { + this.stdioLineBuffer += Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk ?? ''); + for (;;) { + const newlineIndex = this.stdioLineBuffer.indexOf('\n'); + if (newlineIndex < 0) { + break; + } + const line = this.stdioLineBuffer.slice(0, newlineIndex).trim(); + this.stdioLineBuffer = this.stdioLineBuffer.slice(newlineIndex + 1); + if (line) { + this.handleMessage(line); + } + } + } + async connectWebSocket(): Promise { const url = `ws://127.0.0.1:${this.port}`; const started = Date.now(); @@ -1097,6 +1160,7 @@ export class CodexAppClient extends EventEmitter { }; ws.addEventListener('open', () => { this.socket = ws; + this.transportKind = 'websocket'; this.connected = true; ws.addEventListener('message', (message) => this.handleMessage(String(message.data))); ws.addEventListener('close', () => { @@ -1139,7 +1203,7 @@ export class CodexAppClient extends EventEmitter { } async request(method: string, params: any, { timeoutMs = 30_000 }: { timeoutMs?: number } = {}): Promise { - if (!this.socket || !this.connected) { + if (!this.isTransportConnected()) { await this.start(); } const id = String(++this.requestId); @@ -1190,6 +1254,13 @@ export class CodexAppClient extends EventEmitter { } send(payload: any): void { + if (this.transportKind === 'stdio') { + if (!this.child?.stdin?.writable) { + throw new Error('Codex app-server stdio is not open'); + } + this.child.stdin.write(`${JSON.stringify(payload)}\n`); + return; + } if (!this.socket || this.socket.readyState !== WebSocket.OPEN) { throw new Error('Codex app-server socket is not open'); } @@ -1470,8 +1541,9 @@ export class CodexAppClient extends EventEmitter { let lastTurnSnapshotKey = null; let stableTerminalReadCount = 0; let pollCount = 0; - let includeTurnsUnsupported = false; - let includeTurnsUnsupportedAt = 0; + let includeTurnsUnsupported = this.transportKind === 'stdio'; + let includeTurnsUnsupportedAt = includeTurnsUnsupported ? this.turnPollNow() : 0; + let threadSummaryForFallback: ProviderThreadSummary | null = null; let pendingApprovalWaitLogged = false; let lastPendingApprovalCount = 0; const terminalSettleMs = computeTerminalSettleMs(timeoutMs); @@ -1552,50 +1624,50 @@ export class CodexAppClient extends EventEmitter { lastPendingApprovalCount = pendingApprovalCount; } pollCount += 1; - let thread = null; - try { - thread = await this.readThread(threadId, !includeTurnsUnsupported); - } catch (error) { - if (isThreadMaterializationPendingError(error)) { - this.logDebug('turn_poll_retry', { - threadId, - turnId, - pollCount, - reason: 'thread_materialization_pending', - }); - await this.turnPollSleep(1000); - continue; - } - if (isRequestTimeoutError(error)) { - this.logDebug('turn_poll_retry', { - threadId, - turnId, - pollCount, - reason: 'thread_read_timeout', - }); - await this.turnPollSleep(1000); - continue; - } - if (isIncludeTurnsUnsupportedError(error)) { - includeTurnsUnsupported = true; - includeTurnsUnsupportedAt ||= this.turnPollNow(); - this.logDebug('turn_poll_retry', { - threadId, - turnId, - pollCount, - reason: 'thread_read_include_turns_unsupported', - }); - try { - thread = await this.readThread(threadId, false); - } catch (fallbackError) { - if (isThreadMaterializationPendingError(fallbackError) || isRequestTimeoutError(fallbackError)) { - await this.turnPollSleep(250); - continue; + let thread = threadSummaryForFallback; + if (!includeTurnsUnsupported) { + try { + thread = await this.readThread(threadId, true); + threadSummaryForFallback = thread; + } catch (error) { + if (isThreadMaterializationPendingError(error)) { + this.logDebug('turn_poll_retry', { + threadId, + turnId, + pollCount, + reason: 'thread_materialization_pending', + }); + await this.turnPollSleep(1000); + continue; + } + if (isRequestTimeoutError(error)) { + this.logDebug('turn_poll_retry', { + threadId, + turnId, + pollCount, + reason: 'thread_read_timeout', + }); + await this.turnPollSleep(1000); + continue; + } + if (isIncludeTurnsUnsupportedError(error)) { + includeTurnsUnsupported = true; + includeTurnsUnsupportedAt ||= this.turnPollNow(); + try { + thread = await this.readThread(threadId, false); + threadSummaryForFallback = thread; + } catch { + thread = threadSummaryForFallback; } - throw fallbackError; + this.logDebug('turn_poll_retry', { + threadId, + turnId, + pollCount, + reason: 'thread_read_include_turns_unsupported', + }); + } else { + throw error; } - } else { - throw error; } } const turn = includeTurnsUnsupported @@ -2338,6 +2410,14 @@ function normalizeStringList(value: unknown): string[] { : []; } +function normalizeCodexAppServerTransport(value: unknown): CodexAppServerTransport { + const normalized = typeof value === 'string' ? value.trim().toLowerCase() : ''; + if (normalized === 'stdio' || normalized === 'websocket') { + return normalized; + } + return 'auto'; +} + function normalizeBoolean(value: unknown): boolean | null { return typeof value === 'boolean' ? value : null; } @@ -3181,10 +3261,6 @@ function isTerminalNotificationForThread( if (method === 'turncompleted') { return true; } - if (method === 'itemcompleted') { - const notificationTurnId = extractNotificationTurnId(notification?.params ?? null); - return !notificationTurnId || notificationTurnId === turnId; - } return false; } @@ -4062,22 +4138,7 @@ function createCodexAppServerLaunchSpec({ options?: Record; displayCommand: string; } { - if (platform === 'win32' && /\.(cmd|bat)$/iu.test(command)) { - return { - command: buildWindowsShellCommandLine([command, ...args]), - args: null, - options: { - shell: true, - windowsHide: true, - }, - displayCommand: command, - }; - } - return { - command, - args, - displayCommand: command, - }; + return createCodexCliLaunchSpec({ command, args, platform }); } function createCodexLaunchError({ @@ -4132,21 +4193,6 @@ function createCodexConnectTimeoutError({ return new Error(`Timed out connecting to ${url} after launching "${command}".${detail}`); } -function buildWindowsShellCommandLine(parts: string[]): string { - return parts.map(quoteWindowsShellArgument).join(' '); -} - -function quoteWindowsShellArgument(value: string): string { - const normalized = String(value ?? ''); - if (!normalized) { - return '""'; - } - if (!/[\s"]/u.test(normalized)) { - return normalized; - } - return `"${normalized.replace(/"/g, '""')}"`; -} - async function reservePort(): Promise { return new Promise((resolve, reject) => { const server = net.createServer(); diff --git a/src/providers/codex/cli_command.ts b/src/providers/codex/cli_command.ts new file mode 100644 index 0000000..99ca4b8 --- /dev/null +++ b/src/providers/codex/cli_command.ts @@ -0,0 +1,49 @@ +export interface CodexCliLaunchSpec { + command: string; + args: string[] | null; + options: Record; + displayCommand: string; +} + +export function createCodexCliLaunchSpec({ + command, + args, + platform = process.platform, +}: { + command: string; + args: string[]; + platform?: NodeJS.Platform; +}): CodexCliLaunchSpec { + if (platform === 'win32' && /\.(cmd|bat)$/iu.test(command)) { + return { + command: buildWindowsShellCommandLine([command, ...args]), + args: null, + options: { + shell: true, + windowsHide: true, + }, + displayCommand: command, + }; + } + return { + command, + args, + options: {}, + displayCommand: command, + }; +} + +function buildWindowsShellCommandLine(parts: string[]): string { + return parts.map(quoteWindowsShellArgument).join(' '); +} + +function quoteWindowsShellArgument(value: string): string { + const normalized = String(value ?? ''); + if (!normalized) { + return '""'; + } + if (!/[\s"]/u.test(normalized)) { + return normalized; + } + return `"${normalized.replace(/"/g, '""')}"`; +} diff --git a/src/providers/codex/experimental_features_manager.ts b/src/providers/codex/experimental_features_manager.ts index df4e9aa..88e7ef3 100644 --- a/src/providers/codex/experimental_features_manager.ts +++ b/src/providers/codex/experimental_features_manager.ts @@ -1,4 +1,5 @@ import { execFileSync } from 'node:child_process'; +import { createCodexCliLaunchSpec } from './cli_command.js'; export interface CodexExperimentalFeatureInfo { name: string; @@ -14,6 +15,7 @@ export interface CodexExperimentalFeatureCatalogEntry { interface CodexExperimentalFeaturesManagerOptions { execFileSyncImpl?: typeof execFileSync; env?: NodeJS.ProcessEnv; + platform?: NodeJS.Platform; } export class CodexExperimentalFeaturesManager { @@ -21,12 +23,16 @@ export class CodexExperimentalFeaturesManager { private readonly env: NodeJS.ProcessEnv; + private readonly platform: NodeJS.Platform; + constructor({ execFileSyncImpl = execFileSync, env = process.env, + platform = process.platform, }: CodexExperimentalFeaturesManagerOptions = {}) { this.execFileSyncImpl = execFileSyncImpl; this.env = env; + this.platform = platform; } async listFeatures({ @@ -35,10 +41,7 @@ export class CodexExperimentalFeaturesManager { codexCliBin?: string | null; } = {}): Promise { const resolvedCliBin = normalizeCodexCliBin(codexCliBin); - const output = this.execFileSyncImpl(resolvedCliBin, ['features', 'list'], { - encoding: 'utf8', - env: this.env, - }); + const output = this.execCodexCliSync(resolvedCliBin, ['features', 'list']); return parseCodexFeaturesListOutput(output); } @@ -48,9 +51,7 @@ export class CodexExperimentalFeaturesManager { codexCliBin?: string | null; } = {}): Promise { const resolvedCliBin = normalizeCodexCliBin(codexCliBin); - this.execFileSyncImpl(resolvedCliBin, ['features', 'enable', featureName], { - encoding: 'utf8', - env: this.env, + this.execCodexCliSync(resolvedCliBin, ['features', 'enable', featureName], { stdio: 'pipe', }); } @@ -61,12 +62,32 @@ export class CodexExperimentalFeaturesManager { codexCliBin?: string | null; } = {}): Promise { const resolvedCliBin = normalizeCodexCliBin(codexCliBin); - this.execFileSyncImpl(resolvedCliBin, ['features', 'disable', featureName], { - encoding: 'utf8', - env: this.env, + this.execCodexCliSync(resolvedCliBin, ['features', 'disable', featureName], { stdio: 'pipe', }); } + + private execCodexCliSync( + codexCliBin: string, + args: string[], + options: { stdio?: 'pipe' } = {}, + ): string { + const launchSpec = createCodexCliLaunchSpec({ + command: codexCliBin, + args, + platform: this.platform, + }); + const execOptions = { + encoding: 'utf8', + env: this.env, + ...options, + ...launchSpec.options, + }; + if (launchSpec.args) { + return this.execFileSyncImpl(launchSpec.command, launchSpec.args, execOptions as any); + } + return (this.execFileSyncImpl as any)(launchSpec.command, execOptions); + } } export function parseCodexFeaturesListOutput(output: string): CodexExperimentalFeatureInfo[] { @@ -112,7 +133,7 @@ export function getPublicCodexExperimentalFeatures( } function parseCodexFeatureListLine(line: string): CodexExperimentalFeatureInfo | null { - const parts = line.split(/\s{2,}/u).map((part) => part.trim()).filter(Boolean); + const parts = line.split(/\t+|\s{2,}/u).map((part) => part.trim()).filter(Boolean); if (parts.length !== 3) { return null; } diff --git a/src/providers/codex/review_runner.ts b/src/providers/codex/review_runner.ts index 4a19b53..101fff6 100644 --- a/src/providers/codex/review_runner.ts +++ b/src/providers/codex/review_runner.ts @@ -1,6 +1,7 @@ import crypto from 'node:crypto'; import { spawn, type ChildProcess } from 'node:child_process'; import { normalizeLocale } from '../../i18n/index.js'; +import { createCodexCliLaunchSpec } from './cli_command.js'; import type { ProviderReviewTarget, ProviderThreadSummary, @@ -116,11 +117,19 @@ export class CodexCliReviewRunner implements CodexReviewRunnerLike { target, locale, }); - const child = this.spawnImpl(codexCliBin, args, { + const launchSpec = createCodexCliLaunchSpec({ + command: codexCliBin, + args, + }); + const spawnOptions = { cwd, env: process.env, stdio: ['ignore', 'pipe', 'pipe'], - }); + ...launchSpec.options, + }; + const child = launchSpec.args + ? this.spawnImpl(launchSpec.command, launchSpec.args, spawnOptions as any) + : this.spawnImpl(launchSpec.command, spawnOptions as any); state.child = child; const stdoutChunks: string[] = []; diff --git a/start-weixin-login.cmd b/start-weixin-login.cmd new file mode 100644 index 0000000..b8b81ce --- /dev/null +++ b/start-weixin-login.cmd @@ -0,0 +1,8 @@ +@echo off +setlocal +cd /d "%~dp0" +set "CODEX_REAL_BIN=C:\Users\cully\AppData\Roaming\npm\codex.cmd" +set "CODEX_APP_SERVER_TRANSPORT=stdio" +set "CODEXBRIDGE_DEFAULT_CWD=D:\cully\Documents" +set "CODEXBRIDGE_LOCALE=zh-CN" +npm run weixin:login -- --timeout-sec 480 diff --git a/start-weixin-serve.cmd b/start-weixin-serve.cmd new file mode 100644 index 0000000..f6359a8 --- /dev/null +++ b/start-weixin-serve.cmd @@ -0,0 +1,8 @@ +@echo off +setlocal +cd /d "%~dp0" +set "CODEX_REAL_BIN=C:\Users\cully\AppData\Roaming\npm\codex.cmd" +set "CODEX_APP_SERVER_TRANSPORT=stdio" +set "CODEXBRIDGE_DEFAULT_CWD=D:\cully\Documents" +set "CODEXBRIDGE_LOCALE=zh-CN" +npm run weixin:serve -- --cwd "D:\cully\Documents" diff --git a/test/providers/codex/app_client.test.ts b/test/providers/codex/app_client.test.ts index c32b1c3..4ccf38a 100644 --- a/test/providers/codex/app_client.test.ts +++ b/test/providers/codex/app_client.test.ts @@ -1405,7 +1405,11 @@ test('CodexAppClient times out individual JSON-RPC requests and clears pending s }); client.connected = true; + client.transportKind = 'websocket'; client.socket = {} as unknown as WebSocket; + Object.defineProperty(client.socket, 'readyState', { + value: WebSocket.OPEN, + }); client.send = (() => {}) as any; await assert.rejects( @@ -1641,6 +1645,123 @@ test('CodexAppClient falls back to progress text when ephemeral threads reject i }]); }); +test('CodexAppClient stdio wait ignores intermediate item completion until turn completion', async () => { + let nowMs = 0; + let sentStartup = false; + let sentFinal = false; + const client = new CodexAppClient({ + codexCliBin: 'codex', + turnPollNow: () => nowMs, + turnPollSleep: async (ms) => { + nowMs += ms; + if (!sentStartup) { + sentStartup = true; + client.emit('notification', { + method: 'item/started', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'startup-item', + type: 'agentMessage', + phase: 'commentary', + }, + }, + }); + client.emit('notification', { + method: 'item/agentMessage/delta', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + itemId: 'startup-item', + phase: 'commentary', + delta: 'Using startup workflow.', + }, + }); + client.emit('notification', { + method: 'item/completed', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'startup-item', + type: 'agentMessage', + phase: 'commentary', + text: 'Using startup workflow.', + }, + }, + }); + } + if (!sentFinal && nowMs >= 750) { + sentFinal = true; + client.emit('notification', { + method: 'item/started', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'final-item', + type: 'agentMessage', + phase: 'final_answer', + }, + }, + }); + client.emit('notification', { + method: 'item/agentMessage/delta', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + itemId: 'final-item', + phase: 'final_answer', + delta: 'OK', + }, + }); + client.emit('notification', { + method: 'item/completed', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'final-item', + type: 'agentMessage', + phase: 'final_answer', + text: 'OK', + }, + }, + }); + client.emit('notification', { + method: 'turn/completed', + params: { + threadId: 'thread-1', + }, + }); + } + }, + }); + client.transportKind = 'stdio'; + + client.request = async (method) => { + if (method === 'turn/start') { + return { turn: { id: 'turn-1' } }; + } + throw new Error(`Unexpected method: ${method}`); + }; + + const result = await client.startTurn({ + threadId: 'thread-1', + inputText: 'hello', + model: 'gpt-5.4', + effort: null, + collaborationMode: 'default', + timeoutMs: 2500, + }); + + assert.equal(result.outputText, 'OK'); + assert.equal(result.outputState, 'complete'); + assert.equal(result.finalSource, 'progress_only'); + assert.equal(sentFinal, true); +}); + test('CodexAppClient waits for assistant output after a terminal turn initially contains no visible items', async () => { const client = new CodexAppClient({ codexCliBin: 'codex', diff --git a/test/providers/codex/experimental_features_manager.test.ts b/test/providers/codex/experimental_features_manager.test.ts index f7107a6..f7871c3 100644 --- a/test/providers/codex/experimental_features_manager.test.ts +++ b/test/providers/codex/experimental_features_manager.test.ts @@ -48,6 +48,20 @@ test('getPublicCodexExperimentalFeatures keeps the official experimental menu or ); }); +test('parseCodexFeaturesListOutput parses tab-delimited Codex CLI output', () => { + const output = [ + 'undo\tstable\tfalse', + 'shell_tool\tstable\ttrue', + 'web_search_cached\texperimental\tfalse', + ].join('\n'); + + assert.deepEqual(parseCodexFeaturesListOutput(output), [ + { name: 'undo', maturity: 'stable', enabled: false }, + { name: 'shell_tool', maturity: 'stable', enabled: true }, + { name: 'web_search_cached', maturity: 'experimental', enabled: false }, + ]); +}); + test('CodexExperimentalFeaturesManager delegates list/enable/disable to codex features commands', async () => { const calls: Array<{ command: string; args: string[] }> = []; const manager = new CodexExperimentalFeaturesManager({ @@ -73,3 +87,26 @@ test('CodexExperimentalFeaturesManager delegates list/enable/disable to codex fe { command: '/opt/codex/bin/codex', args: ['features', 'disable', 'memories'] }, ]); }); + +test('CodexExperimentalFeaturesManager wraps Windows cmd shims through the shell', async () => { + const calls: Array<{ command: string; argsOrOptions: any; options: any }> = []; + const manager = new CodexExperimentalFeaturesManager({ + platform: 'win32', + execFileSyncImpl: ((command: string, argsOrOptions: any, options: any) => { + calls.push({ command, argsOrOptions, options }); + return 'memories experimental false\n'; + }) as any, + }); + + const features = await manager.listFeatures({ + codexCliBin: 'C:\\Program Files\\Codex\\codex.cmd', + }); + + assert.deepEqual(features, [ + { name: 'memories', maturity: 'experimental', enabled: false }, + ]); + assert.equal(calls[0]?.command, '"C:\\Program Files\\Codex\\codex.cmd" features list'); + assert.equal(calls[0]?.argsOrOptions?.shell, true); + assert.equal(calls[0]?.argsOrOptions?.windowsHide, true); + assert.equal(calls[0]?.options, undefined); +}); From 7609fa943a583cffb581a907d3a12ae0ee4bc399 Mon Sep 17 00:00:00 2001 From: cullysu Date: Thu, 14 May 2026 18:09:17 +0800 Subject: [PATCH 2/4] fix wechat bridge delivery timeouts --- src/platforms/weixin/official/api.ts | 201 +++++++++++++++++- src/platforms/weixin/official/transport.ts | 27 ++- src/platforms/weixin/plugin.ts | 2 + src/providers/codex/app_client.ts | 73 +++++++ .../weixin/official/transport.test.ts | 30 +++ test/providers/codex/app_client.test.ts | 40 ++++ 6 files changed, 359 insertions(+), 14 deletions(-) diff --git a/src/platforms/weixin/official/api.ts b/src/platforms/weixin/official/api.ts index 89296e4..d165fd3 100644 --- a/src/platforms/weixin/official/api.ts +++ b/src/platforms/weixin/official/api.ts @@ -1,4 +1,6 @@ import { createI18n } from '../../../i18n/index.js'; +import dns from 'node:dns/promises'; +import https from 'node:https'; import type { BaseInfo, GetConfigReq, @@ -221,12 +223,27 @@ async function requestJson(params: RawRequestOptions & { baseUrl: string; token?: string | null; }): Promise { - const i18n = createI18n(params.locale); - const fetchImpl = params.fetchImpl ?? (globalThis.fetch as WeixinOfficialFetch | undefined); - if (typeof fetchImpl !== 'function') { + const fetchImpl = params.fetchImpl; + if (fetchImpl !== undefined && typeof fetchImpl !== 'function') { + const i18n = createI18n(params.locale); throw new Error(i18n.t('platform.weixin.official.missingFetchImplementation')); } + if (fetchImpl) { + return requestJsonWithFetch({ + ...params, + fetchImpl, + }); + } + return requestJsonWithAddressRotation(params); +} + +async function requestJsonWithFetch(params: RawRequestOptions & { + baseUrl: string; + token?: string | null; + fetchImpl: WeixinOfficialFetch; +}): Promise { + const i18n = createI18n(params.locale); const abortController = new AbortController(); const timer = setTimeout(() => abortController.abort(), params.timeoutMs); const startTime = Date.now(); @@ -239,7 +256,7 @@ async function requestJson(params: RawRequestOptions & { }); try { - const response = await fetchImpl(joinUrl(params.baseUrl, params.endpoint), { + const response = await params.fetchImpl(joinUrl(params.baseUrl, params.endpoint), { method: params.method, body: params.body, signal: abortController.signal, @@ -281,6 +298,182 @@ async function requestJson(params: RawRequestOptions & { } } +async function requestJsonWithAddressRotation(params: RawRequestOptions & { + baseUrl: string; + token?: string | null; +}): Promise { + const i18n = createI18n(params.locale); + const url = new URL(joinUrl(params.baseUrl, params.endpoint)); + const addresses = await resolveHostAddresses(url.hostname); + const startTime = Date.now(); + const deadline = startTime + params.timeoutMs; + let lastError: unknown = null; + debugWeixinHttp('request_start', { + method: params.method, + endpoint: params.endpoint, + timeoutMs: params.timeoutMs, + authorized: params.authorized ?? true, + bodyLength: typeof params.body === 'string' ? Buffer.byteLength(params.body, 'utf8') : 0, + }); + + for (const address of addresses) { + const remainingMs = deadline - Date.now(); + if (remainingMs <= 0) { + break; + } + try { + const response = await requestJsonOverHttpsAddress({ + url, + address, + params, + timeoutMs: Math.min(20_000, remainingMs), + }); + debugWeixinHttp('request_end', { + method: params.method, + endpoint: params.endpoint, + status: response.status, + ok: response.status >= 200 && response.status < 300, + durationMs: Date.now() - startTime, + responseLength: response.raw.length, + responsePreview: previewResponse(response.raw), + }); + if (response.status < 200 || response.status >= 300) { + throw new Error(i18n.t('platform.weixin.official.ilinkHttpError', { + method: params.method, + endpoint: params.endpoint, + status: response.status, + response: response.raw.slice(0, 200), + })); + } + return response.raw ? JSON.parse(response.raw) as T : {} as T; + } catch (error) { + lastError = error; + if (!isRetryableNetworkError(error)) { + break; + } + debugWeixinHttp('request_retry', { + method: params.method, + endpoint: params.endpoint, + address, + elapsedMs: Date.now() - startTime, + error: error instanceof Error ? (error.stack || error.message) : String(error), + }); + } + } + + debugWeixinHttp('request_error', { + method: params.method, + endpoint: params.endpoint, + durationMs: Date.now() - startTime, + error: lastError instanceof Error ? (lastError.stack || lastError.message) : String(lastError ?? 'unknown error'), + }); + if (lastError instanceof Error) { + throw lastError; + } + throw new Error(String(lastError ?? 'Weixin request failed')); +} + +async function resolveHostAddresses(hostname: string): Promise { + try { + const records = await dns.lookup(hostname, { all: true }); + const addresses = records + .map((record) => record.address) + .filter((address) => typeof address === 'string' && address.trim()); + return [...new Set(addresses)].length > 0 ? [...new Set(addresses)] : [hostname]; + } catch { + return [hostname]; + } +} + +function requestJsonOverHttpsAddress({ + url, + address, + params, + timeoutMs, +}: { + url: URL; + address: string; + params: RawRequestOptions & { + baseUrl: string; + token?: string | null; + }; + timeoutMs: number; +}): Promise<{ status: number; raw: string }> { + const headers = buildHeaders({ + token: params.token ?? null, + authorized: params.authorized ?? true, + extraHeaders: { + ...(params.headers ?? {}), + Host: url.hostname, + }, + }); + return new Promise((resolve, reject) => { + let settled = false; + const request = https.request({ + protocol: 'https:', + hostname: address, + port: url.port ? Number(url.port) : 443, + method: params.method, + path: `${url.pathname}${url.search}`, + headers, + servername: url.hostname, + }, (response) => { + const chunks: Buffer[] = []; + response.on('data', (chunk) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + response.on('end', () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + resolve({ + status: Number(response.statusCode ?? 0), + raw: Buffer.concat(chunks).toString('utf8'), + }); + }); + }); + + const timer = setTimeout(() => { + if (settled) { + return; + } + const error = new Error(`HTTPS request timed out after ${timeoutMs}ms`); + (error as NodeJS.ErrnoException).code = 'ETIMEDOUT'; + request.destroy(error); + }, timeoutMs); + + request.on('error', (error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + reject(error); + }); + if (params.body) { + request.write(params.body); + } + request.end(); + }); +} + +function isRetryableNetworkError(error: unknown): boolean { + const code = typeof error === 'object' && error && 'code' in error + ? String((error as { code?: unknown }).code ?? '') + : ''; + return [ + 'ECONNRESET', + 'ECONNREFUSED', + 'EHOSTUNREACH', + 'ENETUNREACH', + 'ETIMEDOUT', + 'UND_ERR_CONNECT_TIMEOUT', + 'UND_ERR_HEADERS_TIMEOUT', + ].includes(code); +} + function joinUrl(baseUrl: string, endpoint: string): string { const normalizedBase = String(baseUrl).replace(/\/+$/u, ''); const normalizedEndpoint = String(endpoint).replace(/^\/+/u, ''); diff --git a/src/platforms/weixin/official/transport.ts b/src/platforms/weixin/official/transport.ts index 877fc2b..8df1294 100644 --- a/src/platforms/weixin/official/transport.ts +++ b/src/platforms/weixin/official/transport.ts @@ -19,6 +19,7 @@ import type { } from './types.js'; const DEFAULT_LONG_POLL_TIMEOUT_MS = 35_000; +const DEFAULT_GLOBAL_FETCH = globalThis.fetch as WeixinOfficialFetch | undefined; export interface WeixinOfficialTransport { baseUrl: string; @@ -31,6 +32,7 @@ export interface WeixinOfficialTransport { text: string; contextToken?: string | null; clientId: string; + timeoutMs?: number; }): Promise; sendTyping(params: { toUserId: string; @@ -57,7 +59,7 @@ export interface WeixinOfficialTransport { export function createWeixinOfficialTransport({ baseUrl, token = null, - fetchImpl = globalThis.fetch as WeixinOfficialFetch | undefined, + fetchImpl, locale = null, }: { baseUrl: string; @@ -66,28 +68,33 @@ export function createWeixinOfficialTransport({ locale?: string | null; }): WeixinOfficialTransport { const normalizedBaseUrl = String(baseUrl).replace(/\/+$/u, ''); + const effectiveFetchImpl = fetchImpl + ?? (globalThis.fetch !== DEFAULT_GLOBAL_FETCH + ? globalThis.fetch as WeixinOfficialFetch | undefined + : undefined); return { baseUrl: normalizedBaseUrl, token, - fetch: fetchImpl, + fetch: effectiveFetchImpl, locale, async getUpdates({ syncCursor = '', timeoutMs = DEFAULT_LONG_POLL_TIMEOUT_MS } = {}) { return getUpdates({ baseUrl: normalizedBaseUrl, token, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, timeoutMs, get_updates_buf: syncCursor, }); }, - async sendMessage({ toUserId, text, contextToken = null, clientId }) { + async sendMessage({ toUserId, text, contextToken = null, clientId, timeoutMs }) { return sendMessage({ baseUrl: normalizedBaseUrl, token, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, + timeoutMs, msg: { from_user_id: '', to_user_id: toUserId, @@ -108,7 +115,7 @@ export function createWeixinOfficialTransport({ return sendTyping({ baseUrl: normalizedBaseUrl, token, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, ilink_user_id: toUserId, typing_ticket: typingTicket, @@ -119,7 +126,7 @@ export function createWeixinOfficialTransport({ return getConfig({ baseUrl: normalizedBaseUrl, token, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, ilink_user_id: userId, ...(contextToken ? { context_token: contextToken } : {}), @@ -128,7 +135,7 @@ export function createWeixinOfficialTransport({ async getBotQr({ botType = DEFAULT_ILINK_BOT_TYPE } = {}) { return getBotQr({ baseUrl: normalizedBaseUrl, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, botType, }); @@ -136,7 +143,7 @@ export function createWeixinOfficialTransport({ async getQrStatus({ qrcode, baseUrlOverride = null }) { return getQrStatus({ baseUrl: baseUrlOverride ? String(baseUrlOverride).replace(/\/+$/u, '') : normalizedBaseUrl, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, qrcode, }); @@ -150,7 +157,7 @@ export function createWeixinOfficialTransport({ opts: { baseUrl: normalizedBaseUrl, token, - fetchImpl, + fetchImpl: effectiveFetchImpl, locale, contextToken, }, diff --git a/src/platforms/weixin/plugin.ts b/src/platforms/weixin/plugin.ts index f553ca5..fecbddb 100644 --- a/src/platforms/weixin/plugin.ts +++ b/src/platforms/weixin/plugin.ts @@ -44,6 +44,7 @@ import type { const TYPING_START = 1; const TYPING_STOP = 2; +const WEIXIN_SEND_MESSAGE_TIMEOUT_MS = 30_000; interface WeixinScope { chatType: 'group' | 'dm'; @@ -480,6 +481,7 @@ export class WeixinPlatformPlugin implements Pick { if (isTerminalNotificationForThread(notification, threadId, turnId)) { sawTerminalNotification = true; } + if (isErrorNotificationForThreadTurn(notification, threadId, turnId)) { + terminalRuntimeError = extractNotificationErrorMessage(notification) + ?? 'Codex app-server reported an error for this turn'; + sawTerminalNotification = true; + return; + } const progress = extractProgressUpdate(notification, turnId, itemOutputKinds, progressState); if (!progress) { return; @@ -1623,6 +1630,23 @@ export class CodexAppClient extends EventEmitter { pendingApprovalWaitLogged = false; lastPendingApprovalCount = pendingApprovalCount; } + if (terminalRuntimeError) { + const result = { + turnId, + threadId, + title: threadSummaryForFallback?.title ?? null, + outputText: '', + outputArtifacts: [], + outputMedia: [], + outputState: 'provider_error', + previewText: progressState.finalAnswerText, + finalSource: 'notification_error', + status: null, + errorMessage: terminalRuntimeError, + }; + this.logDebug('turn_wait_return', summarizeTurnResultForDebug(result)); + return result; + } pollCount += 1; let thread = threadSummaryForFallback; if (!includeTurnsUnsupported) { @@ -1683,6 +1707,23 @@ export class CodexAppClient extends EventEmitter { turn: summarizeTurnSnapshot(turn), progress: summarizeProgressState(progressState), }); + if (terminalRuntimeError) { + const result = { + turnId, + threadId, + title: thread?.title ?? null, + outputText: '', + outputArtifacts: [], + outputMedia: [], + outputState: 'provider_error', + previewText: progressState.finalAnswerText, + finalSource: 'notification_error', + status: turn?.status ?? null, + errorMessage: terminalRuntimeError, + }; + this.logDebug('turn_wait_return', summarizeTurnResultForDebug(result)); + return result; + } if (includeTurnsUnsupported) { const previewText = progressState.finalAnswerText || progressState.commentaryText; const settleAnchor = Math.max( @@ -3264,6 +3305,38 @@ function isTerminalNotificationForThread( return false; } +function isErrorNotificationForThreadTurn( + notification: any, + threadId: string, + turnId: string, +): boolean { + if (!notification || !isErrorNotificationMethod(notification.method)) { + return false; + } + if (extractThreadIdFromNotification(notification) !== threadId) { + return false; + } + const notificationTurnId = extractNotificationTurnId(notification?.params ?? null); + return !notificationTurnId || notificationTurnId === turnId; +} + +function isErrorNotificationMethod(method: unknown): boolean { + const normalized = String(method ?? '').replace(/[^a-z]/gi, '').toLowerCase(); + return normalized === 'error' + || normalized === 'streamerror' + || normalized.endsWith('error'); +} + +function extractNotificationErrorMessage(notification: any): string | null { + const params = notification?.params ?? null; + const message = extractTextCandidate(params?.error) + ?? extractTextCandidate(params?.message) + ?? extractTextCandidate(params?.details) + ?? extractTextCandidate(params?.event?.error) + ?? extractTextCandidate(notification?.error); + return typeof message === 'string' && message.trim() ? message.trim() : null; +} + function computeTerminalSettleMs(timeoutMs) { const numericTimeout = Number(timeoutMs || 0); if (!Number.isFinite(numericTimeout) || numericTimeout <= 0) { diff --git a/test/platforms/weixin/official/transport.test.ts b/test/platforms/weixin/official/transport.test.ts index bf72c91..1d9376d 100644 --- a/test/platforms/weixin/official/transport.test.ts +++ b/test/platforms/weixin/official/transport.test.ts @@ -224,6 +224,36 @@ test('WeixinOfficialTransport.sendMessage and getConfig use Hermes-compatible pa assert.equal(configBody.context_token, 'ctx-1'); }); +test('WeixinOfficialTransport.sendMessage forwards explicit timeoutMs to the request layer', async () => { + const { fetchImpl } = createFetchMock([{ body: { ret: 0 } }]); + const observedTimeouts: number[] = []; + const originalSetTimeout = globalThis.setTimeout; + (globalThis as any).setTimeout = (handler: any, timeout?: any, ...args: any[]) => { + observedTimeouts.push(Number(timeout)); + return originalSetTimeout(handler, timeout, ...args); + }; + + try { + const transport = createWeixinOfficialTransport({ + baseUrl: 'https://ilink.example.com', + token: 'bot-token', + fetchImpl, + }); + + await transport.sendMessage({ + toUserId: 'wxid_sender', + text: 'hello', + contextToken: 'ctx-1', + clientId: 'client-1', + timeoutMs: 30_000, + }); + } finally { + globalThis.setTimeout = originalSetTimeout; + } + + assert.ok(observedTimeouts.includes(30_000)); +}); + test('WeixinOfficialTransport.sendMediaFile transcodes JPEG inputs before upload and sends the image item downstream', async (t) => { if (!hasFfmpeg()) { t.skip('ffmpeg/ffprobe not available'); diff --git a/test/providers/codex/app_client.test.ts b/test/providers/codex/app_client.test.ts index 4ccf38a..3556bd8 100644 --- a/test/providers/codex/app_client.test.ts +++ b/test/providers/codex/app_client.test.ts @@ -3274,6 +3274,46 @@ test('CodexAppClient surfaces exhausted subscription credits from session rate l assert.equal(result.errorMessage, 'Codex subscription credits are exhausted (premium balance 0).'); }); +test('CodexAppClient returns provider_error immediately when an error notification arrives for the active stdio turn', async () => { + const client = new CodexAppClient({ + codexCliBin: 'codex', + }); + client.transportKind = 'stdio'; + + client.request = async (method) => { + if (method === 'turn/start') { + setTimeout(() => { + client.emit('notification', { + method: 'error', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + error: { + message: 'HTTP 503 Service Unavailable', + }, + }, + }); + }, 0); + return { turn: { id: 'turn-1' } }; + } + throw new Error(`Unexpected method: ${method}`); + }; + + const result = await client.startTurn({ + threadId: 'thread-1', + inputText: 'hello', + model: 'gpt-5.4', + effort: null, + collaborationMode: 'default', + timeoutMs: 2500, + }); + + assert.equal(result.outputText, ''); + assert.equal(result.outputState, 'provider_error'); + assert.equal(result.finalSource, 'notification_error'); + assert.equal(result.errorMessage, 'HTTP 503 Service Unavailable'); +}); + test('CodexAppClient returns partial commentary instead of timing out when assistant activity exists without a final answer', async () => { const client = new CodexAppClient({ codexCliBin: 'codex', From f96e2c38edb67f89473af009cffe16c7bed57a6b Mon Sep 17 00:00:00 2001 From: cullysu Date: Thu, 14 May 2026 19:46:49 +0800 Subject: [PATCH 3/4] fix codexbridge wechat retry and native model selection --- src/core/bridge_coordinator.ts | 25 +++ src/providers/codex/app_client.ts | 56 ++++- src/providers/codex/plugin.ts | 3 + test/core/bridge_coordinator.test.ts | 54 +++++ test/providers/codex/app_client.test.ts | 231 ++++++++++++++++++++ test/providers/openai_native/plugin.test.ts | 73 +++++++ 6 files changed, 440 insertions(+), 2 deletions(-) diff --git a/src/core/bridge_coordinator.ts b/src/core/bridge_coordinator.ts index 7b5dde0..d524b5e 100644 --- a/src/core/bridge_coordinator.ts +++ b/src/core/bridge_coordinator.ts @@ -11378,6 +11378,20 @@ export class BridgeCoordinator { result, context: turnArtifactContext, }); + if (shouldRecoverFromProviderTurnResult(finalizedResult)) { + const errorMessage = finalizedResult.errorMessage || 'Codex turn failed with a recoverable provider error'; + debugCoordinator('turn_result_recoverable_provider_error', { + platform: scopeRef.platform, + scopeId: scopeRef.externalScopeId, + bridgeSessionId: session.id, + threadId: finalizedResult?.threadId ?? session.codexThreadId, + turnId: finalizedResult?.turnId ?? null, + outputState: finalizedResult?.outputState ?? null, + finalSource: finalizedResult?.finalSource ?? null, + errorMessage, + }); + throw new Error(errorMessage); + } debugCoordinator('turn_result_finalized', { platform: scopeRef.platform, scopeId: scopeRef.externalScopeId, @@ -20410,6 +20424,17 @@ function shouldAutoRebindAfterRecoveryFailure(error) { return isStaleThreadError(error) || isResumeRetryableError(error); } +function shouldRecoverFromProviderTurnResult(result) { + if (!result || result.outputState !== 'provider_error') { + return false; + } + const errorMessage = typeof result.errorMessage === 'string' ? result.errorMessage : ''; + if (!errorMessage.trim()) { + return false; + } + return shouldAutoRebindAfterRecoveryFailure(new Error(errorMessage)); +} + function isApprovedExecutionStallError(error) { const message = error instanceof Error ? error.message : String(error); return /Approval was accepted, but the approved /i.test(message) diff --git a/src/providers/codex/app_client.ts b/src/providers/codex/app_client.ts index f691040..8c5f7b3 100644 --- a/src/providers/codex/app_client.ts +++ b/src/providers/codex/app_client.ts @@ -1561,8 +1561,24 @@ export class CodexAppClient extends EventEmitter { sawTerminalNotification = true; } if (isErrorNotificationForThreadTurn(notification, threadId, turnId)) { - terminalRuntimeError = extractNotificationErrorMessage(notification) - ?? 'Codex app-server reported an error for this turn'; + const notificationErrorMessage = extractNotificationErrorMessage(notification); + if (!notificationErrorMessage) { + this.logDebug('turn_wait_unclassified_error_notification', { + threadId, + turnId, + method: notification?.method ?? null, + }); + return; + } + if (isTransientNotificationErrorMessage(notificationErrorMessage)) { + this.logDebug('turn_wait_transient_error_notification', { + threadId, + turnId, + errorMessage: notificationErrorMessage, + }); + return; + } + terminalRuntimeError = notificationErrorMessage; sawTerminalNotification = true; return; } @@ -2648,12 +2664,19 @@ function summarizeRpcResult(method: string, result: any) { } function summarizeNotificationMessage(message: any) { + const errorMessage = isErrorNotificationMethod(message?.method) + ? extractNotificationErrorMessage(message) + : null; return { method: String(message?.method ?? ''), id: 'id' in (message ?? {}) ? String(message.id ?? '') : null, threadId: extractThreadIdFromNotification(message), turnId: extractNotificationTurnId(message?.params ?? null), itemId: extractItemId(message?.params ?? null), + eventType: typeof message?.params?.event?.type === 'string' + ? message.params.event.type + : null, + errorMessage: truncateDebugText(errorMessage, 160), outputKind: typeof message?.params?.item?.output_kind === 'string' ? message.params.item.output_kind : null, @@ -3333,10 +3356,39 @@ function extractNotificationErrorMessage(notification: any): string | null { ?? extractTextCandidate(params?.message) ?? extractTextCandidate(params?.details) ?? extractTextCandidate(params?.event?.error) + ?? extractTextCandidate(params?.event?.message) + ?? extractTextCandidate(params?.event?.details) + ?? extractTextCandidate(params?.event?.msg) + ?? extractTextCandidate(params?.msg?.error) + ?? extractTextCandidate(params?.msg?.message) + ?? extractTextCandidate(params?.msg?.details) + ?? extractTextCandidate(params?.msg) ?? extractTextCandidate(notification?.error); return typeof message === 'string' && message.trim() ? message.trim() : null; } +function isTransientNotificationErrorMessage(message: string | null): boolean { + if (!message) { + return false; + } + const normalized = message.trim().replace(/\s+/g, ' ').toLowerCase(); + if (!normalized) { + return false; + } + const hasAttemptCounter = /\b\d+\s*\/\s*\d+\b/.test(normalized); + const looksLikeRetry = + /\breconnecting\b/.test(normalized) + || /\bretrying\b/.test(normalized) + || /\bretry\b/.test(normalized); + const looksTerminal = + /\b(exhausted|failed|failure|fatal|giving up|unavailable|forbidden|unauthorized)\b/.test(normalized) + || /\bhttp\s+\d{3}\b/.test(normalized); + if (hasAttemptCounter && looksLikeRetry && !looksTerminal) { + return true; + } + return /\b(stream|connection|socket)\b.*\b(disconnected|closed|reset)\b.*\bretrying\b/.test(normalized); +} + function computeTerminalSettleMs(timeoutMs) { const numericTimeout = Number(timeoutMs || 0); if (!Number.isFinite(numericTimeout) || numericTimeout <= 0) { diff --git a/src/providers/codex/plugin.ts b/src/providers/codex/plugin.ts index eef7491..518a447 100644 --- a/src/providers/codex/plugin.ts +++ b/src/providers/codex/plugin.ts @@ -629,6 +629,9 @@ export class CodexProviderPlugin { defaultReasoningEffort: null, }; } + if (providerProfile.providerKind === 'openai-native') { + return null; + } const models = await client.listModels(); return models.find((model) => model.isDefault) ?? models[0] diff --git a/test/core/bridge_coordinator.test.ts b/test/core/bridge_coordinator.test.ts index 60f98d8..cd1101d 100644 --- a/test/core/bridge_coordinator.test.ts +++ b/test/core/bridge_coordinator.test.ts @@ -1502,6 +1502,60 @@ test('bridge coordinator auto-rebinds to a new session when stale thread resume ); }); +test('bridge coordinator auto-rebinds when Codex returns a stale-thread provider_error result', async () => { + const { runtime, openai } = makeRuntime(); + const original = await runtime.services.bridgeCoordinator.handleInboundEvent({ + platform: 'weixin', + externalScopeId: 'wx-user-provider-error-stale-thread', + text: 'hello codexbridge', + }); + + const staleThreadId = original.session.codexThreadId; + const originalStartTurn = openai.startTurn.bind(openai); + openai.startTurn = async (args) => { + if (args.bridgeSession.codexThreadId === staleThreadId) { + openai.startTurnCalls.push({ + providerProfile: args.providerProfile, + bridgeSession: args.bridgeSession, + sessionSettings: args.sessionSettings, + event: args.event, + inputText: args.inputText, + }); + return { + outputText: '', + outputArtifacts: [], + outputMedia: [], + outputState: 'provider_error', + previewText: '', + finalSource: 'notification_error', + status: null, + errorMessage: `thread not found: ${staleThreadId}`, + turnId: `${staleThreadId}-turn-stale`, + threadId: staleThreadId, + title: original.session.title, + }; + } + return originalStartTurn(args); + }; + + const result = await runtime.services.bridgeCoordinator.handleInboundEvent({ + platform: 'weixin', + externalScopeId: 'wx-user-provider-error-stale-thread', + text: 'hello after provider error', + }); + + assert.match(result.messages[0]?.text ?? '', /openai: hello after provider error/); + assert.equal(openai.startThreadCalls.length, 2); + assert.ok(openai.resumeThreadCalls.length >= 1); + + const rebound = runtime.services.bridgeSessions.resolveScopeSession({ + platform: 'weixin', + externalScopeId: 'wx-user-provider-error-stale-thread', + }); + assert.notEqual(rebound?.id, original.session?.bridgeSessionId); + assert.notEqual(rebound?.codexThreadId, original.session?.codexThreadId); +}); + test('bridge coordinator recreates a scope session when Codex reports a damaged rollout file', async () => { const { runtime, openai } = makeRuntime(); const original = await runtime.services.bridgeCoordinator.handleInboundEvent({ diff --git a/test/providers/codex/app_client.test.ts b/test/providers/codex/app_client.test.ts index 3556bd8..aab8455 100644 --- a/test/providers/codex/app_client.test.ts +++ b/test/providers/codex/app_client.test.ts @@ -3314,6 +3314,237 @@ test('CodexAppClient returns provider_error immediately when an error notificati assert.equal(result.errorMessage, 'HTTP 503 Service Unavailable'); }); +test('CodexAppClient extracts terminal error messages from codex event notifications', async () => { + const client = new CodexAppClient({ + codexCliBin: 'codex', + }); + client.transportKind = 'stdio'; + + client.request = async (method) => { + if (method === 'turn/start') { + setTimeout(() => { + client.emit('notification', { + method: 'codex/event/error', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + event: { + type: 'error', + message: 'HTTP 503 Service Unavailable', + }, + }, + }); + }, 0); + return { turn: { id: 'turn-1' } }; + } + throw new Error(`Unexpected method: ${method}`); + }; + + const result = await client.startTurn({ + threadId: 'thread-1', + inputText: 'hello', + model: 'gpt-5.4', + effort: null, + collaborationMode: 'default', + timeoutMs: 2500, + }); + + assert.equal(result.outputText, ''); + assert.equal(result.outputState, 'provider_error'); + assert.equal(result.finalSource, 'notification_error'); + assert.equal(result.errorMessage, 'HTTP 503 Service Unavailable'); +}); + +test('CodexAppClient ignores transient reconnect notifications and still returns the final answer for the active stdio turn', async () => { + let nowMs = 0; + let sleepCount = 0; + let transientEmitted = false; + let finalEmitted = false; + const client = new CodexAppClient({ + codexCliBin: 'codex', + turnPollNow: () => nowMs, + turnPollSleep: async (ms) => { + nowMs += ms; + sleepCount += 1; + if (!transientEmitted) { + transientEmitted = true; + client.emit('notification', { + method: 'codex/event/error', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + event: { + type: 'error', + message: 'Reconnecting... 1/5', + }, + }, + }); + } else if (!finalEmitted && sleepCount >= 2) { + finalEmitted = true; + client.emit('notification', { + method: 'item/started', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'item-1', + type: 'agentMessage', + phase: 'final_answer', + }, + }, + }); + client.emit('notification', { + method: 'item/agentMessage/delta', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + itemId: 'item-1', + delta: '恢复后的最终回答', + }, + }); + client.emit('notification', { + method: 'item/completed', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'item-1', + }, + }, + }); + client.emit('notification', { + method: 'turn/completed', + params: { + threadId: 'thread-1', + }, + }); + } + }, + }); + client.transportKind = 'stdio'; + + client.request = async (method) => { + if (method === 'turn/start') { + return { turn: { id: 'turn-1' } }; + } + throw new Error(`Unexpected method: ${method}`); + }; + + const result = await client.startTurn({ + threadId: 'thread-1', + inputText: 'hello', + model: 'gpt-5.4', + effort: null, + collaborationMode: 'default', + timeoutMs: 2500, + }); + + assert.equal(result.outputText, '恢复后的最终回答'); + assert.equal(result.outputState, 'complete'); + assert.equal(result.finalSource, 'progress_only'); + assert.equal(result.errorMessage, undefined); +}); + +test('CodexAppClient ignores message-less stream errors and waits for retry output', async () => { + let nowMs = 0; + let sleepCount = 0; + let streamErrorEmitted = false; + let transientEmitted = false; + let finalEmitted = false; + const client = new CodexAppClient({ + codexCliBin: 'codex', + turnPollNow: () => nowMs, + turnPollSleep: async (ms) => { + nowMs += ms; + sleepCount += 1; + if (!streamErrorEmitted) { + streamErrorEmitted = true; + client.emit('notification', { + method: 'codex/event/stream_error', + params: { + conversationId: 'thread-1', + id: '1', + msg: { + type: 'stream_error', + }, + }, + }); + } else if (!transientEmitted) { + transientEmitted = true; + client.emit('notification', { + method: 'error', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + message: 'Reconnecting... 1/5', + }, + }); + } else if (!finalEmitted && sleepCount >= 3) { + finalEmitted = true; + client.emit('notification', { + method: 'item/started', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'item-1', + type: 'agentMessage', + phase: 'final_answer', + }, + }, + }); + client.emit('notification', { + method: 'item/agentMessage/delta', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + itemId: 'item-1', + delta: 'OK', + }, + }); + client.emit('notification', { + method: 'item/completed', + params: { + threadId: 'thread-1', + turnId: 'turn-1', + item: { + id: 'item-1', + }, + }, + }); + client.emit('notification', { + method: 'turn/completed', + params: { + threadId: 'thread-1', + }, + }); + } + }, + }); + client.transportKind = 'stdio'; + + client.request = async (method) => { + if (method === 'turn/start') { + return { turn: { id: 'turn-1' } }; + } + throw new Error(`Unexpected method: ${method}`); + }; + + const result = await client.startTurn({ + threadId: 'thread-1', + inputText: 'hello', + model: 'gpt-5.4', + effort: null, + collaborationMode: 'default', + timeoutMs: 2500, + }); + + assert.equal(result.outputText, 'OK'); + assert.equal(result.outputState, 'complete'); + assert.equal(result.finalSource, 'progress_only'); + assert.equal(result.errorMessage, undefined); +}); + test('CodexAppClient returns partial commentary instead of timing out when assistant activity exists without a final answer', async () => { const client = new CodexAppClient({ codexCliBin: 'codex', diff --git a/test/providers/openai_native/plugin.test.ts b/test/providers/openai_native/plugin.test.ts index b2de6c8..c7a0dad 100644 --- a/test/providers/openai_native/plugin.test.ts +++ b/test/providers/openai_native/plugin.test.ts @@ -63,3 +63,76 @@ test('OpenAINativeProviderPlugin delegates thread creation through CodexProvider ['startThread', 'openai-default', '/tmp/openai'], ]); }); + +test('OpenAINativeProviderPlugin leaves the model unset unless the bridge session explicitly selects one', async () => { + const calls: any[] = []; + const plugin = new OpenAINativeProviderPlugin({ + clientFactory: () => ({ + async start() { + calls.push(['start']); + }, + async startThread(params: any) { + calls.push(['startThread', params.model ?? null]); + return { + threadId: 'thread-openai-2', + cwd: params.cwd ?? null, + title: params.title ?? null, + }; + }, + async startTurn(params: any) { + calls.push(['startTurn', params.model ?? null]); + return { + outputText: 'done', + threadId: params.threadId, + title: null, + }; + }, + async listModels() { + calls.push(['listModels']); + return [{ + id: 'gpt-5.1-codex-max', + model: 'gpt-5.1-codex-max', + displayName: 'GPT-5.1 Codex Max', + description: '', + isDefault: true, + supportedReasoningEfforts: ['medium'], + defaultReasoningEffort: 'medium', + }]; + }, + }), + }); + + const providerProfile = makeProfile(); + const session = { + id: 'bridge-session-1', + providerProfileId: providerProfile.id, + codexThreadId: 'thread-openai-2', + cwd: '/tmp/openai', + title: null, + createdAt: Date.now(), + updatedAt: Date.now(), + }; + + await plugin.startThread({ + providerProfile, + cwd: '/tmp/openai', + }); + await plugin.startTurn({ + providerProfile, + bridgeSession: session, + sessionSettings: null, + event: { + platform: 'weixin', + externalScopeId: 'wx-user-1', + text: 'hello', + }, + inputText: 'hello', + }); + + assert.deepEqual(calls, [ + ['start'], + ['startThread', null], + ['start'], + ['startTurn', null], + ]); +}); From 4bf75443ed98119df231c3e726bf832a08effe18 Mon Sep 17 00:00:00 2001 From: cullysu Date: Thu, 14 May 2026 20:27:23 +0800 Subject: [PATCH 4/4] fix wechat active turn cleanup --- src/core/bridge_coordinator.ts | 54 ++++++- test/core/bridge_coordinator.test.ts | 217 +++++++++++++++++++++++++++ 2 files changed, 268 insertions(+), 3 deletions(-) diff --git a/src/core/bridge_coordinator.ts b/src/core/bridge_coordinator.ts index d524b5e..529c772 100644 --- a/src/core/bridge_coordinator.ts +++ b/src/core/bridge_coordinator.ts @@ -1183,7 +1183,8 @@ export class BridgeCoordinator { }); return explicitPluginIssueResponse; } - this.activeTurns?.beginScopeTurn(scopeRef); + const localActiveTurn = this.activeTurns?.beginScopeTurn(scopeRef) ?? null; + let localTurnFinished = false; let session = null; try { const locale = this.resolveScopeLocale(scopeRef, effectiveEvent); @@ -1240,6 +1241,7 @@ export class BridgeCoordinator { errorMessage: result.errorMessage ?? '', }, }; + localTurnFinished = isTurnResultLocallyFinished(result); return response; } catch (error) { const failure = classifyTurnFailure(error, this.currentI18n); @@ -1264,9 +1266,13 @@ export class BridgeCoordinator { errorMessage: failure.errorMessage ?? '', }, }; + localTurnFinished = isTurnResultLocallyFinished(failure); return response; } finally { - await this.releaseActiveTurnIfStillRunning(scopeRef); + await this.releaseActiveTurnIfStillRunning(scopeRef, { + localTurnFinished, + expectedActiveTurn: localActiveTurn, + }); } } @@ -10852,11 +10858,28 @@ export class BridgeCoordinator { return this.activeTurns?.resolveScopeTurn(scopeRef) ?? null; } - async releaseActiveTurnIfStillRunning(scopeRef) { + async releaseActiveTurnIfStillRunning(scopeRef, { localTurnFinished = false, expectedActiveTurn = null } = {}) { + const currentActiveTurn = this.activeTurns?.resolveScopeTurn(scopeRef) ?? null; + if (expectedActiveTurn && currentActiveTurn !== expectedActiveTurn) { + return; + } const activeTurn = await this.reconcileActiveTurn(scopeRef); if (!activeTurn) { return; } + if (expectedActiveTurn && activeTurn !== expectedActiveTurn) { + return; + } + if (localTurnFinished && !hasPendingApproval(activeTurn)) { + debugCoordinator('active_turn_released_after_local_finish', { + platform: scopeRef.platform, + scopeId: scopeRef.externalScopeId, + threadId: activeTurn.threadId ?? null, + turnId: activeTurn.turnId ?? null, + }); + this.activeTurns?.endScopeTurn(scopeRef); + return; + } if (activeTurn.turnId || hasPendingApproval(activeTurn)) { return; } @@ -11226,6 +11249,20 @@ export class BridgeCoordinator { } } } + if ( + active + && interruptErrors.length > 0 + && interruptErrors.every((error) => isInterruptRequestTimeoutError(error)) + ) { + debugCoordinator('active_turn_released_after_interrupt_timeout', { + platform: scopeRef.platform, + scopeId: scopeRef.externalScopeId, + threadId: active.threadId ?? session.codexThreadId ?? null, + turnId: active.turnId ?? null, + interruptErrors, + }); + this.activeTurns?.endScopeTurn(scopeRef); + } const settled = waitForSettleMs > 0 ? await this.waitForThreadToStop(scopeRef, session, waitForSettleMs) @@ -20435,6 +20472,17 @@ function shouldRecoverFromProviderTurnResult(result) { return shouldAutoRebindAfterRecoveryFailure(new Error(errorMessage)); } +function isTurnResultLocallyFinished(result) { + const outputState = String(result?.outputState ?? 'complete').trim().toLowerCase(); + return outputState !== 'partial'; +} + +function isInterruptRequestTimeoutError(errorMessage) { + const normalized = String(errorMessage ?? '').trim().toLowerCase(); + return normalized.includes('timed out waiting for codex json-rpc response to turn/interrupt') + || normalized.includes('timeout waiting for codex json-rpc response to turn/interrupt'); +} + function isApprovedExecutionStallError(error) { const message = error instanceof Error ? error.message : String(error); return /Approval was accepted, but the approved /i.test(message) diff --git a/test/core/bridge_coordinator.test.ts b/test/core/bridge_coordinator.test.ts index cd1101d..5fc3d56 100644 --- a/test/core/bridge_coordinator.test.ts +++ b/test/core/bridge_coordinator.test.ts @@ -2177,6 +2177,50 @@ test('stale active turns are reconciled before starting a new conversation turn' assert.equal(runtime.services.activeTurns.resolveScopeTurn(scopeRef), null); }); +test('completed provider results release the local active turn even when thread status remains running', async () => { + const { runtime, openai } = makeRuntime(); + const originalStartTurn = openai.startTurn.bind(openai); + const scopeRef = { + platform: 'weixin', + externalScopeId: 'wx-user-complete-local-release-1', + }; + + openai.startTurn = async ({ bridgeSession, inputText, onTurnStarted = null }) => { + if (inputText !== 'complete from progress') { + return originalStartTurn({ bridgeSession, inputText, onTurnStarted }); + } + const thread = openai.threads.get(bridgeSession.codexThreadId); + assert.ok(thread); + const turnId = `${bridgeSession.codexThreadId}-turn-running-but-returned`; + await onTurnStarted?.({ + turnId, + threadId: bridgeSession.codexThreadId, + }); + thread.turns = [{ + id: turnId, + status: 'running', + error: null, + items: [], + }]; + return { + outputText: 'final answer from progress', + outputState: 'complete', + finalSource: 'progress_only', + turnId, + threadId: bridgeSession.codexThreadId, + title: bridgeSession.title, + }; + }; + + const result = await runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: 'complete from progress', + }); + + assert.equal(result.messages[0]?.text ?? '', 'final answer from progress'); + assert.equal(runtime.services.activeTurns.resolveScopeTurn(scopeRef), null); +}); + test('conversation turns remain blocked when the previous provider turn is still running', async () => { const { runtime, openai } = makeRuntime(); const scopeRef = { @@ -5574,6 +5618,179 @@ test('/stop interrupts the active turn once the provider has issued a turn id', assert.equal(firstResult.meta?.codexTurn?.outputState, 'interrupted'); }); +test('/stop clears the local active turn when Codex interrupt RPC times out', async () => { + const { runtime, openai } = makeRuntime(); + const originalStartTurn = openai.startTurn.bind(openai); + const scopeRef = { + platform: 'weixin', + externalScopeId: 'wx-user-stop-timeout-release-1', + }; + let releaseTurn: (value?: unknown) => void = () => {}; + const turnGate = new Promise((resolve) => { + releaseTurn = resolve; + }); + + openai.startTurn = async ({ bridgeSession, inputText, onTurnStarted = null }) => { + if (inputText !== 'first turn') { + return originalStartTurn({ bridgeSession, inputText, onTurnStarted }); + } + const thread = openai.threads.get(bridgeSession.codexThreadId); + assert.ok(thread); + const turnId = `${bridgeSession.codexThreadId}-turn-timeout-stop`; + await onTurnStarted?.({ + turnId, + threadId: bridgeSession.codexThreadId, + }); + thread.turns = [{ + id: turnId, + status: 'running', + error: null, + items: [], + }]; + await turnGate; + thread.turns = [{ + id: turnId, + status: 'interrupted', + error: 'Conversation interrupted', + items: [], + }]; + return { + outputText: '', + outputState: 'interrupted', + turnId, + threadId: bridgeSession.codexThreadId, + title: bridgeSession.title, + }; + }; + openai.interruptTurn = async (params) => { + openai.interruptTurnCalls.push(params); + throw new Error('Timed out waiting for Codex JSON-RPC response to turn/interrupt'); + }; + + const firstTurn = runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: 'first turn', + }); + + await waitForCondition(() => runtime.services.activeTurns.resolveScopeTurn(scopeRef)?.turnId); + + const stop = await runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: '/stop', + }); + + assert.match(stop.messages.map((message) => message.text ?? '').join('\n'), /turn\/interrupt/); + assert.equal(runtime.services.activeTurns.resolveScopeTurn(scopeRef), null); + + const next = await runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: 'after stop timeout', + }); + + assert.equal(next.messages[0]?.text ?? '', 'openai: after stop timeout'); + + releaseTurn(); + await firstTurn; +}); + +test('/stop timeout cleanup does not clear a newly started second turn', async () => { + const { runtime, openai } = makeRuntime(); + const originalStartTurn = openai.startTurn.bind(openai); + const scopeRef = { + platform: 'weixin', + externalScopeId: 'wx-user-stop-timeout-race-1', + }; + let releaseFirst: (value?: unknown) => void = () => {}; + const firstGate = new Promise((resolve) => { + releaseFirst = resolve; + }); + const secondTurnId = 'race-second-turn'; + + openai.startTurn = async ({ bridgeSession, inputText, onTurnStarted = null }) => { + if (inputText === 'first turn') { + const thread = openai.threads.get(bridgeSession.codexThreadId); + assert.ok(thread); + const turnId = `${bridgeSession.codexThreadId}-turn-timeout-race-first`; + await onTurnStarted?.({ + turnId, + threadId: bridgeSession.codexThreadId, + }); + thread.turns = [{ + id: turnId, + status: 'running', + error: null, + items: [], + }]; + await firstGate; + thread.turns = [{ + id: turnId, + status: 'interrupted', + error: 'Conversation interrupted', + items: [], + }]; + return { + outputText: '', + outputState: 'interrupted', + turnId, + threadId: bridgeSession.codexThreadId, + title: bridgeSession.title, + }; + } + if (inputText === 'second turn') { + const thread = openai.threads.get(bridgeSession.codexThreadId); + assert.ok(thread); + await onTurnStarted?.({ + turnId: secondTurnId, + threadId: bridgeSession.codexThreadId, + }); + thread.turns = [{ + id: secondTurnId, + status: 'running', + error: null, + items: [], + }]; + return { + outputText: 'second partial output', + outputState: 'partial', + finalSource: 'progress_only', + turnId: secondTurnId, + threadId: bridgeSession.codexThreadId, + title: bridgeSession.title, + }; + } + return originalStartTurn({ bridgeSession, inputText, onTurnStarted }); + }; + openai.interruptTurn = async () => { + throw new Error('Timed out waiting for Codex JSON-RPC response to turn/interrupt'); + }; + + const firstTurn = runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: 'first turn', + }); + + await waitForCondition(() => runtime.services.activeTurns.resolveScopeTurn(scopeRef)?.turnId); + + await runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: '/stop', + }); + + const second = await runtime.services.bridgeCoordinator.handleInboundEvent({ + ...scopeRef, + text: 'second turn', + }); + + assert.equal(second.meta?.codexTurn?.outputState, 'partial'); + assert.equal(runtime.services.activeTurns.resolveScopeTurn(scopeRef)?.turnId, secondTurnId); + + releaseFirst(); + await firstTurn; + + assert.equal(runtime.services.activeTurns.resolveScopeTurn(scopeRef)?.turnId, secondTurnId); + runtime.services.activeTurns.endScopeTurn(scopeRef); +}); + test('/interrupt remains a hidden compatibility alias and can queue an interrupt before turn startup completes', async () => { const { runtime, openai } = makeRuntime(); const scopeRef = {