diff --git a/CHANGELOG.md b/CHANGELOG.md index acfe367c..42f220e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ ### Fixed +- 账号"已限速"状态与上游 cachedQuota 双真理漂移彻底修复(free 用户尤甚,本地锁可长达一周不释放):proxy 此前同时维护 `entry.usage.rate_limit_until` + `entry.status === "rate_limited"`(来自 429 retry-after 的本地锁)和 `entry.cachedQuota..limit_reached/reset_at`(来自上游 rate_limits header 被动收集)两套独立信号。`refresh-scheduler.ts:196/236/291` 和 `services/account-mutation.ts:59` 的 `markStatus(_, "active")` 调用会把 status 从 "rate_limited" 翻回 "active" 但不动 `rate_limit_until`,导致后者成为孤儿字段——dashboard 显示"已限速 / 5h 0% 已使用"自相矛盾,且对 free 账号"7d 主窗口"语义下,孤儿 lock 可比上游真实重置时间晚出整整一个周期。修复:(1) 删除 `markRateLimited` / `clearRateLimit` / `markQuotaExhausted` 三个旧方法,新增 `applyRateLimit429(entryId, { retryAfterSec?, resetsAtSec?, countRequest? })`,把 429 retry-after 直接写到 `cachedQuota.rate_limit.{limit_reached=true, reset_at}`,永不缩短已有 reset_at(下一次 passive header 采集会修正 bucket),不再 mutate `entry.status`;(2) `AccountStatus` 枚举去掉 `"rate_limited"`(只剩 `active / expired / quota_exhausted / refreshing / disabled / banned`,纯轮转状态机);(3) 重写 `proxy-error-handler.ts:91` / `proxy-handler.ts:545` 走新方法;(4) `refreshStatus` 删掉 rate-limit 清理分支(`resetExpiredQuotaWindow` 已 cover 窗口到期自动清 `limit_reached`);(5) `isAuthenticated` / `getPoolSummary` 加 `hasReachedCachedQuota` 检查,避免全 quota 耗尽时误报 authenticated;(6) `accounts.json` 一次性 migration:`migrateLegacyRateLimit` 在 `loadPersisted` 里把 `status="rate_limited" + rate_limit_until` 老条目转成 `status="active" + 合成 cachedQuota.rate_limit`(仅当本地 lock 比 cachedQuota 新鲜时才覆盖),下一次 persist 自动丢字段;(7) `web/src/lib/accountStatus.ts` 新增 `derivedStatus(account)`,dashboard `AccountCard` / `AccountList` 都按 cachedQuota → "rate_limited" 派生 badge,"已达上限"账号现在如实显示而不是错标"活跃";新增 `tests/unit/auth/account-pool-rate-limit-429.test.ts`(9 个)+ `tests/unit/auth/account-persistence-migration.test.ts`(7 个),改写约 20 处旧 `markRateLimited` / `markQuotaExhausted` 断言用 `applyRateLimit429` + `isQuotaExhausted(account.quota)`,full suite 1927 全绿(`src/auth/account-registry.ts`、`src/auth/account-pool.ts`、`src/auth/account-persistence.ts`、`src/auth/types.ts`、`src/auth/quota-skip.ts`、`src/routes/shared/proxy-error-handler.ts`、`src/routes/shared/proxy-handler.ts`、`web/src/lib/accountStatus.ts`、`web/src/components/AccountCard.tsx`、`web/src/components/AccountTable.tsx`、`web/src/components/AccountList.tsx`、`web/src/pages/AccountManagement.tsx`) +- `src/routes/shared/proxy-handler.ts` 对 `EmptyResponseError` 与"上游 reasoning 中途断流"两类错误未区分,前者会 cross-account retry(吃 3 个号),后者重试也一定会再次撞同样的上游 120s 硬上限——一次"上游 reasoning 没在 ~120s 内 emit 任何 output_text"会消耗整池 360s 的号。新增 `UpstreamPrematureCloseError`(`src/translation/codex-event-extractor.ts`),`collectCodexResponse`(`src/translation/codex-to-openai.ts`)检测到 stream 无 `response.completed`/`response.failed`/`error` 任一 terminal 事件时抛新类型,proxy-handler 接住直接 504 fail-fast 不跨号重试。背景:调查中观察到上游 chatgpt.com 在 HTTP/1.1 chunked encoding 的 chunk 之间硬切 TCP,不发 0-length 终止 chunk 也不发 SSE `[DONE]`(hyper 透出 `error decoding response body: unexpected EOF during chunk size line`),触发条件为模型在 `effort=xhigh` 下推理超过 ~120s 仍未开始 output——属于上游 backend 行为不是 proxy bug;proxy 现在做的是不让该故障吃光账号池(`src/translation/codex-event-extractor.ts`、`src/translation/codex-to-openai.ts`、`src/routes/shared/proxy-handler.ts`、`tests/unit/translation/codex-to-openai.test.ts`、`tests/integration/proxy-handler.test.ts`) - `src/utils/debug-dump.ts` 把 dump 文件路径硬编码 `/tmp/codex-proxy-dump-*.jsonl`,Windows 没有 `/tmp` 目录,`fs.appendFileSync` ENOENT 直接被外层 try/catch 吞掉——Windows 用户即使设了 `CODEX_PROXY_DEBUG_DUMP=1` 也拿不到任何 dump 输出,且没有任何 warning。改用 `os.tmpdir()` 解析(macOS `/var/folders/.../T`、Linux `/tmp`、Windows `C:\Users\\AppData\Local\Temp`),跨平台一致。新增 `tests/unit/utils/debug-dump.test.ts` 把"dump path must live under os.tmpdir()"freeze 成显式断言,防止以后再有人为了"看着短"重新硬编码 `/tmp` - v2.0.73 用户在 8080 端口被占时弹 `Uncaught Exception: Error: listen EADDRINUSE: address already in use 127.0.0.1:8080`:根因是 `@hono/node-server.serve()` 同步返回 Server 对象但 `listen()` 是异步,`startServer()` 在 socket 真正 bind 之前就 resolve 了,main.ts 那个`try { await startServer(...) } catch { startServer({ port: 0 }) }` 的随机端口 fallback 永远不触发——EADDRINUSE 在 `await` 之外异步抛,逃出 catch 范围变成 uncaughtException 弹给 Electron 用户。同一个 race 在 #486 smoke 健康探测里以另一种形态出现过(grep 命中 "Server started" 后 curl 立刻 connect refused),当时只在外层 retry 上吸收了,没动产品代码;这次把根因修在 src 层。新增 `src/utils/await-listening.ts` 暴露 `awaitServerListening(server)`,监听 `listening` / `error` 二选一并自清 listener;`startServer()` 在 `serve()` 后插一行 `await awaitServerListening(server)` 把 bind 错误真正变成 startServer 的拒绝。配套 `tests/unit/utils/await-listening.test.ts` 5 个单测覆盖 listening / error / 已 listening / 双向不泄漏 listener。修复后 main.ts 的随机端口 fallback 真正生效,8080 被占时会自动换一个端口(`src/utils/await-listening.ts`、`src/index.ts`、`tests/unit/utils/await-listening.test.ts`) - Electron auto-updater 真正尊重 `autoUpdate` 选项:`packages/electron/electron/auto-updater.ts` 里 `const isAutoUpdate = options.autoUpdate ?? true` 这个变量声明了但**从来没被使用过**——后续的 `setTimeout(initial check, 30s)` 与 `setInterval(periodic check, 4h)` 直接无条件运行,于是用户即便配了 `autoUpdate: false` 也照样后台 ping 上游 latest release 检查更新。把这两个定时器包到 `if (isAutoUpdate)` 内才让开关真的生效。补 `packages/electron/__tests__/auto-updater.test.ts` 两个 case:(1) `autoUpdate: false` 时 advance fake timer 不会触发任何 `checkForUpdates`;(2) `allowPrerelease: true` 真的写到 `mockAutoUpdater.allowPrerelease` diff --git a/src/auth/account-persistence.ts b/src/auth/account-persistence.ts index 74ab642a..f719dcb3 100644 --- a/src/auth/account-persistence.ts +++ b/src/auth/account-persistence.ts @@ -18,7 +18,77 @@ import { extractUserProfile, isTokenExpired, } from "./jwt-utils.js"; -import type { AccountEntry, AccountsFile } from "./types.js"; +import type { AccountEntry, AccountsFile, CodexQuota } from "./types.js"; + +/** + * Migrate a legacy entry to the new schema: + * status === "rate_limited" + usage.rate_limit_until → status="active" + + * cachedQuota.rate_limit.{limit_reached, reset_at}. + * + * Trust rule: if cachedQuota was fetched AFTER rate_limit_until was last set + * (quotaFetchedAt > rate_limit_until), we treat cachedQuota as ground truth + * and just drop the local lock. Otherwise we synthesize/overwrite the primary + * bucket from rate_limit_until. + * + * Returns true when the entry was mutated. + */ +export function migrateLegacyRateLimit(entry: AccountEntry): boolean { + const usage = entry.usage; + const legacyUntil = usage.rate_limit_until; + // On-disk shape pre-dates the enum narrowing; cast through string to compare + // against the retired "rate_limited" literal without tripping TS no-overlap. + const wasRateLimitedStatus = (entry.status as string) === "rate_limited"; + if (!wasRateLimitedStatus && !legacyUntil) return false; + + let mutated = false; + + if (wasRateLimitedStatus) { + entry.status = "active"; + mutated = true; + } + + if (legacyUntil) { + const untilMs = Date.parse(legacyUntil); + const untilSec = Number.isFinite(untilMs) ? Math.floor(untilMs / 1000) : 0; + const inFuture = Number.isFinite(untilMs) && untilMs > Date.now(); + + const fetchedMs = entry.quotaFetchedAt ? Date.parse(entry.quotaFetchedAt) : NaN; + const cachedQuotaIsFresh = + entry.cachedQuota != null && + Number.isFinite(fetchedMs) && + Number.isFinite(untilMs) && + fetchedMs > untilMs; + + if (inFuture && !cachedQuotaIsFresh) { + const synthesized: CodexQuota = entry.cachedQuota ?? { + plan_type: entry.planType ?? "unknown", + rate_limit: { + allowed: false, + limit_reached: true, + used_percent: 100, + reset_at: untilSec, + limit_window_seconds: usage.limit_window_seconds ?? null, + }, + secondary_rate_limit: null, + code_review_rate_limit: null, + }; + synthesized.rate_limit = { + ...synthesized.rate_limit, + allowed: false, + limit_reached: true, + used_percent: Math.max(synthesized.rate_limit.used_percent ?? 0, 100), + reset_at: untilSec, + }; + entry.cachedQuota = synthesized; + entry.quotaFetchedAt = new Date().toISOString(); + } + + usage.rate_limit_until = null; + mutated = true; + } + + return mutated; +} export interface AccountPersistence { load(): { entries: AccountEntry[]; needsPersist: boolean }; @@ -212,6 +282,10 @@ function loadPersisted(): { entries: AccountEntry[]; needsPersist: boolean } { entry.quotaFetchedAt = null; needsPersist = true; } + // Migrate legacy rate_limit_until + status="rate_limited" → cachedQuota + if (migrateLegacyRateLimit(entry)) { + needsPersist = true; + } entries.push(entry); } diff --git a/src/auth/account-pool.ts b/src/auth/account-pool.ts index f61c1ca0..2caa1ad3 100644 --- a/src/auth/account-pool.ts +++ b/src/auth/account-pool.ts @@ -164,28 +164,23 @@ export class AccountPool { } } - markRateLimited( + /** + * Single source of truth for "this account just got 429'd". Writes the + * retry-after hint into cachedQuota.rate_limit (primary bucket); pool + * exclusion flows through {@link hasReachedCachedQuota}. See + * AccountRegistry.applyRateLimit429 for full semantics including + * never-shrink-existing-reset_at and bucket-inference fallback. + */ + applyRateLimit429( entryId: string, - options?: { retryAfterSec?: number; countRequest?: boolean }, + options?: { retryAfterSec?: number; resetsAtSec?: number; countRequest?: boolean }, ): void { - if (this.registry.markRateLimited(entryId, this.rateLimitBackoffSeconds, options)) { + if (this.registry.applyRateLimit429(entryId, this.rateLimitBackoffSeconds, options)) { this.lifecycle.clearLock(entryId); this.evictWsPool(entryId); } } - clearRateLimit(entryId: string): void { - if (this.registry.clearRateLimit(entryId)) { - this.lifecycle.clearLock(entryId); - } - } - - markQuotaExhausted(entryId: string, resetAtUnix: number | null): void { - if (this.registry.markQuotaExhausted(entryId, resetAtUnix)) { - this.lifecycle.clearLock(entryId); - } - } - // ── Quota / usage ───────────────────────────────────────────────── recordEmptyResponse(entryId: string): void { diff --git a/src/auth/account-registry.ts b/src/auth/account-registry.ts index 65be9c24..7e4aa4ea 100644 --- a/src/auth/account-registry.ts +++ b/src/auth/account-registry.ts @@ -106,7 +106,6 @@ export class AccountRegistry { cached_tokens: 0, empty_response_count: 0, last_used: null, - rate_limit_until: null, window_request_count: 0, window_input_tokens: 0, window_output_tokens: 0, @@ -186,19 +185,62 @@ export class AccountRegistry { } /** Returns true if the entry was found and mutated. */ - markRateLimited( + /** + * Handle an upstream 429 by writing into cachedQuota.rate_limit (primary + * bucket) as the single source of truth. 429 body carries no bucket marker; + * the next passive header collection on a successful response will overwrite + * with ground truth (which may upgrade this to secondary if needed). + * + * - Synthesizes a minimal cachedQuota if none exists yet (new account). + * - Never shrinks an existing reset_at — if cachedQuota already says we are + * limited further in the future (e.g. weekly bucket), keep that. + * - Does NOT mutate `entry.status`; pool exclusion happens via + * {@link hasReachedCachedQuota}. + * + * Returns true if the entry was found. + */ + applyRateLimit429( entryId: string, backoffSeconds: number, - options?: { retryAfterSec?: number; countRequest?: boolean }, + options?: { retryAfterSec?: number; resetsAtSec?: number; countRequest?: boolean }, ): boolean { const entry = this.accounts.get(entryId); if (!entry) return false; - const backoff = jitter(options?.retryAfterSec ?? backoffSeconds, 0.2); - const until = new Date(Date.now() + backoff * 1000); + const nowSec = Date.now() / 1000; + const explicit = options?.resetsAtSec; + const fromRetry = options?.retryAfterSec != null + ? nowSec + jitter(options.retryAfterSec, 0.2) + : null; + const newResetAt = explicit ?? fromRetry ?? (nowSec + jitter(backoffSeconds, 0.2)); + + const quota: CodexQuota = entry.cachedQuota ?? { + plan_type: entry.planType ?? "unknown", + rate_limit: { + allowed: false, + limit_reached: true, + used_percent: 100, + reset_at: newResetAt, + limit_window_seconds: entry.usage.limit_window_seconds ?? null, + }, + secondary_rate_limit: null, + code_review_rate_limit: null, + }; - entry.status = "rate_limited"; - entry.usage.rate_limit_until = until.toISOString(); + const existingResetAt = quota.rate_limit.reset_at; + const finalResetAt = existingResetAt != null && existingResetAt > newResetAt + ? existingResetAt + : newResetAt; + + quota.rate_limit = { + ...quota.rate_limit, + allowed: false, + limit_reached: true, + used_percent: Math.max(quota.rate_limit.used_percent ?? 0, 100), + reset_at: finalResetAt, + }; + entry.cachedQuota = quota; + entry.quotaFetchedAt = new Date().toISOString(); if (options?.countRequest) { entry.usage.request_count++; @@ -210,38 +252,6 @@ export class AccountRegistry { return true; } - /** Returns true if the entry was found and mutated. */ - clearRateLimit(entryId: string): boolean { - const entry = this.accounts.get(entryId); - if (!entry) return false; - entry.status = "active"; - entry.usage.rate_limit_until = null; - this.schedulePersist(); - return true; - } - - /** Returns true if the entry was found and actually changed. */ - markQuotaExhausted(entryId: string, resetAtUnix: number | null): boolean { - const entry = this.accounts.get(entryId); - if (!entry) return false; - if (entry.status === "disabled" || entry.status === "expired" || entry.status === "banned" || entry.status === "refreshing") return false; - - const until = resetAtUnix - ? new Date(resetAtUnix * 1000).toISOString() - : new Date(Date.now() + 300_000).toISOString(); - - if (entry.status === "rate_limited" && entry.usage.rate_limit_until) { - const existing = new Date(entry.usage.rate_limit_until).getTime(); - const proposed = new Date(until).getTime(); - if (proposed <= existing) return false; - } - - entry.status = "rate_limited"; - entry.usage.rate_limit_until = until; - this.schedulePersist(); - return true; - } - // ── Query ───────────────────────────────────────────────────────── getAccounts(): AccountInfo[] { @@ -268,7 +278,11 @@ export class AccountRegistry { const now = new Date(); for (const entry of this.accounts.values()) { this.refreshStatus(entry, now); - if (entry.status === "active") return true; + // "Authenticated" used to imply "has a usable account". After retiring + // status="rate_limited", we treat any cachedQuota-exhausted account as + // unusable too — otherwise an all-exhausted pool would falsely report + // authenticated and produce confusing 4xx on requests. + if (entry.status === "active" && !hasReachedCachedQuota(entry)) return true; } return false; } @@ -325,6 +339,8 @@ export class AccountRegistry { active: number; expired: number; quota_exhausted: number; + /** Count of accounts whose cachedQuota reports any bucket limit_reached. + * Derived from cachedQuota, NOT from a "rate_limited" status (retired). */ rate_limited: number; refreshing: number; disabled: number; @@ -334,11 +350,14 @@ export class AccountRegistry { let active = 0, expired = 0, quota_exhausted = 0, rate_limited = 0, refreshing = 0, disabled = 0, banned = 0; for (const entry of this.accounts.values()) { this.refreshStatus(entry, now); + if (entry.status === "active" && hasReachedCachedQuota(entry)) { + rate_limited++; + continue; + } switch (entry.status) { case "active": active++; break; case "expired": expired++; break; case "quota_exhausted": quota_exhausted++; break; - case "rate_limited": rate_limited++; break; case "refreshing": refreshing++; break; case "disabled": disabled++; break; case "banned": banned++; break; @@ -459,7 +478,6 @@ export class AccountRegistry { cached_tokens: 0, empty_response_count: 0, last_used: null, - rate_limit_until: null, window_reset_at: entry.usage.window_reset_at ?? null, window_request_count: 0, window_input_tokens: 0, @@ -475,13 +493,6 @@ export class AccountRegistry { // ── Internal ────────────────────────────────────────────────────── refreshStatus(entry: AccountEntry, now: Date): void { - if (entry.status === "rate_limited" && entry.usage.rate_limit_until) { - if (now >= new Date(entry.usage.rate_limit_until)) { - entry.status = "active"; - entry.usage.rate_limit_until = null; - } - } - if (entry.status === "active" && isTokenExpired(entry.token)) { entry.status = "expired"; } diff --git a/src/auth/quota-skip.ts b/src/auth/quota-skip.ts index 31eef69a..1965d978 100644 --- a/src/auth/quota-skip.ts +++ b/src/auth/quota-skip.ts @@ -1,7 +1,13 @@ -import type { AccountEntry } from "./types.js"; +import type { AccountEntry, CodexQuota } from "./types.js"; + +/** True when any of the 3 cachedQuota buckets reports limit_reached. */ +export function isQuotaExhausted(quota: CodexQuota | null | undefined): boolean { + if (!quota) return false; + return quota.rate_limit.limit_reached === true || + quota.secondary_rate_limit?.limit_reached === true || + quota.code_review_rate_limit?.limit_reached === true; +} export function hasReachedCachedQuota(entry: AccountEntry): boolean { - return entry.cachedQuota?.rate_limit.limit_reached === true || - entry.cachedQuota?.secondary_rate_limit?.limit_reached === true || - entry.cachedQuota?.code_review_rate_limit?.limit_reached === true; + return isQuotaExhausted(entry.cachedQuota); } diff --git a/src/auth/types.ts b/src/auth/types.ts index e6f46598..41b3b759 100644 --- a/src/auth/types.ts +++ b/src/auth/types.ts @@ -6,7 +6,6 @@ export type AccountStatus = | "active" | "expired" | "quota_exhausted" - | "rate_limited" | "refreshing" | "disabled" | "banned"; @@ -27,7 +26,12 @@ export interface AccountUsage { image_request_failed_count?: number; empty_response_count: number; last_used: string | null; - rate_limit_until: string | null; + /** + * Legacy local-lock field, retired. Reads survive on disk to support + * in-place migration; new code MUST consult cachedQuota.*.limit_reached + * instead. Removed from runtime mutation by `migrateLegacyRateLimit`. + */ + rate_limit_until?: string | null; /** Tracks the current rate limit window end (Unix seconds). When window rolls over, counters reset. */ window_reset_at?: number | null; /** Per-window request count (resets when window expires). */ diff --git a/src/routes/shared/proxy-error-handler.ts b/src/routes/shared/proxy-error-handler.ts index d25b47d9..64a4919e 100644 --- a/src/routes/shared/proxy-error-handler.ts +++ b/src/routes/shared/proxy-error-handler.ts @@ -75,21 +75,13 @@ export function handleCodexApiError( console.error(`[${tag}] Account ${entryId} | Codex API error:`, err.message); - // 2. Rate-limited + // 2. Rate-limited — write into cachedQuota.rate_limit (single source of + // truth). applyRateLimit429 internally never shrinks an existing reset_at, + // so a fresh secondary-window lock survives a stale primary 429. if (err.status === 429) { const retryAfterSec = extractRetryAfterSec(err.body); - - // If cached quota shows limit_reached with a known reset time, use that - // instead of the short default backoff (prevents exhausted accounts from - // cycling back to "active" after 60s only to get 429'd again) - const entry = pool.getEntry(entryId); - const cachedReset = entry?.cachedQuota?.rate_limit?.reset_at; - const effectiveRetry = (entry?.cachedQuota?.rate_limit?.limit_reached && cachedReset) - ? Math.max(retryAfterSec ?? 0, cachedReset - Math.floor(Date.now() / 1000)) - : retryAfterSec; - - pool.markRateLimited(entryId, { retryAfterSec: effectiveRetry ?? undefined, countRequest: true }); - const backoffDisplay = effectiveRetry != null ? Math.round(effectiveRetry) : null; + pool.applyRateLimit429(entryId, { retryAfterSec, countRequest: true }); + const backoffDisplay = retryAfterSec != null ? Math.round(retryAfterSec) : null; console.warn( `[${tag}] Account ${entryId} (${email}) | 429 rate limited` + (backoffDisplay != null ? ` (resets in ${backoffDisplay}s)` : "") + diff --git a/src/routes/shared/proxy-handler.ts b/src/routes/shared/proxy-handler.ts index 51764afe..7ada4208 100644 --- a/src/routes/shared/proxy-handler.ts +++ b/src/routes/shared/proxy-handler.ts @@ -19,7 +19,7 @@ import { } from "../../proxy/codex-api.js"; import type { CodexResponsesRequest } from "../../proxy/codex-api.js"; import type { UpstreamAdapter } from "../../proxy/upstream-adapter.js"; -import { EmptyResponseError } from "../../translation/codex-event-extractor.js"; +import { EmptyResponseError, UpstreamPrematureCloseError } from "../../translation/codex-event-extractor.js"; import type { AccountPool } from "../../auth/account-pool.js"; import type { CookieJar } from "../../proxy/cookie-jar.js"; import type { ProxyPool } from "../../proxy/proxy-pool.js"; @@ -538,11 +538,13 @@ export async function handleProxyRequest( const windowSec = rl.primary.window_minutes != null ? rl.primary.window_minutes * 60 : null; accountPool.syncRateLimitWindow(entryId, rl.primary.reset_at, windowSec); } - // Proactively mark exhausted accounts so they don't get re-selected + // Proactively mark exhausted accounts so they don't get re-selected. + // updateCachedQuota above already records the truth; this call only + // exists for its side effects (lifecycle.clearLock + WS pool eviction). if (quota.rate_limit.limit_reached && rl.primary?.reset_at != null) { const backoffSec = rl.primary.reset_at - Math.floor(Date.now() / 1000); if (backoffSec > 0) { - accountPool.markRateLimited(entryId, { retryAfterSec: backoffSec }); + accountPool.applyRateLimit429(entryId, { resetsAtSec: rl.primary.reset_at }); } } }; @@ -890,6 +892,20 @@ async function handleNonStreaming( releaseAccount(accountPool, currentEntryId, annotateImageGenOutcome(result.usage, req.expectsImageGen), released); return c.json(result.response); } catch (collectErr) { + // Upstream FIN'd mid-reasoning (typically gpt-5.5 xhigh > 120 s cap). + // Cross-account retry would re-hit the same cap and burn the pool, so + // we fail fast with 504. The proxy can't recover this — the client + // needs to lower reasoning effort or pick a different model. + if (collectErr instanceof UpstreamPrematureCloseError) { + const email = accountPool.getEntry(currentEntryId)?.email ?? "?"; + console.warn( + `[${fmt.tag}] Account ${currentEntryId} (${email}) | upstream premature close (hadReasoning=${collectErr.hadReasoning} events=${collectErr.eventCount}) — failing fast, not retrying`, + ); + releaseAccount(accountPool, currentEntryId, annotateImageGenOutcome(undefined, req.expectsImageGen), released); + c.status(504); + return c.json(fmt.formatError(504, collectErr.message)); + } + if (collectErr instanceof EmptyResponseError && attempt <= MAX_EMPTY_RETRIES) { const email = accountPool.getEntry(currentEntryId)?.email ?? "?"; console.warn( diff --git a/src/translation/codex-event-extractor.ts b/src/translation/codex-event-extractor.ts index 72d76ccb..db821e1e 100644 --- a/src/translation/codex-event-extractor.ts +++ b/src/translation/codex-event-extractor.ts @@ -53,6 +53,32 @@ export class EmptyResponseError extends Error { } } +/** + * Upstream closed the SSE stream without sending `response.completed`, + * `response.failed`, or an `error` event. Observed when gpt-5.5 with + * `effort=xhigh` spends > 120 s in reasoning_summary before producing any + * output_text — the Codex backend caps total response duration and silently + * FINs the connection. + * + * Treated separately from EmptyResponseError because cross-account retry is + * useless (same workload re-hits the same cap on the next account) and just + * burns the pool. The proxy surfaces 504 to the client instead. + */ +export class UpstreamPrematureCloseError extends Error { + constructor( + public readonly responseId: string | null, + public readonly hadReasoning: boolean, + public readonly eventCount: number, + ) { + super( + hadReasoning + ? "Upstream closed stream after reasoning without producing output (likely hit response-duration cap)" + : "Upstream closed stream without a terminal event", + ); + this.name = "UpstreamPrematureCloseError"; + } +} + export interface ExtractedEvent { typed: TypedCodexEvent; responseId?: string; diff --git a/src/translation/codex-to-openai.ts b/src/translation/codex-to-openai.ts index a3f10626..6356d1ca 100644 --- a/src/translation/codex-to-openai.ts +++ b/src/translation/codex-to-openai.ts @@ -18,9 +18,15 @@ import type { ChatCompletionToolCall, ChatCompletionChunkToolCall, } from "../types/openai.js"; -import { iterateCodexEvents, EmptyResponseError, type UsageInfo } from "./codex-event-extractor.js"; +import { + iterateCodexEvents, + EmptyResponseError, + UpstreamPrematureCloseError, + type UsageInfo, +} from "./codex-event-extractor.js"; import { reconvertTupleValues } from "./tuple-schema.js"; import { codexApiErrorFromEvent } from "./codex-api-error-from-event.js"; +import { debugDump, debugDumpEnabled } from "../utils/debug-dump.js"; /** Format an SSE chunk for streaming output */ function formatSSE(chunk: ChatCompletionChunk): string { @@ -318,7 +324,20 @@ export async function collectCodexResponse( // Collect tool calls const toolCalls: ChatCompletionToolCall[] = []; + const dumpEnabled = debugDumpEnabled(); + const eventTrace: unknown[] = []; + let sawTerminalEvent = false; + let eventCount = 0; + for await (const evt of iterateCodexEvents(codexApi, rawResponse)) { + if (dumpEnabled) eventTrace.push(evt.typed); + eventCount++; + if ( + evt.typed.type === "response.completed" || + evt.typed.type === "response.failed" + ) { + sawTerminalEvent = true; + } if (evt.responseId) responseId = evt.responseId; if (evt.error) { throw codexApiErrorFromEvent(evt.error); @@ -345,6 +364,29 @@ export async function collectCodexResponse( // Detect empty response (HTTP 200 but no content) if (!fullText && toolCalls.length === 0 && completionTokens === 0) { + if (dumpEnabled) { + debugDump("empty-response-openai", { + model, + responseId, + promptTokens, + completionTokens, + cachedTokens, + reasoningTokens, + fullTextLen: fullText.length, + fullReasoningLen: fullReasoning.length, + toolCallsCount: toolCalls.length, + sawTerminalEvent, + eventCount: eventTrace.length, + events: eventTrace, + }); + } + // Upstream FIN'd without ever sending response.completed / response.failed + // — this is the gpt-5.5 xhigh reasoning > 120 s cap, not a real "empty" + // payload. Surface it distinctly so the proxy-handler can fail fast + // instead of burning a 3-account empty-response retry. + if (!sawTerminalEvent) { + throw new UpstreamPrematureCloseError(responseId, fullReasoning.length > 0, eventCount); + } throw new EmptyResponseError(responseId, { input_tokens: promptTokens, output_tokens: completionTokens }); } diff --git a/tests/_fixtures/sse-streams.ts b/tests/_fixtures/sse-streams.ts index 82cd65be..9ae140e5 100644 --- a/tests/_fixtures/sse-streams.ts +++ b/tests/_fixtures/sse-streams.ts @@ -70,6 +70,20 @@ export function emptyStream(): ExtractedEvent[] { ]; } +/** Upstream cut the SSE stream after producing reasoning summary but before + * any output_text or response.completed terminal event. Observed when + * gpt-5.5 with effort=xhigh thinks past the upstream's 120 s response cap. */ +export function prematureCloseAfterReasoningStream(): ExtractedEvent[] { + return [ + createCreated("resp_pc"), + createInProgress("resp_pc"), + createReasoningDelta("Let me think about this carefully"), + createReasoningDelta("..."), + createReasoningDelta(" still thinking"), + // No response.completed, no response.failed, no error — stream just ends. + ]; +} + /** Stream with multiple tool calls. */ export function multiToolCallStream(): ExtractedEvent[] { return [ diff --git a/tests/e2e/quota-refresh.test.ts b/tests/e2e/quota-refresh.test.ts index 71a37e80..8c729cb8 100644 --- a/tests/e2e/quota-refresh.test.ts +++ b/tests/e2e/quota-refresh.test.ts @@ -135,7 +135,7 @@ describe("E2E: quota auto-refresh", () => { clearWarnings("test-acct-1"); }); - it("markQuotaExhausted causes acquire to skip that account", async () => { + it("applyRateLimit429 causes acquire to skip that account", async () => { const id1 = pool.addAccount(createValidJwt({ accountId: "acct-exhaust-1", email: "exhaust1@test.com", @@ -147,8 +147,8 @@ describe("E2E: quota auto-refresh", () => { planType: "plus", })); - // Exhaust first account - pool.markQuotaExhausted(id1, Math.floor(Date.now() / 1000) + 7200); + // Exhaust first account via cachedQuota path + pool.applyRateLimit429(id1, { resetsAtSec: Math.floor(Date.now() / 1000) + 7200 }); const acquired = pool.acquire(); expect(acquired).not.toBeNull(); diff --git a/tests/integration/account-routing.test.ts b/tests/integration/account-routing.test.ts index f0606f2b..a4bad362 100644 --- a/tests/integration/account-routing.test.ts +++ b/tests/integration/account-routing.test.ts @@ -204,7 +204,7 @@ describe("account-routing integration", () => { // Acquire A and mark it rate limited const acqA = pool.acquire()!; expect(acqA.entryId).toBe(idA); - pool.markRateLimited(acqA.entryId); + pool.applyRateLimit429(acqA.entryId); // Next acquire should skip A and return B const acqB = pool.acquire()!; @@ -219,7 +219,7 @@ describe("account-routing integration", () => { expect(acq.entryId).toBe(id); // Mark rate limited with very short backoff (negative = already expired) - pool.markRateLimited(acq.entryId, { retryAfterSec: -1 }); + pool.applyRateLimit429(acq.entryId, { retryAfterSec: -1 }); // Should auto-recover immediately since backoff has passed const recovered = pool.acquire(); diff --git a/tests/integration/proxy-handler.test.ts b/tests/integration/proxy-handler.test.ts index c9ed1842..ccff235b 100644 --- a/tests/integration/proxy-handler.test.ts +++ b/tests/integration/proxy-handler.test.ts @@ -68,13 +68,29 @@ vi.mock("@src/translation/codex-event-extractor.js", () => { this.usage = usage; } } - return { EmptyResponseError }; + class UpstreamPrematureCloseError extends Error { + responseId: string | null; + hadReasoning: boolean; + eventCount: number; + constructor(responseId: string | null, hadReasoning: boolean, eventCount: number) { + super( + hadReasoning + ? "Upstream closed stream after reasoning without producing output (likely hit response-duration cap)" + : "Upstream closed stream without a terminal event", + ); + this.name = "UpstreamPrematureCloseError"; + this.responseId = responseId; + this.hadReasoning = hadReasoning; + this.eventCount = eventCount; + } + } + return { EmptyResponseError, UpstreamPrematureCloseError }; }); // Import after mocks are set up import { handleProxyRequest } from "@src/routes/shared/proxy-handler.js"; import { CodexApiError } from "@src/proxy/codex-api.js"; -import { EmptyResponseError } from "@src/translation/codex-event-extractor.js"; +import { EmptyResponseError, UpstreamPrematureCloseError } from "@src/translation/codex-event-extractor.js"; // ── Helpers ─────────────────────────────────────────────────────────── @@ -83,6 +99,7 @@ function createMockAccountPool(overrides: Record = {}) { acquire: vi.fn(() => ({ entryId: "e1", token: "tok", accountId: "acc1" })), release: vi.fn(), markRateLimited: vi.fn(), + applyRateLimit429: vi.fn(), markStatus: vi.fn(), getEntry: vi.fn(() => ({ email: "test@test.com" })), recordEmptyResponse: vi.fn(), @@ -186,45 +203,45 @@ describe("proxy-handler integration", () => { }); // 3. Streaming success - it("returns text/event-stream with SSE chunks for streaming", async () => { - const accountPool = createMockAccountPool(); - const fmt = createMockFormatAdapter(); - const req = createStreamingRequest(); - const { app } = buildTestApp({ accountPool, fmt, req }); + it("returns text/event-stream with SSE chunks for streaming", async () => { + const accountPool = createMockAccountPool(); + const fmt = createMockFormatAdapter(); + const req = createStreamingRequest(); + const { app } = buildTestApp({ accountPool, fmt, req }); const res = await app.request("/test", { method: "POST" }); expect(res.headers.get("Content-Type")).toContain("text/event-stream"); const text = await res.text(); expect(text).toContain("data: {}\n\n"); - expect(text).toContain("data: [DONE]\n\n"); - expect(fmt.streamTranslator).toHaveBeenCalled(); - }); - - it("returns a streaming error event when upstream request fails before SSE starts", async () => { - mockCreateResponse = () => - Promise.reject(new CodexApiError(0, "error sending request for url")); - - const accountPool = createMockAccountPool(); - const fmt = createMockFormatAdapter(); - const req = createStreamingRequest(); - const { app } = buildTestApp({ accountPool, fmt, req }); - - const res = await app.request("/test", { method: "POST" }); - expect(res.status).toBe(200); - expect(res.headers.get("Content-Type")).toContain("text/event-stream"); - - const text = await res.text(); - expect(text).toContain("event: response.failed"); - expect(text).toContain("error sending request for url"); - expect(fmt.formatStreamError).toHaveBeenCalledWith( - 502, - "Codex API error (0): error sending request for url", - ); - expect(accountPool.release).toHaveBeenCalledWith("e1", undefined); - }); - - // 4. CodexApiError 429 → markRateLimited with parsed retryAfterSec + fallback to next account + expect(text).toContain("data: [DONE]\n\n"); + expect(fmt.streamTranslator).toHaveBeenCalled(); + }); + + it("returns a streaming error event when upstream request fails before SSE starts", async () => { + mockCreateResponse = () => + Promise.reject(new CodexApiError(0, "error sending request for url")); + + const accountPool = createMockAccountPool(); + const fmt = createMockFormatAdapter(); + const req = createStreamingRequest(); + const { app } = buildTestApp({ accountPool, fmt, req }); + + const res = await app.request("/test", { method: "POST" }); + expect(res.status).toBe(200); + expect(res.headers.get("Content-Type")).toContain("text/event-stream"); + + const text = await res.text(); + expect(text).toContain("event: response.failed"); + expect(text).toContain("error sending request for url"); + expect(fmt.formatStreamError).toHaveBeenCalledWith( + 502, + "Codex API error (0): error sending request for url", + ); + expect(accountPool.release).toHaveBeenCalledWith("e1", undefined); + }); + + // 4. CodexApiError 429 → markRateLimited with parsed retryAfterSec + fallback to next account it("handles 429 by parsing resets_in_seconds and falling back to next account", async () => { const body429 = JSON.stringify({ error: { type: "usage_limit_reached", message: "Limit reached", resets_in_seconds: 471284 }, @@ -250,7 +267,7 @@ describe("proxy-handler integration", () => { const res = await app.request("/test", { method: "POST" }); expect(res.status).toBe(200); - expect(accountPool.markRateLimited).toHaveBeenCalledWith("e1", { + expect(accountPool.applyRateLimit429).toHaveBeenCalledWith("e1", { retryAfterSec: 471284, countRequest: true, }); @@ -277,7 +294,7 @@ describe("proxy-handler integration", () => { const res = await app.request("/test", { method: "POST" }); expect(res.status).toBe(429); - expect(accountPool.markRateLimited).toHaveBeenCalledWith("e1", { + expect(accountPool.applyRateLimit429).toHaveBeenCalledWith("e1", { retryAfterSec: undefined, countRequest: true, }); @@ -302,7 +319,7 @@ describe("proxy-handler integration", () => { await app.request("/test", { method: "POST" }); - const call = accountPool.markRateLimited.mock.calls[0] as [string, { retryAfterSec: number; countRequest: boolean }]; + const call = accountPool.applyRateLimit429.mock.calls[0] as [string, { retryAfterSec: number; countRequest: boolean }]; expect(call[0]).toBe("e1"); // Should be approximately 3600 (±5s tolerance for test execution time) expect(call[1].retryAfterSec).toBeGreaterThan(3590); @@ -331,9 +348,9 @@ describe("proxy-handler integration", () => { expect(res.status).toBe(429); // Both accounts marked rate limited - expect(accountPool.markRateLimited).toHaveBeenCalledTimes(2); - expect(accountPool.markRateLimited).toHaveBeenCalledWith("e1", { retryAfterSec: 100, countRequest: true }); - expect(accountPool.markRateLimited).toHaveBeenCalledWith("e2", { retryAfterSec: 100, countRequest: true }); + expect(accountPool.applyRateLimit429).toHaveBeenCalledTimes(2); + expect(accountPool.applyRateLimit429).toHaveBeenCalledWith("e1", { retryAfterSec: 100, countRequest: true }); + expect(accountPool.applyRateLimit429).toHaveBeenCalledWith("e2", { retryAfterSec: 100, countRequest: true }); expect(accountPool.release).not.toHaveBeenCalled(); }); @@ -423,8 +440,36 @@ describe("proxy-handler integration", () => { expect(accountPool.release).toHaveBeenCalledWith("e1", undefined); }); + // 8b. Upstream premature close → 504, no cross-account retry + it("fails fast with 504 on UpstreamPrematureCloseError, no retry", async () => { + let acquireCount = 0; + const accountPool = createMockAccountPool({ + acquire: vi.fn(() => { + acquireCount++; + return { entryId: `e${acquireCount}`, token: "tok", accountId: "acc" }; + }), + }); + + let collectCallCount = 0; + const fmt = createMockFormatAdapter({ + collectTranslator: vi.fn(async () => { + collectCallCount++; + throw new UpstreamPrematureCloseError("resp_pc", true, 1920); + }), + }); + + const { app } = buildTestApp({ accountPool, fmt }); + + const res = await app.request("/test", { method: "POST" }); + expect(res.status).toBe(504); + expect(collectCallCount).toBe(1); + expect(acquireCount).toBe(1); + expect(accountPool.recordEmptyResponse).not.toHaveBeenCalled(); + expect(accountPool.release).toHaveBeenCalledWith("e1", undefined); + }); + // 8. Empty response retry (non-streaming) → account switch, second succeeds - it("retries with a new account on EmptyResponseError", async () => { + it("retries with a new account on EmptyResponseError", async () => { let callCount = 0; const accountPool = createMockAccountPool({ acquire: vi.fn(() => { @@ -470,52 +515,52 @@ describe("proxy-handler integration", () => { input_tokens: 1, output_tokens: 0, }); - expect(accountPool.release).toHaveBeenCalledWith("e2", { - input_tokens: 5, - output_tokens: 15, - }); - }); - - it("attributes collect CodexApiError after EmptyResponseError retry to the new account", async () => { - let acquireCount = 0; - const accountPool = createMockAccountPool({ - acquire: vi.fn(() => { - acquireCount++; - if (acquireCount === 1) return { entryId: "e1", token: "tok1", accountId: "acc1" }; - return { entryId: "e2", token: "tok2", accountId: "acc2" }; - }), - }); - - let collectCallCount = 0; - const fmt = createMockFormatAdapter({ - collectTranslator: vi.fn(async () => { - collectCallCount++; - if (collectCallCount === 1) { - throw new EmptyResponseError( - "resp_empty", - { input_tokens: 1, output_tokens: 0 }, - ); - } - throw new CodexApiError(422, JSON.stringify({ - error: { type: "invalid_request_error", message: "bad retry collect" }, - })); - }), - }); - - const { app } = buildTestApp({ accountPool, fmt }); - - const res = await app.request("/test", { method: "POST" }); - expect(res.status).toBe(422); - - expect(accountPool.recordEmptyResponse).toHaveBeenCalledWith("e1"); - expect(accountPool.release).toHaveBeenCalledWith("e1", { - input_tokens: 1, - output_tokens: 0, - }); - expect(accountPool.release).toHaveBeenCalledWith("e2", undefined); - }); - - // 9. Empty response retries exhausted → 502 + expect(accountPool.release).toHaveBeenCalledWith("e2", { + input_tokens: 5, + output_tokens: 15, + }); + }); + + it("attributes collect CodexApiError after EmptyResponseError retry to the new account", async () => { + let acquireCount = 0; + const accountPool = createMockAccountPool({ + acquire: vi.fn(() => { + acquireCount++; + if (acquireCount === 1) return { entryId: "e1", token: "tok1", accountId: "acc1" }; + return { entryId: "e2", token: "tok2", accountId: "acc2" }; + }), + }); + + let collectCallCount = 0; + const fmt = createMockFormatAdapter({ + collectTranslator: vi.fn(async () => { + collectCallCount++; + if (collectCallCount === 1) { + throw new EmptyResponseError( + "resp_empty", + { input_tokens: 1, output_tokens: 0 }, + ); + } + throw new CodexApiError(422, JSON.stringify({ + error: { type: "invalid_request_error", message: "bad retry collect" }, + })); + }), + }); + + const { app } = buildTestApp({ accountPool, fmt }); + + const res = await app.request("/test", { method: "POST" }); + expect(res.status).toBe(422); + + expect(accountPool.recordEmptyResponse).toHaveBeenCalledWith("e1"); + expect(accountPool.release).toHaveBeenCalledWith("e1", { + input_tokens: 1, + output_tokens: 0, + }); + expect(accountPool.release).toHaveBeenCalledWith("e2", undefined); + }); + + // 9. Empty response retries exhausted → 502 it("returns 502 when all empty response retries are exhausted", async () => { const emptyUsage = { input_tokens: 0, output_tokens: 0 }; let acquireCount = 0; diff --git a/tests/stress/secondary-quota.test.ts b/tests/stress/secondary-quota.test.ts index 35decfe1..a72bfa77 100644 --- a/tests/stress/secondary-quota.test.ts +++ b/tests/stress/secondary-quota.test.ts @@ -15,6 +15,7 @@ import { } from "@helpers/e2e-setup.js"; import { buildTextStreamChunks } from "@helpers/sse.js"; import { createValidJwt } from "@helpers/jwt.js"; +import { isQuotaExhausted } from "@src/auth/quota-skip.js"; import type { TlsTransportResponse } from "@src/tls/transport.js"; import { Hono } from "hono"; @@ -229,9 +230,9 @@ describe("secondary quota rotation", () => { expect(res.status).toBe(200); } - // Verify accounts 0,1 are now marked rate_limited (proactive marking) + // Verify accounts 0,1 are now cachedQuota-exhausted (proactive marking) const afterPrime = ctx.accountPool.getAccounts(); - const limited = afterPrime.filter((a) => a.status === "rate_limited"); + const limited = afterPrime.filter((a) => isQuotaExhausted(a.quota)); expect(limited.length).toBeGreaterThanOrEqual(2); // Record account 2's request count after priming @@ -253,10 +254,10 @@ describe("secondary quota rotation", () => { const acct2After = afterConcurrent.find((a) => a.id === ctx.entryIds[2])!; expect(acct2After.usage.request_count - acct2CountBefore).toBe(3); - // Accounts 0,1 should still be rate_limited with no new requests + // Accounts 0,1 should still be cachedQuota-exhausted with no new requests for (const id of [ctx.entryIds[0], ctx.entryIds[1]]) { const acct = afterConcurrent.find((a) => a.id === id)!; - expect(acct.status).toBe("rate_limited"); + expect(isQuotaExhausted(acct.quota)).toBe(true); } }); @@ -327,8 +328,8 @@ describe("secondary quota rotation", () => { const accounts = ctx.accountPool.getAccounts(); const acct0 = accounts.find((a) => a.id === ctx.entryIds[0])!; const acct1 = accounts.find((a) => a.id === ctx.entryIds[1])!; - expect(acct0.status).toBe("rate_limited"); - expect(acct1.status).toBe("rate_limited"); + expect(isQuotaExhausted(acct0.quota)).toBe(true); + expect(isQuotaExhausted(acct1.quota)).toBe(true); const acct2Before = accounts.find((a) => a.id === ctx.entryIds[2])!; const countBefore = acct2Before.usage.request_count; @@ -370,13 +371,14 @@ describe("secondary quota rotation", () => { await chatRequest(ctx.app, defaultBody()); await chatRequest(ctx.app, defaultBody()); - // Account 0 should be proactively marked + // Account 0 should be proactively marked via cachedQuota const acct0 = ctx.accountPool.getAccounts().find((a) => a.id === ctx.entryIds[0])!; - expect(acct0.status).toBe("rate_limited"); + expect(isQuotaExhausted(acct0.quota)).toBe(true); // Account 1 should be healthy const acct1 = ctx.accountPool.getAccounts().find((a) => a.id === ctx.entryIds[1])!; expect(acct1.status).toBe("active"); + expect(isQuotaExhausted(acct1.quota)).toBe(false); const acct1CountBefore = acct1.usage.request_count; @@ -455,15 +457,18 @@ describe("secondary quota rotation", () => { await chatRequest(ctx.app, defaultBody()); const acct0 = ctx.accountPool.getAccounts().find((a) => a.id === ctx.entryIds[0])!; - expect(acct0.status).toBe("rate_limited"); - - // rate_limit_until should reflect secondaryResetAt (±30% for jitter + timing) - expect(acct0.usage.rate_limit_until).not.toBeNull(); - const rateLimitUntilSec = new Date(acct0.usage.rate_limit_until!).getTime() / 1000; - const expectedBackoff = secondaryResetAt - nowUnix(); - const actualBackoff = rateLimitUntilSec - nowUnix(); - expect(actualBackoff).toBeGreaterThan(expectedBackoff * 0.6); - expect(actualBackoff).toBeLessThan(expectedBackoff * 1.5); + // Both buckets flagged: secondary from passive header collection, + // primary from the 429 retry-after. Pool excludes via hasReachedCachedQuota. + expect(isQuotaExhausted(acct0.quota)).toBe(true); + expect(acct0.quota?.secondary_rate_limit?.limit_reached).toBe(true); + + // Account stays excluded at least until the secondary window resets + // (the longer of the two locks). The primary reset_at from the 429 + // retry-after is irrelevant for pool exclusion as long as the secondary + // is still limit_reached. + const secondaryReset = acct0.quota?.secondary_rate_limit?.reset_at; + expect(secondaryReset).toBeGreaterThanOrEqual(secondaryResetAt - 5); + expect(secondaryReset).toBeLessThanOrEqual(secondaryResetAt + 5); }); }); @@ -475,17 +480,17 @@ describe("secondary quota rotation", () => { vi.mocked(getMockTransport().post).mockClear(); }); - it("returns 401 when all accounts are rate_limited (not authenticated)", async () => { + it("returns 401 when all accounts are cachedQuota-exhausted (not authenticated)", async () => { ctx = buildApp(3); - // Mark all accounts as rate_limited with long backoff + // Mark all accounts as quota-exhausted with long backoff for (const id of ctx.entryIds) { - ctx.accountPool.markRateLimited(id, { retryAfterSec: 7200 }); + ctx.accountPool.applyRateLimit429(id, { retryAfterSec: 7200 }); } // Verify all marked const accounts = ctx.accountPool.getAccounts(); - expect(accounts.every((a) => a.status === "rate_limited")).toBe(true); + expect(accounts.every((a) => isQuotaExhausted(a.quota))).toBe(true); // Fire 6 concurrent requests — all should get 401 (isAuthenticated=false) const responses = await Promise.all( @@ -522,7 +527,7 @@ describe("secondary quota rotation", () => { }, code_review_rate_limit: null, }); - ctx.accountPool.markRateLimited(id, { retryAfterSec: 604800 }); + ctx.accountPool.applyRateLimit429(id, { retryAfterSec: 604800 }); } const responses = await Promise.all( diff --git a/tests/unit/auth/account-persistence-migration.test.ts b/tests/unit/auth/account-persistence-migration.test.ts new file mode 100644 index 00000000..700e047f --- /dev/null +++ b/tests/unit/auth/account-persistence-migration.test.ts @@ -0,0 +1,161 @@ +/** + * Tests for the legacy rate_limit_until → cachedQuota migration applied + * during AccountPersistence.load(). + * + * The proxy previously kept a local lock in `entry.usage.rate_limit_until` + * plus `entry.status === "rate_limited"`. We are retiring both in favour of + * `cachedQuota.rate_limit.limit_reached/reset_at` as the single source of + * truth. On load, legacy entries must be coerced so existing deployments + * survive a restart. + */ + +import { describe, it, expect } from "vitest"; +import type { AccountEntry } from "@src/auth/types.js"; +import { migrateLegacyRateLimit } from "@src/auth/account-persistence.js"; + +function makeEntry(overrides?: Partial): AccountEntry { + return { + id: "e1", + token: "tok", + refreshToken: null, + email: null, + accountId: null, + userId: null, + label: null, + planType: "plus", + proxyApiKey: "pk", + status: "active", + usage: { + request_count: 0, + input_tokens: 0, + output_tokens: 0, + cached_tokens: 0, + empty_response_count: 0, + last_used: null, + rate_limit_until: null, + }, + addedAt: new Date().toISOString(), + cachedQuota: null, + quotaFetchedAt: null, + ...overrides, + }; +} + +describe("migrateLegacyRateLimit", () => { + it("legacy status=rate_limited + future rate_limit_until → status=active + synthesized cachedQuota.rate_limit", () => { + const future = new Date(Date.now() + 3_600_000).toISOString(); // +1 h + const futureSec = Math.floor(new Date(future).getTime() / 1000); + const entry = makeEntry({ status: "rate_limited", usage: { ...makeEntry().usage, rate_limit_until: future } }); + + const changed = migrateLegacyRateLimit(entry); + + expect(changed).toBe(true); + expect(entry.status).toBe("active"); + expect(entry.usage.rate_limit_until).toBeNull(); + expect(entry.cachedQuota?.rate_limit.limit_reached).toBe(true); + expect(entry.cachedQuota?.rate_limit.reset_at).toBe(futureSec); + }); + + it("synthesizes cachedQuota when missing", () => { + const future = new Date(Date.now() + 3_600_000).toISOString(); + const entry = makeEntry({ + status: "rate_limited", + usage: { ...makeEntry().usage, rate_limit_until: future }, + cachedQuota: null, + }); + + migrateLegacyRateLimit(entry); + + expect(entry.cachedQuota).not.toBeNull(); + expect(entry.cachedQuota?.plan_type).toBe("plus"); + }); + + it("trusts fresh cachedQuota (fetched after rate_limit_until was set) — drops local lock, leaves cachedQuota alone", () => { + const oldUntil = new Date(Date.now() - 1_000).toISOString(); // already past + const freshFetchedAt = new Date(Date.now() - 500).toISOString(); + const entry = makeEntry({ + status: "rate_limited", + usage: { ...makeEntry().usage, rate_limit_until: oldUntil }, + cachedQuota: { + plan_type: "plus", + rate_limit: { + allowed: true, + limit_reached: false, + used_percent: 0, + reset_at: Math.floor(Date.now() / 1000) + 18000, + limit_window_seconds: 18000, + }, + secondary_rate_limit: null, + code_review_rate_limit: null, + }, + quotaFetchedAt: freshFetchedAt, + }); + + migrateLegacyRateLimit(entry); + + expect(entry.status).toBe("active"); + expect(entry.usage.rate_limit_until).toBeNull(); + expect(entry.cachedQuota?.rate_limit.limit_reached).toBe(false); // not overwritten + }); + + it("overwrites stale cachedQuota when rate_limit_until is newer than quotaFetchedAt", () => { + const recentUntil = new Date(Date.now() + 3_600_000).toISOString(); + const oldFetchedAt = new Date(Date.now() - 10_000_000).toISOString(); + const entry = makeEntry({ + status: "rate_limited", + usage: { ...makeEntry().usage, rate_limit_until: recentUntil }, + cachedQuota: { + plan_type: "plus", + rate_limit: { + allowed: true, + limit_reached: false, + used_percent: 0, + reset_at: Math.floor(Date.now() / 1000) + 100, + limit_window_seconds: 18000, + }, + secondary_rate_limit: null, + code_review_rate_limit: null, + }, + quotaFetchedAt: oldFetchedAt, + }); + + migrateLegacyRateLimit(entry); + + expect(entry.cachedQuota?.rate_limit.limit_reached).toBe(true); + expect(entry.cachedQuota?.rate_limit.reset_at).toBe(Math.floor(new Date(recentUntil).getTime() / 1000)); + }); + + it("status=active with orphan rate_limit_until in past → just clears the field", () => { + const past = new Date(Date.now() - 60_000).toISOString(); + const entry = makeEntry({ + status: "active", + usage: { ...makeEntry().usage, rate_limit_until: past }, + }); + + const changed = migrateLegacyRateLimit(entry); + + expect(changed).toBe(true); + expect(entry.status).toBe("active"); + expect(entry.usage.rate_limit_until).toBeNull(); + expect(entry.cachedQuota).toBeNull(); // didn't synth, lock was already irrelevant + }); + + it("entry without legacy fields is a no-op", () => { + const entry = makeEntry({ status: "active" }); + + const changed = migrateLegacyRateLimit(entry); + + expect(changed).toBe(false); + expect(entry.status).toBe("active"); + expect(entry.usage.rate_limit_until).toBeNull(); + }); + + it("status=rate_limited with no rate_limit_until just coerces status", () => { + const entry = makeEntry({ status: "rate_limited" }); + + migrateLegacyRateLimit(entry); + + expect(entry.status).toBe("active"); + expect(entry.cachedQuota).toBeNull(); + }); +}); diff --git a/tests/unit/auth/account-pool-config-di.test.ts b/tests/unit/auth/account-pool-config-di.test.ts index ccbb71f5..b3c84017 100644 --- a/tests/unit/auth/account-pool-config-di.test.ts +++ b/tests/unit/auth/account-pool-config-di.test.ts @@ -112,7 +112,7 @@ describe("constructor DI: initialToken", () => { }); describe("constructor DI: rateLimitBackoffSeconds", () => { - it("uses injected backoff in markRateLimited", () => { + it("uses injected backoff in applyRateLimit429 when no retry-after hint provided", () => { const pool = new AccountPool({ persistence: createMemoryPersistence(), rotationStrategy: "least_used", @@ -124,20 +124,17 @@ describe("constructor DI: rateLimitBackoffSeconds", () => { pool.addAccount(token); const acq = pool.acquire()!; - const before = Date.now(); - pool.markRateLimited(acq.entryId); - const after = Date.now(); + const beforeSec = Math.floor(Date.now() / 1000); + pool.applyRateLimit429(acq.entryId); + const afterSec = Math.ceil(Date.now() / 1000); - // Account should be rate-limited const info = pool.getAccounts()[0]; - expect(info.status).toBe("rate_limited"); - - // rate_limit_until should be ~5 seconds from now (jitter adds ±20%) - const until = new Date(info.usage.rate_limit_until!).getTime(); - const expectedMin = before + 5 * 1000 * 0.7; // generous margin for jitter - const expectedMax = after + 5 * 1000 * 1.3; - expect(until).toBeGreaterThanOrEqual(expectedMin); - expect(until).toBeLessThanOrEqual(expectedMax); + expect(info.quota?.rate_limit.limit_reached).toBe(true); + + // reset_at should be ~5 seconds from now (jitter adds ±20%) + const resetAt = info.quota!.rate_limit.reset_at!; + expect(resetAt).toBeGreaterThanOrEqual(beforeSec + 5 * 0.7); + expect(resetAt).toBeLessThanOrEqual(afterSec + 5 * 1.3); }); it("retryAfterSec overrides instance backoff", () => { @@ -152,13 +149,13 @@ describe("constructor DI: rateLimitBackoffSeconds", () => { pool.addAccount(token); const acq = pool.acquire()!; - const before = Date.now(); - pool.markRateLimited(acq.entryId, { retryAfterSec: 120 }); + const beforeSec = Math.floor(Date.now() / 1000); + pool.applyRateLimit429(acq.entryId, { retryAfterSec: 120 }); const info = pool.getAccounts()[0]; - const until = new Date(info.usage.rate_limit_until!).getTime(); + const resetAt = info.quota!.rate_limit.reset_at!; // Should be ~120s, not ~5s - expect(until).toBeGreaterThan(before + 60_000); + expect(resetAt).toBeGreaterThan(beforeSec + 60); }); }); diff --git a/tests/unit/auth/account-pool-has-available.test.ts b/tests/unit/auth/account-pool-has-available.test.ts index c702f6f5..d0af29ad 100644 --- a/tests/unit/auth/account-pool-has-available.test.ts +++ b/tests/unit/auth/account-pool-has-available.test.ts @@ -90,7 +90,7 @@ describe("AccountPool.hasAvailableAccounts", () => { it("returns false when all accounts are rate-limited", () => { const id = pool.addAccount("token-a"); - pool.markRateLimited(id, {}); + pool.applyRateLimit429(id); expect(pool.hasAvailableAccounts()).toBe(false); }); @@ -170,11 +170,11 @@ describe("AccountPool.hasAvailableAccounts", () => { expect(pool.hasAvailableAccounts()).toBe(false); }); - it("refreshes rate_limit_until and counts expired accounts correctly", () => { + it("auto-clears cachedQuota.rate_limit.limit_reached after reset_at passes", () => { const id = pool.addAccount("token-a"); - // Mark rate-limited with a past timestamp so refreshStatus will flip to active - pool.markRateLimited(id, { retryAfterSec: -1 }); - // Despite being marked rate_limited, refreshStatus should recover it + // Apply a 429 with negative retry-after so reset_at is in the past; + // resetExpiredQuotaWindow (called inside refreshStatus) should auto-clear. + pool.applyRateLimit429(id, { retryAfterSec: -1 }); expect(pool.hasAvailableAccounts()).toBe(true); }); diff --git a/tests/unit/auth/account-pool-quota.test.ts b/tests/unit/auth/account-pool-quota.test.ts index 678c1e21..2470cfa5 100644 --- a/tests/unit/auth/account-pool-quota.test.ts +++ b/tests/unit/auth/account-pool-quota.test.ts @@ -1,7 +1,7 @@ /** * Tests for AccountPool quota-related methods: * - updateCachedQuota() - * - markQuotaExhausted() + * - applyRateLimit429() (replaces the retired markQuotaExhausted/markRateLimited) * - toInfo() populating cached quota */ @@ -11,6 +11,7 @@ import { createValidJwt } from "@helpers/jwt.js"; import { createMockConfig } from "@helpers/config.js"; import { setConfigForTesting, resetConfigForTesting } from "@src/config.js"; import { AccountPool } from "@src/auth/account-pool.js"; +import { hasReachedCachedQuota } from "@src/auth/quota-skip.js"; import type { CodexQuota } from "@src/auth/types.js"; function makeQuota(overrides?: Partial): CodexQuota { @@ -58,81 +59,74 @@ describe("AccountPool quota methods", () => { }); }); - describe("markQuotaExhausted", () => { - it("sets status to rate_limited with reset time", () => { + describe("applyRateLimit429 (replaces markQuotaExhausted)", () => { + it("marks primary cachedQuota.rate_limit as limit_reached with provided reset_at", () => { const id = pool.addAccount(createValidJwt({ accountId: "a2" })); const resetAt = Math.floor(Date.now() / 1000) + 7200; - pool.markQuotaExhausted(id, resetAt); + pool.applyRateLimit429(id, { resetsAtSec: resetAt }); const entry = pool.getEntry(id); - expect(entry?.status).toBe("rate_limited"); - expect(entry?.usage.rate_limit_until).toBeTruthy(); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); + expect(entry?.cachedQuota?.rate_limit.reset_at).toBe(resetAt); + expect(hasReachedCachedQuota(entry!)).toBe(true); }); - it("uses fallback when resetAt is null", () => { + it("uses default backoff when no retry/reset hint provided", () => { const id = pool.addAccount(createValidJwt({ accountId: "a3" })); - pool.markQuotaExhausted(id, null); + pool.applyRateLimit429(id); const entry = pool.getEntry(id); - expect(entry?.status).toBe("rate_limited"); - expect(entry?.usage.rate_limit_until).toBeTruthy(); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); + expect(entry?.cachedQuota?.rate_limit.reset_at).toBeGreaterThan(Math.floor(Date.now() / 1000)); }); - it("does not override disabled status", () => { + it("does not override disabled status (account stays disabled)", () => { const id = pool.addAccount(createValidJwt({ accountId: "a4" })); pool.markStatus(id, "disabled"); - pool.markQuotaExhausted(id, Math.floor(Date.now() / 1000) + 3600); + pool.applyRateLimit429(id, { resetsAtSec: Math.floor(Date.now() / 1000) + 3600 }); const entry = pool.getEntry(id); - expect(entry?.status).toBe("disabled"); // unchanged + expect(entry?.status).toBe("disabled"); }); it("does not override expired status", () => { const id = pool.addAccount(createValidJwt({ accountId: "a5" })); pool.markStatus(id, "expired"); - pool.markQuotaExhausted(id, Math.floor(Date.now() / 1000) + 3600); + pool.applyRateLimit429(id, { resetsAtSec: Math.floor(Date.now() / 1000) + 3600 }); const entry = pool.getEntry(id); - expect(entry?.status).toBe("expired"); // unchanged + expect(entry?.status).toBe("expired"); }); - it("extends rate_limit_until on already rate_limited account", () => { + it("extends existing lock — never shrinks reset_at when re-applied", () => { const id = pool.addAccount(createValidJwt({ accountId: "a6" })); - // Simulate 429 backoff (short) - pool.markRateLimited(id, { retryAfterSec: 60 }); - const entryBefore = pool.getEntry(id); - expect(entryBefore?.status).toBe("rate_limited"); - const shortUntil = new Date(entryBefore!.usage.rate_limit_until!).getTime(); - - // Quota refresh discovers exhaustion — much longer reset - const resetAt = Math.floor(Date.now() / 1000) + 7200; // 2 hours - pool.markQuotaExhausted(id, resetAt); - - const entryAfter = pool.getEntry(id); - expect(entryAfter?.status).toBe("rate_limited"); - const longUntil = new Date(entryAfter!.usage.rate_limit_until!).getTime(); - expect(longUntil).toBeGreaterThan(shortUntil); + + // First 429 with short retry-after + pool.applyRateLimit429(id, { retryAfterSec: 60 }); + const shortResetAt = pool.getEntry(id)!.cachedQuota!.rate_limit.reset_at!; + + // Second 429 (e.g. discovered weekly bucket exhausted) with much longer reset + const longResetAt = Math.floor(Date.now() / 1000) + 7200; + pool.applyRateLimit429(id, { resetsAtSec: longResetAt }); + + expect(pool.getEntry(id)!.cachedQuota!.rate_limit.reset_at).toBe(longResetAt); + expect(longResetAt).toBeGreaterThan(shortResetAt); }); - it("does not shorten existing rate_limit_until", () => { + it("does not shrink existing reset_at when a shorter 429 arrives", () => { const id = pool.addAccount(createValidJwt({ accountId: "a7" })); - // Mark with long reset (e.g. 7-day quota) - const longResetAt = Math.floor(Date.now() / 1000) + 86400; // 24 hours - pool.markQuotaExhausted(id, longResetAt); + const longResetAt = Math.floor(Date.now() / 1000) + 86400; + pool.applyRateLimit429(id, { resetsAtSec: longResetAt }); - const entryBefore = pool.getEntry(id); - const originalUntil = entryBefore!.usage.rate_limit_until; + // Shorter retry-after should NOT replace the longer lock + const shortResetAt = Math.floor(Date.now() / 1000) + 3600; + pool.applyRateLimit429(id, { resetsAtSec: shortResetAt }); - // Try to mark with shorter reset (e.g. 5-hour quota) - const shortResetAt = Math.floor(Date.now() / 1000) + 3600; // 1 hour - pool.markQuotaExhausted(id, shortResetAt); - - const entryAfter = pool.getEntry(id); - expect(entryAfter!.usage.rate_limit_until).toBe(originalUntil); // unchanged + expect(pool.getEntry(id)!.cachedQuota!.rate_limit.reset_at).toBe(longResetAt); }); }); @@ -190,12 +184,11 @@ describe("AccountPool quota methods", () => { }); describe("acquire skips exhausted accounts", () => { - it("skips rate_limited (quota exhausted) account", () => { + it("skips account marked via applyRateLimit429", () => { const id1 = pool.addAccount(createValidJwt({ accountId: "b1" })); const id2 = pool.addAccount(createValidJwt({ accountId: "b2" })); - // Exhaust first account - pool.markQuotaExhausted(id1, Math.floor(Date.now() / 1000) + 7200); + pool.applyRateLimit429(id1, { resetsAtSec: Math.floor(Date.now() / 1000) + 7200 }); const acquired = pool.acquire(); expect(acquired).not.toBeNull(); @@ -205,7 +198,7 @@ describe("AccountPool quota methods", () => { it("returns null when all accounts exhausted", () => { const id1 = pool.addAccount(createValidJwt({ accountId: "c1" })); - pool.markQuotaExhausted(id1, Math.floor(Date.now() / 1000) + 7200); + pool.applyRateLimit429(id1, { resetsAtSec: Math.floor(Date.now() / 1000) + 7200 }); const acquired = pool.acquire(); expect(acquired).toBeNull(); @@ -257,6 +250,7 @@ describe("AccountPool quota methods", () => { limit_reached: true, used_percent: 100, reset_at: Math.floor(Date.now() / 1000) + 3600, + limit_window_seconds: 3600, }, })); @@ -300,6 +294,7 @@ describe("AccountPool quota methods", () => { limit_reached: true, used_percent: 100, reset_at: nowSec - 10, + limit_window_seconds: 3600, }, })); @@ -313,7 +308,7 @@ describe("AccountPool quota methods", () => { expect(codeReview).toBeDefined(); expect(codeReview?.limit_reached).toBe(false); expect(codeReview?.used_percent).toBe(0); - expect(codeReview?.reset_at).toBeNull(); + expect(codeReview?.reset_at).toBeGreaterThan(nowSec); }); it("allows cached exhausted accounts when skip_exhausted is false", () => { diff --git a/tests/unit/auth/account-pool-rate-limit-429.test.ts b/tests/unit/auth/account-pool-rate-limit-429.test.ts new file mode 100644 index 00000000..c7726cb3 --- /dev/null +++ b/tests/unit/auth/account-pool-rate-limit-429.test.ts @@ -0,0 +1,150 @@ +/** + * Tests for AccountPool.applyRateLimit429 — the new single-source-of-truth + * path for handling upstream 429 responses. Writes into cachedQuota instead + * of mutating status/rate_limit_until, so the dual-truth bug between local + * lock and upstream quota window can no longer arise. + */ + +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { createMemoryPersistence } from "@helpers/account-pool-factory.js"; +import { createValidJwt } from "@helpers/jwt.js"; +import { createMockConfig } from "@helpers/config.js"; +import { setConfigForTesting, resetConfigForTesting } from "@src/config.js"; +import { AccountPool } from "@src/auth/account-pool.js"; +import { hasReachedCachedQuota } from "@src/auth/quota-skip.js"; +import type { CodexQuota } from "@src/auth/types.js"; + +function makeQuota(overrides?: Partial): CodexQuota { + return { + plan_type: "plus", + rate_limit: { + allowed: true, + limit_reached: false, + used_percent: 42, + reset_at: Math.floor(Date.now() / 1000) + 3600, + limit_window_seconds: 18000, + }, + secondary_rate_limit: null, + code_review_rate_limit: null, + ...overrides, + }; +} + +describe("AccountPool.applyRateLimit429", () => { + let pool: AccountPool; + + beforeEach(() => { + setConfigForTesting(createMockConfig()); + pool = new AccountPool({ persistence: createMemoryPersistence() }); + }); + afterEach(() => { + resetConfigForTesting(); + }); + + it("writes limit_reached=true and reset_at into primary cachedQuota.rate_limit", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a1", planType: "plus" })); + const before = Math.floor(Date.now() / 1000); + + pool.applyRateLimit429(id, { retryAfterSec: 600 }); + + const entry = pool.getEntry(id); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); + expect(entry?.cachedQuota?.rate_limit.used_percent).toBeGreaterThanOrEqual(100); + // 20% jitter applied to retry-after to spread retries; allow a wide window. + const resetAt = entry?.cachedQuota?.rate_limit.reset_at ?? 0; + expect(resetAt).toBeGreaterThanOrEqual(before + 600 * 0.8 - 1); + expect(resetAt).toBeLessThanOrEqual(before + 600 * 1.2 + 1); + }); + + it("synthesizes cachedQuota when account has none yet", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a2", planType: "plus" })); + expect(pool.getEntry(id)?.cachedQuota).toBeNull(); + + pool.applyRateLimit429(id, { retryAfterSec: 300 }); + + const entry = pool.getEntry(id); + expect(entry?.cachedQuota).not.toBeNull(); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); + expect(entry?.cachedQuota?.plan_type).toBe("plus"); + }); + + it("prefers existing reset_at when it is further in the future (don't shrink lock)", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a3", planType: "plus" })); + const farFuture = Math.floor(Date.now() / 1000) + 36000; + pool.updateCachedQuota(id, makeQuota({ + rate_limit: { + allowed: false, + limit_reached: true, + used_percent: 100, + reset_at: farFuture, + limit_window_seconds: 18000, + }, + })); + + pool.applyRateLimit429(id, { retryAfterSec: 60 }); + + const entry = pool.getEntry(id); + expect(entry?.cachedQuota?.rate_limit.reset_at).toBe(farFuture); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); + }); + + it("uses resetsAtSec when provided in preference to retryAfterSec", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a4", planType: "plus" })); + const explicit = Math.floor(Date.now() / 1000) + 1234; + + pool.applyRateLimit429(id, { retryAfterSec: 60, resetsAtSec: explicit }); + + const entry = pool.getEntry(id); + expect(entry?.cachedQuota?.rate_limit.reset_at).toBe(explicit); + }); + + it("excludes the account from hasAvailableAccounts via hasReachedCachedQuota", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a5", planType: "plus" })); + expect(pool.hasAvailableAccounts()).toBe(true); + + pool.applyRateLimit429(id, { retryAfterSec: 600 }); + + const entry = pool.getEntry(id); + expect(hasReachedCachedQuota(entry!)).toBe(true); + expect(pool.hasAvailableAccounts()).toBe(false); + }); + + it("does not mutate status (kept as 'active' so the field stays a pure rotation marker)", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a6", planType: "plus" })); + expect(pool.getEntry(id)?.status).toBe("active"); + + pool.applyRateLimit429(id, { retryAfterSec: 600 }); + + expect(pool.getEntry(id)?.status).toBe("active"); + }); + + it("no-ops for unknown entry", () => { + expect(() => pool.applyRateLimit429("nonexistent", { retryAfterSec: 60 })).not.toThrow(); + }); + + it("counts the request when countRequest=true", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a7", planType: "plus" })); + const before = pool.getEntry(id)?.usage.request_count ?? 0; + + pool.applyRateLimit429(id, { retryAfterSec: 60, countRequest: true }); + + expect(pool.getEntry(id)?.usage.request_count).toBe(before + 1); + expect(pool.getEntry(id)?.usage.window_request_count).toBe(1); + }); + + it("after reset_at passes, refreshStatus auto-clears limit_reached via resetExpiredQuotaWindow", () => { + const id = pool.addAccount(createValidJwt({ accountId: "a8", planType: "plus" })); + // Apply a 429 with very short retry-after, then advance the wallclock past it. + const pastResetAt = Math.floor(Date.now() / 1000) - 10; + pool.applyRateLimit429(id, { resetsAtSec: pastResetAt }); + // Existing entry should currently look limited even though reset_at is in the past + // until refreshStatus runs (which is called inside hasAvailableAccounts). + + const available = pool.hasAvailableAccounts(); + + // After refreshStatus → resetExpiredQuotaWindow ran, limit_reached should be false again + const entry = pool.getEntry(id); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(false); + expect(available).toBe(true); + }); +}); diff --git a/tests/unit/auth/account-pool-sticky.test.ts b/tests/unit/auth/account-pool-sticky.test.ts index dd6637bb..2252e7cd 100644 --- a/tests/unit/auth/account-pool-sticky.test.ts +++ b/tests/unit/auth/account-pool-sticky.test.ts @@ -80,7 +80,7 @@ describe("account-pool sticky strategy", () => { pool.getEntry(idA)!.usage.request_count = 5; // Rate-limit A - pool.markRateLimited(idA, { retryAfterSec: 300 }); + pool.applyRateLimit429(idA, { retryAfterSec: 300 }); // Should fall back to B const acquired = pool.acquire(); diff --git a/tests/unit/auth/account-pool.test.ts b/tests/unit/auth/account-pool.test.ts index 0b35a606..863c94cb 100644 --- a/tests/unit/auth/account-pool.test.ts +++ b/tests/unit/auth/account-pool.test.ts @@ -27,6 +27,9 @@ vi.mock("@src/config.js", () => ({ rotation_strategy: "least_used", rate_limit_backoff_seconds: 60, }, + quota: { + skip_exhausted: true, + }, })), })); @@ -69,6 +72,7 @@ describe("AccountPool", () => { rate_limit_backoff_seconds: 60, max_concurrent_per_account: 1, }, + quota: { skip_exhausted: true }, // eslint-disable-next-line @typescript-eslint/no-explicit-any } as ReturnType); vi.mocked(getModelPlanTypes).mockReturnValue([]); @@ -269,32 +273,32 @@ describe("AccountPool", () => { // ── Rate limiting ───────────────────────────────────────────────── describe("rate limiting", () => { - it("marks account as rate_limited", () => { + it("marks account as cachedQuota-exhausted (primary bucket)", () => { pool.addAccount("token-aaa"); const acquired = pool.acquire()!; - pool.markRateLimited(acquired.entryId, { retryAfterSeconds: 120 }); + pool.applyRateLimit429(acquired.entryId, { retryAfterSec: 120 }); - const accounts = pool.getAccounts(); - expect(accounts[0].status).toBe("rate_limited"); + const entry = pool.getEntry(acquired.entryId); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); expect(pool.acquire()).toBeNull(); }); - it("uses configured backoff when retryAfterSeconds not provided", () => { + it("uses configured backoff when retry-after not provided", () => { pool.addAccount("token-aaa"); const acquired = pool.acquire()!; - pool.markRateLimited(acquired.entryId); + pool.applyRateLimit429(acquired.entryId); - const accounts = pool.getAccounts(); - expect(accounts[0].status).toBe("rate_limited"); + const entry = pool.getEntry(acquired.entryId); + expect(entry?.cachedQuota?.rate_limit.limit_reached).toBe(true); }); it("does not count rate limit release as a request", () => { pool.addAccount("token-aaa"); const acquired = pool.acquire()!; - pool.markRateLimited(acquired.entryId); // no options + pool.applyRateLimit429(acquired.entryId); const accounts = pool.getAccounts(); expect(accounts[0].usage.request_count).toBe(0); @@ -397,7 +401,7 @@ describe("AccountPool", () => { // Rate limit the team account const team = pool.acquire({ model: "gpt-5.4" })!; - pool.markRateLimited(team.entryId, { retryAfterSeconds: 3600 }); + pool.applyRateLimit429(team.entryId, { retryAfterSec: 3600 }); // No team accounts available, should return null const acquired = pool.acquire({ model: "gpt-5.4" }); diff --git a/tests/unit/routes/shared/proxy-error-handler.test.ts b/tests/unit/routes/shared/proxy-error-handler.test.ts index 934d5f83..26a8ee0c 100644 --- a/tests/unit/routes/shared/proxy-error-handler.test.ts +++ b/tests/unit/routes/shared/proxy-error-handler.test.ts @@ -5,6 +5,7 @@ import { CodexApiError } from "@src/proxy/codex-types.js"; /* ── Minimal mock matching AccountPool subset used by error handler ── */ interface MockPool { markRateLimited: ReturnType; + applyRateLimit429: ReturnType; markStatus: ReturnType; getEntry: ReturnType; acquire: ReturnType; @@ -13,6 +14,7 @@ interface MockPool { function createMockPool(): MockPool { return { markRateLimited: vi.fn(), + applyRateLimit429: vi.fn(), markStatus: vi.fn(), getEntry: vi.fn().mockReturnValue({ email: "test@example.com" }), acquire: vi.fn(), @@ -55,6 +57,7 @@ describe("handleCodexApiError", () => { it("does not mark account status", () => { handleCodexApiError(err, pool as never, entryId, model, tag, false); + expect(pool.applyRateLimit429).not.toHaveBeenCalled(); expect(pool.markRateLimited).not.toHaveBeenCalled(); expect(pool.markStatus).not.toHaveBeenCalled(); }); @@ -63,33 +66,33 @@ describe("handleCodexApiError", () => { // ── 429 rate-limited ── describe("429 rate-limited", () => { - it("marks account rate-limited and returns retry", () => { + it("applies 429 retry-after to cachedQuota and returns retry", () => { const body = JSON.stringify({ error: { resets_in_seconds: 30 } }); const err = new CodexApiError(429, body); const result = handleCodexApiError(err, pool as never, entryId, model, tag, false); expect(result.action).toBe("retry"); - expect(pool.markRateLimited).toHaveBeenCalledWith(entryId, { + expect(pool.applyRateLimit429).toHaveBeenCalledWith(entryId, { retryAfterSec: 30, countRequest: true, }); }); - it("returns respond with 429 when no retry-after info", () => { + it("forwards undefined retryAfterSec when 429 body has no hint (registry uses default backoff)", () => { const err = new CodexApiError(429, "rate limited"); const result = handleCodexApiError(err, pool as never, entryId, model, tag, false); expect(result.action).toBe("retry"); - expect(pool.markRateLimited).toHaveBeenCalledWith(entryId, { + expect(pool.applyRateLimit429).toHaveBeenCalledWith(entryId, { retryAfterSec: undefined, countRequest: true, }); }); - it("uses cached quota reset time when account is exhausted", () => { - const resetAt = Math.floor(Date.now() / 1000) + 86400; // 1 day from now + it("does not combine with cached quota in handler (don't-shrink-existing-reset_at lives inside applyRateLimit429)", () => { + const resetAt = Math.floor(Date.now() / 1000) + 86400; pool.getEntry.mockReturnValue({ email: "test@example.com", cachedQuota: { @@ -100,25 +103,9 @@ describe("handleCodexApiError", () => { handleCodexApiError(err, pool as never, entryId, model, tag, false); - const call = pool.markRateLimited.mock.calls[0]; - expect(call[0]).toBe(entryId); - // Should use the longer cached reset time instead of 30s - expect(call[1].retryAfterSec).toBeGreaterThan(86000); - expect(call[1].countRequest).toBe(true); - }); - - it("uses short backoff when account is not exhausted", () => { - pool.getEntry.mockReturnValue({ - email: "test@example.com", - cachedQuota: { - rate_limit: { limit_reached: false, used_percent: 50, reset_at: null }, - }, - }); - const err = new CodexApiError(429, JSON.stringify({ error: { resets_in_seconds: 30 } })); - - handleCodexApiError(err, pool as never, entryId, model, tag, false); - - expect(pool.markRateLimited).toHaveBeenCalledWith(entryId, { + // Handler passes through the raw retry-after; registry-level + // applyRateLimit429 preserves the longer existing reset_at. + expect(pool.applyRateLimit429).toHaveBeenCalledWith(entryId, { retryAfterSec: 30, countRequest: true, }); diff --git a/tests/unit/translation/codex-to-openai.test.ts b/tests/unit/translation/codex-to-openai.test.ts index dd78ff7b..37e360f8 100644 --- a/tests/unit/translation/codex-to-openai.test.ts +++ b/tests/unit/translation/codex-to-openai.test.ts @@ -2,8 +2,14 @@ * Tests for Codex → OpenAI Chat Completions translation. */ -import { describe, it, expect, vi } from "vitest"; +import { describe, it, expect, vi, beforeEach } from "vitest"; import type { ExtractedEvent } from "@src/translation/codex-event-extractor.js"; + +vi.mock("@src/utils/debug-dump.js", () => ({ + debugDumpEnabled: vi.fn(() => false), + debugDump: vi.fn(), + debugDumpPath: vi.fn(() => null), +})); import { simpleTextStream, toolCallStream, @@ -13,6 +19,7 @@ import { multiToolCallStream, usageStream, toolCallNoDeltaStream, + prematureCloseAfterReasoningStream, } from "@fixtures/sse-streams.js"; import { createCreated, @@ -83,10 +90,10 @@ describe("streamCodexToOpenAI", () => { expect(reasoningChunks.length).toBeGreaterThan(0); }); - it("throws CodexApiError on upstream error events", async () => { - await expect(collectStreamOutput(errorStream())) - .rejects.toMatchObject({ status: 429 }); - }); + it("throws CodexApiError on upstream error events", async () => { + await expect(collectStreamOutput(errorStream())) + .rejects.toMatchObject({ status: 429 }); + }); it("injects error text for empty response", async () => { const chunks = await collectStreamOutput(emptyStream()); @@ -138,6 +145,58 @@ describe("collectCodexResponse", () => { await expect(collectCodexResponse(fakeCodexApi, fakeResponse, "gpt-5.4")) .rejects.toThrow("empty response"); }); + + it("throws UpstreamPrematureCloseError when stream ends after reasoning without response.completed", async () => { + const { UpstreamPrematureCloseError } = await import( + "@src/translation/codex-event-extractor.js" + ); + mockEvents = prematureCloseAfterReasoningStream(); + const promise = collectCodexResponse(fakeCodexApi, fakeResponse, "gpt-5.5"); + await expect(promise).rejects.toBeInstanceOf(UpstreamPrematureCloseError); + await expect(promise).rejects.toMatchObject({ + hadReasoning: true, + responseId: "resp_pc", + }); + }); + + it("dumps collected events when EmptyResponseError fires and debug dump is enabled", async () => { + const debugDumpMod = await import("@src/utils/debug-dump.js"); + vi.mocked(debugDumpMod.debugDumpEnabled).mockReturnValue(true); + const dumpSpy = vi.mocked(debugDumpMod.debugDump); + dumpSpy.mockClear(); + + mockEvents = emptyStream(); + await expect(collectCodexResponse(fakeCodexApi, fakeResponse, "gpt-5.5-xhigh-fast")) + .rejects.toThrow("empty response"); + + expect(dumpSpy).toHaveBeenCalledWith( + "empty-response-openai", + expect.objectContaining({ + model: "gpt-5.5-xhigh-fast", + responseId: "resp_5", + events: expect.any(Array), + }), + ); + const dumpedPayload = dumpSpy.mock.calls.find( + ([kind]) => kind === "empty-response-openai", + )![1] as { events: unknown[] }; + expect(dumpedPayload.events.length).toBeGreaterThan(0); + + vi.mocked(debugDumpMod.debugDumpEnabled).mockReturnValue(false); + }); + + it("does not dump when debug dump is disabled", async () => { + const debugDumpMod = await import("@src/utils/debug-dump.js"); + vi.mocked(debugDumpMod.debugDumpEnabled).mockReturnValue(false); + const dumpSpy = vi.mocked(debugDumpMod.debugDump); + dumpSpy.mockClear(); + + mockEvents = emptyStream(); + await expect(collectCodexResponse(fakeCodexApi, fakeResponse, "gpt-5.4")) + .rejects.toThrow("empty response"); + + expect(dumpSpy).not.toHaveBeenCalled(); + }); }); // ── Usage details (streaming) ───────────────────────────────────────── diff --git a/web/src/components/AccountCard.tsx b/web/src/components/AccountCard.tsx index be040c20..e81a9c6a 100644 --- a/web/src/components/AccountCard.tsx +++ b/web/src/components/AccountCard.tsx @@ -3,6 +3,7 @@ import { useT, useI18n } from "../../../shared/i18n/context"; import type { TranslationKey } from "../../../shared/i18n/translations"; import { formatNumber, formatResetTime, formatWindowDuration } from "../../../shared/utils/format"; import type { Account, AccountQuotaWindow, ProxyEntry } from "../../../shared/types"; +import { derivedStatus } from "../lib/accountStatus"; const avatarColors = [ ["bg-purple-100 dark:bg-[#2a1a3f]", "text-purple-600 dark:text-purple-400"], @@ -105,7 +106,8 @@ export function AccountCard({ account, index, onDelete, proxies, onProxyChange, const windowSec = account.quota?.rate_limit?.limit_window_seconds; const windowDur = windowSec ? formatWindowDuration(windowSec, lang === "zh") : null; - const [statusCls, statusKey] = statusStyles[account.status] || statusStyles.disabled; + const effectiveStatus = derivedStatus(account); + const [statusCls, statusKey] = statusStyles[effectiveStatus] || statusStyles.disabled; const handleDelete = useCallback(async () => { if (!confirm(t("removeConfirm"))) return; @@ -194,7 +196,10 @@ export function AccountCard({ account, index, onDelete, proxies, onProxyChange, const [statusToggling, setStatusToggling] = useState(false); const isEnabled = account.status !== "disabled"; - const canToggle = account.status === "active" || account.status === "disabled" || account.status === "rate_limited" || account.status === "refreshing" || account.status === "quota_exhausted"; + // `rate_limited` is no longer a backend status; toggling is allowed for the + // remaining backend states. Cards rendered with derived "rate_limited" badge + // have backend status "active" and therefore satisfy this check. + const canToggle = account.status === "active" || account.status === "disabled" || account.status === "refreshing" || account.status === "quota_exhausted"; const handleStatusToggle = useCallback(async () => { if (!onToggleStatus || !canToggle) return; diff --git a/web/src/components/AccountList.tsx b/web/src/components/AccountList.tsx index 24482f6b..5da1cba6 100644 --- a/web/src/components/AccountList.tsx +++ b/web/src/components/AccountList.tsx @@ -3,6 +3,7 @@ import { useI18n, useT } from "../../../shared/i18n/context"; import { AccountCard } from "./AccountCard"; import { AccountImportExport } from "./AccountImportExport"; import type { Account, ProxyEntry, QuotaWarning } from "../../../shared/types"; +import { derivedStatus } from "../lib/accountStatus"; const STATUS_FILTER_STORAGE_KEY = "codex-proxy-account-list-status-filter"; const EXPAND_ALL_STORAGE_KEY = "codex-proxy-account-list-expand-all"; @@ -128,12 +129,18 @@ export function AccountList({ accounts, loading, onDelete, onRefresh, refreshing localStorage.setItem(EXPAND_ALL_STORAGE_KEY, String(visibleCount > PAGE_SIZE)); }, [visibleCount]); + // Counts are bucketed by derivedStatus so the "rate_limited" filter still + // works on cachedQuota-exhausted accounts even though the backend status + // is "active". Keep filter semantics consistent with the badge. const statusCounts: Record = {}; - for (const a of accounts) statusCounts[a.status] = (statusCounts[a.status] ?? 0) + 1; + for (const a of accounts) { + const key = derivedStatus(a); + statusCounts[key] = (statusCounts[key] ?? 0) + 1; + } const displayAccounts = statusFilter === "all" ? accounts - : accounts.filter((a) => a.status === statusFilter); + : accounts.filter((a) => derivedStatus(a) === statusFilter); useEffect(() => { if (statusFilter !== "all" && !statusCounts[statusFilter]) { diff --git a/web/src/components/AccountTable.tsx b/web/src/components/AccountTable.tsx index c7579c2c..bc063908 100644 --- a/web/src/components/AccountTable.tsx +++ b/web/src/components/AccountTable.tsx @@ -4,6 +4,12 @@ import type { TranslationKey } from "../../../shared/i18n/translations"; import type { AssignmentAccount } from "../../../shared/hooks/use-proxy-assignments"; import type { ProxyEntry } from "../../../shared/types"; +// Note: AssignmentAccount has no `quota` field, so we cannot derive +// "rate_limited" from cachedQuota here — the table only knows the backend +// status string. The filter dropdown and badge therefore reflect raw +// status. AccountList / AccountCard do derive via `derivedStatus` from +// `web/src/lib/accountStatus.ts` because they receive full `Account`. + const PAGE_SIZE = 50; const statusStyles: Record = { @@ -170,7 +176,6 @@ export function AccountTable({ - @@ -235,7 +240,9 @@ export function AccountTable({ {onToggleStatus && (() => { const isEnabled = acct.status !== "disabled"; - const canToggle = acct.status === "active" || acct.status === "disabled" || acct.status === "rate_limited" || acct.status === "refreshing" || acct.status === "quota_exhausted"; + // "rate_limited" retired as a backend status (now derived from + // cachedQuota); the remaining states still gate the toggle. + const canToggle = acct.status === "active" || acct.status === "disabled" || acct.status === "refreshing" || acct.status === "quota_exhausted"; return (