Skip to content

fix(commhub): SSE backpressure + half-open detection (round-2/4 ①)#271

Merged
s2agi merged 1 commit into
mainfrom
fix/commhub-sse-backpressure
Jun 28, 2026
Merged

fix(commhub): SSE backpressure + half-open detection (round-2/4 ①)#271
s2agi merged 1 commit into
mainfrom
fix/commhub-sse-backpressure

Conversation

@s2agi

@s2agi s2agi commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

Author

Agent: 通信SDK马

Summary

Round-2/4 review ① per 通信龙 dispatch 2026-06-28 — highest-severity unowned item. server/src/push.ts had no per-client cap on the SSE ReadableStream queue. A half-open consumer (TCP drop, client crash without FIN) keeps its slot in clients AND keeps receiving controller.enqueue() calls from both the keepalive timer and every pushEvent. Bytes pile up in the stream's internal queue forever — clear OOM vector once exposed to the public internet (any commhub-server hosted at <hub-domain> reachable over WAN).

v1 → v2 (post-review fix)

v1 had a unit-mismatch bug caught by 通信牛: the default web-streams queuing strategy counts chunks, not bytes. So desiredSize < -1_000_000 meant "1 million chunks queued", not "1 MB queued" — a 1 MiB single enqueue only nudges desiredSize by -1 under the default strategy. The byte-cap was effectively defeated.

v2 fix: construct the SSE stream with an explicit byte-counting strategy so the byte-cap actually fires:

new ReadableStream<Uint8Array>(src, {
  highWaterMark: 0,
  size: (chunk) => chunk.byteLength,
})

通信牛's Bun probe (verified locally in the v2 test suite): default-strategy desiredSize goes 1 → 0 after a 1 MiB chunk and -1 after 2 MiB; byte-strategy goes 0 → -1048576 after 2 MiB.

What changed

server/src/push.ts

  • Byte-counting queuing strategy on the SSE ReadableStream (the critical v2 fix).
  • tryEnqueueBytes(client, bytes) — every enqueue path goes through this guard. Two thresholds:
    • Hard ceiling: desiredSize < -MAX_QUEUE_BACKPRESSURE_BYTES (1 MB default) → close stream immediately. One huge event can't blow up a stuck client.
    • Stuck timeout: desiredSize < 0 continuously for STUCK_CLOSE_MS (60s default) → close. Catches slow-trickle leaks.
  • controller.desiredSize === null treated as no headroom → close. Defends against runtime errored stream.
  • sweepLiveness() runs every LIVENESS_SWEEP_MS (15s default) so half-open detection doesn't require event traffic.
  • closeClient(client, reason) idempotent, clears keepalive timer, best-effort close. All log lines include reason for ops triage.
  • Initial connected frame goes through tryEnqueueBytes too.
  • All thresholds env-overridable: ANET_SSE_MAX_QUEUE_BYTES, ANET_SSE_STUCK_CLOSE_MS, ANET_SSE_KEEPALIVE_MS, ANET_SSE_LIVENESS_SWEEP_MS.
  • Liveness timer uses .unref() so it doesn't hold the event loop open.

Tests

$ bun test src/push.test.ts
 26 pass · 0 fail · 65 expect() calls

$ COMMHUB_DB=/tmp/anet-sse-test.db NODE_ENV=test bun test
 142 pass · 1 fail · 462 expect() calls

The 1 fail is the pre-existing missing @modelcontextprotocol/sdk dev dep — unrelated to this change, tracked in #261 / round-4 dependency hygiene.

26 tests:

  • 21 fake-controller branches: tryEnqueueBytes (ceiling, stuck-set, stuck-recover, null-desired, enqueue-throw, already-closed), sweepLiveness (null/over/stuck/first-stuck/healthy/mixed), pushEvent integration (half-open/mixed/over-ceiling/unknown), getSSEStats (closed excluded).
  • 3 real-ReadableStream (NEW in v2 — the assertion class that catches v1's unit-mismatch bug):
    • createSSEStream uses byte-counting strategy: enqueue 1 KB → desiredSize drops by ≥1024 (not by 1).
    • non-reading consumer past MAX bytes triggers hard ceiling close: pushes that overshoot 1 MB total bytes close the client. Under v1's strategy this test would NOT close (each push = 1 chunk).
    • reading consumer drains queue → no spurious close: drain promptly, no false-positive close.
  • 2 pre-existing rekey tests untouched.

Constraint compliance (per 通信龙 dispatch)

  • Open PR — not pushed to main directly
  • 通信牛 review gate (no self-merge); v1 → v2 reflects review feedback
  • Docker / isolated test (COMMHUB_DB=/tmp/anet-sse-test.db, NODE_ENV=test)
  • No production deploy — code only
  • Did not touch production hub at <hub-domain> or any prod DB
  • All tunables env-overridable so ops can dial defaults without code change

Out of scope (queued — separate PRs)

Test plan

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bfe61ca8df

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread server/src/push.ts
Comment on lines +119 to +120
if (desired < -MAX_QUEUE_BACKPRESSURE_BYTES) {
closeClient(client, `queue-overflow-${Math.abs(desired)}b`);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Configure the SSE queue to measure bytes

In createSSEStream the ReadableStream is created with the default queuing strategy, so controller.desiredSize is reduced per queued chunk rather than by Uint8Array.byteLength. Comparing that chunk-count value to MAX_QUEUE_BACKPRESSURE_BYTES means the intended 1 MB hard cap actually allows roughly 1,000,000 queued SSE chunks, and a half-open client receiving frequent events can still accumulate far more memory than the configured byte ceiling before this branch closes it. Add a byte-length queuing strategy or track queued byte sizes explicitly before enforcing this limit.

Useful? React with 👍 / 👎.

Issue #1: the public hub has no per-client cap on the SSE
ReadableStream queue. A half-open consumer (TCP drop, client crash
without FIN) keeps its slot in `clients` AND keeps receiving
`controller.enqueue()` calls from both the keepalive timer and every
`pushEvent`. The bytes pile up in the stream's internal queue forever
— clear OOM vector for any commhub-server exposed to the public
internet (see #270 round-2/4).

Issue #2 (CHANGE_REQ in v1, fixed in v2): default web-streams queuing
strategy counts CHUNKS, not bytes — so a `desiredSize < -1_000_000`
threshold meant "one million chunks queued", not "1 MB queued". A
single 1 MiB enqueue bumps desiredSize by -1 under the default
strategy. v2 explicitly constructs the SSE stream with a byte-counting
strategy so the byte-cap actually fires:

    new ReadableStream<Uint8Array>(src, {
      highWaterMark: 0,
      size: (chunk) => chunk.byteLength,
    })

server/src/push.ts
- Byte-counting queuing strategy on the SSE ReadableStream (the
  critical fix for v1's defeated byte-cap).
- `tryEnqueueBytes(client, bytes)` — every enqueue path through one
  guard. Two thresholds:
    a. HARD CEILING: `desiredSize < -MAX_QUEUE_BACKPRESSURE_BYTES`
       (1 MB default) → close immediately. One huge event can't blow
       up a stuck client.
    b. STUCK TIMEOUT: `desiredSize < 0` continuously for
       `STUCK_CLOSE_MS` (60s default) → close.
- `controller.desiredSize === null` treated as no headroom → close.
- `sweepLiveness()` runs every `LIVENESS_SWEEP_MS` (15s default) so
  half-open detection doesn't require event traffic.
- `closeClient(client, reason)` idempotent, clears keepalive timer,
  best-effort close. All log lines include reason.
- All thresholds env-overridable: ANET_SSE_MAX_QUEUE_BYTES,
  ANET_SSE_STUCK_CLOSE_MS, ANET_SSE_KEEPALIVE_MS,
  ANET_SSE_LIVENESS_SWEEP_MS.
- Liveness timer uses `.unref()`.

server/src/push.test.ts (26 tests total — 21 fake-controller + 3
real-ReadableStream + 2 pre-existing rekey)
- Fake-controller tests cover tryEnqueueBytes/sweepLiveness/pushEvent
  branches with mockable desiredSize.
- Real-ReadableStream tests (new in v2):
    * byte-strategy unit-check: enqueue 1 KB → desiredSize drops by
      ~1024 (not by 1). The exact assertion that catches the v1 unit-
      mismatch bug.
    * non-reading consumer past MAX bytes → hard-ceiling close fires.
    * reading consumer drains promptly → no spurious close even after
      many pushes.

Test plan
- `bun test src/push.test.ts` → 26/26 pass
- `COMMHUB_DB=/tmp/test bun test` → 142/143 pass (1 fail is the pre-
  existing missing @modelcontextprotocol/sdk dev dep, unrelated)
- No production deploy. COMMHUB_DB overridden to /tmp/* for tests.
- Per 通信龙 dispatch: PR + 通信牛 review gate, no self-merge.
- v1 caught by 通信牛 review (Bun probe: default-strategy desiredSize
  went 1→0 after 1 MiB chunk; byte-strategy went -1048576). Thanks
  通信牛 for the close call.
@s2agi s2agi force-pushed the fix/commhub-sse-backpressure branch from bfe61ca to 2c26d36 Compare June 28, 2026 01:12
@s2agi s2agi merged commit 7b78715 into main Jun 28, 2026
1 of 3 checks passed
@s2agi s2agi deleted the fix/commhub-sse-backpressure branch June 28, 2026 01:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants