diff --git a/.gitignore b/.gitignore index 6e5f8cc59c..ec28ef9944 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ apps/web/src/components/__screenshots__ .vitest-* __screenshots__/ .tanstack +.codex diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index 3145038647..4270816c0a 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -519,6 +519,25 @@ export class CodexAppServerManager extends EventEmitter { }); }); + it("projects account metadata updates into normalized thread activities", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "account.updated", + eventId: asEventId("evt-account-updated"), + provider: "codex", + createdAt: now, + threadId: asThreadId("thread-1"), + payload: { + account: { + type: "chatgpt", + planType: "pro", + sparkEnabled: true, + }, + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some( + (activity: ProviderRuntimeTestActivity) => activity.kind === "account.updated", + ), + ); + + const accountActivity = thread.activities.find( + (activity: ProviderRuntimeTestActivity) => activity.kind === "account.updated", + ); + expect(accountActivity?.payload).toMatchObject({ + type: "chatgpt", + planType: "pro", + sparkEnabled: true, + }); + }); + + it("projects account rate-limit updates into normalized thread activities", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "account.rate-limits.updated", + eventId: asEventId("evt-account-rate-limits-updated"), + provider: "claudeAgent", + createdAt: now, + threadId: asThreadId("thread-1"), + payload: { + rateLimits: { + tokensPerMinute: 600000, + requestsPerMinute: 60, + remainingTokens: 512000, + }, + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.activities.some( + (activity: ProviderRuntimeTestActivity) => activity.kind === "account.rate-limits.updated", + ), + ); + + const rateLimitsActivity = thread.activities.find( + (activity: ProviderRuntimeTestActivity) => activity.kind === "account.rate-limits.updated", + ); + expect(rateLimitsActivity?.payload).toMatchObject({ + tokensPerMinute: 600000, + requestsPerMinute: 60, + remainingTokens: 512000, + }); + }); + it("projects compacted thread state into context compaction activities", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 8d3fb5d752..4651dd79a4 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -107,6 +107,31 @@ function buildContextWindowActivityPayload( return event.payload.usage; } +function buildAccountActivityPayload( + event: ProviderRuntimeEvent, +): Record | undefined { + switch (event.type) { + case "account.updated": { + const account = event.payload.account; + if (!account || typeof account !== "object" || Array.isArray(account)) { + return undefined; + } + return account as Record; + } + + case "account.rate-limits.updated": { + const rateLimits = event.payload.rateLimits; + if (!rateLimits || typeof rateLimits !== "object" || Array.isArray(rateLimits)) { + return undefined; + } + return rateLimits as Record; + } + + default: + return undefined; + } +} + function normalizeRuntimeTurnState( value: string | undefined, ): "completed" | "failed" | "interrupted" | "cancelled" { @@ -428,6 +453,27 @@ function runtimeEventToActivities( ]; } + case "account.updated": + case "account.rate-limits.updated": { + const payload = buildAccountActivityPayload(event); + if (!payload) { + return []; + } + + return [ + { + id: event.eventId, + createdAt: event.createdAt, + tone: "info", + kind: event.type, + summary: event.type === "account.updated" ? "Account updated" : "Account quota updated", + payload, + turnId: toTurnId(event.turnId) ?? null, + ...maybeSequence, + }, + ]; + } + case "item.updated": { if (!isToolLifecycleItemType(event.payload.itemType)) { return []; diff --git a/apps/server/src/provider/Layers/ClaudeProvider.ts b/apps/server/src/provider/Layers/ClaudeProvider.ts index 761b795fe5..58bcdf7a41 100644 --- a/apps/server/src/provider/Layers/ClaudeProvider.ts +++ b/apps/server/src/provider/Layers/ClaudeProvider.ts @@ -22,12 +22,21 @@ import { spawnAndCollect, type CommandResult, } from "../providerSnapshot"; +import { normalizeProviderUsageFromRateLimits } from "../providerUsage"; import { makeManagedServerProvider } from "../makeManagedServerProvider"; import { ClaudeProvider } from "../Services/ClaudeProvider"; import { ServerSettingsService } from "../../serverSettings"; import { ServerSettingsError } from "@t3tools/contracts"; const PROVIDER = "claudeAgent" as const; +interface ClaudeProbeState { + readonly subscriptionType: string | undefined; + readonly account: Record | null; + readonly rateLimits: ReadonlyArray> | Record | null; +} + +type MaybeClaudeProbeState = ClaudeProbeState | string | undefined; + const BUILT_IN_MODELS: ReadonlyArray = [ { slug: "claude-opus-4-6", @@ -382,7 +391,8 @@ export function adjustModelsForSubscription( // ── SDK capability probe ──────────────────────────────────────────── -const CAPABILITIES_PROBE_TIMEOUT_MS = 8_000; +const CAPABILITIES_PROBE_TIMEOUT_MS = 2_000; +const CLAUDE_RATE_LIMIT_PROBE_TIMEOUT_MS = 1_000; /** * Probe account information by spawning a lightweight Claude Agent SDK @@ -395,6 +405,20 @@ const CAPABILITIES_PROBE_TIMEOUT_MS = 8_000; * This is used as a fallback when `claude auth status` does not include * subscription type information. */ +function toClaudeProbeState(value: MaybeClaudeProbeState): ClaudeProbeState | undefined { + if (value === undefined) { + return undefined; + } + if (typeof value === "string") { + return { + subscriptionType: value, + account: null, + rateLimits: null, + }; + } + return value; +} + const probeClaudeCapabilities = (binaryPath: string) => { const abort = new AbortController(); return Effect.tryPromise(async () => { @@ -410,8 +434,60 @@ const probeClaudeCapabilities = (binaryPath: string) => { stderr: () => {}, }, }); + const rateLimitInfos: Array> = []; + const rateLimitsPromise = new Promise> | null>( + (resolve) => { + let settled = false; + let debounceTimer: ReturnType | null = null; + + const settle = () => { + if (settled) { + return; + } + settled = true; + if (debounceTimer) { + clearTimeout(debounceTimer); + } + resolve(rateLimitInfos.length > 0 ? rateLimitInfos : null); + }; + + const scheduleSettle = () => { + if (debounceTimer) { + clearTimeout(debounceTimer); + } + debounceTimer = setTimeout(settle, CLAUDE_RATE_LIMIT_PROBE_TIMEOUT_MS); + }; + + void (async () => { + try { + for await (const message of q) { + if (message.type !== "rate_limit_event" || !message.rate_limit_info) { + continue; + } + + rateLimitInfos.push({ ...message.rate_limit_info } as Record); + scheduleSettle(); + } + } catch { + settle(); + } + + settle(); + })(); + }, + ); const init = await q.initializationResult(); - return { subscriptionType: init.account?.subscriptionType }; + const rateLimits = await Promise.race([ + rateLimitsPromise, + new Promise((resolve) => + setTimeout(() => resolve(null), CLAUDE_RATE_LIMIT_PROBE_TIMEOUT_MS), + ), + ]); + return { + subscriptionType: init.account?.subscriptionType, + account: init.account ? ({ ...init.account } as Record) : null, + rateLimits, + } satisfies ClaudeProbeState; }).pipe( Effect.ensuring( Effect.sync(() => { @@ -439,7 +515,7 @@ const runClaudeCommand = Effect.fn("runClaudeCommand")(function* (args: Readonly }); export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")(function* ( - resolveSubscriptionType?: (binaryPath: string) => Effect.Effect, + resolveSubscriptionType?: (binaryPath: string) => Effect.Effect, ): Effect.fn.Return< ServerProvider, ServerSettingsError, @@ -532,9 +608,17 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( // ── Auth check + subscription detection ──────────────────────────── - const authProbe = yield* runClaudeCommand(["auth", "status"]).pipe( - Effect.timeoutOption(DEFAULT_TIMEOUT_MS), - Effect.result, + const [authProbe, probeState] = yield* Effect.all( + [ + runClaudeCommand(["auth", "status"]).pipe( + Effect.timeoutOption(DEFAULT_TIMEOUT_MS), + Effect.result, + ), + resolveSubscriptionType + ? resolveSubscriptionType(claudeSettings.binaryPath).pipe(Effect.map(toClaudeProbeState)) + : Effect.void.pipe(Effect.as(undefined)), + ], + { concurrency: "unbounded" }, ); // Determine subscription type from multiple sources (cheapest first): @@ -551,11 +635,16 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( authMethod = extractClaudeAuthMethodFromOutput(authProbe.success.value); } - if (!subscriptionType && resolveSubscriptionType) { - subscriptionType = yield* resolveSubscriptionType(claudeSettings.binaryPath); + if (!subscriptionType) { + subscriptionType = probeState?.subscriptionType; } const resolvedModels = adjustModelsForSubscription(models, subscriptionType); + const usage = normalizeProviderUsageFromRateLimits({ + provider: PROVIDER, + rateLimits: probeState?.rateLimits, + updatedAt: checkedAt, + }); // ── Handle auth results (same logic as before, adjusted models) ── @@ -566,11 +655,14 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( enabled: claudeSettings.enabled, checkedAt, models: resolvedModels, + ...(usage ? { usage } : {}), probe: { installed: true, version: parsedVersion, status: "warning", auth: { status: "unknown" }, + ...(probeState?.account ? { account: probeState.account } : {}), + ...(probeState?.rateLimits ? { rateLimits: probeState.rateLimits } : {}), message: error instanceof Error ? `Could not verify Claude authentication status: ${error.message}.` @@ -585,11 +677,14 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( enabled: claudeSettings.enabled, checkedAt, models: resolvedModels, + ...(usage ? { usage } : {}), probe: { installed: true, version: parsedVersion, status: "warning", auth: { status: "unknown" }, + ...(probeState?.account ? { account: probeState.account } : {}), + ...(probeState?.rateLimits ? { rateLimits: probeState.rateLimits } : {}), message: "Could not verify Claude authentication status. Timed out while running command.", }, }); @@ -602,6 +697,7 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( enabled: claudeSettings.enabled, checkedAt, models: resolvedModels, + ...(usage ? { usage } : {}), probe: { installed: true, version: parsedVersion, @@ -610,6 +706,8 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( ...parsed.auth, ...(authMetadata ? authMetadata : {}), }, + ...(probeState?.account ? { account: probeState.account } : {}), + ...(probeState?.rateLimits ? { rateLimits: probeState.rateLimits } : {}), ...(parsed.message ? { message: parsed.message } : {}), }, }); @@ -624,8 +722,7 @@ export const ClaudeProviderLive = Layer.effect( const subscriptionProbeCache = yield* Cache.make({ capacity: 1, timeToLive: Duration.minutes(5), - lookup: (binaryPath: string) => - probeClaudeCapabilities(binaryPath).pipe(Effect.map((r) => r?.subscriptionType)), + lookup: (binaryPath: string) => probeClaudeCapabilities(binaryPath), }); const checkProvider = checkClaudeProviderStatus((binaryPath) => diff --git a/apps/server/src/provider/Layers/CodexProvider.ts b/apps/server/src/provider/Layers/CodexProvider.ts index 667bdf048b..bec465009e 100644 --- a/apps/server/src/provider/Layers/CodexProvider.ts +++ b/apps/server/src/provider/Layers/CodexProvider.ts @@ -7,18 +7,7 @@ import type { ServerProviderAuth, ServerProviderState, } from "@t3tools/contracts"; -import { - Cache, - Duration, - Effect, - Equal, - FileSystem, - Layer, - Option, - Path, - Result, - Stream, -} from "effect"; +import { Effect, Equal, FileSystem, Layer, Option, Path, Result, Stream } from "effect"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; import { @@ -44,13 +33,15 @@ import { codexAuthSubType, type CodexAccountSnapshot, } from "../codexAccount"; -import { probeCodexAccount } from "../codexAppServer"; +import { probeCodexAccountState, type CodexAccountState } from "../codexAppServer"; +import { normalizeProviderUsageFromRateLimits } from "../providerUsage"; import { CodexProvider } from "../Services/CodexProvider"; import { ServerSettingsService } from "../../serverSettings"; import { ServerSettingsError } from "@t3tools/contracts"; const PROVIDER = "codex" as const; const OPENAI_AUTH_PROVIDERS = new Set(["openai"]); +type MaybeCodexAccountState = CodexAccountState | CodexAccountSnapshot | undefined; const BUILT_IN_MODELS: ReadonlyArray = [ { slug: "gpt-5.4", @@ -291,13 +282,13 @@ export const hasCustomModelProvider = readCodexConfigModelProvider().pipe( Effect.orElseSucceed(() => false), ); -const CAPABILITIES_PROBE_TIMEOUT_MS = 8_000; +const CAPABILITIES_PROBE_TIMEOUT_MS = 2_000; const probeCodexCapabilities = (input: { readonly binaryPath: string; readonly homePath?: string; }) => - Effect.tryPromise((signal) => probeCodexAccount({ ...input, signal })).pipe( + Effect.tryPromise((signal) => probeCodexAccountState({ ...input, signal })).pipe( Effect.timeoutOption(CAPABILITIES_PROBE_TIMEOUT_MS), Effect.result, Effect.map((result) => { @@ -306,6 +297,20 @@ const probeCodexCapabilities = (input: { }), ); +function toCodexAccountState(account: MaybeCodexAccountState): CodexAccountState | undefined { + if (!account) { + return undefined; + } + + return "snapshot" in account + ? account + : { + snapshot: account, + account: null, + rateLimits: null, + }; +} + const runCodexCommand = Effect.fn("runCodexCommand")(function* (args: ReadonlyArray) { const settingsService = yield* ServerSettingsService; const codexSettings = yield* settingsService.getSettings.pipe( @@ -325,7 +330,7 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu resolveAccount?: (input: { readonly binaryPath: string; readonly homePath?: string; - }) => Effect.Effect, + }) => Effect.Effect, ): Effect.fn.Return< ServerProvider, ServerSettingsError, @@ -452,17 +457,28 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu }); } - const authProbe = yield* runCodexCommand(["login", "status"]).pipe( - Effect.timeoutOption(DEFAULT_TIMEOUT_MS), - Effect.result, + const [authProbe, accountState] = yield* Effect.all( + [ + runCodexCommand(["login", "status"]).pipe( + Effect.timeoutOption(DEFAULT_TIMEOUT_MS), + Effect.result, + ), + resolveAccount + ? resolveAccount({ + binaryPath: codexSettings.binaryPath, + homePath: codexSettings.homePath, + }).pipe(Effect.map(toCodexAccountState)) + : Effect.void.pipe(Effect.as(undefined)), + ], + { concurrency: "unbounded" }, ); - const account = resolveAccount - ? yield* resolveAccount({ - binaryPath: codexSettings.binaryPath, - homePath: codexSettings.homePath, - }) - : undefined; - const resolvedModels = adjustCodexModelsForAccount(models, account); + const accountSnapshot = accountState?.snapshot; + const resolvedModels = adjustCodexModelsForAccount(models, accountSnapshot); + const usage = normalizeProviderUsageFromRateLimits({ + provider: PROVIDER, + rateLimits: accountState?.rateLimits, + updatedAt: checkedAt, + }); if (Result.isFailure(authProbe)) { const error = authProbe.failure; @@ -471,11 +487,14 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu enabled: codexSettings.enabled, checkedAt, models: resolvedModels, + ...(usage ? { usage } : {}), probe: { installed: true, version: parsedVersion, status: "warning", auth: { status: "unknown" }, + ...(accountState?.account ? { account: accountState.account } : {}), + ...(accountState?.rateLimits ? { rateLimits: accountState.rateLimits } : {}), message: error instanceof Error ? `Could not verify Codex authentication status: ${error.message}.` @@ -490,24 +509,28 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu enabled: codexSettings.enabled, checkedAt, models: resolvedModels, + ...(usage ? { usage } : {}), probe: { installed: true, version: parsedVersion, status: "warning", auth: { status: "unknown" }, + ...(accountState?.account ? { account: accountState.account } : {}), + ...(accountState?.rateLimits ? { rateLimits: accountState.rateLimits } : {}), message: "Could not verify Codex authentication status. Timed out while running command.", }, }); } const parsed = parseAuthStatusFromOutput(authProbe.success.value); - const authType = codexAuthSubType(account); - const authLabel = codexAuthSubLabel(account); + const authType = codexAuthSubType(accountSnapshot); + const authLabel = codexAuthSubLabel(accountSnapshot); return buildServerProvider({ provider: PROVIDER, enabled: codexSettings.enabled, checkedAt, models: resolvedModels, + ...(usage ? { usage } : {}), probe: { installed: true, version: parsedVersion, @@ -517,6 +540,8 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu ...(authType ? { type: authType } : {}), ...(authLabel ? { label: authLabel } : {}), }, + ...(accountState?.account ? { account: accountState.account } : {}), + ...(accountState?.rateLimits ? { rateLimits: accountState.rateLimits } : {}), ...(parsed.message ? { message: parsed.message } : {}), }, }); @@ -529,20 +554,11 @@ export const CodexProviderLive = Layer.effect( const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; - const accountProbeCache = yield* Cache.make({ - capacity: 4, - timeToLive: Duration.minutes(5), - lookup: (key: string) => { - const [binaryPath, homePath] = JSON.parse(key) as [string, string | undefined]; - return probeCodexCapabilities({ - binaryPath, - ...(homePath ? { homePath } : {}), - }); - }, - }); - const checkProvider = checkCodexProviderStatus((input) => - Cache.get(accountProbeCache, JSON.stringify([input.binaryPath, input.homePath])), + probeCodexCapabilities({ + binaryPath: input.binaryPath, + ...(input.homePath ? { homePath: input.homePath } : {}), + }), ).pipe( Effect.provideService(ServerSettingsService, serverSettings), Effect.provideService(FileSystem.FileSystem, fileSystem), diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index 116c008d67..4d518cfdea 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -884,6 +884,56 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest()))( ), ); + it.effect( + "includes provider-level Claude account and weekly quota data from the SDK probe", + () => + Effect.gen(function* () { + const status = yield* checkClaudeProviderStatus(() => + Effect.succeed({ + subscriptionType: "maxplan", + account: { + email: "adi@example.com", + subscriptionType: "maxplan", + }, + rateLimits: { + status: "allowed_warning", + rateLimitType: "seven_day", + utilization: 0.42, + resetsAt: 1_775_463_116, + }, + }), + ); + + assert.strictEqual(status.provider, "claudeAgent"); + assert.strictEqual(status.status, "ready"); + assert.strictEqual(status.auth.status, "authenticated"); + assert.deepStrictEqual(status.account, { + email: "adi@example.com", + subscriptionType: "maxplan", + }); + assert.deepStrictEqual(status.rateLimits, { + status: "allowed_warning", + rateLimitType: "seven_day", + utilization: 0.42, + resetsAt: 1_775_463_116, + }); + }).pipe( + Effect.provide( + mockSpawnerLayer((args) => { + const joined = args.join(" "); + if (joined === "--version") return { stdout: "1.0.0\n", stderr: "", code: 0 }; + if (joined === "auth status") + return { + stdout: '{"loggedIn":true,"authMethod":"claude.ai"}\n', + stderr: "", + code: 0, + }; + throw new Error(`Unexpected args: ${joined}`); + }), + ), + ), + ); + it.effect("returns an api key label for claude api key auth", () => Effect.gen(function* () { const status = yield* checkClaudeProviderStatus(); diff --git a/apps/server/src/provider/codexAccount.ts b/apps/server/src/provider/codexAccount.ts index 1db00250f6..7b97dee9b1 100644 --- a/apps/server/src/provider/codexAccount.ts +++ b/apps/server/src/provider/codexAccount.ts @@ -32,9 +32,18 @@ function asString(value: unknown): string | undefined { return typeof value === "string" ? value : undefined; } -export function readCodexAccountSnapshot(response: unknown): CodexAccountSnapshot { +export function readCodexAccountPayload(response: unknown): Record | undefined { + const record = asObject(response); + return asObject(record?.account) ?? record; +} + +export function readCodexRateLimitsPayload(response: unknown): Record | undefined { const record = asObject(response); - const account = asObject(record?.account) ?? record; + return asObject(record?.rateLimits) ?? record; +} + +export function readCodexAccountSnapshot(response: unknown): CodexAccountSnapshot { + const account = readCodexAccountPayload(response); const accountType = asString(account?.type); if (accountType === "apiKey") { diff --git a/apps/server/src/provider/codexAppServer.test.ts b/apps/server/src/provider/codexAppServer.test.ts new file mode 100644 index 0000000000..64cc49a24e --- /dev/null +++ b/apps/server/src/provider/codexAppServer.test.ts @@ -0,0 +1,144 @@ +import { EventEmitter } from "node:events"; +import { PassThrough } from "node:stream"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const { spawnMock, spawnSyncMock } = vi.hoisted(() => ({ + spawnMock: vi.fn(), + spawnSyncMock: vi.fn(), +})); + +vi.mock("node:child_process", () => ({ + spawn: spawnMock, + spawnSync: spawnSyncMock, +})); + +import { probeCodexAccountState } from "./codexAppServer"; + +interface StubOptions { + readonly rateLimitsBehavior: "ignore" | "respond"; +} + +function installCodexProbeChild(options: StubOptions) { + spawnMock.mockImplementation(() => { + const stdin = new PassThrough(); + const stdout = new PassThrough(); + const stderr = new PassThrough(); + const child = new EventEmitter() as EventEmitter & { + stdin: PassThrough; + stdout: PassThrough; + stderr: PassThrough; + killed: boolean; + pid: number; + kill: () => void; + }; + + let inputBuffer = ""; + child.stdin = stdin; + child.stdout = stdout; + child.stderr = stderr; + child.killed = false; + child.pid = 1234; + child.kill = () => { + child.killed = true; + child.emit("exit", null, null); + }; + + const writeJson = (value: unknown) => { + stdout.write(`${JSON.stringify(value)}\n`); + }; + + stdin.on("data", (chunk: Buffer | string) => { + inputBuffer += chunk.toString(); + let newlineIndex = inputBuffer.indexOf("\n"); + while (newlineIndex >= 0) { + const line = inputBuffer.slice(0, newlineIndex).trim(); + inputBuffer = inputBuffer.slice(newlineIndex + 1); + if (line.length === 0) { + newlineIndex = inputBuffer.indexOf("\n"); + continue; + } + + const message = JSON.parse(line) as { id?: number; method?: string }; + if (message.method === "initialize") { + writeJson({ id: 1, result: {} }); + } else if (message.id === 2 && message.method === "account/read") { + writeJson({ + id: 2, + result: { + account: { + type: "chatgpt", + planType: "pro", + }, + }, + }); + } else if (message.id === 3 && message.method === "account/rateLimits/read") { + if (options.rateLimitsBehavior === "respond") { + writeJson({ + id: 3, + result: { + rateLimits: { + primary: { + remaining: 7, + used: 3, + }, + }, + }, + }); + } + } + + newlineIndex = inputBuffer.indexOf("\n"); + } + }); + + return child; + }); +} + +afterEach(() => { + spawnMock.mockReset(); + spawnSyncMock.mockReset(); +}); + +describe("probeCodexAccountState", () => { + it("resolves when account/rateLimits/read is ignored", async () => { + installCodexProbeChild({ rateLimitsBehavior: "ignore" }); + + const state = await probeCodexAccountState({ + binaryPath: "codex", + signal: AbortSignal.timeout(3_000), + }); + + expect(state.snapshot).toEqual({ + type: "chatgpt", + planType: "pro", + sparkEnabled: true, + }); + expect(state.account).toEqual({ + type: "chatgpt", + planType: "pro", + }); + expect(state.rateLimits).toBeNull(); + }); + + it("includes rate limits when account/rateLimits/read responds", async () => { + installCodexProbeChild({ rateLimitsBehavior: "respond" }); + + const state = await probeCodexAccountState({ + binaryPath: "codex", + signal: AbortSignal.timeout(3_000), + }); + + expect(state.snapshot).toEqual({ + type: "chatgpt", + planType: "pro", + sparkEnabled: true, + }); + expect(state.rateLimits).toEqual({ + primary: { + remaining: 7, + used: 3, + }, + }); + }); +}); diff --git a/apps/server/src/provider/codexAppServer.ts b/apps/server/src/provider/codexAppServer.ts index d25fc3533e..a682bb76ab 100644 --- a/apps/server/src/provider/codexAppServer.ts +++ b/apps/server/src/provider/codexAppServer.ts @@ -1,15 +1,31 @@ import { spawn, spawnSync, type ChildProcessWithoutNullStreams } from "node:child_process"; import readline from "node:readline"; -import { readCodexAccountSnapshot, type CodexAccountSnapshot } from "./codexAccount"; +import { + readCodexAccountPayload, + readCodexAccountSnapshot, + readCodexRateLimitsPayload, + type CodexAccountSnapshot, +} from "./codexAccount"; + +export interface CodexAccountState { + readonly snapshot: CodexAccountSnapshot; + readonly account: Record | null; + readonly rateLimits: Record | null; +} interface JsonRpcProbeResponse { readonly id?: unknown; + readonly method?: unknown; + readonly params?: unknown; readonly result?: unknown; readonly error?: { readonly message?: unknown; }; } +const RATE_LIMITS_PROBE_GRACE_MS = 300; +const EXIT_GRACE_MS = 50; + function readErrorMessage(response: JsonRpcProbeResponse): string | undefined { return typeof response.error?.message === "string" ? response.error.message : undefined; } @@ -45,6 +61,15 @@ export async function probeCodexAccount(input: { readonly homePath?: string; readonly signal?: AbortSignal; }): Promise { + const state = await probeCodexAccountState(input); + return state.snapshot; +} + +export async function probeCodexAccountState(input: { + readonly binaryPath: string; + readonly homePath?: string; + readonly signal?: AbortSignal; +}): Promise { return await new Promise((resolve, reject) => { const child = spawn(input.binaryPath, ["app-server"], { env: { @@ -57,8 +82,17 @@ export async function probeCodexAccount(input: { const output = readline.createInterface({ input: child.stdout }); let completed = false; + let accountSnapshot: CodexAccountSnapshot | null = null; + let accountPayload: Record | null = null; + let rateLimitsPayload: Record | null | undefined; + let rateLimitsFallbackTimer: NodeJS.Timeout | undefined; + let rateLimitsRequestedAt: number | null = null; const cleanup = () => { + if (rateLimitsFallbackTimer) { + clearTimeout(rateLimitsFallbackTimer); + rateLimitsFallbackTimer = undefined; + } output.removeAllListeners(); output.close(); child.removeAllListeners(); @@ -83,6 +117,42 @@ export async function probeCodexAccount(input: { ), ); + const maybeFinish = () => { + if (!accountSnapshot) { + return; + } + + if (rateLimitsPayload === undefined) { + if (rateLimitsRequestedAt === null) { + return; + } + + if (!rateLimitsFallbackTimer) { + const remainingGraceMs = Math.max( + 0, + RATE_LIMITS_PROBE_GRACE_MS - (Date.now() - rateLimitsRequestedAt), + ); + rateLimitsFallbackTimer = setTimeout(() => { + rateLimitsPayload = null; + maybeFinish(); + }, remainingGraceMs); + } + return; + } + + const snapshot = accountSnapshot; + const account = accountPayload ?? null; + const rateLimits = rateLimitsPayload; + + finish(() => + resolve({ + snapshot, + account, + rateLimits, + }), + ); + }; + if (input.signal?.aborted) { fail(new Error("Codex account probe aborted.")); return; @@ -112,6 +182,15 @@ export async function probeCodexAccount(input: { } const response = parsed as JsonRpcProbeResponse; + if (response.method === "account/rateLimits/updated") { + const payload = + readCodexRateLimitsPayload(response.params) ?? + readCodexRateLimitsPayload(response.result); + rateLimitsPayload = payload ?? null; + maybeFinish(); + return; + } + if (response.id === 1) { const errorMessage = readErrorMessage(response); if (errorMessage) { @@ -121,6 +200,8 @@ export async function probeCodexAccount(input: { writeMessage({ method: "initialized" }); writeMessage({ id: 2, method: "account/read", params: {} }); + rateLimitsRequestedAt = Date.now(); + writeMessage({ id: 3, method: "account/rateLimits/read", params: {} }); return; } @@ -131,18 +212,38 @@ export async function probeCodexAccount(input: { return; } - finish(() => resolve(readCodexAccountSnapshot(response.result))); + accountSnapshot = readCodexAccountSnapshot(response.result); + accountPayload = readCodexAccountPayload(response.result) ?? null; + maybeFinish(); + return; + } + + if (response.id === 3) { + const payload = readCodexRateLimitsPayload(response.result); + if (payload) { + rateLimitsPayload = payload; + } + maybeFinish(); } }); child.once("error", fail); child.once("exit", (code, signal) => { - if (completed) return; - fail( - new Error( - `codex app-server exited before probe completed (code=${code ?? "null"}, signal=${signal ?? "null"}).`, - ), - ); + setTimeout(() => { + if (completed) return; + if (accountSnapshot) { + rateLimitsPayload ??= null; + maybeFinish(); + if (completed) { + return; + } + } + fail( + new Error( + `codex app-server exited before probe completed (code=${code ?? "null"}, signal=${signal ?? "null"}).`, + ), + ); + }, EXIT_GRACE_MS); }); writeMessage({ diff --git a/apps/server/src/provider/providerSnapshot.ts b/apps/server/src/provider/providerSnapshot.ts index e1243c4bd0..af4ada77f4 100644 --- a/apps/server/src/provider/providerSnapshot.ts +++ b/apps/server/src/provider/providerSnapshot.ts @@ -3,6 +3,7 @@ import type { ServerProviderAuth, ServerProviderModel, ServerProviderState, + ServerProviderUsage, } from "@t3tools/contracts"; import { Effect, Stream } from "effect"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; @@ -23,6 +24,8 @@ export interface ProviderProbeResult { readonly status: Exclude; readonly auth: ServerProviderAuth; readonly message?: string; + readonly account?: Record; + readonly rateLimits?: Record | ReadonlyArray>; } export function nonEmptyTrimmed(value: string | undefined): string | undefined { @@ -130,6 +133,7 @@ export function buildServerProvider(input: { checkedAt: string; models: ReadonlyArray; probe: ProviderProbeResult; + usage?: ServerProviderUsage; }): ServerProvider { return { provider: input.provider, @@ -140,6 +144,9 @@ export function buildServerProvider(input: { auth: input.probe.auth, checkedAt: input.checkedAt, ...(input.probe.message ? { message: input.probe.message } : {}), + ...(input.probe.account ? { account: input.probe.account } : {}), + ...(input.probe.rateLimits ? { rateLimits: input.probe.rateLimits } : {}), + ...(input.usage ? { usage: input.usage } : {}), models: input.models, }; } diff --git a/apps/server/src/provider/providerUsage.test.ts b/apps/server/src/provider/providerUsage.test.ts new file mode 100644 index 0000000000..dc16b4e4ba --- /dev/null +++ b/apps/server/src/provider/providerUsage.test.ts @@ -0,0 +1,433 @@ +import { describe, expect, it } from "vitest"; + +import { + deriveUsedPercentFromRemaining, + mergeProviderUsage, + normalizeProviderUsageFromRateLimits, +} from "./providerUsage"; + +describe("providerUsage", () => { + it("derives used percent from remaining values across edge cases", () => { + expect(deriveUsedPercentFromRemaining(null, 100)).toBeNull(); + expect(deriveUsedPercentFromRemaining(0, 0)).toBe(100); + expect(deriveUsedPercentFromRemaining(100, 0)).toBe(0); + expect(deriveUsedPercentFromRemaining(100, null)).toBe(0); + expect(deriveUsedPercentFromRemaining(100, 100)).toBe(0); + expect(deriveUsedPercentFromRemaining(25, 100)).toBe(75); + }); + + it("normalizes Codex five-hour and weekly buckets from rate-limit payloads", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + rateLimitsByLimitId: { + weekly: { + limitId: "weekly", + usage: 29, + limit: 100, + window_seconds: 604_800, + reset_at: 1_775_555_565, + }, + session: { + limitId: "session", + usage: 41, + limit: 100, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + }, + }, + }); + + expect(usage).toEqual({ + updatedAt: "2026-03-31T10:00:00.000Z", + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 59, + usedPercent: 41, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 71, + usedPercent: 29, + resetsAt: new Date(1_775_555_565_000).toISOString(), + }, + ], + }); + }); + + it("derives usedPercent from usage and limit when usage is an absolute count", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + rateLimitsByLimitId: { + session: { + limitId: "session", + usage: 15, + limit: 60, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + }, + }, + }); + + expect(usage?.buckets).toEqual([ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 75, + usedPercent: 25, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + ]); + }); + + it("normalizes Codex buckets when the payload reports used and remaining percentages", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + primary: { + used: 3, + remaining: 97, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + secondary: { + used: 29, + remaining: 71, + window_seconds: 604_800, + reset_at: 1_775_555_565, + }, + }, + }); + + expect(usage).toEqual({ + updatedAt: "2026-03-31T10:00:00.000Z", + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 97, + usedPercent: 3, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 71, + usedPercent: 29, + resetsAt: new Date(1_775_555_565_000).toISOString(), + }, + ], + }); + }); + + it("derives usedPercent from remaining when the payload only reports remaining percentages", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + primary: { + remaining: 97, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + }, + }); + + expect(usage?.buckets).toEqual([ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 97, + usedPercent: 3, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + ]); + }); + + it("derives usedPercent from absolute remaining counts without clamping them as percentages", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + rateLimitsByLimitId: { + session: { + limitId: "session", + remaining: 500, + limit: 1_000, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + }, + }, + }); + + expect(usage?.buckets).toEqual([ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 50, + usedPercent: 50, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + ]); + }); + + it("prefers remaining percentages over remaining counts when both are present", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + rateLimitsByLimitId: { + session: { + limitId: "session", + remainingPercent: 80, + remaining: 500, + limit: 1_000, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + }, + }, + }); + + expect(usage?.buckets).toEqual([ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 80, + usedPercent: 20, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + ]); + }); + + it("does not double-scale low percentages derived from usage and limit", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:00:00.000Z", + rateLimits: { + rateLimitsByLimitId: { + session: { + limitId: "session", + usage: 1, + limit: 1_000, + window_seconds: 18_000, + reset_at: 1_775_123_456, + }, + }, + }, + }); + + const bucket = usage?.buckets[0]; + expect(bucket?.usedPercent).toBeCloseTo(0.1, 6); + expect(bucket?.remainingPercent).toBeCloseTo(99.9, 6); + }); + + it("normalizes Claude weekly rate-limit events", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "claudeAgent", + updatedAt: "2026-03-31T10:05:00.000Z", + rateLimits: { + rate_limit_info: { + rateLimitType: "seven_day", + utilization: 0.42, + resetsAt: 1_775_463_116, + }, + }, + }); + + expect(usage).toEqual({ + updatedAt: "2026-03-31T10:05:00.000Z", + buckets: [ + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 58, + usedPercent: 42, + resetsAt: new Date(1_775_463_116_000).toISOString(), + }, + ], + }); + }); + + it("clamps over-limit utilization values to 100 percent", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "claudeAgent", + updatedAt: "2026-03-31T10:06:00.000Z", + rateLimits: { + rate_limit_info: { + rateLimitType: "seven_day", + utilization: 1.5, + resetsAt: 1_775_463_116, + }, + }, + }); + + expect(usage?.buckets).toEqual([ + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 0, + usedPercent: 100, + resetsAt: new Date(1_775_463_116_000).toISOString(), + }, + ]); + }); + + it("normalizes Claude five-hour rate-limit events", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "claudeAgent", + updatedAt: "2026-03-31T10:10:00.000Z", + rateLimits: { + rate_limit_info: { + rate_limit_type: "five_hour", + utilization: 0.15, + reset_at: 1_775_123_456, + }, + }, + }); + + expect(usage).toEqual({ + updatedAt: "2026-03-31T10:10:00.000Z", + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 85, + usedPercent: 15, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + ], + }); + }); + + it("merges multiple Claude rate-limit snapshots into both buckets", () => { + const usage = normalizeProviderUsageFromRateLimits({ + provider: "claudeAgent", + updatedAt: "2026-03-31T10:15:00.000Z", + rateLimits: [ + { + rate_limit_info: { + rateLimitType: "five_hour", + utilization: 0.15, + resetsAt: 1_775_123_456, + }, + }, + { + rate_limit_info: { + rateLimitType: "seven_day", + utilization: 0.42, + resetsAt: 1_775_463_116, + }, + }, + ], + }); + + expect(usage).toEqual({ + updatedAt: "2026-03-31T10:15:00.000Z", + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 85, + usedPercent: 15, + resetsAt: new Date(1_775_123_456_000).toISOString(), + }, + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 58, + usedPercent: 42, + resetsAt: new Date(1_775_463_116_000).toISOString(), + }, + ], + }); + }); + + it("ignores malformed payloads", () => { + expect( + normalizeProviderUsageFromRateLimits({ + provider: "codex", + updatedAt: "2026-03-31T10:05:00.000Z", + rateLimits: { + primary: { + windowDurationMins: 10_080, + }, + }, + }), + ).toBeUndefined(); + }); + + it("ignores partial payloads without enough data to derive a bucket", () => { + expect( + normalizeProviderUsageFromRateLimits({ + provider: "claudeAgent", + updatedAt: "2026-03-31T10:05:00.000Z", + rateLimits: { + rate_limit_info: { + rateLimitType: "seven_day", + }, + }, + }), + ).toBeUndefined(); + }); + + it("merges incremental usage updates without dropping existing buckets", () => { + const merged = mergeProviderUsage( + { + updatedAt: "2026-03-31T10:00:00.000Z", + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 80, + usedPercent: 20, + resetsAt: "2026-03-31T15:00:00.000Z", + }, + ], + }, + { + updatedAt: "2026-03-31T10:05:00.000Z", + buckets: [ + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 60, + usedPercent: 40, + resetsAt: "2026-04-06T10:00:00.000Z", + }, + ], + }, + ); + + expect(merged).toEqual({ + updatedAt: "2026-03-31T10:05:00.000Z", + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 80, + usedPercent: 20, + resetsAt: "2026-03-31T15:00:00.000Z", + }, + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 60, + usedPercent: 40, + resetsAt: "2026-04-06T10:00:00.000Z", + }, + ], + }); + }); +}); diff --git a/apps/server/src/provider/providerUsage.ts b/apps/server/src/provider/providerUsage.ts new file mode 100644 index 0000000000..88e9a4c0e4 --- /dev/null +++ b/apps/server/src/provider/providerUsage.ts @@ -0,0 +1,431 @@ +import type { + ProviderKind, + ServerProvider, + ServerProviderUsage, + ServerProviderUsageBucket, + ServerProviderUsageBucketId, +} from "@t3tools/contracts"; + +function asRecord(value: unknown): Record | null { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : null; +} + +function asRecordArray(value: unknown): ReadonlyArray> | null { + if (!Array.isArray(value)) { + return null; + } + + const records = value.map(asRecord); + if (records.some((record) => record === null)) { + return null; + } + + return records as ReadonlyArray>; +} + +function asNumber(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? value : null; +} + +function asString(value: unknown): string | null { + return typeof value === "string" && value.trim().length > 0 ? value : null; +} + +function toBucketId(value: string | null): ServerProviderUsageBucketId | null { + if (!value) { + return null; + } + + switch (value.trim().toLowerCase()) { + case "five_hour": + case "fivehour": + case "five-hour": + case "5h": + case "session": + case "session_limit": + case "session-limit": + return "fiveHour"; + case "seven_day": + case "sevenday": + case "seven-day": + case "weekly": + case "weekly_limit": + case "weekly-limit": + case "7d": + return "weekly"; + default: + return null; + } +} + +function normalizePercent(value: number | null): number | null { + if (value === null) { + return null; + } + + const normalized = Math.max(0, Math.min(100, value)); + return Math.round(normalized * 1_000_000) / 1_000_000; +} + +function normalizeUtilization(value: unknown): number | null { + const utilization = asNumber(value); + if (utilization === null) { + return null; + } + + const percent = Math.max(0, utilization * 100); + return normalizePercent(percent); +} + +function derivePercentFromUsage(usage: number | null, limit: number | null): number | null { + if (usage === null) { + return null; + } + + if (limit !== null && limit > 0) { + return normalizePercent((usage / limit) * 100); + } + + return normalizePercent(usage); +} + +/** @internal - Exported for testing */ +export function deriveUsedPercentFromRemaining( + remaining: number | null, + limit: number | null, +): number | null { + if (remaining === null) { + return null; + } + + if (limit !== null && limit > 0) { + return normalizePercent(((limit - remaining) / limit) * 100); + } + + return normalizePercent(100 - remaining); +} + +function toIsoDateTime(value: unknown): string | null { + if (typeof value === "string") { + const parsed = Date.parse(value); + return Number.isNaN(parsed) ? null : new Date(parsed).toISOString(); + } + + if (typeof value !== "number" || !Number.isFinite(value)) { + return null; + } + + const epochMs = value > 1_000_000_000_000 ? value : value * 1_000; + return new Date(epochMs).toISOString(); +} + +function usageBucketOrder(id: ServerProviderUsageBucketId): number { + return id === "fiveHour" ? 0 : 1; +} + +function sortBuckets( + buckets: Iterable, +): ReadonlyArray { + return [...buckets].toSorted( + (left, right) => usageBucketOrder(left.id) - usageBucketOrder(right.id), + ); +} + +function deriveRemainingPercent( + usedPercent: number | null, + remainingPercent: number | null, +): number | null { + if (remainingPercent !== null) { + return remainingPercent; + } + if (usedPercent === null) { + return null; + } + return Math.max(0, 100 - usedPercent); +} + +function toUsageBucketLabel(id: ServerProviderUsageBucketId): string { + return id === "fiveHour" ? "Session limit" : "Weekly limit"; +} + +function getRecordValue(record: Record, keys: ReadonlyArray): unknown { + for (const key of keys) { + if (key in record) { + return record[key]; + } + } + return undefined; +} + +function makeUsageBucket(input: { + id: ServerProviderUsageBucketId; + label: string; + resetsAt: unknown; + usedPercent?: unknown; + utilization?: unknown; + remainingPercent?: unknown; + remaining?: unknown; + used?: unknown; + usage?: unknown; + limit?: unknown; +}): ServerProviderUsageBucket | null { + const usedPercent = + normalizePercent(asNumber(input.usedPercent)) ?? + normalizeUtilization(input.utilization) ?? + derivePercentFromUsage(asNumber(input.used), asNumber(input.limit)) ?? + derivePercentFromUsage(asNumber(input.usage), asNumber(input.limit)); + const remainingPercent = normalizePercent(asNumber(input.remainingPercent)); + const remainingCount = asNumber(input.remaining); + const derivedUsedPercentFromRemaining = + remainingPercent === null + ? deriveUsedPercentFromRemaining(remainingCount, asNumber(input.limit)) + : null; + const resolvedUsedPercent = usedPercent ?? derivedUsedPercentFromRemaining; + const resolvedRemainingPercent = deriveRemainingPercent(resolvedUsedPercent, remainingPercent); + const resetsAt = toIsoDateTime(input.resetsAt); + + if (resolvedUsedPercent === null || resolvedRemainingPercent === null || !resetsAt) { + return null; + } + + return { + id: input.id, + label: input.label, + usedPercent: resolvedUsedPercent, + remainingPercent: resolvedRemainingPercent, + resetsAt, + }; +} + +function codexBucketIdFromWindow(value: unknown): ServerProviderUsageBucketId | null { + const windowDurationMins = asNumber(value); + if (windowDurationMins === 300) { + return "fiveHour"; + } + if (windowDurationMins === 10_080) { + return "weekly"; + } + return null; +} + +function codexBucketIdFromWindowSeconds(value: unknown): ServerProviderUsageBucketId | null { + const windowSeconds = asNumber(value); + if (windowSeconds === 18_000) { + return "fiveHour"; + } + if (windowSeconds === 604_800) { + return "weekly"; + } + return null; +} + +function codexBucketIdFromBucket( + bucket: Record, + bucketKey?: string, +): ServerProviderUsageBucketId | null { + return ( + toBucketId( + asString( + getRecordValue(bucket, ["id", "bucketId", "limitId", "rateLimitType", "rate_limit_type"]), + ), + ) ?? + toBucketId(bucketKey ? asString(bucketKey) : null) ?? + codexBucketIdFromWindowSeconds( + getRecordValue(bucket, ["window_seconds", "windowSeconds", "windowDurationSecs"]), + ) ?? + codexBucketIdFromWindow( + getRecordValue(bucket, ["windowDurationMins", "windowDurationMinutes"]), + ) ?? + toBucketId(asString(getRecordValue(bucket, ["label"]))) + ); +} + +function normalizeCodexUsageBuckets(rateLimits: Record) { + const buckets = new Map(); + const rateLimitsByLimitId = asRecord( + getRecordValue(rateLimits, ["rateLimitsByLimitId", "rate_limits_by_limit_id"]), + ); + const candidateBuckets: ReadonlyArray<{ + bucket: Record | null; + key?: string; + }> = [ + { bucket: asRecord(getRecordValue(rateLimits, ["primary"])), key: "primary" }, + { bucket: asRecord(getRecordValue(rateLimits, ["secondary"])), key: "secondary" }, + ...Object.entries(rateLimitsByLimitId ?? {}).flatMap(([key, entry]) => { + const bucket = asRecord(entry); + return bucket ? [{ bucket, key }] : []; + }), + ]; + + const resolvedCandidateBuckets = candidateBuckets.filter( + (candidate): candidate is { bucket: Record; key?: string } => + candidate.bucket !== null, + ); + + for (const { bucket, key } of resolvedCandidateBuckets) { + const bucketId = codexBucketIdFromBucket(bucket, key); + if (!bucketId) { + continue; + } + const normalizedBucket = makeUsageBucket({ + id: bucketId, + label: toUsageBucketLabel(bucketId), + usedPercent: getRecordValue(bucket, ["usedPercent"]), + utilization: getRecordValue(bucket, ["utilization"]), + remainingPercent: getRecordValue(bucket, ["remainingPercent", "remaining"]), + remaining: getRecordValue(bucket, ["remaining"]), + used: getRecordValue(bucket, ["used"]), + usage: getRecordValue(bucket, ["usage"]), + limit: getRecordValue(bucket, ["limit"]), + resetsAt: getRecordValue(bucket, ["resetsAt", "resetAt", "reset_at", "reset"]), + }); + if (normalizedBucket) { + buckets.set(bucketId, normalizedBucket); + } + } + + return sortBuckets(buckets.values()); +} + +function claudeBucketId(rateLimitType: string | null): ServerProviderUsageBucketId | null { + switch (rateLimitType) { + case "five_hour": + case "session": + case "session_limit": + return "fiveHour"; + case "seven_day": + case "seven_day_opus": + case "seven_day_sonnet": + case "weekly": + return "weekly"; + default: + return null; + } +} + +function normalizeClaudeUsageBuckets( + rateLimits: Record | ReadonlyArray>, +) { + let rateLimitInfos: ReadonlyArray>; + if (Array.isArray(rateLimits)) { + rateLimitInfos = rateLimits; + } else { + const resolvedRateLimits = rateLimits as Record; + const resolvedRateLimitInfo = + asRecord(getRecordValue(resolvedRateLimits, ["rate_limit_info", "rateLimitInfo"])) ?? + resolvedRateLimits; + rateLimitInfos = [resolvedRateLimitInfo]; + } + const buckets = new Map(); + + for (const rateLimitInfo of rateLimitInfos) { + const resolvedRateLimitInfo = + asRecord(getRecordValue(rateLimitInfo, ["rate_limit_info", "rateLimitInfo"])) ?? + rateLimitInfo; + const rateLimitType = asString( + getRecordValue(resolvedRateLimitInfo, ["rateLimitType", "rate_limit_type"]), + ); + const bucketId = claudeBucketId(rateLimitType); + if (!bucketId) { + continue; + } + + const bucket = makeUsageBucket({ + id: bucketId, + label: toUsageBucketLabel(bucketId), + utilization: getRecordValue(resolvedRateLimitInfo, ["utilization"]), + remainingPercent: getRecordValue(resolvedRateLimitInfo, ["remainingPercent", "remaining"]), + remaining: getRecordValue(resolvedRateLimitInfo, ["remaining"]), + used: getRecordValue(resolvedRateLimitInfo, ["used"]), + usage: getRecordValue(resolvedRateLimitInfo, ["usage"]), + limit: getRecordValue(resolvedRateLimitInfo, ["limit"]), + resetsAt: getRecordValue(resolvedRateLimitInfo, ["resetsAt", "resetAt", "reset_at"]), + }); + + if (bucket) { + buckets.set(bucketId, bucket); + } + } + + return sortBuckets(buckets.values()); +} + +export function normalizeProviderUsageFromRateLimits(input: { + provider: ProviderKind; + rateLimits: unknown; + updatedAt: string; +}): ServerProviderUsage | undefined { + const rateLimits = asRecord(input.rateLimits); + const rateLimitRecords = asRecordArray(input.rateLimits); + if (!rateLimits && !rateLimitRecords) { + return undefined; + } + + const buckets = + input.provider === "codex" + ? (() => { + if (!rateLimits) { + return []; + } + + const resolvedRateLimits = + asRecord(getRecordValue(rateLimits, ["rateLimits", "rate_limits"])) ?? rateLimits; + return normalizeCodexUsageBuckets(resolvedRateLimits); + })() + : normalizeClaudeUsageBuckets(rateLimitRecords ?? (rateLimits ? [rateLimits] : [])); + + if (buckets.length === 0) { + return undefined; + } + + return { + buckets, + updatedAt: input.updatedAt, + }; +} + +export function mergeProviderUsage( + previous: ServerProviderUsage | undefined, + next: ServerProviderUsage | undefined, +): ServerProviderUsage | undefined { + if (!previous) { + return next; + } + if (!next) { + return previous; + } + + const buckets = new Map(); + for (const bucket of previous.buckets) { + buckets.set(bucket.id, bucket); + } + for (const bucket of next.buckets) { + buckets.set(bucket.id, bucket); + } + + const updatedAtCandidates = [previous.updatedAt, next.updatedAt].filter( + (value): value is string => typeof value === "string", + ); + + return { + buckets: sortBuckets(buckets.values()), + ...(updatedAtCandidates.length > 0 ? { updatedAt: updatedAtCandidates.toSorted().at(-1) } : {}), + }; +} + +export function applyUsageToProviderSnapshot( + provider: ServerProvider, + usage: ServerProviderUsage | undefined, +): ServerProvider { + const mergedUsage = mergeProviderUsage(provider.usage, usage); + if (!mergedUsage) { + return provider; + } + + return { + ...provider, + usage: mergedUsage, + }; +} diff --git a/apps/web/src/components/settings/SettingsPanels.tsx b/apps/web/src/components/settings/SettingsPanels.tsx index d534eefaa4..2363a54257 100644 --- a/apps/web/src/components/settings/SettingsPanels.tsx +++ b/apps/web/src/components/settings/SettingsPanels.tsx @@ -16,6 +16,7 @@ import { type ProviderKind, type ServerProvider, type ServerProviderModel, + type ServerProviderUsageBucket, ThreadId, } from "@t3tools/contracts"; import { DEFAULT_UNIFIED_SETTINGS } from "@t3tools/contracts/settings"; @@ -45,6 +46,11 @@ import { getCustomModelOptionsByProvider, resolveAppModelSelectionState, } from "../../modelSelection"; +import { + formatUsageRemainingPercent, + formatUsageResetAt, + getProviderUsageBuckets, +} from "../../lib/accountQuota"; import { ensureNativeApi, readNativeApi } from "../../nativeApi"; import { useStore } from "../../store"; import { formatRelativeTime, formatRelativeTimeLabel } from "../../timestampFormat"; @@ -65,6 +71,51 @@ import { useServerProviders, } from "../../rpc/serverState"; +function ProviderUsageRows({ + buckets, + timestampFormat, +}: { + buckets: ReadonlyArray; + timestampFormat: "locale" | "12-hour" | "24-hour"; +}) { + if (buckets.length === 0) { + return null; + } + + return ( +
+
+ {buckets.map((bucket) => ( +
+
+ {bucket.label} + + {formatUsageRemainingPercent(bucket)} + +
+
+
+
+
+ {formatUsageResetAt(bucket.resetsAt, timestampFormat)} +
+
+ ))} +
+
+ ); +} + const THEME_OPTIONS = [ { value: "system", @@ -1104,6 +1155,7 @@ export function GeneralSettingsPanel() { const customModelError = customModelErrorByProvider[providerCard.provider] ?? null; const providerDisplayName = PROVIDER_DISPLAY_NAMES[providerCard.provider] ?? providerCard.title; + const usageBuckets = getProviderUsageBuckets(providerCard.liveProvider); return (
@@ -1194,6 +1246,11 @@ export function GeneralSettingsPanel() {
+ + diff --git a/apps/web/src/lib/accountQuota.test.ts b/apps/web/src/lib/accountQuota.test.ts new file mode 100644 index 0000000000..41f29ec396 --- /dev/null +++ b/apps/web/src/lib/accountQuota.test.ts @@ -0,0 +1,77 @@ +import { describe, expect, it } from "vitest"; +import type { ServerProvider } from "@t3tools/contracts"; + +import { + formatUsageRemainingPercent, + formatUsageResetAt, + getProviderUsageBuckets, +} from "./accountQuota"; + +function makeProvider( + input: Partial & Pick, +): ServerProvider { + return { + enabled: true, + installed: true, + version: "0.117.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-03-30T00:00:00.000Z", + models: [], + ...input, + }; +} + +describe("accountQuota", () => { + it("returns provider usage buckets in normalized order", () => { + const buckets = getProviderUsageBuckets( + makeProvider({ + provider: "codex", + usage: { + buckets: [ + { + id: "fiveHour", + label: "Session limit", + remainingPercent: 67, + usedPercent: 33, + resetsAt: "2026-04-01T05:30:00.000Z", + }, + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 71, + usedPercent: 29, + resetsAt: "2026-04-06T05:12:45.000Z", + }, + ], + updatedAt: "2026-03-31T10:00:00.000Z", + }, + }), + ); + + expect(buckets).toHaveLength(2); + expect(buckets[0]?.id).toBe("fiveHour"); + expect(buckets[1]?.id).toBe("weekly"); + }); + + it("formats compact remaining percentages", () => { + expect( + formatUsageRemainingPercent({ + id: "weekly", + label: "Weekly limit", + remainingPercent: 58, + usedPercent: 42, + resetsAt: "2026-04-06T05:12:45.000Z", + }), + ).toBe("58% remaining"); + }); + + it("formats reset timestamps using the selected time format", () => { + expect(formatUsageResetAt("2026-04-06T17:05:00.000Z", "24-hour")).not.toMatch(/[AP]M/); + expect(formatUsageResetAt("2026-04-06T17:05:00.000Z", "12-hour")).toMatch(/[AP]M/); + }); + + it("returns an empty list when usage is unavailable", () => { + expect(getProviderUsageBuckets(makeProvider({ provider: "claudeAgent" }))).toEqual([]); + }); +}); diff --git a/apps/web/src/lib/accountQuota.ts b/apps/web/src/lib/accountQuota.ts new file mode 100644 index 0000000000..4984ae8d3e --- /dev/null +++ b/apps/web/src/lib/accountQuota.ts @@ -0,0 +1,40 @@ +import type { + ServerProvider, + ServerProviderUsageBucket, + TimestampFormat, +} from "@t3tools/contracts"; +import { getTimestampFormatOptions } from "../timestampFormat"; + +export function getProviderUsageBuckets( + provider: ServerProvider | null | undefined, +): ReadonlyArray { + return provider?.usage?.buckets ?? []; +} + +export function formatUsageRemainingPercent(bucket: ServerProviderUsageBucket): string { + const rounded = Number.isInteger(bucket.remainingPercent) + ? bucket.remainingPercent + : Number(bucket.remainingPercent.toFixed(1)); + return `${rounded}% remaining`; +} + +const usageResetFormatterCache = new Map(); + +function getUsageResetFormatter(timestampFormat: TimestampFormat): Intl.DateTimeFormat { + const cached = usageResetFormatterCache.get(timestampFormat); + if (cached) { + return cached; + } + + const formatter = new Intl.DateTimeFormat(undefined, { + month: "short", + day: "numeric", + ...getTimestampFormatOptions(timestampFormat, false), + }); + usageResetFormatterCache.set(timestampFormat, formatter); + return formatter; +} + +export function formatUsageResetAt(isoDate: string, timestampFormat: TimestampFormat): string { + return `Resets ${getUsageResetFormatter(timestampFormat).format(new Date(isoDate))}`; +} diff --git a/apps/web/src/rpc/serverState.test.ts b/apps/web/src/rpc/serverState.test.ts index 721ce25fb5..9357951855 100644 --- a/apps/web/src/rpc/serverState.test.ts +++ b/apps/web/src/rpc/serverState.test.ts @@ -68,6 +68,11 @@ const baseServerConfig: ServerConfig = { const serverApi = { getConfig: vi.fn<() => Promise>(), + refreshProviders: vi.fn< + () => Promise<{ + providers: ReadonlyArray; + }> + >(), subscribeConfig: vi.fn((listener: (event: ServerConfigStreamEvent) => void) => registerListener(configListeners, listener), ), @@ -107,6 +112,7 @@ beforeEach(() => { vi.clearAllMocks(); lifecycleListeners.clear(); configListeners.clear(); + serverApi.refreshProviders.mockResolvedValue({ providers: defaultProviders }); resetServerStateForTests(); }); @@ -129,6 +135,7 @@ describe("serverState", () => { expect(serverApi.subscribeConfig).toHaveBeenCalledOnce(); expect(serverApi.subscribeLifecycle).toHaveBeenCalledOnce(); expect(serverApi.getConfig).toHaveBeenCalledOnce(); + expect(serverApi.refreshProviders).toHaveBeenCalledOnce(); expect(configListener).toHaveBeenCalledWith( { issues: [], @@ -146,7 +153,7 @@ describe("serverState", () => { providers: defaultProviders, settings: DEFAULT_SERVER_SETTINGS, }, - "snapshot", + "providerStatuses", ); unsubscribeLate(); @@ -174,10 +181,15 @@ describe("serverState", () => { expect(getServerConfig()).toEqual(streamedConfig); }); + await waitFor(() => { + expect(serverApi.refreshProviders).toHaveBeenCalledOnce(); + }); + deferred.resolve(baseServerConfig); await new Promise((resolve) => setTimeout(resolve, 0)); expect(getServerConfig()).toEqual(streamedConfig); + expect(serverApi.refreshProviders).toHaveBeenCalledOnce(); stop(); }); @@ -280,24 +292,22 @@ describe("serverState", () => { }); expect(providersListener).toHaveBeenLastCalledWith({ providers: nextProviders }); - expect(configListener).toHaveBeenNthCalledWith( - 2, + expect(configListener.mock.calls).toContainEqual([ { issues: [{ kind: "keybindings.malformed-config", message: "bad json" }], providers: defaultProviders, settings: DEFAULT_SERVER_SETTINGS, }, "keybindingsUpdated", - ); - expect(configListener).toHaveBeenNthCalledWith( - 3, + ]); + expect(configListener.mock.calls).toContainEqual([ { issues: [{ kind: "keybindings.malformed-config", message: "bad json" }], providers: nextProviders, settings: DEFAULT_SERVER_SETTINGS, }, "providerStatuses", - ); + ]); expect(configListener).toHaveBeenLastCalledWith( { issues: [{ kind: "keybindings.malformed-config", message: "bad json" }], @@ -314,4 +324,39 @@ describe("serverState", () => { unsubscribeConfig(); stop(); }); + + it("applies eager refreshProviders results without waiting for stream events", async () => { + const refreshedProviders: ReadonlyArray = [ + { + ...defaultProviders[0]!, + checkedAt: "2026-01-03T00:00:00.000Z", + usage: { + updatedAt: "2026-01-03T00:00:00.000Z", + buckets: [ + { + id: "weekly", + label: "Weekly limit", + usedPercent: 12, + remainingPercent: 88, + resetsAt: "2026-01-08T16:19:00.000Z", + }, + ], + }, + }, + ]; + serverApi.getConfig.mockResolvedValueOnce(baseServerConfig); + serverApi.refreshProviders.mockResolvedValueOnce({ providers: refreshedProviders }); + + const stop = startServerStateSync(serverApi); + + await waitFor(() => { + expect(getServerConfig()).toEqual({ + ...baseServerConfig, + providers: refreshedProviders, + }); + }); + + expect(serverApi.refreshProviders).toHaveBeenCalledOnce(); + stop(); + }); }); diff --git a/apps/web/src/rpc/serverState.ts b/apps/web/src/rpc/serverState.ts index 3e8dbab28c..610885a434 100644 --- a/apps/web/src/rpc/serverState.ts +++ b/apps/web/src/rpc/serverState.ts @@ -26,7 +26,7 @@ export interface ServerConfigUpdatedNotification { type ServerStateClient = Pick< WsRpcClient["server"], - "getConfig" | "subscribeConfig" | "subscribeLifecycle" + "getConfig" | "refreshProviders" | "subscribeConfig" | "subscribeLifecycle" >; function makeStateAtom(label: string, initialValue: A) { @@ -168,6 +168,24 @@ export function onProvidersUpdated( export function startServerStateSync(client: ServerStateClient): () => void { let disposed = false; + let providerRefreshRequested = false; + + const requestProviderRefresh = () => { + if (disposed || providerRefreshRequested) { + return; + } + providerRefreshRequested = true; + void client + .refreshProviders() + .then((payload) => { + if (disposed) { + return; + } + applyProvidersUpdated(payload); + }) + .catch(() => undefined); + }; + const cleanups = [ client.subscribeLifecycle((event) => { if (event.type === "welcome") { @@ -176,6 +194,9 @@ export function startServerStateSync(client: ServerStateClient): () => void { }), client.subscribeConfig((event) => { applyServerConfigEvent(event); + if (event.type === "snapshot") { + requestProviderRefresh(); + } }), ]; @@ -187,8 +208,11 @@ export function startServerStateSync(client: ServerStateClient): () => void { return; } setServerConfigSnapshot(config); + requestProviderRefresh(); }) .catch(() => undefined); + } else { + requestProviderRefresh(); } return () => { diff --git a/packages/contracts/src/server.test.ts b/packages/contracts/src/server.test.ts new file mode 100644 index 0000000000..6bb9f2ebff --- /dev/null +++ b/packages/contracts/src/server.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it } from "vitest"; +import { Schema } from "effect"; + +import { ServerProvider, ServerProviderUsage, ServerProviderUsageBucket } from "./server"; + +describe("server contracts", () => { + it("decodes normalized provider usage buckets", () => { + const bucket = Schema.decodeUnknownSync(ServerProviderUsageBucket)({ + id: "fiveHour", + label: "Session limit", + remainingPercent: 63, + usedPercent: 37, + resetsAt: "2026-03-31T12:00:00.000Z", + }); + + expect(bucket.id).toBe("fiveHour"); + expect(bucket.remainingPercent).toBe(63); + }); + + it("decodes provider usage payloads", () => { + const usage = Schema.decodeUnknownSync(ServerProviderUsage)({ + updatedAt: "2026-03-31T10:00:00.000Z", + buckets: [ + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 58, + usedPercent: 42, + resetsAt: "2026-04-06T05:12:45.000Z", + }, + ], + }); + + expect(usage.buckets).toHaveLength(1); + expect(usage.buckets[0]?.id).toBe("weekly"); + }); + + it("allows usage on server providers", () => { + const provider = Schema.decodeUnknownSync(ServerProvider)({ + provider: "codex", + enabled: true, + installed: true, + version: "0.117.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-03-31T10:00:00.000Z", + usage: { + updatedAt: "2026-03-31T10:00:00.000Z", + buckets: [ + { + id: "weekly", + label: "Weekly limit", + remainingPercent: 58, + usedPercent: 42, + resetsAt: "2026-04-06T05:12:45.000Z", + }, + ], + }, + models: [], + }); + + expect(provider.usage?.buckets[0]?.label).toBe("Weekly limit"); + }); +}); diff --git a/packages/contracts/src/server.ts b/packages/contracts/src/server.ts index 776a0a89e9..80720c29c6 100644 --- a/packages/contracts/src/server.ts +++ b/packages/contracts/src/server.ts @@ -56,6 +56,32 @@ export const ServerProviderModel = Schema.Struct({ }); export type ServerProviderModel = typeof ServerProviderModel.Type; +export const ServerProviderUsageBucketId = Schema.Literals(["fiveHour", "weekly"]); +export type ServerProviderUsageBucketId = typeof ServerProviderUsageBucketId.Type; + +const PercentageSchema = Schema.Number.check(Schema.isGreaterThanOrEqualTo(0)).check( + Schema.isLessThanOrEqualTo(100), +); + +export const ServerProviderUsageBucket = Schema.Struct({ + id: ServerProviderUsageBucketId, + label: TrimmedNonEmptyString, + remainingPercent: PercentageSchema.annotate({ + description: "Remaining usage percentage in the provider-defined bucket window.", + }), + usedPercent: PercentageSchema.annotate({ + description: "Used usage percentage in the provider-defined bucket window.", + }), + resetsAt: IsoDateTime, +}); +export type ServerProviderUsageBucket = typeof ServerProviderUsageBucket.Type; + +export const ServerProviderUsage = Schema.Struct({ + buckets: Schema.Array(ServerProviderUsageBucket), + updatedAt: Schema.optional(IsoDateTime), +}); +export type ServerProviderUsage = typeof ServerProviderUsage.Type; + export const ServerProvider = Schema.Struct({ provider: ProviderKind, enabled: Schema.Boolean, @@ -65,6 +91,9 @@ export const ServerProvider = Schema.Struct({ auth: ServerProviderAuth, checkedAt: IsoDateTime, message: Schema.optional(TrimmedNonEmptyString), + account: Schema.optional(Schema.Unknown), + rateLimits: Schema.optional(Schema.Unknown), + usage: Schema.optional(ServerProviderUsage), models: Schema.Array(ServerProviderModel), }); export type ServerProvider = typeof ServerProvider.Type;