diff --git a/infra/monitoring/protifer.rules.yml b/infra/monitoring/protifer.rules.yml index 71fc943..64ef71a 100644 --- a/infra/monitoring/protifer.rules.yml +++ b/infra/monitoring/protifer.rules.yml @@ -120,6 +120,28 @@ groups: engaging (requests_shed_total). Sustained backlog with healthy workers means demand exceeds capacity. + - alert: SheddingPendingResidueLeak + expr: | + shedding_pending_residues > 0 + and on() + sum(bullmq_queue_jobs{state=~"waiting|active"}) == 0 + for: 15m + labels: + severity: warning + app: protifer + annotations: + summary: 'protifer: shedding pending-residues nonzero while queues idle' + description: | + shedding_pending_residues has stayed above zero for 15m with no + waiting or active jobs on any queue. Pending residues should + reconcile to zero on the job-cleanup leader sweep once work + drains — a stuck nonzero value at idle means admission accounting + drifted (a release path missed its decrement). Admission keeps + counting this phantom load against the SLO budget, so it will shed + real requests too early. Check the api-gateway logs and /metrics + (docker compose logs api-gateway on the app-tier host); restarting + the gateway resets the in-process counter as a stopgap. + - alert: HighJobLatency expr: | histogram_quantile(0.95, sum by (le) (rate(bullmq_job_total_duration_seconds_bucket{queue="prediction"}[15m]))) > 1200 diff --git a/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/.openspec.yaml b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/.openspec.yaml new file mode 100644 index 0000000..c86b1d7 --- /dev/null +++ b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-06-14 diff --git a/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/design.md b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/design.md new file mode 100644 index 0000000..0beb433 --- /dev/null +++ b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/design.md @@ -0,0 +1,102 @@ +## Context + +The shedding admission controller estimates wait time as `pendingResidues / throughput` and sheds requests when that exceeds the per-plan SLO. `pendingResidues` is maintained as a free-running Redis counter (`shedding:pending_residues`): `incrementPending` runs at admission for both `/v1/predictions` and `/v1/embeddings` (`middleware/shedding.ts:195`), and `decrementPending` runs only from live BullMQ `QueueEvents` on the embedding queue, processed by a single elected accounting leader (`event-subscriber.ts:79,109`). + +`QueueEvents` is live-only with no replay. Any terminal event not observed by the leader at the instant it fires — leader lock handoff (30s TTL, 10s renew), events outpacing a single leader during a burst, a deduped embedding child (deterministic `jobId`), or a job evicted before `getJob` (`removeOnFail: { count: 100 }`) — drops its decrement permanently. The counter only ratchets up. A 5-minute load test drove it to 400,000 and it stayed dead-flat after load ended, pinning `estimated_wait` at its 5-minute cap. The bug is masked today only because `SHED_MODE` defaults to `shadow`. + +The existing cleanup module (`cleanup.ts`) already runs a leader-elected periodic sweep that holds both `predictionQueue` and `embeddingQueue`, exposes a `staleChildrenScan` hook and a `reconcileNow()` handle — the natural home for an authoritative recompute. + +The second input, `throughput`, is equally wrong. `recordCompletion(residues, durationSeconds)` (`state.ts:79`) feeds the EWMA a sample of `residues / (finishedOn − processedOn)` per terminal **embedding** event — the processing rate of a single job. The system actually drains `WORKER_CONCURRENCY` (4) jobs per worker across both queues in parallel, so the single-job rate under-states true throughput by roughly the concurrency factor. And it is embedding-only, while reconciled `pending` spans both queues — so `estimated_wait = pending / throughput` divides a both-queue residue total by an embedding-only single-job rate. Fixing the leak alone leaves this dimensional mismatch, so `enforce` would still over-shed. Both inputs must be corrected together. + +## Goals / Non-Goals + +**Goals:** + +- Make `pending_residues` self-healing: a missed event must not leak past the next sweep. +- Derive the authoritative value from real queue state, route-agnostically (fixes the prediction-side blind spot for free). +- Make `throughput` an aggregate, concurrency-aware drain rate in the same residue units as `pending`, so `estimated_wait` is dimensionally meaningful and reflects how fast work actually leaves the queues. +- Derive throughput from reliable signals, not the replay-less event stream, so a missed completion no longer corrupts the estimate. +- Keep admission responsive between sweeps (no regression in decision latency); keep the existing EWMA smoothing and the `decide.ts` formula unchanged — only its inputs change. +- Detect accounting drift via an alert so this class of bug can never again sit invisible. +- Make a future `SHED_MODE=enforce` rollout actually safe (both inputs correct, not just one). + +**Non-Goals:** + +- Reworking load-testing methodology (honest e2e measurement, rate-limit exemption) — deferred. +- Per-Triton-call / per-job latency metrics (the separate `prediction-latency-observability` change) — not needed for the aggregate residue drain rate here. +- Changing the SLO thresholds, plan priorities, retry-after jitter, or the shadow/enforce default. + +## Decisions + +### Decision 1: Reconcile on the cleanup leader sweep, not a new loop + +Recompute `pending_residues` inside the existing cleanup sweep, next to `staleChildrenScan`. The sweep is already leader-elected (single writer), already holds both queues, and already runs on an interval — so reconciliation inherits correct concurrency semantics with no new lock or timer. + +- **Alternative — dedicated reconciliation loop/leader:** rejected; duplicates leader election and risks two writers racing on the same Redis key. +- **Alternative — fix event capture (durable stream / ack):** rejected; turning `QueueEvents` into a replayable log is far more complex and still leaves drift on edge cases. Reconciliation makes correctness independent of event delivery. + +### Decision 2: Authoritative recompute = sum of residues over waiting/active/waiting-children + +Compute pending as the sum of per-job residues across `QUEUE_NAMES.PREDICTION` and `QUEUE_NAMES.EMBEDDING` for jobs in `waiting`, `active`, and `waiting-children`. Residue per job = sequence length, read from job data (matching how `incrementPending` and the subscriber already derive residues). Write the result with an authoritative setter (`setPending`) — an absolute `SET`, not `incrby`/`decrby`. + +- Including `active` keeps in-flight work counted; including `waiting-children` counts prediction parents whose embedding child is still running. Excluding terminal states is what drives the value to zero on drain. +- **Alternative — reconcile only embedding queue:** rejected; that preserves the prediction blind spot. + +### Decision 3: Event counter stays as a between-sweeps fast path + +Keep `incrementPending`/`decrementPending` so admission reflects sub-sweep-interval activity. Reconciliation overwrites (not adjusts) the value each sweep, so accumulated drift is discarded wholesale. This bounds worst-case staleness to one sweep interval while keeping decisions current. + +- **Alternative — remove the event counter, read live each admission:** rejected; summing queue residues on every request is too expensive for the hot path. Sweep-interval reconciliation + cheap incr/decr hint is the right cost balance. + +### Decision 4: Leak detector as a metric invariant alert + +Add a rule to `infra/monitoring/protifer.rules.yml`: fire when `shedding_pending_residues > 0` is sustained while `bullmq_queue_jobs{state=~"waiting|active"}` is zero. This encodes the invariant "pending must trend to zero when there is no work." `promtool` validates it in CI; `mimirtool rules sync` deploys on `main`. + +### Decision 5: Throughput = aggregate drain rate measured on the sweep by conservation + +Measure how fast residues actually leave the queues, on the same leader sweep, using flow conservation rather than per-job timing: + +``` +departures(Δt) = arrivals(Δt) − (pending_now − pending_prev) +drain_rate = max(0, departures) / Δt_seconds // residues/sec, aggregate +``` + +- `arrivals(Δt)` comes from a new **monotonic** `shedding:admitted_residues_total` counter, `INCRBY`-ed synchronously at admission (in both shadow and enforce paths, alongside the existing `incrementPending`). It is reliable because it is in the request path, not the event stream. +- `pending_now` / `pending_prev` are the authoritative reconciled values from Decision 2 (this sweep and the prior one). +- The sweep records `drain_rate` as the EWMA sample (reusing the existing `THROUGHPUT_KEY` Lua update), then snapshots `admitted_total` and the timestamp for the next interval's delta. + +This is the same architectural thesis as the pending reconciliation: derive the number from reliable, replay-independent signals. It is automatically **concurrency-aware** (it counts real departures, however many workers/slots produced them) and **route-agnostic** (both queues' work flows through the same pending/arrivals bookkeeping), so it is in the same residue units as `pending` — the division in `decide.ts` becomes dimensionally sound. `decide.ts` itself is unchanged. + +- **Alternative — count terminal events into a completed-residues counter and rate it:** rejected as the source — it inherits exactly the replay-less `QueueEvents` fragility (and embedding-only scope) this change exists to escape; a missed completion would understate drain and over-shed. +- **Alternative — multiply the existing single-job rate by a fixed `WORKER_CONCURRENCY × worker_count`:** rejected — the gateway does not know the live worker count, slots are not always full, and it hard-codes a deployment assumption. Measuring real departures needs none of that. +- **Edge cases:** clamp `departures` at ≥ 0 (snapshot timing can make pending momentarily rise mid-drain); skip the EWMA update when `Δt` is ~0 or the prior snapshot is absent (first sweep after boot); `readState` continues to seed `residuesPerSecondEwma` to `config.initialResiduesPerSecond` until the first valid sample, so cold-start behavior is unchanged. + +### Decision 6: Retire the per-job throughput sample; subscriber keeps only `decrementPending` + +`recordCompletion` is removed from the throughput path — the event subscriber (`event-subscriber.ts:99`) stops calling it, since the per-job, embedding-only sample is exactly the wrong rate. The subscriber retains only `decrementPending` as the between-sweeps fast-path hint (Decision 3). Throughput is now owned entirely by the sweep (Decision 5). + +- This keeps a single writer for the EWMA (the leader sweep), mirroring the single-writer property the pending reconciliation relies on, and removes the last place the replay-less event stream fed a decision input. + +## Risks / Trade-offs + +- **Reconcile reads many jobs under deep backlog** → keep it observe-and-set (no per-job mutation), reuse the existing sweep cadence, and prefer count/aggregate reads; if job-data reads prove heavy, batch them. The sweep already iterates queue state, so marginal cost is bounded. +- **Sub-sweep drift window** → between sweeps the fast-path counter can still be wrong, but it self-corrects each sweep and can no longer leak unboundedly. Acceptable; sweep interval is the staleness bound. +- **Two writers if leadership flaps** → mitigated by reusing the existing single-leader sweep election; the authoritative `SET` is idempotent, so a brief overlap converges rather than corrupts. +- **Alert false negatives if `bullmq_queue_jobs` lags** → the alert uses a sustained `for:` duration so transient drain-vs-counter skew does not fire; tune the window during rollout. +- **Drain-rate noise on short/empty intervals** → a sweep with little traffic yields a tiny, noisy `departures`. Mitigated by the existing EWMA smoothing, clamping `departures ≥ 0`, and skipping the update when `Δt` is ~0; the cold-start floor (`initialResiduesPerSecond`) covers the no-sample window. +- **Sweep cadence couples to estimate freshness** → throughput now updates once per sweep rather than per completion. The EWMA already smooths over multiple samples, so a sweep-cadence sample rate is sufficient for an admission heuristic; if the estimate proves laggy, shorten the sweep, not the design. + +## Migration Plan + +1. Land reconciliation + `setPending`, the aggregate drain-rate sampler, the admitted-residues counter, and the alert rule behind the existing shadow default (no behavior change for users). +2. Verify on the next real load test that (a) `shedding_pending_residues` returns to zero after load drains, and (b) `shedding_residues_per_second` reflects realistic aggregate throughput (order-of-`WORKER_CONCURRENCY` higher than the old single-job value) so `shedding_estimated_wait_seconds` tracks observed end-to-end latency rather than pinning. +3. Confirm the leak-detector alert is green at idle and does not false-fire under load. +4. Only after (2)–(3) hold is a future `SHED_MODE=enforce` rollout safe — that rollout is a separate operational change. + +Rollback: revert the cleanup-sweep reconciliation and drain-rate sampler; restore the `recordCompletion` call in the subscriber. The event-driven counter and per-job throughput resume their prior (leaky, over-estimating) behavior, which is no worse than today's shadow-mode state. + +## Open Questions + +- Exact reconcile cadence: reuse the cleanup sweep interval as-is, or run reconciliation on a subset of sweeps if job-data reads are costly? Default: reuse as-is, revisit if metrics show cost. +- Idle definition for the alert (`waiting|active` == 0) vs. also requiring `waiting-children` == 0 — confirm against how `bullmq_queue_jobs` labels are exported. +- Drain-rate measurement source — **resolved: conservation (`arrivals − Δpending`)**, per Decision 5. Chosen over a completed-residues event counter because it is fully event-independent and unit-consistent by construction. diff --git a/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/proposal.md b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/proposal.md new file mode 100644 index 0000000..ee7df35 --- /dev/null +++ b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/proposal.md @@ -0,0 +1,50 @@ +## Why + +The admission controller's wait estimate — `estimated_wait = pending_residues / throughput`, shed when it exceeds the per-plan SLO — is wrong on **both** of its inputs, and neither alone is safe to enable. + +**`pending_residues` leaks permanently.** It is incremented on every submission but decremented only via live BullMQ `QueueEvents` on the embedding queue, handled by a single elected leader — and `QueueEvents` has no replay, so any missed event (leader handoff, events outpacing the leader during a burst, a deduped embedding child, or a job evicted before `getJob`) leaks residues forever. A single 5-minute load test ramped `shedding_pending_residues` to 400,000 where it stayed dead-flat after load ended, pinning `estimated_wait` indefinitely. + +**`throughput` is the wrong rate.** It is an EWMA of `residues / (finishedOn − processedOn)` recorded per terminal embedding event (`state.ts:84`, `event-subscriber.ts:99`) — i.e. the processing rate of a **single** job, measured on the **embedding queue only**. The real system drains `WORKER_CONCURRENCY` jobs in parallel per worker across both queues, so this under-states true throughput by roughly the concurrency factor. Worse, after the leak fix `pending_residues` sums residues across **both** queues while `throughput` still reflects embedding-only single-job service — the ratio is dimensionally inconsistent. The estimate over-states wait and would over-shed. + +Today both bugs are masked because `SHED_MODE` defaults to `shadow` (requests still admitted). The moment anyone sets `SHED_MODE=enforce`, the gateway returns 503 to all free/pro users — permanently from the leak, and excessively from the throughput error. Fixing the leak alone does **not** make `enforce` safe, because the wait estimate is still wrong. The two fixes are coupled and must land together; this change does both. + +## What Changes + +**Accounting (`pending_residues`):** + +- Add periodic **reconciliation** of `pending_residues` from actual queue state: on the existing cleanup leader sweep, recompute the counter as the sum of residues over jobs currently in `waiting` / `active` / `waiting-children` across the prediction and embedding queues, and write the authoritative value to Redis. +- Make reconciliation the **source of truth**; keep the existing increment/decrement only as a fast-path hint between sweeps. Event drift is now self-healing — a missed event is corrected on the next sweep instead of leaking forever. +- Close the prediction-side blind spot as a side effect: summing real queued work is route-agnostic. + +**Estimation (`throughput` / `estimated_wait`):** + +- Replace per-job, embedding-only throughput sampling with an **aggregate drain rate** measured on the same leader sweep: residues actually leaving the queues per wall-clock second, in the **same residue units** as `pending_residues`, so the wait estimate is dimensionally consistent and concurrency-aware. +- Derive the drain rate from **reliable signals** (a synchronous monotonic admitted-residues counter and the reconciled pending value), not from the replay-less event stream — a missed completion event no longer corrupts the estimate. The EWMA smoothing is retained for stability; only its input sample changes. +- Stop feeding the per-job `recordCompletion` sample into the throughput EWMA; the event subscriber keeps only its fast-path `decrementPending` role. + +**Observability:** + +- Add a **leak-detector alert** to `infra/monitoring/protifer.rules.yml`: fire when `shedding_pending_residues` stays above zero while the queues are idle — the invariant that pending must trend to zero when there is no work. + +Out of scope (deferred to follow-up changes): + +- Load-testing methodology rework (honest end-to-end measurement, rate-limit exemption for pipeline tests). +- Per-Triton-call and per-job latency metrics (the separate `prediction-latency-observability` change). That change improves _human-facing_ latency visibility; it is not required for the aggregate drain-rate estimate here, which needs only residue throughput, not per-model timing. +- Changing the SLO thresholds, plan priorities, retry-after jitter, or the shadow/enforce default. + +## Capabilities + +### New Capabilities + +- `request-shedding`: Admission-control accounting and wait estimation — how `pending_residues` is maintained (event fast-path plus authoritative leader-sweep reconciliation), how `throughput` / `estimated_wait` is derived (aggregate, concurrency-aware drain rate in units consistent with pending), and the observability invariant that detects accounting drift. + +### Modified Capabilities + + + +## Impact + +- **Code**: `services/api-gateway/src/cleanup.ts` (add reconciliation + drain-rate sampling to the leader sweep, alongside the existing stale-children scan), `services/api-gateway/src/shedding/state.ts` (add an authoritative `setPending`, a monotonic admitted-residues counter, and a sweep-fed throughput sampler; the per-job `recordCompletion` path is removed from the throughput EWMA), `services/api-gateway/src/shedding/event-subscriber.ts` (drops the `recordCompletion` call; keeps `decrementPending` as a hint), `services/api-gateway/src/shedding/decide.ts` (unchanged formula; now fed correct, unit-consistent inputs). Reconciliation reads residues for jobs across `QUEUE_NAMES.PREDICTION` and `QUEUE_NAMES.EMBEDDING`. +- **Redis**: `shedding:pending_residues` becomes a leader-managed derived value; a new monotonic `shedding:admitted_residues_total` feeds the drain-rate computation; `shedding:throughput_ewma` is now driven by the sweep, not per-job events. +- **Monitoring**: new alert rule in `infra/monitoring/protifer.rules.yml` (validated by `promtool` in CI). +- **Behavior**: no user-facing API change. Shadow-mode metrics (`shedding_pending_residues`, `shedding_residues_per_second`, `shedding_estimated_wait_seconds`) become trustworthy, and a future `SHED_MODE=enforce` rollout no longer over-sheds or carries the permanent-503 landmine. diff --git a/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/specs/request-shedding/spec.md b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/specs/request-shedding/spec.md new file mode 100644 index 0000000..9520017 --- /dev/null +++ b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/specs/request-shedding/spec.md @@ -0,0 +1,80 @@ +## ADDED Requirements + +### Requirement: Pending residues reconciled from queue state + +The admission controller's `pending_residues` accounting SHALL be derived from the actual queue state, not solely from an event-driven increment/decrement counter. The accounting leader SHALL periodically recompute `pending_residues` as the sum of residues over all jobs currently in the `waiting`, `active`, and `waiting-children` states across the prediction and embedding queues, and SHALL write that computed value as the authoritative `shedding:pending_residues` in Redis. Reconciliation SHALL run on the existing cleanup leader sweep. + +#### Scenario: Counter self-heals after a missed completion event + +- **WHEN** a job terminates but its completion event is missed by the accounting subscriber (e.g. leader handoff, event burst, deduped child, or evicted job), leaving `pending_residues` higher than the real queued work +- **THEN** the next leader-sweep reconciliation recomputes `pending_residues` from the live queue state and overwrites the stale value, so the leak does not persist across sweeps + +#### Scenario: Counter returns to zero when the system drains + +- **WHEN** all prediction and embedding queues have no jobs in `waiting`, `active`, or `waiting-children` +- **THEN** the next reconciliation sets `pending_residues` to zero + +#### Scenario: Prediction-flow residues are accounted route-agnostically + +- **WHEN** prediction jobs and their embedding children are queued +- **THEN** reconciliation includes their residues regardless of which queue emits completion events, so prediction-side work is not undercounted + +### Requirement: Event-driven counter is a fast-path hint + +The system SHALL retain the existing `incrementPending` (at admission) and `decrementPending` (on terminal events) operations as a between-sweeps fast path so admission decisions reflect recent activity without waiting for the next reconciliation. These operations SHALL NOT be the source of truth; the leader-sweep reconciliation value SHALL take precedence whenever it runs. + +#### Scenario: Admission reflects recent submissions before the next sweep + +- **WHEN** a request is admitted and its residues are added via `incrementPending` +- **THEN** subsequent admission decisions before the next reconciliation account for those residues + +#### Scenario: Reconciliation overwrites accumulated event drift + +- **WHEN** the event-driven counter has drifted from the true queued residues between sweeps +- **THEN** the reconciliation pass replaces the drifted value rather than adjusting it incrementally + +### Requirement: Accounting drift is observable + +The monitoring rules SHALL include an alert that detects a stuck `pending_residues` counter. The alert SHALL fire when `shedding_pending_residues` remains above zero for a sustained period while the queues are idle (no `waiting` or `active` jobs), indicating accounting drift that reconciliation has not cleared. The alert rule SHALL pass `promtool` validation. + +#### Scenario: Leak detector fires on a stuck counter + +- **WHEN** `shedding_pending_residues` stays above zero for the configured duration while no jobs are `waiting` or `active` +- **THEN** the leak-detector alert fires + +#### Scenario: No false alarm under genuine load + +- **WHEN** `shedding_pending_residues` is above zero because jobs are genuinely `waiting` or `active` +- **THEN** the leak-detector alert does not fire + +### Requirement: Wait estimate uses an aggregate, concurrency-aware throughput + +The throughput used in the wait estimate (`estimated_wait = pending / throughput`) SHALL represent the aggregate rate at which residues actually leave the queues across all workers and processing slots — not the processing rate of a single job. A single job's service time SHALL NOT be used directly as the system throughput. + +#### Scenario: Throughput reflects concurrent draining, not one job + +- **WHEN** multiple jobs are processed concurrently across worker slots +- **THEN** the recorded throughput reflects the combined rate at which residues drain from the queues, so the wait estimate is not inflated by the concurrency factor + +#### Scenario: Throughput tracks observed drain after a load test + +- **WHEN** a load test drives sustained traffic and then drains +- **THEN** the recorded residues-per-second reflects the realistic aggregate drain rate and `estimated_wait` tracks observed end-to-end latency rather than pinning at an inflated value + +### Requirement: Wait estimate inputs share consistent residue units + +The `pending` and `throughput` inputs to the wait estimate SHALL be expressed in the same residue units and SHALL both account for work across the prediction and embedding queues, so that `estimated_wait` is dimensionally meaningful. Throughput SHALL NOT be derived from a single queue while pending spans both. + +#### Scenario: Both pipelines contribute to throughput + +- **WHEN** both prediction-flow and embedding work are draining +- **THEN** the measured throughput accounts for residues leaving both queues, matching the both-queue residue total used for `pending` + +### Requirement: Throughput is robust to missed completion events + +The throughput measurement SHALL be derived from signals that do not depend on observing every BullMQ terminal event (which has no replay), so a missed completion event does not corrupt the wait estimate. The measurement SHALL be performed by the accounting leader. + +#### Scenario: Throughput is unaffected by a missed terminal event + +- **WHEN** a terminal completion event is missed by the accounting subscriber +- **THEN** the throughput value used for admission is still computed correctly from reliable accounting (e.g. reconciled pending and synchronously-counted admissions), not understated by the missed event diff --git a/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/tasks.md b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/tasks.md new file mode 100644 index 0000000..f04ff6a --- /dev/null +++ b/openspec/changes/archive/2026-06-19-fix-shedding-residue-leak/tasks.md @@ -0,0 +1,36 @@ +## 1. State: authoritative setter + +- [x] 1.1 Add `setPending(residues)` to `shedding/state.ts` that writes `shedding:pending_residues` with an absolute `SET` (clamped at >= 0), distinct from `incrementPending`/`decrementPending`. +- [x] 1.2 Add a unit test in `state.test.ts` proving `setPending` overwrites prior drifted values (not additive). + +## 2. Reconciliation on the cleanup leader sweep + +- [x] 2.1 Add a residue-summing helper that, given the prediction and embedding queues, sums per-job residues (sequence length) over jobs in `waiting`, `active`, and `waiting-children`. +- [x] 2.2 Wire the helper into the cleanup sweep (`cleanup.ts`) next to `staleChildrenScan`, calling `state.setPending(total)` with the computed sum; pass the `SheddingState` into `CleanupDeps`. +- [x] 2.3 Ensure reconciliation runs only on the leader sweep and is observe-and-set (no per-job mutation); reuse the existing sweep interval. +- [x] 2.4 Add unit tests in `cleanup.test.ts`: (a) drained queues reconcile to zero; (b) a stale/inflated counter is overwritten to the true sum; (c) prediction-flow residues (parent + embedding child) are counted route-agnostically. + +## 3. Keep event counter as fast-path hint + +- [x] 3.1 Confirm `incrementPending`/`decrementPending` and the embedding-queue subscriber remain unchanged in behavior (now a between-sweeps hint, not source of truth) — no code change expected; add/adjust a comment noting reconciliation is authoritative. +- [x] 3.2 Verify existing `event-subscriber.test.ts` still passes; add a test asserting reconciliation takes precedence over accumulated event drift. + +## 4. Estimation: aggregate drain-rate throughput + +- [x] 4.1 Add a monotonic `shedding:admitted_residues_total` counter to `state.ts` (`incrAdmitted(residues)` via `INCRBY`); call it synchronously at admission in `middleware/shedding.ts` alongside `incrementPending`, in both the shadow and enforce paths. +- [x] 4.2 Add a sweep-driven throughput sampler to `state.ts`: given the current reconciled `pending`, read+snapshot `admitted_residues_total` and the last sweep timestamp, compute `departures = max(0, arrivals − Δpending)` and `drain_rate = departures / Δt_seconds`, and feed `drain_rate` into the existing `THROUGHPUT_KEY` EWMA Lua update. Skip the update when `Δt ≈ 0` or no prior snapshot exists. +- [x] 4.3 Wire the sampler into the cleanup leader sweep so it runs right after reconciliation (same sweep, using the just-computed pending as `pending_now`). +- [x] 4.4 Remove the `recordCompletion` call from `event-subscriber.ts` (keep `decrementPending`); drop or repurpose the now-unused per-job `recordCompletion` from `state.ts`. +- [x] 4.5 Unit tests in `state.test.ts`: (a) drain rate reflects concurrent departures (arrivals − Δpending), not single-job time; (b) clamps negative departures to zero; (c) skips update with no prior snapshot / zero Δt; (d) cold start still falls back to `initialResiduesPerSecond`. +- [x] 4.6 Test in `cleanup.test.ts`/`event-subscriber.test.ts` that a missed terminal event does not affect throughput (it is sweep-derived), and `decide.ts` is unchanged and consumes the new inputs. + +## 5. Leak-detector alert + +- [x] 5.1 Add an alert rule to `infra/monitoring/protifer.rules.yml` firing when `shedding_pending_residues > 0` is sustained (`for:` window) while `bullmq_queue_jobs{state=~"waiting|active"}` is zero. +- [x] 5.2 Confirm the rule passes `promtool` validation (the CI gate) and add it to any rules unit-test fixtures if present. + +## 6. Verification + +- [x] 6.1 Run repo gates: `bun run typecheck`, `bun run lint`, `bun run format`, `bun run test`. +- [x] 6.2 Run shedding/cleanup integration coverage via `bun run test:int` (stack up) to exercise reconciliation and drain-rate sampling against real Redis/queues. +- [x] 6.3 Manual/load verification: confirm `shedding_pending_residues` returns to zero after a load test drains; `shedding_residues_per_second` reflects realistic aggregate throughput (order-of-`WORKER_CONCURRENCY` above the old single-job value); `shedding_estimated_wait_seconds` tracks observed latency rather than pinning; and the leak-detector alert is green at idle without false-firing under load. diff --git a/openspec/specs/request-shedding/spec.md b/openspec/specs/request-shedding/spec.md new file mode 100644 index 0000000..3bfabbf --- /dev/null +++ b/openspec/specs/request-shedding/spec.md @@ -0,0 +1,92 @@ +# request-shedding Specification + +## Purpose + +Gateway-side admission control that protects the prediction and embedding +pipelines from overload. The controller tracks how much queued work (`pending_residues`) +is in flight and estimates wait time so it can return 503 + Retry-After under a +class-based SLO. This capability governs how `pending_residues` accounting stays +truthful — reconciled from live queue state rather than drifting on missed +events — and how the wait estimate is computed from an aggregate, concurrency-aware +throughput in consistent residue units across both pipelines. + +## Requirements + +### Requirement: Pending residues reconciled from queue state + +The admission controller's `pending_residues` accounting SHALL be derived from the actual queue state, not solely from an event-driven increment/decrement counter. The accounting leader SHALL periodically recompute `pending_residues` as the sum of residues over all jobs currently in the `waiting`, `active`, and `waiting-children` states across the prediction and embedding queues, and SHALL write that computed value as the authoritative `shedding:pending_residues` in Redis. Reconciliation SHALL run on the existing cleanup leader sweep. + +#### Scenario: Counter self-heals after a missed completion event + +- **WHEN** a job terminates but its completion event is missed by the accounting subscriber (e.g. leader handoff, event burst, deduped child, or evicted job), leaving `pending_residues` higher than the real queued work +- **THEN** the next leader-sweep reconciliation recomputes `pending_residues` from the live queue state and overwrites the stale value, so the leak does not persist across sweeps + +#### Scenario: Counter returns to zero when the system drains + +- **WHEN** all prediction and embedding queues have no jobs in `waiting`, `active`, or `waiting-children` +- **THEN** the next reconciliation sets `pending_residues` to zero + +#### Scenario: Prediction-flow residues are accounted route-agnostically + +- **WHEN** prediction jobs and their embedding children are queued +- **THEN** reconciliation includes their residues regardless of which queue emits completion events, so prediction-side work is not undercounted + +### Requirement: Event-driven counter is a fast-path hint + +The system SHALL retain the existing `incrementPending` (at admission) and `decrementPending` (on terminal events) operations as a between-sweeps fast path so admission decisions reflect recent activity without waiting for the next reconciliation. These operations SHALL NOT be the source of truth; the leader-sweep reconciliation value SHALL take precedence whenever it runs. + +#### Scenario: Admission reflects recent submissions before the next sweep + +- **WHEN** a request is admitted and its residues are added via `incrementPending` +- **THEN** subsequent admission decisions before the next reconciliation account for those residues + +#### Scenario: Reconciliation overwrites accumulated event drift + +- **WHEN** the event-driven counter has drifted from the true queued residues between sweeps +- **THEN** the reconciliation pass replaces the drifted value rather than adjusting it incrementally + +### Requirement: Accounting drift is observable + +The monitoring rules SHALL include an alert that detects a stuck `pending_residues` counter. The alert SHALL fire when `shedding_pending_residues` remains above zero for a sustained period while the queues are idle (no `waiting` or `active` jobs), indicating accounting drift that reconciliation has not cleared. The alert rule SHALL pass `promtool` validation. + +#### Scenario: Leak detector fires on a stuck counter + +- **WHEN** `shedding_pending_residues` stays above zero for the configured duration while no jobs are `waiting` or `active` +- **THEN** the leak-detector alert fires + +#### Scenario: No false alarm under genuine load + +- **WHEN** `shedding_pending_residues` is above zero because jobs are genuinely `waiting` or `active` +- **THEN** the leak-detector alert does not fire + +### Requirement: Wait estimate uses an aggregate, concurrency-aware throughput + +The throughput used in the wait estimate (`estimated_wait = pending / throughput`) SHALL represent the aggregate rate at which residues actually leave the queues across all workers and processing slots — not the processing rate of a single job. A single job's service time SHALL NOT be used directly as the system throughput. + +#### Scenario: Throughput reflects concurrent draining, not one job + +- **WHEN** multiple jobs are processed concurrently across worker slots +- **THEN** the recorded throughput reflects the combined rate at which residues drain from the queues, so the wait estimate is not inflated by the concurrency factor + +#### Scenario: Throughput tracks observed drain after a load test + +- **WHEN** a load test drives sustained traffic and then drains +- **THEN** the recorded residues-per-second reflects the realistic aggregate drain rate and `estimated_wait` tracks observed end-to-end latency rather than pinning at an inflated value + +### Requirement: Wait estimate inputs share consistent residue units + +The `pending` and `throughput` inputs to the wait estimate SHALL be expressed in the same residue units and SHALL both account for work across the prediction and embedding queues, so that `estimated_wait` is dimensionally meaningful. Throughput SHALL NOT be derived from a single queue while pending spans both. + +#### Scenario: Both pipelines contribute to throughput + +- **WHEN** both prediction-flow and embedding work are draining +- **THEN** the measured throughput accounts for residues leaving both queues, matching the both-queue residue total used for `pending` + +### Requirement: Throughput is robust to missed completion events + +The throughput measurement SHALL be derived from signals that do not depend on observing every BullMQ terminal event (which has no replay), so a missed completion event does not corrupt the wait estimate. The measurement SHALL be performed by the accounting leader. + +#### Scenario: Throughput is unaffected by a missed terminal event + +- **WHEN** a terminal completion event is missed by the accounting subscriber +- **THEN** the throughput value used for admission is still computed correctly from reliable accounting (e.g. reconciled pending and synchronously-counted admissions), not understated by the missed event diff --git a/services/api-gateway/scripts/ops-key.int.test.ts b/services/api-gateway/scripts/ops-key.int.test.ts index 88d05b8..1b1e8e0 100644 --- a/services/api-gateway/scripts/ops-key.int.test.ts +++ b/services/api-gateway/scripts/ops-key.int.test.ts @@ -55,11 +55,14 @@ beforeAll(async () => { const port = container.getMappedPort(5432) databaseUrl = `postgres://${DB_USER}:${DB_PASSWORD}@${host}:${String(port)}/${DB_NAME}` - // createAuth() reads DATABASE_URL lazily at call time; our runCreate etc. - // build their own pg Pool from the same env var. + // runCreate/runRotate lazily load ops-key config from process.env (the + // narrow auth/cors/database loader), so seed every field that loader reads. process.env['DATABASE_URL'] = databaseUrl process.env['BETTER_AUTH_SECRET'] = 'test-secret-not-for-production' process.env['BETTER_AUTH_BASE_URL'] = 'http://localhost:9090' + process.env['GITHUB_CLIENT_ID'] = TEST_ENV['GITHUB_CLIENT_ID'] + process.env['GITHUB_CLIENT_SECRET'] = TEST_ENV['GITHUB_CLIENT_SECRET'] + process.env['CORS_ORIGINS'] = TEST_ENV['CORS_ORIGINS'] const migrationPool = new Pool({ connectionString: databaseUrl }) try { diff --git a/services/api-gateway/scripts/ops-key.ts b/services/api-gateway/scripts/ops-key.ts index 059cf4b..8177969 100644 --- a/services/api-gateway/scripts/ops-key.ts +++ b/services/api-gateway/scripts/ops-key.ts @@ -22,7 +22,7 @@ import pino from 'pino' import { createAuth } from '../src/auth/index.ts' import type { Auth, AuthDeps } from '../src/auth/index.ts' -import { loadConfig, ProductionConfigError } from '../src/config/index.ts' +import { loadOpsKeyConfig } from '../src/config/index.ts' export const MACHINE_USER_DOMAIN = '@protifer.invalid' const LABEL_RE = /^[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$/ @@ -151,12 +151,9 @@ let cachedAuthDeps: AuthDeps | undefined function loadOpsConfig(): { pool: Pool; authDeps: AuthDeps } { let cfg try { - cfg = loadConfig() + cfg = loadOpsKeyConfig() } catch (err) { - if ( - err instanceof ConfigValidationError || - err instanceof ProductionConfigError - ) { + if (err instanceof ConfigValidationError) { throw new OpsKeyError(err.message) } throw err diff --git a/services/api-gateway/src/app.ts b/services/api-gateway/src/app.ts index 8ab7cdf..5916f55 100644 --- a/services/api-gateway/src/app.ts +++ b/services/api-gateway/src/app.ts @@ -48,7 +48,11 @@ import { createPrometheusFlagHook, createSentryFlagHook, } from './flags/hooks.ts' -import { createMetrics, startQueueDepthPolling } from './metrics.ts' +import { + createMetrics, + startQueueDepthPolling, + startSheddingStatePolling, +} from './metrics.ts' import { createAdminRoleMiddleware } from './middleware/admin-role.ts' import { createAuthenticateMiddleware } from './middleware/auth/index.ts' import { createMetricsMiddleware } from './middleware/metrics.ts' @@ -223,6 +227,8 @@ export function createApp(overrides?: { metrics.bullmqQueueJobs, ) + const sheddingStatePoller = startSheddingStatePolling(sheddingState, metrics) + // Shared QueueEvents instances — job cleanup and pipeline metrics both // listen; the app owns their lifecycle (closed in `close()`). const predictionQueueEvents = new QueueEvents(QUEUE_NAMES.PREDICTION, { @@ -261,6 +267,7 @@ export function createApp(overrides?: { metrics, thresholdMs: config.jobCleanup.staleChildrenThresholdMs, }), + sheddingState, intervalMs: config.jobCleanup.reconcileIntervalMs, lockTtlMs: config.jobCleanup.lockTtlMs, }) @@ -286,6 +293,11 @@ export function createApp(overrides?: { const rateLimitConnection = connection as unknown as Redis const submissionRL = createSubmissionRateLimiter({ connection: rateLimitConnection, + submissionsPerMinute: { + free: config.rateLimit.submissionsFree, + pro: config.rateLimit.submissionsPro, + enterprise: config.rateLimit.submissionsEnterprise, + }, }) const pollRL = createPollRateLimiter({ connection: rateLimitConnection }) @@ -608,6 +620,7 @@ fetch('/api/auth/sign-in/social',{method:'POST',credentials:'include',headers:{' // connections, then DB pools last (BullMQ and HTTP may still use them // during drain). queueDepthPoller.stop() + sheddingStatePoller.stop() await sheddingEventHandle.close() await cleanupHandle.close() await predictionQueueEvents.close() diff --git a/services/api-gateway/src/cleanup.test.ts b/services/api-gateway/src/cleanup.test.ts index 8b96518..a652d06 100644 --- a/services/api-gateway/src/cleanup.test.ts +++ b/services/api-gateway/src/cleanup.test.ts @@ -7,11 +7,16 @@ import { releaseLeaderLockIfOwner, runReconcileSweep, setupJobCleanup, + sumQueueResidues, trackJob, RECONCILE_LOCK_KEY, } from './cleanup.ts' import { createMetrics } from './metrics.ts' +function queueWithJobs(jobs: { data: unknown }[]) { + return { getJobs: vi.fn().mockResolvedValue(jobs) } as never +} + function createMockEvents() { return new EventEmitter() } @@ -83,6 +88,7 @@ const mockLogger = { function createMockQueue() { return { getJob: vi.fn().mockResolvedValue(null), + getJobs: vi.fn().mockResolvedValue([]), } } @@ -592,6 +598,137 @@ describe('periodic reconciliation loop', () => { }) }) +describe('sumQueueResidues', () => { + it('counts prediction parent + embedding child route-agnostically', async () => { + // Same flow: prediction parent sits in waiting-children, its embedding + // child runs in active — both contribute their sequence length. + const pq = queueWithJobs([{ data: { sequence: 'AAAAA' } }]) // parent: 5 + const eq = queueWithJobs([{ data: { sequence: 'AAAAA' } }]) // child: 5 + expect(await sumQueueResidues([pq, eq])).toBe(10) + }) + + it('reconciles to zero when queues are drained', async () => { + expect(await sumQueueResidues([queueWithJobs([]), queueWithJobs([])])).toBe( + 0, + ) + }) + + it('requests waiting, active, and waiting-children states', async () => { + const pq = queueWithJobs([]) + await sumQueueResidues([pq]) + expect( + (pq as unknown as { getJobs: ReturnType }).getJobs, + ).toHaveBeenCalledWith(['waiting', 'active', 'waiting-children']) + }) + + it('ignores jobs without sequence data', async () => { + const pq = queueWithJobs([ + { data: undefined }, + { data: { sequence: 'ABC' } }, + ]) + expect(await sumQueueResidues([pq])).toBe(3) + }) +}) + +describe('shedding reconciliation on the leader sweep', () => { + let redis: ReturnType + let pq: ReturnType + let eq: ReturnType + + beforeEach(() => { + vi.clearAllMocks() + redis = createMockRedis() + pq = createMockQueue() + eq = createMockQueue() + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + function setupWithState(sheddingState: unknown) { + return setupJobCleanup({ + redis, + logger: mockLogger, + predictionEvents: createMockEvents(), + embeddingEvents: createMockEvents(), + predictionQueue: pq as never, + embeddingQueue: eq as never, + sheddingState: sheddingState as never, + intervalMs: 1000, + }) + } + + it('overwrites a stale counter with the true summed residues', async () => { + const setPending = vi.fn().mockResolvedValue(8) + const sampleThroughput = vi.fn().mockResolvedValue(null) + pq.getJobs.mockResolvedValue([{ data: { sequence: 'AAAAA' } }]) // 5 + eq.getJobs.mockResolvedValue([{ data: { sequence: 'AAA' } }]) // 3 + + const handle = setupWithState({ setPending, sampleThroughput }) + await vi.advanceTimersByTimeAsync(1000) + + // Absolute set to the true sum — independent of any prior drifted value. + expect(setPending).toHaveBeenCalledWith(8) + + vi.useRealTimers() + await handle.close() + vi.useFakeTimers() + }) + + it('samples throughput on the sweep (sweep-derived, not event-derived)', async () => { + const setPending = vi.fn().mockResolvedValue(5) + const sampleThroughput = vi.fn().mockResolvedValue(400) + pq.getJobs.mockResolvedValue([{ data: { sequence: 'AAAAA' } }]) // 5 + + const handle = setupWithState({ setPending, sampleThroughput }) + await vi.advanceTimersByTimeAsync(1000) + + // No terminal events fired, yet throughput is still sampled — so a missed + // completion event cannot corrupt the estimate. + expect(sampleThroughput).toHaveBeenCalledWith(5) + + vi.useRealTimers() + await handle.close() + vi.useFakeTimers() + }) + + it('reconciliation failure does not fail the sweep', async () => { + const setPending = vi.fn().mockRejectedValue(new Error('redis blip')) + const sampleThroughput = vi.fn() + const logger = { info: vi.fn(), warn: vi.fn(), error: vi.fn() } + const metrics = createMetrics() + + const handle = setupJobCleanup({ + redis, + logger, + predictionEvents: createMockEvents(), + embeddingEvents: createMockEvents(), + predictionQueue: pq as never, + embeddingQueue: eq as never, + sheddingState: { setPending, sampleThroughput } as never, + metrics, + intervalMs: 1000, + }) + await vi.advanceTimersByTimeAsync(1000) + + const sweeps = await metrics.registry + .getSingleMetric('job_cleanup_sweeps_total') + ?.get() + const ran = sweeps?.values.find((v) => v.labels.outcome === 'ran') + expect(ran?.value).toBe(1) + expect(logger.warn).toHaveBeenCalledWith( + { err: expect.any(Error) as unknown }, + 'reconcile: shedding reconciliation failed', + ) + + vi.useRealTimers() + await handle.close() + vi.useFakeTimers() + }) +}) + describe('JOBS-05: cap resets after all jobs complete', () => { let redis: ReturnType let mockPredictionQueue: ReturnType diff --git a/services/api-gateway/src/cleanup.ts b/services/api-gateway/src/cleanup.ts index 0d63789..373e783 100644 --- a/services/api-gateway/src/cleanup.ts +++ b/services/api-gateway/src/cleanup.ts @@ -4,6 +4,7 @@ import type { Queue } from '@protifer/shared' import type { JobCleanupMetrics, ReconcileReason } from './metrics.ts' import type { RedisCommands } from './queue.ts' +import type { SheddingState } from './shedding/state.ts' export const ACTIVE_JOBS_KEY = (userId: string) => `active-jobs:${userId}` export const JOB_USER_MAP_KEY = 'job-user-map' @@ -48,6 +49,11 @@ interface CleanupDeps { metrics?: JobCleanupMetrics /** Observe-only stale waiting-children scan, run on the leader sweep. */ staleChildrenScan?: () => Promise + /** + * Shedding accounting. When provided, the leader sweep reconciles + * `pending_residues` from real queue state and samples the drain rate. + */ + sheddingState?: SheddingState intervalMs?: number lockTtlMs?: number instanceId?: string @@ -178,6 +184,24 @@ export async function runReconcileSweep(deps: SweepDeps): Promise { return { sweptKeys, removedEntries, removedByReason, durationMs } } +/** + * Sum per-job residues (sequence length) over jobs in `waiting`, `active`, + * and `waiting-children` across the given queues. Route-agnostic: a prediction + * flow contributes its parent (waiting-children) and its embedding child + * (active/waiting), so in-flight work is counted regardless of queue. + */ +export async function sumQueueResidues(queues: Queue[]): Promise { + let total = 0 + for (const queue of queues) { + const jobs = await queue.getJobs(['waiting', 'active', 'waiting-children']) + for (const job of jobs) { + const seq = (job.data as { sequence?: string } | undefined)?.sequence + if (typeof seq === 'string') total += seq.length + } + } + return total +} + export function setupJobCleanup(deps: CleanupDeps): CleanupHandle { const { redis, @@ -266,6 +290,18 @@ export function setupJobCleanup(deps: CleanupDeps): CleanupHandle { logger.warn({ err }, 'reconcile: stale-children scan failed') } } + if (deps.sheddingState) { + try { + const pending = await sumQueueResidues([ + predictionQueue, + embeddingQueue, + ]) + await deps.sheddingState.setPending(pending) + await deps.sheddingState.sampleThroughput(pending) + } catch (err) { + logger.warn({ err }, 'reconcile: shedding reconciliation failed') + } + } } catch (err) { logger.error({ err }, 'reconcile: sweep failed') metrics?.sweeps.inc({ outcome: 'error' }, 1) diff --git a/services/api-gateway/src/config/index.ts b/services/api-gateway/src/config/index.ts index 7d902be..8754a12 100644 --- a/services/api-gateway/src/config/index.ts +++ b/services/api-gateway/src/config/index.ts @@ -1,6 +1,7 @@ export { ConfigSchema, loadConfig, + loadOpsKeyConfig, assertProductionInvariants, ProductionConfigError, type Config, diff --git a/services/api-gateway/src/config/schema.test.ts b/services/api-gateway/src/config/schema.test.ts index 910fcdd..f46ff34 100644 --- a/services/api-gateway/src/config/schema.test.ts +++ b/services/api-gateway/src/config/schema.test.ts @@ -4,6 +4,7 @@ import { describe, it, expect } from 'vitest' import { ConfigSchema, loadConfig, + loadOpsKeyConfig, ProductionConfigError, assertProductionInvariants, } from './schema.ts' @@ -154,6 +155,19 @@ describe('loadConfig', () => { }).not.toThrow() }) + it('rate-limit submission ceilings default to PLAN_LIMITS and are env-overridable', () => { + const def = loadConfig(VALID_ENV) + expect(def.rateLimit.submissionsFree).toBe(10) + expect(def.rateLimit.submissionsPro).toBe(60) + + const over = loadConfig({ + ...VALID_ENV, + RATE_LIMIT_SUBMISSIONS_PRO: '5000', + }) + expect(over.rateLimit.submissionsPro).toBe(5000) + expect(over.rateLimit.submissionsFree).toBe(10) + }) + it('describes every field with kind metadata', () => { const docs = ConfigSchema.describe() expect(docs.length).toBeGreaterThan(15) @@ -164,3 +178,28 @@ describe('loadConfig', () => { expect(port?.hasDefault).toBe(true) }) }) + +describe('loadOpsKeyConfig', () => { + const OPS_ENV = { + BETTER_AUTH_SECRET: '0123456789abcdef', + BETTER_AUTH_BASE_URL: 'http://localhost:9090', + GITHUB_CLIENT_ID: 'gh-id', + GITHUB_CLIENT_SECRET: 'gh-secret', + CORS_ORIGINS: 'http://localhost:5173', + DATABASE_URL: 'postgresql://localhost:5432/db', + } + + it('loads auth/cors/database without the gateway infra config', () => { + // No GARAGE_*/REDIS_*/TRITON_* — ops-key must not require them. + const cfg = loadOpsKeyConfig(OPS_ENV) + expect(cfg.database.url).toBe('postgresql://localhost:5432/db') + expect(cfg.auth.githubClientId).toBe('gh-id') + expect(cfg.cors.origins).toEqual(['http://localhost:5173']) + }) + + it('still reports missing auth fields', () => { + expect(() => + loadOpsKeyConfig({ DATABASE_URL: 'postgresql://x/y' }), + ).toThrow(ConfigValidationError) + }) +}) diff --git a/services/api-gateway/src/config/schema.ts b/services/api-gateway/src/config/schema.ts index a93f41e..6415259 100644 --- a/services/api-gateway/src/config/schema.ts +++ b/services/api-gateway/src/config/schema.ts @@ -6,6 +6,7 @@ import { customSection, defineConfig, loadSheddingConfig, + PLAN_LIMITS, secretField, zCsv, } from '@protifer/shared' @@ -162,6 +163,28 @@ const storage = { }), } +const rateLimit = { + submissionsFree: configField({ + envName: 'RATE_LIMIT_SUBMISSIONS_FREE', + description: + 'Submission requests/min ceiling for free plan (rate limiter, distinct from shedding SLOs). Raise in load-test envs to reach the shedding controller.', + type: z.coerce.number().int().min(1), + default: PLAN_LIMITS.free.submissionsPerMinute, + }), + submissionsPro: configField({ + envName: 'RATE_LIMIT_SUBMISSIONS_PRO', + description: 'Submission requests/min ceiling for pro plan.', + type: z.coerce.number().int().min(1), + default: PLAN_LIMITS.pro.submissionsPerMinute, + }), + submissionsEnterprise: configField({ + envName: 'RATE_LIMIT_SUBMISSIONS_ENTERPRISE', + description: 'Submission requests/min ceiling for enterprise plan.', + type: z.coerce.number().int().min(1), + default: PLAN_LIMITS.enterprise.submissionsPerMinute, + }), +} + const jobCleanup = { reconcileIntervalMs: configField({ envName: 'JOB_CLEANUP_RECONCILE_INTERVAL_MS', @@ -300,6 +323,7 @@ export const ConfigSchema = defineConfig({ redis, triton, storage, + rateLimit, jobCleanup, models, shedding: sheddingSection, @@ -307,6 +331,20 @@ export const ConfigSchema = defineConfig({ export type Config = ReturnType +/** + * Narrow loader for the ops-key CLI — it mints/rotates API keys through + * better-auth and Postgres only, so it must NOT require the storage/redis/ + * triton/model config the gateway needs. Loading just these sections keeps the + * operator's required env (and secret surface) minimal. + */ +export const OpsKeyConfigSchema = defineConfig({ auth, cors, database }) + +export function loadOpsKeyConfig( + envIn: NodeJS.ProcessEnv = process.env, +): ReturnType { + return OpsKeyConfigSchema.load(envIn) +} + export class ProductionConfigError extends Error { constructor(public readonly issues: string[]) { super( diff --git a/services/api-gateway/src/metrics.test.ts b/services/api-gateway/src/metrics.test.ts index 2b272fd..0ce9a4d 100644 --- a/services/api-gateway/src/metrics.test.ts +++ b/services/api-gateway/src/metrics.test.ts @@ -1,7 +1,11 @@ import { Gauge, Registry } from 'prom-client' import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' -import { createMetrics, startQueueDepthPolling } from './metrics.ts' +import { + createMetrics, + startQueueDepthPolling, + startSheddingStatePolling, +} from './metrics.ts' describe('createMetrics', () => { it('returns a registry with all expected metrics', () => { @@ -137,3 +141,57 @@ describe('startQueueDepthPolling', () => { vi.useRealTimers() }) }) + +describe('startSheddingStatePolling', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + afterEach(() => { + vi.useRealTimers() + }) + + it('refreshes the shedding gauges from readState each tick', async () => { + const m = createMetrics() + const readState = vi + .fn() + .mockResolvedValue({ pendingResidues: 800, residuesPerSecondEwma: 400 }) + + const { stop } = startSheddingStatePolling({ readState }, m, 15_000) + await vi.advanceTimersByTimeAsync(15_000) + + expect(readState).toHaveBeenCalledOnce() + expect((await m.shedingPendingResidues.get()).values[0]?.value).toBe(800) + expect((await m.shedingResiduesPerSecond.get()).values[0]?.value).toBe(400) + // estimatedWait = pending / ewma = 800 / 400 = 2 + expect((await m.shedingEstimatedWait.get()).values[0]?.value).toBe(2) + stop() + }) + + it('drives the pending gauge to zero at idle so the leak alert clears', async () => { + const m = createMetrics() + m.shedingPendingResidues.set(10_000_000) // stale request-time value + const readState = vi + .fn() + .mockResolvedValue({ pendingResidues: 0, residuesPerSecondEwma: 1200 }) + + const { stop } = startSheddingStatePolling({ readState }, m, 15_000) + await vi.advanceTimersByTimeAsync(15_000) + + expect((await m.shedingPendingResidues.get()).values[0]?.value).toBe(0) + expect((await m.shedingEstimatedWait.get()).values[0]?.value).toBe(0) + stop() + }) + + it('swallows transient readState errors and stop() halts polling', async () => { + const m = createMetrics() + const readState = vi.fn().mockRejectedValue(new Error('Redis blip')) + + const { stop } = startSheddingStatePolling({ readState }, m, 15_000) + await expect(vi.advanceTimersByTimeAsync(15_000)).resolves.not.toThrow() + expect(readState).toHaveBeenCalledOnce() + + stop() + await vi.advanceTimersByTimeAsync(30_000) + expect(readState).toHaveBeenCalledOnce() + }) +}) diff --git a/services/api-gateway/src/metrics.ts b/services/api-gateway/src/metrics.ts index e21ed5a..d61db33 100644 --- a/services/api-gateway/src/metrics.ts +++ b/services/api-gateway/src/metrics.ts @@ -120,13 +120,13 @@ export function createMetrics(): AppMetrics { const shedingResiduesPerSecond = new Gauge({ name: 'shedding_residues_per_second', - help: 'Current EWMA of embedding throughput (residues per second).', + help: 'Current EWMA of aggregate pipeline drain rate (residues per second) across both queues.', registers: [registry], }) const shedingPendingResidues = new Gauge({ name: 'shedding_pending_residues', - help: 'Current pending-residues counter tracking in-flight embedding work.', + help: 'Current pending-residues across both queues (waiting + active + waiting-children), reconciled by the leader sweep.', registers: [registry], }) @@ -267,3 +267,52 @@ export function startQueueDepthPolling( }, } } + +interface SheddingStateReader { + readState: () => Promise<{ + pendingResidues: number + residuesPerSecondEwma: number + }> +} + +interface SheddingGauges { + shedingPendingResidues: Gauge + shedingResiduesPerSecond: Gauge + shedingEstimatedWait: Gauge +} + +// The shedding gauges are otherwise only written by the request middleware, so +// at idle they freeze at their last request-time value — which would make the +// pending-residue leak alert fire on stale data. Refresh them per-instance from +// the reconciled Redis state on a timer, mirroring queue-depth polling. +export function startSheddingStatePolling( + state: SheddingStateReader, + gauges: SheddingGauges, + intervalMs = 15_000, +): { stop: () => void } { + async function poll() { + try { + const snap = await state.readState() + const throughput = + snap.residuesPerSecondEwma > 0 ? snap.residuesPerSecondEwma : 1 + gauges.shedingPendingResidues.set(snap.pendingResidues) + gauges.shedingResiduesPerSecond.set(snap.residuesPerSecondEwma) + gauges.shedingEstimatedWait.set(snap.pendingResidues / throughput) + } catch { + // transient Redis blip — skip this tick + } + } + + const timer = setInterval(() => { + void poll() + }, intervalMs) + if (typeof (timer as { unref?: () => void }).unref === 'function') { + ;(timer as { unref: () => void }).unref() + } + + return { + stop: () => { + clearInterval(timer) + }, + } +} diff --git a/services/api-gateway/src/middleware/rate-limit.test.ts b/services/api-gateway/src/middleware/rate-limit.test.ts index 9267d65..64ac98f 100644 --- a/services/api-gateway/src/middleware/rate-limit.test.ts +++ b/services/api-gateway/src/middleware/rate-limit.test.ts @@ -129,6 +129,18 @@ describe('createSubmissionRateLimiter', () => { ) }) + it('honors a configured per-plan submissions ceiling', async () => { + const app = makeAuthApp( + 'free', + createSubmissionRateLimiter({ + connection, + submissionsPerMinute: { free: 999, pro: 60, enterprise: 300 }, + }), + ) + const res = await app.request('/test') + expect(res.headers.get('ratelimit-policy')).toContain('999') + }) + it('shares counter across two limiter instances on the same connection', async () => { const limiterA = createSubmissionRateLimiter({ connection }) const limiterB = createSubmissionRateLimiter({ connection }) diff --git a/services/api-gateway/src/middleware/rate-limit.ts b/services/api-gateway/src/middleware/rate-limit.ts index 7e062f5..197a984 100644 --- a/services/api-gateway/src/middleware/rate-limit.ts +++ b/services/api-gateway/src/middleware/rate-limit.ts @@ -1,4 +1,4 @@ -import type { Redis } from '@protifer/shared' +import type { Plan, Redis } from '@protifer/shared' import { PLAN_LIMITS } from '@protifer/shared' import type { Store } from 'hono-rate-limiter' import { rateLimiter } from 'hono-rate-limiter' @@ -36,12 +36,22 @@ export interface RateLimitDeps { connection: Redis } -export function createSubmissionRateLimiter({ connection }: RateLimitDeps) { +export interface SubmissionRateLimitDeps extends RateLimitDeps { + /** Per-plan submissions/min ceiling. Defaults to PLAN_LIMITS. */ + submissionsPerMinute?: Record +} + +export function createSubmissionRateLimiter({ + connection, + submissionsPerMinute, +}: SubmissionRateLimitDeps) { return rateLimiter<{ Variables: Variables }>({ windowMs: 60 * 1000, limit: (c) => { const plan = c.get('auth').plan - return PLAN_LIMITS[plan].submissionsPerMinute + return ( + submissionsPerMinute?.[plan] ?? PLAN_LIMITS[plan].submissionsPerMinute + ) }, keyGenerator: (c) => c.get('auth').sub, store: makeRedisStore(connection, 'rl:submit:'), diff --git a/services/api-gateway/src/middleware/shedding.ts b/services/api-gateway/src/middleware/shedding.ts index fe48050..9073f6a 100644 --- a/services/api-gateway/src/middleware/shedding.ts +++ b/services/api-gateway/src/middleware/shedding.ts @@ -180,6 +180,7 @@ export function createSheddingMiddleware(deps: SheddingMiddlewareDeps) { // shadow mode: admit despite decision try { await state.incrementPending(residues) + await state.incrAdmitted(residues) } catch (err) { logger.warn({ err }, 'shedding: incrementPending failed (shadow)') } @@ -193,6 +194,7 @@ export function createSheddingMiddleware(deps: SheddingMiddlewareDeps) { ) try { await state.incrementPending(residues) + await state.incrAdmitted(residues) } catch (err) { logger.warn({ err }, 'shedding: incrementPending failed') } diff --git a/services/api-gateway/src/shedding/decide.test.ts b/services/api-gateway/src/shedding/decide.test.ts index c815d6a..e6ee8ad 100644 --- a/services/api-gateway/src/shedding/decide.test.ts +++ b/services/api-gateway/src/shedding/decide.test.ts @@ -1,4 +1,5 @@ import { loadSheddingConfig } from '@protifer/shared' +import type { Plan } from '@protifer/shared' import { describe, it, expect } from 'vitest' import { decideAdmission } from './decide.ts' @@ -182,6 +183,55 @@ describe('decideAdmission', () => { expect(decision.retryAfterSeconds).toBeGreaterThanOrEqual(0) }) + // Deep, sustained overload — the regime the gateway rate limiter masks from + // load tests, so it is covered here at the pure-decision layer instead. + describe('deep overload (calibration curve)', () => { + const throughput = 1000 // residues/sec + const cfg = loadSheddingConfig({ + SHED_SLO_PRO_SECONDS: '60', + SHED_RETRY_JITTER_FRACTION: '0', + }) + const at = (pendingResidues: number, plan: Plan, c = cfg) => + decideAdmission({ + state: { + pendingResidues, + residuesPerSecondEwma: throughput, + lastCompletionTimestampMs: 1_000_000, + }, + config: c, + plan, + sequenceResidues: 0, + nowMs: 1_000_000, + jitter: fixedJitter, + }) + + it('admits exactly at the SLO boundary and sheds just past it', () => { + // wait = pending / throughput; SLO 60s → boundary at 60_000 residues + expect(at(60_000, 'pro').admit).toBe(true) // 60s == SLO, not > SLO + const past = at(60_001, 'pro') + expect(past.admit).toBe(false) + expect(past.code).toBe('OVERLOADED') + }) + + it('retry-after grows with the depth of overload', () => { + const [shallow, mid, deep] = [120_000, 600_000, 3_000_000].map( + (p) => at(p, 'pro').retryAfterSeconds ?? 0, + ) + expect(shallow).toBe(120) // jitter off → ceil(estimated wait) + expect(shallow).toBeLessThan(mid ?? 0) + expect(mid).toBeLessThan(deep ?? 0) + }) + + it('sustained deep overload still admits enterprise (SLO=0) while shedding pro', () => { + const c = loadSheddingConfig({ + SHED_SLO_ENTERPRISE_SECONDS: '0', + SHED_SLO_PRO_SECONDS: '60', + }) + expect(at(10_000_000, 'enterprise', c).admit).toBe(true) + expect(at(10_000_000, 'pro', c).admit).toBe(false) + }) + }) + it('estimated wait uses initial throughput when EWMA is 0', () => { const cfg = loadSheddingConfig({ SHED_INITIAL_RESIDUES_PER_SECOND: '2000', diff --git a/services/api-gateway/src/shedding/event-subscriber.test.ts b/services/api-gateway/src/shedding/event-subscriber.test.ts index e6dcad0..fbd4e59 100644 --- a/services/api-gateway/src/shedding/event-subscriber.test.ts +++ b/services/api-gateway/src/shedding/event-subscriber.test.ts @@ -147,7 +147,7 @@ describe('startEventSubscriber', () => { expect(events.close).toHaveBeenCalled() }) - it('on completed: decrements pending residues and records throughput', async () => { + it('on completed: decrements pending as a hint, does not touch throughput', async () => { const job = makeJob({ sequence: 'AAAA', processedOn: 1000, @@ -157,7 +157,9 @@ describe('startEventSubscriber', () => { getJob: vi.fn().mockResolvedValue(job), } as unknown as Parameters[0]['embeddingQueue'] - const config = loadSheddingConfig({ SHED_ALPHA: '1' }) + const config = loadSheddingConfig({ + SHED_INITIAL_RESIDUES_PER_SECOND: '900', + }) const state = createShedingState({ redis, config }) await state.incrementPending(10) const events = makeFakeEvents() @@ -181,9 +183,50 @@ describe('startEventSubscriber', () => { const snap = await state.readState() // Started at 10, decrement 4 → 6 expect(snap.pendingResidues).toBe(6) - // sample = 4 residues / 4s = 1 r/s, alpha=1 → ewma=1 - expect(snap.residuesPerSecondEwma).toBeCloseTo(1) + // Throughput is now sweep-derived; the event must not feed the EWMA. + expect(snap.residuesPerSecondEwma).toBe(900) + + await sub.close() + }) + + it('on completed: stamps the completion liveness timestamp', async () => { + const job = makeJob({ sequence: 'AAAA' }) + const queue = { + getJob: vi.fn().mockResolvedValue(job), + } as unknown as Parameters[0]['embeddingQueue'] + + const state = createShedingState({ redis, config: loadSheddingConfig({}) }) + const events = makeFakeEvents() + const sub = startEventSubscriber({ + redis, + connection: {} as never, + embeddingQueue: queue, + state, + logger: mockLogger, + instanceId: 'only', + lockTtlMs: 5_000, + renewIntervalMs: 1_000_000, + queueEventsFactory: () => events, + }) + await new Promise((r) => setTimeout(r, 30)) + expect((await state.readState()).lastCompletionTimestampMs).toBeNull() + + events.emit('completed', { jobId: 'j1' }) + await new Promise((r) => setTimeout(r, 20)) + expect((await state.readState()).lastCompletionTimestampMs).not.toBeNull() await sub.close() }) + + it('sweep reconciliation overrides accumulated event drift', async () => { + // A missed terminal event leaves the fast-path counter inflated; the + // leader sweep's setPending recompute overwrites it wholesale. + const config = loadSheddingConfig({}) + const state = createShedingState({ redis, config }) + await state.incrementPending(400_000) // leaked from missed decrements + expect((await state.readState()).pendingResidues).toBe(400_000) + + await state.setPending(120) // reconciled from real queue state + expect((await state.readState()).pendingResidues).toBe(120) + }) }) diff --git a/services/api-gateway/src/shedding/event-subscriber.ts b/services/api-gateway/src/shedding/event-subscriber.ts index 7005fdb..6ce0f28 100644 --- a/services/api-gateway/src/shedding/event-subscriber.ts +++ b/services/api-gateway/src/shedding/event-subscriber.ts @@ -76,29 +76,24 @@ export function startEventSubscriber( let events: QueueEventsType | null = null let renewTimer: ReturnType | null = null + // Stamps completion-liveness (for the staleness guard) and applies the + // fast-path pending decrement. Throughput is derived from the leader sweep's + // drain-rate sampler, so a missed terminal event here self-heals at the next + // reconciliation instead of corrupting the estimate. async function handleTerminal(jobId: string): Promise { if (!leader) return try { - const job = await embeddingQueue.getJob(jobId) + // Independent ops — stamp liveness while the job is fetched (one fewer RTT). + const [, job] = await Promise.all([ + state.recordCompletion(), + embeddingQueue.getJob(jobId), + ]) if (!job) return const data = job.data as EmbeddingJobData | undefined const residues = data?.sequence.length ?? 0 if (residues <= 0) return - const finishedOn = - (job as unknown as { finishedOn?: number }).finishedOn ?? null - const processedOn = - (job as unknown as { processedOn?: number }).processedOn ?? null - const durationMs = - finishedOn !== null && processedOn !== null && finishedOn > processedOn - ? finishedOn - processedOn - : null - const durationSeconds = durationMs !== null ? durationMs / 1000 : null - await state.decrementPending(residues) - if (durationSeconds !== null && durationSeconds > 0) { - await state.recordCompletion(residues, durationSeconds) - } } catch (err) { logger.warn({ err, jobId }, 'shedding: terminal event handler failed') } diff --git a/services/api-gateway/src/shedding/state.test.ts b/services/api-gateway/src/shedding/state.test.ts index d3b8da6..8ed8347 100644 --- a/services/api-gateway/src/shedding/state.test.ts +++ b/services/api-gateway/src/shedding/state.test.ts @@ -5,6 +5,7 @@ import { describe, it, expect, beforeEach } from 'vitest' import { PENDING_RESIDUES_KEY, THROUGHPUT_KEY, + ADMITTED_RESIDUES_KEY, createShedingState, } from './state.ts' import type { SheddingRedis } from './state.ts' @@ -56,49 +57,118 @@ describe('createShedingState', () => { expect(snap.lastCompletionTimestampMs).toBeNull() }) - it('recordCompletion updates EWMA with α * sample + (1 − α) * prev', async () => { - const config = loadSheddingConfig({ - SHED_ALPHA: '0.5', - SHED_INITIAL_RESIDUES_PER_SECOND: '100', + it('setPending overwrites drifted value (absolute, not additive)', async () => { + const state = createShedingState({ + redis, + config: loadSheddingConfig({}), + }) + await state.incrementPending(999) // drift accumulated between sweeps + const v = await state.setPending(350) + expect(v).toBe(350) + const snap = await state.readState() + expect(snap.pendingResidues).toBe(350) + }) + + it('setPending clamps negative input to zero', async () => { + const state = createShedingState({ + redis, + config: loadSheddingConfig({}), }) + expect(await state.setPending(-50)).toBe(0) + const snap = await state.readState() + expect(snap.pendingResidues).toBe(0) + }) + + it('sampleThroughput drain rate = (arrivals − Δpending) / Δt', async () => { + let t = 1_000_000 const state = createShedingState({ redis, - config, - clock: { now: () => 1_000_000 }, + config: loadSheddingConfig({ SHED_ALPHA: '1' }), // ewma = sample + clock: { now: () => t }, }) - // first sample seeds EWMA (prev missing → new = sample) - const first = await state.recordCompletion(200, 2) // 100 r/s - expect(first).toBeCloseTo(100) + // first sweep: no prior snapshot → null, records snapshot (admitted/pending=0) + expect(await state.sampleThroughput(0)).toBeNull() + + // between sweeps: 1000 admitted, 800 drained so pending now 200 + await state.incrAdmitted(1000) + t += 2000 // Δt = 2s + const rate = await state.sampleThroughput(200) + // departures = max(0, 1000 − 200) = 800 → 800 / 2s = 400 r/s, aggregate + expect(rate).toBeCloseTo(400) + const snap = await state.readState() + expect(snap.residuesPerSecondEwma).toBeCloseTo(400) + }) + + it('sampleThroughput clamps negative departures to zero', async () => { + let t = 0 + const state = createShedingState({ + redis, + config: loadSheddingConfig({ SHED_ALPHA: '1' }), + clock: { now: () => t }, + }) + await state.sampleThroughput(0) // snapshot pending=0, admitted=0 + // pending rose with no arrivals (snapshot-timing skew) → departures clamped + t += 1000 + expect(await state.sampleThroughput(500)).toBe(0) + }) - const second = await state.recordCompletion(400, 2) // 200 r/s - expect(second).toBeCloseTo(150) // 0.5 * 200 + 0.5 * 100 + it('sampleThroughput returns null on first sweep (no prior snapshot)', async () => { + const state = createShedingState({ + redis, + config: loadSheddingConfig({}), + clock: { now: () => 5 }, + }) + expect(await state.sampleThroughput(0)).toBeNull() }) - it('recordCompletion ignores zero or negative inputs', async () => { + it('sampleThroughput skips the EWMA update when Δt ≈ 0', async () => { const state = createShedingState({ redis, config: loadSheddingConfig({}), + clock: { now: () => 5 }, // constant clock → Δt = 0 + }) + await state.sampleThroughput(0) + await state.incrAdmitted(100) + expect(await state.sampleThroughput(50)).toBeNull() + expect(await redis.hget(THROUGHPUT_KEY, 'value')).toBeNull() + }) + + it('cold start falls back to initialResiduesPerSecond until a valid sample', async () => { + const state = createShedingState({ + redis, + config: loadSheddingConfig({ SHED_INITIAL_RESIDUES_PER_SECOND: '1500' }), + clock: { now: () => 1 }, }) - const r1 = await state.recordCompletion(0, 1) - const r2 = await state.recordCompletion(10, 0) - const r3 = await state.recordCompletion(10, -1) - expect(r1).toBe(0) - expect(r2).toBe(0) - expect(r3).toBe(0) + await state.sampleThroughput(0) // null, no EWMA write yet const snap = await state.readState() - expect(snap.lastCompletionTimestampMs).toBeNull() + expect(snap.residuesPerSecondEwma).toBe(1500) }) - it('readState exposes last-sample timestamp after a completion', async () => { + it('recordCompletion stamps a liveness timestamp read back by readState', async () => { const state = createShedingState({ redis, config: loadSheddingConfig({}), - clock: { now: () => 42_000 }, + clock: { now: () => 7_000 }, }) - await state.recordCompletion(1000, 1) - const snap = await state.readState() - expect(snap.lastCompletionTimestampMs).toBe(42_000) + expect((await state.readState()).lastCompletionTimestampMs).toBeNull() + await state.recordCompletion() + expect((await state.readState()).lastCompletionTimestampMs).toBe(7_000) + }) + + it('sampleThroughput does not advance the completion liveness timestamp', async () => { + let t = 1_000 + const state = createShedingState({ + redis, + config: loadSheddingConfig({ SHED_ALPHA: '1' }), + clock: { now: () => t }, + }) + await state.sampleThroughput(0) + await state.incrAdmitted(1000) + t += 2000 + await state.sampleThroughput(200) // writes the sampler's last_sample_ms + // Staleness liveness is completion-driven; the sweep sample must not feed it. + expect((await state.readState()).lastCompletionTimestampMs).toBeNull() }) it('uses the documented Redis keys', async () => { @@ -107,15 +177,11 @@ describe('createShedingState', () => { config: loadSheddingConfig({}), }) await state.incrementPending(10) - const raw = await ( - redis as unknown as { - get: (k: string) => Promise - } - ).get(PENDING_RESIDUES_KEY) - expect(raw).toBe('10') - - await state.recordCompletion(20, 2) - const ewmaRaw = await redis.hget(THROUGHPUT_KEY, 'value') - expect(ewmaRaw).not.toBeNull() + await state.incrAdmitted(40) + const get = ( + redis as unknown as { get: (k: string) => Promise } + ).get.bind(redis) + expect(await get(PENDING_RESIDUES_KEY)).toBe('10') + expect(await get(ADMITTED_RESIDUES_KEY)).toBe('40') }) }) diff --git a/services/api-gateway/src/shedding/state.ts b/services/api-gateway/src/shedding/state.ts index b5401cb..ffa8efd 100644 --- a/services/api-gateway/src/shedding/state.ts +++ b/services/api-gateway/src/shedding/state.ts @@ -3,7 +3,9 @@ import type { SheddingConfig } from '@protifer/shared' export const PENDING_RESIDUES_KEY = 'shedding:pending_residues' export const THROUGHPUT_KEY = 'shedding:throughput_ewma' export const EWMA_FIELD = 'value' -export const EWMA_TIMESTAMP_FIELD = 'last_sample_ms' +export const ADMITTED_RESIDUES_KEY = 'shedding:admitted_residues_total' +export const DRAIN_SNAPSHOT_KEY = 'shedding:drain_snapshot' +export const LAST_COMPLETION_KEY = 'shedding:last_completion_ms' export interface SheddingStateSnapshot { pendingResidues: number @@ -19,6 +21,7 @@ export interface SheddingStateSnapshot { export interface SheddingRedis { incrby(key: string, amount: number): Promise decrby(key: string, amount: number): Promise + set(key: string, value: string): Promise get(key: string): Promise hget(key: string, field: string): Promise hset(key: string, ...values: string[]): Promise @@ -66,6 +69,9 @@ export function createShedingState(deps: StateDeps) { const { redis, config } = deps const now = deps.clock?.now ?? Date.now + // increment/decrementPending are a between-sweeps fast-path hint only. + // The leader sweep's `setPending` reconciliation is authoritative — drift + // accumulated here (e.g. a missed terminal event) is overwritten each sweep. async function incrementPending(residues: number): Promise { if (residues <= 0) return 0 return redis.incrby(PENDING_RESIDUES_KEY, residues) @@ -76,37 +82,91 @@ export function createShedingState(deps: StateDeps) { return redis.decrby(PENDING_RESIDUES_KEY, residues) } - async function recordCompletion( - residues: number, - durationSeconds: number, - ): Promise { - if (!(residues > 0) || !(durationSeconds > 0)) return 0 - const sample = residues / durationSeconds - const result = await redis.eval( + // Authoritative absolute write of the reconciled pending total (clamped + // >= 0). Overwrites, not adjusts — discarding accumulated event drift. + async function setPending(residues: number): Promise { + const value = Math.max(0, Math.floor(residues)) + await redis.set(PENDING_RESIDUES_KEY, String(value)) + return value + } + + async function incrAdmitted(residues: number): Promise { + if (residues <= 0) return 0 + return redis.incrby(ADMITTED_RESIDUES_KEY, residues) + } + + // Stamp upstream liveness on each terminal event. The staleness guard reads + // this — NOT the throughput sampler's `last_sample_ms`, which only refreshes + // at sweep cadence and would make staleness false-fire between sweeps. + async function recordCompletion(): Promise { + await redis.set(LAST_COMPLETION_KEY, String(now())) + } + + /** + * Sweep-driven aggregate drain-rate sample. By flow conservation: + * departures = max(0, arrivals − Δpending) + * drain_rate = departures / Δt_seconds (residues/sec, concurrency-aware) + * `arrivals` is the monotonic admitted counter's delta since the prior + * sweep; `Δpending` uses the just-reconciled `pendingNow`. Feeds the EWMA + * and snapshots admitted/pending/timestamp for the next interval. Skips the + * EWMA update (returns null) when no prior snapshot exists or Δt ≈ 0. + */ + async function sampleThroughput(pendingNow: number): Promise { + const nowMs = now() + const [admittedRaw, snap] = await Promise.all([ + redis.get(ADMITTED_RESIDUES_KEY), + redis.hmget(DRAIN_SNAPSHOT_KEY, 'admitted', 'pending', 'ts'), + ]) + const admittedTotal = admittedRaw === null ? 0 : Number(admittedRaw) + const [prevAdmittedRaw, prevPendingRaw, prevTsRaw] = snap + + await redis.hset( + DRAIN_SNAPSHOT_KEY, + 'admitted', + String(admittedTotal), + 'pending', + String(pendingNow), + 'ts', + String(nowMs), + ) + + if ( + prevAdmittedRaw === null || + prevPendingRaw === null || + prevTsRaw === null + ) { + return null + } + const prevTs = Number(prevTsRaw) + const dtSeconds = (nowMs - prevTs) / 1000 + if (!(dtSeconds > 0)) return null + + const arrivals = admittedTotal - Number(prevAdmittedRaw) + const deltaPending = pendingNow - Number(prevPendingRaw) + const departures = Math.max(0, arrivals - deltaPending) + const drainRate = departures / dtSeconds + + await redis.eval( EWMA_UPDATE_SCRIPT, 1, THROUGHPUT_KEY, String(config.alpha), - String(sample), - String(now()), + String(drainRate), + String(nowMs), ) - if (typeof result === 'string') return Number(result) - if (typeof result === 'number') return result - return 0 + return drainRate } async function readState(): Promise { - const [pendingRaw, ewmaAndTs] = await Promise.all([ + const [pendingRaw, ewmaRaw, lastCompletionRaw] = await Promise.all([ redis.get(PENDING_RESIDUES_KEY), - redis.hmget(THROUGHPUT_KEY, EWMA_FIELD, EWMA_TIMESTAMP_FIELD), + redis.hget(THROUGHPUT_KEY, EWMA_FIELD), + redis.get(LAST_COMPLETION_KEY), ]) const pending = pendingRaw === null ? 0 : Number(pendingRaw) - const [ewmaRaw, tsRaw] = ewmaAndTs const ewma = - ewmaRaw === null || ewmaRaw === undefined - ? config.initialResiduesPerSecond - : Number(ewmaRaw) - const lastTs = tsRaw === null || tsRaw === undefined ? null : Number(tsRaw) + ewmaRaw === null ? config.initialResiduesPerSecond : Number(ewmaRaw) + const lastTs = lastCompletionRaw === null ? null : Number(lastCompletionRaw) return { pendingResidues: Number.isFinite(pending) ? pending : 0, residuesPerSecondEwma: @@ -121,7 +181,10 @@ export function createShedingState(deps: StateDeps) { return { incrementPending, decrementPending, + setPending, + incrAdmitted, recordCompletion, + sampleThroughput, readState, } } diff --git a/tests/backend-e2e/helpers.ts b/tests/backend-e2e/helpers.ts index 7120cb9..66212b1 100644 --- a/tests/backend-e2e/helpers.ts +++ b/tests/backend-e2e/helpers.ts @@ -145,6 +145,7 @@ export async function shutdownE2EHelpers(): Promise { const SHEDDING_PENDING_KEY = 'shedding:pending_residues' const SHEDDING_THROUGHPUT_KEY = 'shedding:throughput_ewma' +const SHEDDING_LAST_COMPLETION_KEY = 'shedding:last_completion_ms' function redisClient(): IORedis { return new IORedis({ @@ -158,7 +159,11 @@ function redisClient(): IORedis { export async function resetSheddingState(): Promise { const redis = redisClient() try { - await redis.del(SHEDDING_PENDING_KEY, SHEDDING_THROUGHPUT_KEY) + await redis.del( + SHEDDING_PENDING_KEY, + SHEDDING_THROUGHPUT_KEY, + SHEDDING_LAST_COMPLETION_KEY, + ) } finally { redis.disconnect() } @@ -176,7 +181,7 @@ export async function readSheddingRedis(): Promise { const [pendingRaw, ewmaRaw, tsRaw] = await Promise.all([ redis.get(SHEDDING_PENDING_KEY), redis.hget(SHEDDING_THROUGHPUT_KEY, 'value'), - redis.hget(SHEDDING_THROUGHPUT_KEY, 'last_sample_ms'), + redis.get(SHEDDING_LAST_COMPLETION_KEY), ]) return { pendingResidues: pendingRaw === null ? 0 : Number(pendingRaw), @@ -198,18 +203,18 @@ export async function seedSheddingState(values: { if (values.pendingResidues !== undefined) { await redis.set(SHEDDING_PENDING_KEY, String(values.pendingResidues)) } - if ( - values.residuesPerSecondEwma !== undefined || - values.lastCompletionTimestampMs !== undefined - ) { - const fields: string[] = [] - if (values.residuesPerSecondEwma !== undefined) { - fields.push('value', String(values.residuesPerSecondEwma)) - } - if (values.lastCompletionTimestampMs !== undefined) { - fields.push('last_sample_ms', String(values.lastCompletionTimestampMs)) - } - await redis.hset(SHEDDING_THROUGHPUT_KEY, ...fields) + if (values.residuesPerSecondEwma !== undefined) { + await redis.hset( + SHEDDING_THROUGHPUT_KEY, + 'value', + String(values.residuesPerSecondEwma), + ) + } + if (values.lastCompletionTimestampMs !== undefined) { + await redis.set( + SHEDDING_LAST_COMPLETION_KEY, + String(values.lastCompletionTimestampMs), + ) } } finally { redis.disconnect() diff --git a/tests/backend-e2e/shedding.test.ts b/tests/backend-e2e/shedding.test.ts index c0f6bf5..554ccdc 100644 --- a/tests/backend-e2e/shedding.test.ts +++ b/tests/backend-e2e/shedding.test.ts @@ -134,9 +134,9 @@ describe('Request shedding E2E', () => { (s) => s.pendingResidues <= 0, { timeoutMs: 30_000 }, ) + // Throughput EWMA + last-sample timestamp are leader-sweep-driven now + // (unit-covered), not written on completion — so not asserted here. expect(afterComplete.pendingResidues).toBeLessThanOrEqual(0) - expect(afterComplete.lastCompletionTimestampMs).not.toBeNull() - expect(afterComplete.residuesPerSecondEwma ?? 0).toBeGreaterThan(0) }) it('does not double-count pendingResidues on cache-hit resubmit', async () => {