diff --git a/CLAUDE.md b/CLAUDE.md index 853c084..055fecd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -202,7 +202,7 @@ Auto-failover on 429/401 is primary; `pool rotate` is an override. - **Classification** (`classifyFailover`, `internal/proxy/pool_failover.go`, from `SluiceAddon.Response` for pooled destinations): `429`/`403 + insufficient_quota` -> rate-limited; `401`/token-body `invalid_grant`/`invalid_token` -> auth-failure; `5xx`/other -> no-op. Token-endpoint body trusted only when the request URL matched the OAuth index. - **Pool attribution** (`poolForResponse`): a response is pool-attributed either (a) the flow's CONNECT host has a pooled binding (API-host 429/403), or (b) the request URL matches the OAuth token-URL index for a member (token-endpoint 401/`invalid_grant`). (b) is essential — an OAuth refresh hits `auth.openai.com` (no pool binding; only `api.openai.com` has one), so without it the token-endpoint classification is dead code for Codex. Member recovery + fail-closed are the R1 mechanism above (`OAuthIndex.MatchAll` + the refresh-token join key, never `OAuthIndex.Match`). -- **Synchronous in-memory failover (I1):** health is updated in-process before the response returns (`MarkCooldown` write lock, `ResolveActive` read lock) so the switch never waits on the 2s watcher (which only reconciles); a detached `onFailover` also writes `SetCredentialHealth(member,'cooldown',now+ttl,reason)` for durability. **Cooldown window (B1):** `cooldownFromResponse(class, f.Response.Header)` (`internal/proxy/pool_failover.go`) derives the TTL from the upstream recovery hints — `Retry-After` (delta-seconds or HTTP-date), then `x-ratelimit-reset` / `x-ratelimit-reset-requests` / `x-ratelimit-reset-tokens` (delta or epoch) — clamped to `[floor(class), vault.MaxCooldown=6h]`; no hint falls back to the class default (`vault.RateLimitCooldown`=60s / `vault.AuthFailCooldown`=300s). Floors: rate-limit `vault.MinRateLimitFloor`=10s (a short parsed window is honored, not floored up to 60s), auth-failure `AuthFailCooldown` (a revoked/expired token is never re-probed in seconds). This honors the real multi-hour quota window so a usage-limited member is not re-probed every 60s (the degrade-flap root cause). No body parsing yet. **Cooldown extension is monotonic on both layers:** a member parked 300s for auth that then trips a 60s rate-limit keeps the LATER expiry — `MarkCooldown` and `SetCredentialHealth`'s `cooldown` upsert (CASE-compared against the stored future `cooldown_until`) both keep `max(existing-future, new)`. Only extend is monotonic: an explicit clear (zero/past `until`) and any transition to `healthy` still shorten/clear, and lazy expiry still wins over an expired stored cooldown. No in-flight retry — next request uses the new member. +- **Synchronous in-memory failover (I1):** health is updated in-process before the response returns (`MarkCooldown` write lock, `ResolveActive` read lock) so the switch never waits on the 2s watcher (which only reconciles); a detached `onFailover` also writes `SetCredentialHealth(member,'cooldown',now+ttl,reason)` for durability. **Cooldown window (B1):** `cooldownFromResponse(class, f.Response.Header, f.Response.Body)` (`internal/proxy/pool_failover.go`) derives the TTL from the upstream recovery hints, spanning the conventions across AI providers and general rate limiters. Headers are tried in precedence order (first usable wins; `Retry-After` MUST be first per IETF draft-ietf-httpapi-ratelimit-headers): `Retry-After` (RFC 9110 delta-seconds or HTTP-date), `RateLimit-Reset` (IETF draft, delta-seconds), `X-RateLimit-Reset` (generic; GitHub/Twitter unix epoch, others delta), `X-RateLimit-Reset-After` (Discord, delta float), OpenAI `x-ratelimit-reset-requests` / `x-ratelimit-reset-tokens` (unit-suffixed durations), Anthropic `anthropic-ratelimit-requests-reset` / `anthropic-ratelimit-tokens-reset` (RFC3339/ISO-8601 absolute timestamp). Each value parses as delta-seconds, a unix epoch (magnitude-disambiguated at `epochThreshold` ~2001), an HTTP-date (Retry-After only), an RFC3339/ISO-8601 timestamp, or a unit-suffixed duration; clamped to `[floor(class), vault.MaxCooldown=6h]`. When NO usable header is present it parses the JSON body (`parseBodyRecoveryHint`, skipped if empty or >64 KiB) for `resets_in_seconds`, `resets_at`, `retry_after`, or `reset_after` — probed top-level FIRST then nested under a top-level `error` object — covering OpenAI Codex `usage_limit_reached` (`resets_in_seconds`/`resets_at`) and the Discord/generic `retry_after`/`reset_after` conventions. Each body value uses the same epoch-vs-delta disambiguation as the headers; the body reset is authoritative and can be hours/days out, so a body-derived window is clamped to the higher `[floor(class), vault.MaxUsageLimitCooldown=24h]`. Codex usage-limit 429s carry no recovery header, so without the body fallback the pool re-probes an account that won't reset for hours every 60s (the degrade-flap root cause). Neither hint falls back to the class default (`vault.RateLimitCooldown`=60s / `vault.AuthFailCooldown`=300s). Floors: rate-limit `vault.MinRateLimitFloor`=10s (a short parsed window is honored, not floored up to 60s), auth-failure `AuthFailCooldown` (a revoked/expired token is never re-probed in seconds). **Cooldown extension is monotonic on both layers:** a member parked 300s for auth that then trips a 60s rate-limit keeps the LATER expiry — `MarkCooldown` and `SetCredentialHealth`'s `cooldown` upsert (CASE-compared against the stored future `cooldown_until`) both keep `max(existing-future, new)`. Only extend is monotonic: an explicit clear (zero/past `until`) and any transition to `healthy` still shorten/clear, and lazy expiry still wins over an expired stored cooldown. No in-flight retry — next request uses the new member. - **Reload doesn't resurrect a cooled member:** the durable write is detached/best-effort, so any reload (SIGHUP or the 2s watcher on any unrelated DB write) rebuilds the resolver from store rows via `NewPoolResolver`; `Server.StorePool` calls `PoolResolver.MergeLiveCooldowns(prev)` to carry forward still-active in-memory cooldowns before the atomic swap (monotonic; drops cooldowns for credentials no longer in any pool). - **Exhaustion + edge-triggered notices (A1/A2):** a pool is exhausted iff `PoolResolver.HasHealthyMember(pool)` (RLock, single `now`, mirrors `cooling()` lazy-expiry) is false — NOT `to == from`. Per-pool exhaustion state lives on the long-lived `Server` (`poolExhausted` map, NOT `PoolHealth`, so it survives resolver swaps and is not pruned on membership change). `handlePoolFailover` emits one "pool exhausted" notice on the `false->true` edge and wakes a dedicated recovery monitor goroutine (cap-1 `recoveryWake`). The monitor (`internal/proxy/server.go`, started in `New`, stopped idempotently from both `Close` and `GracefulShutdown`) sleeps until `SoonestCooldown(pool)` (clamped to a ~1s floor), `Load()`s the current resolver each wake, and on `HasHealthyMember -> true` flips `true->false`, emits one "pool recovered" notice (`FormatPoolRecoveredNotice`), and invokes `onPoolRecovered`. This replaces the old per-cooldown-window flap that respammed `cred_failover` + a Telegram notice every ~30/60s. - **Recovery auto-reset (opt-in, per pool):** if the recovered pool has a non-empty `auth_reset_target`, `onPoolRecovered` (wired in `cmd/sluice/main.go` via `wirePoolRecovery`) calls `containerMgr.ResetAuth(ctx, target)` in a detached goroutine with a fresh bounded context and emits an `agent_auth_reset` audit event (`Verdict "recover"`, `Credential` = pool, `Reason` = target). Empty target = no reset (opt-out default); a `ResetAuth` error is logged, not fatal. This un-latches an agent (hermes) that latched "usage limit reached" so it resumes without a manual `auth reset`. diff --git a/internal/proxy/pool_failover.go b/internal/proxy/pool_failover.go index 3624c0a..d971cf7 100644 --- a/internal/proxy/pool_failover.go +++ b/internal/proxy/pool_failover.go @@ -2,6 +2,7 @@ package proxy import ( "bytes" + "encoding/json" "fmt" "log" "math" @@ -155,56 +156,181 @@ func classDefaultCooldown(class failoverClass) time.Duration { } // recoveryHintHeaders are the response headers, in precedence order, from -// which cooldownFromResponse derives the real recovery window. Retry-After is -// the standard signal; the x-ratelimit-reset* family is provider-specific -// (OpenAI emits per-resource reset hints). Each value is parsed as either a -// delta-seconds count, an HTTP-date (Retry-After only), or — for the reset -// family — a unix epoch. +// which cooldownFromResponse derives the real recovery window. The list spans +// the conventions across AI providers and general rate limiters: +// +// - Retry-After (RFC 9110: delta-seconds or HTTP-date) MUST be first per +// IETF draft-ietf-httpapi-ratelimit-headers. +// - RateLimit-Reset (IETF draft, delta-seconds, no x- prefix). +// - X-RateLimit-Reset (generic: GitHub/Twitter emit a unix EPOCH; others a +// delta — disambiguated by magnitude). +// - X-RateLimit-Reset-After (Discord: delta-seconds, may be a float). +// - x-ratelimit-reset-requests / x-ratelimit-reset-tokens (OpenAI: +// unit-suffixed durations like "6m0s" / "1.5s"). +// - anthropic-ratelimit-requests-reset / anthropic-ratelimit-tokens-reset +// (Anthropic: an ISO-8601 / RFC3339 absolute timestamp). +// +// Each value is parsed by parseRecoveryHint as a delta-seconds count, a unix +// epoch (magnitude-disambiguated), an HTTP-date (Retry-After only), an +// RFC3339/ISO-8601 timestamp, or a unit-suffixed duration. http.Header.Get is +// case-insensitive so the canonical casing here matches any wire casing. var recoveryHintHeaders = []string{ "Retry-After", - "x-ratelimit-reset", + "RateLimit-Reset", + "X-RateLimit-Reset", + "X-RateLimit-Reset-After", "x-ratelimit-reset-requests", "x-ratelimit-reset-tokens", + "anthropic-ratelimit-requests-reset", + "anthropic-ratelimit-tokens-reset", } // cooldownFromResponse derives the cooldown duration for a failed pool member -// from the upstream's recovery hint headers, clamped to the class bounds. +// from the upstream's recovery hints, clamped to the class bounds. // -// Precedence: Retry-After (delta-seconds or HTTP-date), then the -// x-ratelimit-reset* family (delta-seconds or unix epoch). The first header -// that yields a positive duration wins. With NO usable hint the flat class -// default is returned (RateLimitCooldown / AuthFailCooldown), so behavior is -// unchanged from before B1 on responses that carry no hint. +// Precedence: // -// The parsed window is clamped to [minFloorForClass(class), MaxCooldown]: a -// rate-limit hint may shrink the cooldown down to MinRateLimitFloor (honoring -// a short real window), an auth failure stays floored at AuthFailCooldown, and -// any absurd/hostile value is capped at MaxCooldown so a member is never -// parked indefinitely. +// 1. Headers, in recoveryHintHeaders precedence order (Retry-After first per +// the IETF draft, then RateLimit-Reset / X-RateLimit-Reset[-After] / the +// OpenAI x-ratelimit-reset-* family / the Anthropic anthropic-ratelimit-*- +// reset family). Each value is parsed as delta-seconds, a unix epoch +// (magnitude-disambiguated), an HTTP-date (Retry-After only), an +// RFC3339/ISO-8601 timestamp, or a unit-suffixed duration. The first header +// that yields a positive duration wins and is clamped to +// [minFloorForClass(class), MaxCooldown] (a rate-limit hint may shrink the +// cooldown down to MinRateLimitFloor; an auth failure stays floored at +// AuthFailCooldown; an absurd/hostile value is capped at MaxCooldown). +// 2. If NO header hint is present, the response BODY is consulted for a reset +// window. The candidate fields (top-level first, then nested under a +// top-level error object) are resets_in_seconds, resets_at, retry_after, +// and reset_after — covering OpenAI Codex usage_limit_reached +// (resets_in_seconds / resets_at) and the Discord/generic retry_after / +// reset_after conventions. A body-derived window is clamped to +// [minFloorForClass(class), MaxUsageLimitCooldown] — the higher cap, +// because the body reset is authoritative and can be many hours/days out +// (a usage limit, not a transient rate-limit window). +// 3. With neither, the flat class default is returned (RateLimitCooldown / +// AuthFailCooldown), so behavior is unchanged from before B1 on responses +// that carry no usable hint. // -// No body parsing in v1 (deferred until a real Codex 429 is captured — see the -// plan's Post-Completion note); only headers are consulted. -// -// Takes the response header directly (not *http.Response) so it works -// uniformly against the go-mitmproxy Flow's Response.Header at the call site -// and a plain http.Header in tests. -func cooldownFromResponse(class failoverClass, header http.Header) time.Duration { - if header == nil { - return classDefaultCooldown(class) - } +// Takes the response header AND body directly (not *http.Response) so it works +// uniformly against the go-mitmproxy Flow's Response.{Header,Body} at the call +// site and plain values in tests. A nil header / nil body is safe. +func cooldownFromResponse(class failoverClass, header http.Header, body []byte) time.Duration { now := time.Now() - for _, h := range recoveryHintHeaders { - raw := strings.TrimSpace(header.Get(h)) - if raw == "" { - continue + if header != nil { + for _, h := range recoveryHintHeaders { + raw := strings.TrimSpace(header.Get(h)) + if raw == "" { + continue + } + d, ok := parseRecoveryHint(h, raw, now) + if !ok || d <= 0 { + continue + } + return clampCooldownTo(class, d, vault.MaxCooldown) } - d, ok := parseRecoveryHint(h, raw, now) - if !ok || d <= 0 { + } + // No usable header hint: fall back to the response body. OpenAI Codex + // usage-limit 429s carry NO Retry-After / x-ratelimit-reset header — the + // real reset window is in the JSON body (resets_in_seconds / resets_at). + if d, ok := parseBodyRecoveryHint(body, now); ok { + return clampCooldownTo(class, d, vault.MaxUsageLimitCooldown) + } + return classDefaultCooldown(class) +} + +// bodyRecoveryHint mirrors the small subset of a 429 error body that carries a +// reset window across providers. Every field is a number in the wire format: +// resets_in_seconds / retry_after / reset_after are deltas from now while +// resets_at is a unix epoch — but a value large enough to be a plausible epoch +// is treated as one regardless of field name (the same magnitude +// disambiguation as the header path), so a provider that puts an epoch in +// retry_after is handled too. type is intentionally NOT required (many +// responses omit it); the presence of a positive reset field is the gate. +// +// The same field set is probed at the top level AND nested under a top-level +// error object (many providers wrap errors as {"error":{...}}), so the +// embedded errorBody mirrors the four fields. +type bodyRecoveryHint struct { + ResetsInSeconds *float64 `json:"resets_in_seconds"` + ResetsAt *float64 `json:"resets_at"` + RetryAfter *float64 `json:"retry_after"` + ResetAfter *float64 `json:"reset_after"` + Error *struct { + ResetsInSeconds *float64 `json:"resets_in_seconds"` + ResetsAt *float64 `json:"resets_at"` + RetryAfter *float64 `json:"retry_after"` + ResetAfter *float64 `json:"reset_after"` + } `json:"error"` +} + +// maxBodyRecoveryHintBytes bounds the body parseBodyRecoveryHint will attempt +// to JSON-decode. Failover error bodies are tiny JSON; anything larger is not +// a quota/auth error envelope and is skipped (the caller falls back to the +// class default). +const maxBodyRecoveryHintBytes = 64 << 10 // 64 KiB + +// parseBodyRecoveryHint parses a 429 response body for a reset window. It +// probes resets_in_seconds, resets_at, retry_after, and reset_after — at the +// top level FIRST, then nested under a top-level error object — and returns +// the first that yields a positive duration. Each candidate is interpreted +// like the header bare-numeric path: a value at/above epochThreshold is a unix +// epoch (minus now), otherwise a delta-from-now. It is deliberately tolerant: +// an empty, oversized, non-JSON, unrelated, or field-less body returns +// (0, false) so the caller falls back to the class default. Non-finite +// (NaN/±Inf), negative, zero, out-of-range, and zero-or-past values are +// rejected with the same guards as parseRecoveryHint, so a hostile or stale +// body can never produce a wrong/negative duration before clamping. +func parseBodyRecoveryHint(body []byte, now time.Time) (time.Duration, bool) { + if len(body) == 0 || len(body) > maxBodyRecoveryHintBytes { + return 0, false + } + var h bodyRecoveryHint + if err := json.Unmarshal(body, &h); err != nil { + return 0, false + } + // Top-level candidates first, in the documented field order, then the same + // fields nested under a top-level error object. The first that yields a + // positive duration wins. + candidates := []*float64{h.ResetsInSeconds, h.ResetsAt, h.RetryAfter, h.ResetAfter} + if h.Error != nil { + candidates = append(candidates, + h.Error.ResetsInSeconds, h.Error.ResetsAt, h.Error.RetryAfter, h.Error.ResetAfter) + } + for _, c := range candidates { + if c == nil { continue } - return clampCooldown(class, d) + if d, ok := bodyHintFromSeconds(*c, now); ok { + return d, true + } } - return classDefaultCooldown(class) + return 0, false +} + +// bodyHintFromSeconds converts a single body reset value to a positive +// duration, mirroring the header bare-numeric path: reject non-finite, +// negative, zero, and out-of-range magnitudes; treat a value at/above +// epochThreshold as a unix epoch (minus now), otherwise as a delta-from-now; +// cap a delta in float space before the nanosecond multiply so the +// int64(time.Duration) conversion can never overflow (the caller's clamp +// applies the real MaxUsageLimitCooldown ceiling). +func bodyHintFromSeconds(secs float64, now time.Time) (time.Duration, bool) { + if secs <= 0 || math.IsNaN(secs) || math.IsInf(secs, 0) || secs > maxRecoveryHintSeconds { + return 0, false + } + if secs >= epochThreshold { + if d := time.Unix(int64(secs), 0).Sub(now); d > 0 { + return d, true + } + return 0, false + } + deltaSec := secs + if maxSec := vault.MaxUsageLimitCooldown.Seconds(); deltaSec > maxSec { + deltaSec = maxSec + } + return time.Duration(deltaSec * float64(time.Second)), true } // parseRecoveryHint parses a single recovery-hint header value into a positive @@ -236,6 +362,12 @@ func cooldownFromResponse(class failoverClass, header http.Header) time.Duration // yet small enough that int64(secs) cannot overflow on the epoch branch. const maxRecoveryHintSeconds = 1e12 +// epochThreshold disambiguates a bare-numeric reset value: at/above this it is +// treated as a unix epoch (~2001-09 and later), below it as a delta-seconds +// count. Shared by the header (parseRecoveryHint) and body (bodyHintFromSeconds) +// paths so both interpret a magnitude identically. +const epochThreshold = 1_000_000_000 + func parseRecoveryHint(header, raw string, now time.Time) (time.Duration, bool) { // Bare numeric: delta-seconds for Retry-After; for the reset family it may // be either delta-seconds OR a unix epoch. Disambiguate by magnitude — a @@ -251,7 +383,6 @@ func parseRecoveryHint(header, raw string, now time.Time) (time.Duration, bool) if secs < 0 || math.IsNaN(secs) || math.IsInf(secs, 0) || secs > maxRecoveryHintSeconds { return 0, false } - const epochThreshold = 1_000_000_000 // ~2001-09; below this, treat as delta-seconds if header != "Retry-After" && secs >= epochThreshold { until := time.Unix(int64(secs), 0) if d := until.Sub(now); d > 0 { @@ -261,7 +392,7 @@ func parseRecoveryHint(header, raw string, now time.Time) (time.Duration, bool) } // Delta path: cap in float space before the *float64(time.Second) // multiply so the int64(time.Duration) conversion can never overflow. - // clampCooldown applies the real ceiling (MaxCooldown) afterward; this + // clampCooldownTo applies the real ceiling (MaxCooldown) afterward; this // only prevents the conversion itself from wrapping. deltaSec := secs if maxSec := vault.MaxCooldown.Seconds(); deltaSec > maxSec { @@ -270,9 +401,9 @@ func parseRecoveryHint(header, raw string, now time.Time) (time.Duration, bool) return time.Duration(deltaSec * float64(time.Second)), true } // HTTP-date (Retry-After absolute form, per RFC 9110). Only Retry-After is - // permitted to carry an absolute HTTP-date; the x-ratelimit-reset* family is - // numeric (delta-seconds / unix epoch / unit-suffixed duration) and must not - // be coerced through HTTP-date parsing. + // permitted to carry an absolute HTTP-date; the numeric reset families + // (delta-seconds / unix epoch / unit-suffixed duration) must not be coerced + // through HTTP-date parsing. if header == "Retry-After" { if t, err := http.ParseTime(raw); err == nil { if d := t.Sub(now); d > 0 { @@ -281,22 +412,35 @@ func parseRecoveryHint(header, raw string, now time.Time) (time.Duration, bool) return 0, false } } - // Unit-suffixed duration (e.g. OpenAI "1.5s", "60ms"). + // RFC3339 / ISO-8601 absolute timestamp (Anthropic's + // anthropic-ratelimit-*-reset). Safe to attempt for ANY header: a bare + // number or unit-suffixed duration does not parse as RFC3339, so this only + // fires on a real timestamp. A future timestamp's distance from now is the + // recovery window; a past/equal one is not usable. + if t, err := time.Parse(time.RFC3339, raw); err == nil { + if d := t.Sub(now); d > 0 { + return d, true + } + return 0, false + } + // Unit-suffixed duration (e.g. OpenAI "1.5s", "60ms", "6m0s"). if d, err := time.ParseDuration(raw); err == nil && d > 0 { return d, true } return 0, false } -// clampCooldown bounds a derived cooldown to [minFloorForClass(class), -// MaxCooldown]. -func clampCooldown(class failoverClass, d time.Duration) time.Duration { +// clampCooldownTo bounds a derived cooldown to [minFloorForClass(class), max]. +// The header path passes vault.MaxCooldown (transient window ceiling); the +// body path (usage_limit_reached) passes vault.MaxUsageLimitCooldown, which is +// higher because the body reset is authoritative and can be many hours/days. +func clampCooldownTo(class failoverClass, d, ceil time.Duration) time.Duration { floor := minFloorForClass(class) if d < floor { return floor } - if d > vault.MaxCooldown { - return vault.MaxCooldown + if d > ceil { + return ceil } return d } @@ -667,13 +811,19 @@ func (a *SluiceAddon) handlePoolFailover(f *mitmproxy.Flow) { return } - // B1: derive the cooldown window from the upstream's recovery hints - // (Retry-After / x-ratelimit-reset*) instead of the flat class TTL, so a - // quota-exhausted member is parked for the REAL window rather than being - // re-probed every RateLimitCooldown (60s). cooldownFromResponse clamps to - // the class bounds and falls back to the flat default when no hint header - // is present. - ttl := cooldownFromResponse(class, f.Response.Header) + // B1: derive the cooldown window from the upstream's recovery hints instead + // of the flat class TTL, so a quota-exhausted member is parked for the REAL + // window rather than being re-probed every RateLimitCooldown (60s). + // cooldownFromResponse tries the headers first (Retry-After / + // x-ratelimit-reset*); when none is present it parses the 429 JSON BODY for + // an OpenAI Codex usage_limit_reached window (resets_in_seconds / + // resets_at), clamped to MaxUsageLimitCooldown. The body fallback matters + // for Codex because its usage-limit 429s carry no recovery header — without + // it the pool re-probes an account that won't reset for hours every 60s. It + // falls back to the flat class default when neither hint is present. The + // body is read the same way classifyFailover already reads f.Response.Body + // (small JSON error bodies are not gzipped). + ttl := cooldownFromResponse(class, f.Response.Header, f.Response.Body) until := time.Now().Add(ttl) tag := failoverReasonTag(class, f.Response.StatusCode, bodyTag) diff --git a/internal/proxy/pool_failover_test.go b/internal/proxy/pool_failover_test.go index 3e4a005..b725ca8 100644 --- a/internal/proxy/pool_failover_test.go +++ b/internal/proxy/pool_failover_test.go @@ -1378,10 +1378,64 @@ func TestCooldownFromResponse(t *testing.T) { class: failoverRateLimited, setup: func(h http.Header) { h.Set("Retry-After", "120") - h.Set("x-ratelimit-reset", "30") + h.Set("x-ratelimit-reset-requests", "30") }, want: 120 * time.Second, }, + { + name: "RateLimit-Reset delta seconds honored (IETF draft)", + class: failoverRateLimited, + setup: func(h http.Header) { h.Set("RateLimit-Reset", "120") }, + want: 120 * time.Second, + }, + { + name: "X-RateLimit-Reset future epoch honored (GitHub/Twitter)", + class: failoverRateLimited, + setup: func(h http.Header) { + h.Set("X-RateLimit-Reset", strconv.FormatInt(now.Add(300*time.Second).Unix(), 10)) + }, + want: 300 * time.Second, + approx: true, + }, + { + name: "X-RateLimit-Reset small delta honored (non-epoch magnitude)", + class: failoverRateLimited, + setup: func(h http.Header) { h.Set("X-RateLimit-Reset", "90") }, + want: 90 * time.Second, + }, + { + name: "X-RateLimit-Reset-After float floored at MinRateLimitFloor (Discord)", + class: failoverRateLimited, + setup: func(h http.Header) { h.Set("X-RateLimit-Reset-After", "1.5") }, + want: vault.MinRateLimitFloor, + }, + { + name: "anthropic-ratelimit-tokens-reset RFC3339 future honored", + class: failoverRateLimited, + setup: func(h http.Header) { + h.Set("anthropic-ratelimit-tokens-reset", now.Add(200*time.Second).UTC().Format(time.RFC3339)) + }, + want: 200 * time.Second, + approx: true, + }, + { + name: "anthropic-ratelimit-requests-reset RFC3339 in the past ignored", + class: failoverRateLimited, + setup: func(h http.Header) { + h.Set("anthropic-ratelimit-requests-reset", now.Add(-200*time.Second).UTC().Format(time.RFC3339)) + }, + want: vault.RateLimitCooldown, + }, + { + name: "Retry-After precedence over RateLimit-Reset (IETF: Retry-After wins)", + class: failoverRateLimited, + setup: func(h http.Header) { + h.Set("Retry-After", "30") + h.Set("RateLimit-Reset", "300") + }, + // Retry-After (30s) wins; 30 is above MinRateLimitFloor so honored verbatim. + want: 30 * time.Second, + }, { name: "unparseable header ignored falls back to default", class: failoverRateLimited, @@ -1429,7 +1483,7 @@ func TestCooldownFromResponse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { class, h := resp(tt.class, tt.setup) - got := cooldownFromResponse(class, h) + got := cooldownFromResponse(class, h, nil) if tt.approx { const tol = 2 * time.Second diff := got - tt.want @@ -1451,10 +1505,10 @@ func TestCooldownFromResponse(t *testing.T) { // TestCooldownFromResponseNilSafe verifies the nil-header guard returns the // class default rather than panicking. func TestCooldownFromResponseNilSafe(t *testing.T) { - if got := cooldownFromResponse(failoverRateLimited, nil); got != vault.RateLimitCooldown { + if got := cooldownFromResponse(failoverRateLimited, nil, nil); got != vault.RateLimitCooldown { t.Fatalf("nil header: got %v, want %v", got, vault.RateLimitCooldown) } - if got := cooldownFromResponse(failoverAuthFailure, nil); got != vault.AuthFailCooldown { + if got := cooldownFromResponse(failoverAuthFailure, nil, nil); got != vault.AuthFailCooldown { t.Fatalf("nil header (auth): got %v, want %v", got, vault.AuthFailCooldown) } } @@ -1505,12 +1559,255 @@ func TestParseRecoveryHintHTTPDateGatedToRetryAfter(t *testing.T) { // is honored. resetHdr := make(http.Header) resetHdr.Set("x-ratelimit-reset-requests", httpDate) - if got := cooldownFromResponse(failoverRateLimited, resetHdr); got != vault.RateLimitCooldown { + if got := cooldownFromResponse(failoverRateLimited, resetHdr, nil); got != vault.RateLimitCooldown { t.Fatalf("reset-family HTTP-date: got %v, want fallback %v", got, vault.RateLimitCooldown) } retryHdr := make(http.Header) retryHdr.Set("Retry-After", httpDate) - if got := cooldownFromResponse(failoverRateLimited, retryHdr); got <= 0 || got > vault.MaxCooldown { + if got := cooldownFromResponse(failoverRateLimited, retryHdr, nil); got <= 0 || got > vault.MaxCooldown { t.Fatalf("Retry-After HTTP-date: got %v, want a clamped positive duration", got) } } + +// TestCooldownFromResponseBody covers the B1 body-fallback path: when the +// upstream sends NO Retry-After / x-ratelimit-reset header (the OpenAI Codex +// usage_limit_reached case), the cooldown is derived from the JSON body's +// resets_in_seconds / resets_at, clamped to MaxUsageLimitCooldown. A usable +// header still wins over the body (precedence), and an unrelated/ill-formed +// body falls back to the class default. +func TestCooldownFromResponseBody(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + class failoverClass + header func(h http.Header) + body []byte + want time.Duration + approx bool + }{ + { + name: "usage_limit_reached resets_in_seconds, no header", + class: failoverRateLimited, + body: []byte(`{"type":"usage_limit_reached","message":"The usage limit has been reached","plan_type":"team","resets_in_seconds":1357}`), + want: 1357 * time.Second, + }, + { + name: "resets_at unix epoch, no resets_in_seconds, no header", + class: failoverRateLimited, + body: []byte(`{"type":"usage_limit_reached","resets_at":` + strconv.FormatInt(now.Add(600*time.Second).Unix(), 10) + `}`), + want: 600 * time.Second, + approx: true, + }, + { + name: "huge resets_in_seconds clamped to MaxUsageLimitCooldown", + class: failoverRateLimited, + body: []byte(`{"type":"usage_limit_reached","resets_in_seconds":327808}`), + want: vault.MaxUsageLimitCooldown, + }, + { + name: "header wins over body when both present", + class: failoverRateLimited, + header: func(h http.Header) { h.Set("Retry-After", "120") }, + body: []byte(`{"type":"usage_limit_reached","resets_in_seconds":1357}`), + want: 120 * time.Second, + }, + { + name: "non-JSON body falls back to class default", + class: failoverRateLimited, + body: []byte(`not json at all`), + want: vault.RateLimitCooldown, + }, + { + name: "body without reset fields falls back to class default", + class: failoverRateLimited, + body: []byte(`{"type":"usage_limit_reached","message":"nope"}`), + want: vault.RateLimitCooldown, + }, + { + name: "negative resets_in_seconds ignored, class default", + class: failoverRateLimited, + body: []byte(`{"resets_in_seconds":-5}`), + want: vault.RateLimitCooldown, + }, + { + name: "zero resets_in_seconds ignored, class default", + class: failoverRateLimited, + body: []byte(`{"resets_in_seconds":0}`), + want: vault.RateLimitCooldown, + }, + { + name: "past resets_at ignored, class default", + class: failoverRateLimited, + body: []byte(`{"resets_at":` + strconv.FormatInt(now.Add(-600*time.Second).Unix(), 10) + `}`), + want: vault.RateLimitCooldown, + }, + { + name: "auth-failure body window floored at AuthFailCooldown", + class: failoverAuthFailure, + body: []byte(`{"resets_in_seconds":5}`), + want: vault.AuthFailCooldown, + }, + { + name: "top-level retry_after delta (Discord/generic)", + class: failoverRateLimited, + body: []byte(`{"retry_after":1357}`), + want: 1357 * time.Second, + }, + { + name: "top-level reset_after delta", + class: failoverRateLimited, + body: []byte(`{"reset_after":600}`), + want: 600 * time.Second, + }, + { + name: "nested error.retry_after delta", + class: failoverRateLimited, + body: []byte(`{"error":{"retry_after":45}}`), + want: 45 * time.Second, + }, + { + name: "nested error.resets_in_seconds delta", + class: failoverRateLimited, + body: []byte(`{"error":{"resets_in_seconds":1357}}`), + want: 1357 * time.Second, + }, + { + name: "Discord-style float retry_after floored at MinRateLimitFloor", + class: failoverRateLimited, + body: []byte(`{"retry_after":0.529}`), + want: vault.MinRateLimitFloor, + }, + { + name: "oversized body falls back to class default", + class: failoverRateLimited, + body: oversizedJSONBody(), + want: vault.RateLimitCooldown, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var h http.Header + if tt.header != nil { + h = make(http.Header) + tt.header(h) + } + got := cooldownFromResponse(tt.class, h, tt.body) + if tt.approx { + const tol = 2 * time.Second + diff := got - tt.want + if diff < 0 { + diff = -diff + } + if diff > tol { + t.Fatalf("cooldownFromResponse = %v, want ~%v (tol %v)", got, tt.want, tol) + } + return + } + if got != tt.want { + t.Fatalf("cooldownFromResponse = %v, want %v", got, tt.want) + } + }) + } +} + +// oversizedJSONBody returns a syntactically valid JSON body that carries a +// usable retry_after but exceeds maxBodyRecoveryHintBytes, so the size guard +// must skip it (failover error bodies are tiny). The padding lives in an +// unused string field so the body stays valid JSON. +func oversizedJSONBody() []byte { + pad := strings.Repeat("a", maxBodyRecoveryHintBytes+1) + return []byte(`{"retry_after":1357,"pad":"` + pad + `"}`) +} + +// TestParseBodyRecoveryHint exercises the body parser's guards directly: +// resets_in_seconds is preferred over resets_at, non-finite/negative/zero/past +// values are rejected, and a field-less or non-JSON body reports ok=false. +func TestParseBodyRecoveryHint(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + body string + wantOK bool + want time.Duration + approx bool + }{ + {name: "resets_in_seconds delta", body: `{"resets_in_seconds":1357}`, wantOK: true, want: 1357 * time.Second}, + { + name: "resets_at epoch", + body: `{"resets_at":` + strconv.FormatInt(now.Add(300*time.Second).Unix(), 10) + `}`, + wantOK: true, want: 300 * time.Second, approx: true, + }, + { + name: "resets_in_seconds preferred over resets_at", + body: `{"resets_in_seconds":100,"resets_at":` + strconv.FormatInt(now.Add(9000*time.Second).Unix(), 10) + `}`, + wantOK: true, want: 100 * time.Second, + }, + {name: "negative resets_in_seconds rejected", body: `{"resets_in_seconds":-5}`, wantOK: false}, + {name: "zero resets_in_seconds rejected", body: `{"resets_in_seconds":0}`, wantOK: false}, + // A past unix-epoch resets_at (>= epochThreshold so it disambiguates as + // an epoch, not a 1s delta) is rejected. + {name: "past resets_at rejected", body: `{"resets_at":` + strconv.FormatInt(now.Add(-600*time.Second).Unix(), 10) + `}`, wantOK: false}, + {name: "no reset fields", body: `{"type":"usage_limit_reached"}`, wantOK: false}, + {name: "non-JSON", body: `garbage`, wantOK: false}, + {name: "empty", body: ``, wantOK: false}, + {name: "retry_after delta", body: `{"retry_after":1357}`, wantOK: true, want: 1357 * time.Second}, + {name: "reset_after delta", body: `{"reset_after":600}`, wantOK: true, want: 600 * time.Second}, + {name: "nested error.retry_after delta", body: `{"error":{"retry_after":45}}`, wantOK: true, want: 45 * time.Second}, + {name: "nested error.resets_in_seconds delta", body: `{"error":{"resets_in_seconds":1357}}`, wantOK: true, want: 1357 * time.Second}, + { + // Top-level wins over nested when both present (order: top-level first). + name: "top-level retry_after preferred over nested error.retry_after", + body: `{"retry_after":100,"error":{"retry_after":9000}}`, + wantOK: true, want: 100 * time.Second, + }, + { + // retry_after at an epoch magnitude is treated as a unix epoch. + name: "retry_after at epoch magnitude treated as epoch", + body: `{"retry_after":` + strconv.FormatInt(now.Add(300*time.Second).Unix(), 10) + `}`, + wantOK: true, want: 300 * time.Second, approx: true, + }, + {name: "oversized body rejected", body: string(oversizedJSONBody()), wantOK: false}, + {name: "overflow-magnitude resets_in_seconds rejected", body: `{"resets_in_seconds":1e308}`, wantOK: false}, + { + // 327808s > MaxUsageLimitCooldown (24h): the parser caps in float + // space at MaxUsageLimitCooldown before the nanosecond multiply (so + // the int64 conversion can't overflow); the caller's clamp then + // keeps that same value. + name: "huge resets_in_seconds capped at MaxUsageLimitCooldown", + body: `{"resets_in_seconds":327808}`, + wantOK: true, want: vault.MaxUsageLimitCooldown, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d, ok := parseBodyRecoveryHint([]byte(tt.body), now) + if ok != tt.wantOK { + t.Fatalf("parseBodyRecoveryHint ok = %v, want %v (d=%v)", ok, tt.wantOK, d) + } + if !tt.wantOK { + if d != 0 { + t.Fatalf("parseBodyRecoveryHint rejected case returned d=%v, want 0", d) + } + return + } + if tt.approx { + const tol = 2 * time.Second + diff := d - tt.want + if diff < 0 { + diff = -diff + } + if diff > tol { + t.Fatalf("parseBodyRecoveryHint = %v, want ~%v (tol %v)", d, tt.want, tol) + } + return + } + if d != tt.want { + t.Fatalf("parseBodyRecoveryHint = %v, want %v", d, tt.want) + } + }) + } +} diff --git a/internal/vault/pool.go b/internal/vault/pool.go index 75572f2..836fecc 100644 --- a/internal/vault/pool.go +++ b/internal/vault/pool.go @@ -33,6 +33,15 @@ const ( MinRateLimitFloor = 10 * time.Second ) +// MaxUsageLimitCooldown caps a cooldown window derived from the 429 RESPONSE +// BODY (OpenAI Codex usage_limit_reached, which carries resets_in_seconds / +// resets_at instead of a Retry-After / x-ratelimit-reset header). The body +// reset is authoritative and can be many hours or days out, so it is bounded +// far higher than MaxCooldown (which caps transient header windows): a member +// is never parked indefinitely, but a real multi-hour usage reset is honored +// rather than re-probed every RateLimitCooldown. +const MaxUsageLimitCooldown = 24 * time.Hour + // ManualRotateReason is the cooldown reason stamped by `sluice pool rotate` // when it parks the previously-active member. A member parked for this // reason is operationally deprioritized BY AN OPERATOR, not unhealthy: it