From 2c26d3661bf478ac8d58001beb70a19ef6ba82a8 Mon Sep 17 00:00:00 2001 From: vansin Date: Sun, 28 Jun 2026 08:57:58 +0800 Subject: [PATCH] =?UTF-8?q?fix(commhub):=20SSE=20backpressure=20+=20half-o?= =?UTF-8?q?pen=20detection=20(round-2/4=20=E2=91=A0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issue #1: the public hub has no per-client cap on the SSE ReadableStream queue. A half-open consumer (TCP drop, client crash without FIN) keeps its slot in `clients` AND keeps receiving `controller.enqueue()` calls from both the keepalive timer and every `pushEvent`. The bytes pile up in the stream's internal queue forever — clear OOM vector for any commhub-server exposed to the public internet (see #270 round-2/4). Issue #2 (CHANGE_REQ in v1, fixed in v2): default web-streams queuing strategy counts CHUNKS, not bytes — so a `desiredSize < -1_000_000` threshold meant "one million chunks queued", not "1 MB queued". A single 1 MiB enqueue bumps desiredSize by -1 under the default strategy. v2 explicitly constructs the SSE stream with a byte-counting strategy so the byte-cap actually fires: new ReadableStream(src, { highWaterMark: 0, size: (chunk) => chunk.byteLength, }) server/src/push.ts - Byte-counting queuing strategy on the SSE ReadableStream (the critical fix for v1's defeated byte-cap). - `tryEnqueueBytes(client, bytes)` — every enqueue path through one guard. Two thresholds: a. HARD CEILING: `desiredSize < -MAX_QUEUE_BACKPRESSURE_BYTES` (1 MB default) → close immediately. One huge event can't blow up a stuck client. b. STUCK TIMEOUT: `desiredSize < 0` continuously for `STUCK_CLOSE_MS` (60s default) → close. - `controller.desiredSize === null` treated as no headroom → close. - `sweepLiveness()` runs every `LIVENESS_SWEEP_MS` (15s default) so half-open detection doesn't require event traffic. - `closeClient(client, reason)` idempotent, clears keepalive timer, best-effort close. All log lines include reason. - All thresholds env-overridable: ANET_SSE_MAX_QUEUE_BYTES, ANET_SSE_STUCK_CLOSE_MS, ANET_SSE_KEEPALIVE_MS, ANET_SSE_LIVENESS_SWEEP_MS. - Liveness timer uses `.unref()`. server/src/push.test.ts (26 tests total — 21 fake-controller + 3 real-ReadableStream + 2 pre-existing rekey) - Fake-controller tests cover tryEnqueueBytes/sweepLiveness/pushEvent branches with mockable desiredSize. - Real-ReadableStream tests (new in v2): * byte-strategy unit-check: enqueue 1 KB → desiredSize drops by ~1024 (not by 1). The exact assertion that catches the v1 unit- mismatch bug. * non-reading consumer past MAX bytes → hard-ceiling close fires. * reading consumer drains promptly → no spurious close even after many pushes. Test plan - `bun test src/push.test.ts` → 26/26 pass - `COMMHUB_DB=/tmp/test bun test` → 142/143 pass (1 fail is the pre- existing missing @modelcontextprotocol/sdk dev dep, unrelated) - No production deploy. COMMHUB_DB overridden to /tmp/* for tests. - Per 通信龙 dispatch: PR + 通信牛 review gate, no self-merge. - v1 caught by 通信牛 review (Bun probe: default-strategy desiredSize went 1→0 after 1 MiB chunk; byte-strategy went -1048576). Thanks 通信牛 for the close call. --- server/src/push.test.ts | 385 +++++++++++++++++++++++++++++++++++++++- server/src/push.ts | 346 ++++++++++++++++++++++++++++++------ 2 files changed, 674 insertions(+), 57 deletions(-) diff --git a/server/src/push.test.ts b/server/src/push.test.ts index a04fccf0..142f0580 100644 --- a/server/src/push.test.ts +++ b/server/src/push.test.ts @@ -1,6 +1,12 @@ -import { afterEach, expect, test } from "bun:test"; +import { afterEach, expect, test, describe } from "bun:test"; import { eventBus } from "./event_bus"; -import { __resetSSEClientsForTest, createSSEStream, getSSEStats } from "./push"; +import { + __resetSSEClientsForTest, + createSSEStream, + getSSEStats, + pushEvent, + __SSE_INTERNALS_FOR_TEST, +} from "./push"; afterEach(() => { __resetSSEClientsForTest(); @@ -45,3 +51,378 @@ test("rename-committed merges old and new SSE client buckets without dropping cl await oldReader.cancel(); await newReader.cancel(); }); + +// ───────────────────────────────────────────────────────────────────── +// Round-2/4 fix: SSE backpressure + half-open detection +// +// These tests exercise tryEnqueueBytes / sweepLiveness directly with a +// hand-rolled fake ReadableStreamDefaultController so we can model the +// failure modes that matter — a controller whose `desiredSize` goes +// negative because the consumer stopped reading (half-open TCP), and +// one whose enqueue throws because the runtime already errored it. +// ───────────────────────────────────────────────────────────────────── + +const enc = new TextEncoder(); + +type FakeCtrlOpts = { + desiredSize?: number | null; + enqueueThrows?: Error; + onClose?: () => void; +}; + +/** Hand-rolled controller stand-in. `desiredSize` is mutable so a test + * can flip a "client" from healthy → backpressured by tweaking one + * number; `enqueue` records the bytes (for assertion) and optionally + * throws. */ +function fakeController(opts: FakeCtrlOpts = {}) { + const enqueued: Uint8Array[] = []; + let closed = false; + const hasDesired = "desiredSize" in opts; + const ctrl = { + get desiredSize(): number | null { + // Explicit null must NOT coalesce to the default — the whole point + // of these tests is to exercise the half-open path where + // desiredSize comes back null. + return hasDesired ? (opts.desiredSize as number | null) : 1024; + }, + enqueue(bytes: Uint8Array) { + if (opts.enqueueThrows) throw opts.enqueueThrows; + enqueued.push(bytes); + }, + close() { + if (!closed) { + closed = true; + opts.onClose?.(); + } + }, + _enqueued: enqueued, + _closed: () => closed, + _setDesired(n: number | null) { + opts.desiredSize = n; + }, + }; + return ctrl; +} + +function makeClient(ctrl: ReturnType) { + return { + controller: ctrl as unknown as ReadableStreamDefaultController, + encoder: enc, + stuckSince: null as number | null, + closed: false, + key: "test:fake", + }; +} + +describe("tryEnqueueBytes — backpressure ceiling", () => { + test("desiredSize healthy → enqueue OK, no close", () => { + const ctrl = fakeController({ desiredSize: 500_000 }); + const c = makeClient(ctrl); + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("hello")); + expect(r).toBe("ok"); + expect(c.closed).toBe(false); + expect(ctrl._enqueued).toHaveLength(1); + }); + + test("desiredSize < -MAX_BACKPRESSURE → close, return dead, no enqueue", () => { + const ceiling = __SSE_INTERNALS_FOR_TEST.MAX_QUEUE_BACKPRESSURE_BYTES; + const ctrl = fakeController({ desiredSize: -(ceiling + 1) }); + const c = makeClient(ctrl); + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("dead"); + expect(c.closed).toBe(true); + expect(ctrl._closed()).toBe(true); + expect(ctrl._enqueued).toHaveLength(0); + }); + + test("desiredSize negative but within ceiling → enqueue still OK, stuckSince set", () => { + const ctrl = fakeController({ desiredSize: -100 }); + const c = makeClient(ctrl); + expect(c.stuckSince).toBeNull(); + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("ok"); + expect(c.closed).toBe(false); + expect(c.stuckSince).not.toBeNull(); + expect(ctrl._enqueued).toHaveLength(1); + }); + + test("desiredSize recovers to positive → stuckSince cleared", () => { + const ctrl = fakeController({ desiredSize: -100 }); + const c = makeClient(ctrl); + __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("a")); + expect(c.stuckSince).not.toBeNull(); + ctrl._setDesired(500); + __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("b")); + expect(c.stuckSince).toBeNull(); + }); + + test("desiredSize null (closed/errored controller) → close, return dead", () => { + const ctrl = fakeController({ desiredSize: null }); + const c = makeClient(ctrl); + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("dead"); + expect(c.closed).toBe(true); + }); + + test("enqueue throws → close, return dead", () => { + const ctrl = fakeController({ + desiredSize: 100, + enqueueThrows: new Error("controller errored"), + }); + const c = makeClient(ctrl); + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("dead"); + expect(c.closed).toBe(true); + }); + + test("already-closed client → return dead immediately, no enqueue/close attempt", () => { + const ctrl = fakeController({ desiredSize: 100 }); + const c = makeClient(ctrl); + c.closed = true; + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("dead"); + expect(ctrl._enqueued).toHaveLength(0); + expect(ctrl._closed()).toBe(false); // never re-closed + }); +}); + +describe("tryEnqueueBytes — stuck timeout", () => { + test("stuck longer than STUCK_CLOSE_MS → close, return dead", () => { + const ctrl = fakeController({ desiredSize: -500 }); + const c = makeClient(ctrl); + // Backdate stuckSince so the threshold has elapsed. + c.stuckSince = Date.now() - (__SSE_INTERNALS_FOR_TEST.STUCK_CLOSE_MS + 1000); + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("dead"); + expect(c.closed).toBe(true); + }); + + test("stuck but below STUCK_CLOSE_MS → enqueue still OK", () => { + const ctrl = fakeController({ desiredSize: -500 }); + const c = makeClient(ctrl); + c.stuckSince = Date.now() - 1000; // far less than 60s default + const r = __SSE_INTERNALS_FOR_TEST.tryEnqueueBytes(c, enc.encode("x")); + expect(r).toBe("ok"); + expect(c.closed).toBe(false); + }); +}); + +describe("sweepLiveness — proactive half-open detection", () => { + test("sweep closes a client whose desiredSize is null", () => { + const ctrl = fakeController({ desiredSize: null }); + const c = makeClient(ctrl); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("test:sweep-null", [c as any]); + __SSE_INTERNALS_FOR_TEST.sweepLiveness(); + expect(c.closed).toBe(true); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has("test:sweep-null")).toBe(false); + }); + + test("sweep closes a client over the backpressure ceiling without any push", () => { + const ceiling = __SSE_INTERNALS_FOR_TEST.MAX_QUEUE_BACKPRESSURE_BYTES; + const ctrl = fakeController({ desiredSize: -(ceiling + 5_000_000) }); + const c = makeClient(ctrl); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("test:sweep-over", [c as any]); + __SSE_INTERNALS_FOR_TEST.sweepLiveness(); + expect(c.closed).toBe(true); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has("test:sweep-over")).toBe(false); + }); + + test("sweep closes a client stuck longer than STUCK_CLOSE_MS", () => { + const ctrl = fakeController({ desiredSize: -500 }); + const c = makeClient(ctrl); + c.stuckSince = Date.now() - (__SSE_INTERNALS_FOR_TEST.STUCK_CLOSE_MS + 5_000); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("test:sweep-stuck", [c as any]); + __SSE_INTERNALS_FOR_TEST.sweepLiveness(); + expect(c.closed).toBe(true); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has("test:sweep-stuck")).toBe(false); + }); + + test("sweep first sees stuck → records stuckSince, does not close yet", () => { + const ctrl = fakeController({ desiredSize: -500 }); + const c = makeClient(ctrl); + expect(c.stuckSince).toBeNull(); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("test:sweep-firststuck", [c as any]); + __SSE_INTERNALS_FOR_TEST.sweepLiveness(); + expect(c.closed).toBe(false); + expect(c.stuckSince).not.toBeNull(); + }); + + test("sweep leaves healthy clients alone", () => { + const ctrl = fakeController({ desiredSize: 500 }); + const c = makeClient(ctrl); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("test:sweep-healthy", [c as any]); + __SSE_INTERNALS_FOR_TEST.sweepLiveness(); + expect(c.closed).toBe(false); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.get("test:sweep-healthy")).toHaveLength(1); + }); + + test("sweep prunes only dead entries when multiple clients share a key", () => { + const dead = fakeController({ desiredSize: null }); + const alive = fakeController({ desiredSize: 500 }); + const cDead = makeClient(dead); + const cAlive = makeClient(alive); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("test:sweep-mixed", [cDead, cAlive] as any); + __SSE_INTERNALS_FOR_TEST.sweepLiveness(); + const remaining = __SSE_INTERNALS_FOR_TEST.clientsMap.get("test:sweep-mixed"); + expect(remaining).toHaveLength(1); + expect(cDead.closed).toBe(true); + expect(cAlive.closed).toBe(false); + }); +}); + +describe("pushEvent integration with backpressure", () => { + test("push to half-open client → client evicted from map", () => { + const ctrl = fakeController({ desiredSize: null }); + const c = makeClient(ctrl); + c.key = "global:half-open"; + __SSE_INTERNALS_FOR_TEST.clientsMap.set("global:half-open", [c as any]); + pushEvent("half-open", { type: "test", payload: 1 }); + expect(c.closed).toBe(true); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has("global:half-open")).toBe(false); + }); + + test("push to mixed (one alive, one dead) → alive receives, dead pruned", () => { + const dead = fakeController({ desiredSize: null }); + const alive = fakeController({ desiredSize: 500 }); + const cDead = makeClient(dead); + cDead.key = "global:mixed"; + const cAlive = makeClient(alive); + cAlive.key = "global:mixed"; + __SSE_INTERNALS_FOR_TEST.clientsMap.set("global:mixed", [cDead, cAlive] as any); + pushEvent("mixed", { type: "msg" }); + expect(alive._enqueued).toHaveLength(1); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.get("global:mixed")).toHaveLength(1); + }); + + test("push to over-ceiling client → client closed, no enqueue, map evicted", () => { + const ceiling = __SSE_INTERNALS_FOR_TEST.MAX_QUEUE_BACKPRESSURE_BYTES; + const ctrl = fakeController({ desiredSize: -(ceiling + 1) }); + const c = makeClient(ctrl); + c.key = "global:over"; + __SSE_INTERNALS_FOR_TEST.clientsMap.set("global:over", [c as any]); + pushEvent("over", { type: "msg" }); + expect(ctrl._enqueued).toHaveLength(0); + expect(c.closed).toBe(true); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has("global:over")).toBe(false); + }); + + test("push to unknown session is a no-op (no throw)", () => { + expect(() => pushEvent("never-connected", { type: "x" })).not.toThrow(); + }); +}); + +// ───────────────────────────────────────────────────────────────────── +// Round-2 review fix: byte-level backpressure on a REAL ReadableStream +// +// The fake-controller tests above can't catch the unit-mismatch bug +// (treating desiredSize as bytes when the runtime treats it as chunk +// count). These tests exercise the actual `createSSEStream` Response +// against a non-reading consumer and verify that bytes — not chunks — +// trip the backpressure cap. +// +// Probe baseline (per 通信牛, also verified locally in this test): +// - Default ReadableStream (no strategy) → desiredSize is the +// CHUNK count. 1MB chunk → desiredSize goes 1 → 0; second chunk +// → -1. Million 1-byte chunks needed to hit our -1MB ceiling. +// - Byte-counting strategy `{highWaterMark: 0, size: c.byteLength}` +// → desiredSize is the BYTE count. 1MB chunk → -1_000_000. +// ───────────────────────────────────────────────────────────────────── + +describe("real ReadableStream — byte-level backpressure (round-2 fix)", () => { + test("createSSEStream uses byte-counting strategy: desiredSize tracks bytes not chunks", async () => { + const res = createSSEStream("byte-strategy-probe", "net-bsp"); + // Don't read from res.body. The stream sits with whatever was + // enqueued in `start()` (initial 'connected' frame) until we push. + const key = "net-bsp:byte-strategy-probe"; + const arr = __SSE_INTERNALS_FOR_TEST.clientsMap.get(key); + expect(arr).toBeDefined(); + expect(arr).toHaveLength(1); + const ctrl = arr![0].controller; + + // The unit check: enqueue a 1 KB chunk and verify desiredSize + // drops by ~1024, not by 1. With chunk-count strategy desiredSize + // would drop by exactly 1; with byte strategy by 1024. + const before = ctrl.desiredSize!; + ctrl.enqueue(new Uint8Array(1024)); + const after = ctrl.desiredSize!; + expect(before - after).toBeGreaterThanOrEqual(1024); + + // Cleanup so the afterEach reset doesn't leak. + await res.body!.cancel(); + }); + + test("non-reading consumer past MAX bytes triggers hard ceiling close", async () => { + const ceiling = __SSE_INTERNALS_FOR_TEST.MAX_QUEUE_BACKPRESSURE_BYTES; + const res = createSSEStream("byte-cap-probe", "net-bcp"); + const key = "net-bcp:byte-cap-probe"; + const c = __SSE_INTERNALS_FOR_TEST.clientsMap.get(key)![0]; + + // Push events whose total byte size exceeds the hard ceiling. + // With chunk-count strategy this would NOT close (each event is 1 + // chunk; desiredSize would be at -N for N events, never -1M). + // With byte strategy this SHOULD close once accumulated bytes + // exceed ceiling. + const chunkSize = 100_000; // 100KB per push + const needed = Math.ceil(ceiling / chunkSize) + 2; // overshoot a bit + const big = "x".repeat(chunkSize); + for (let i = 0; i < needed; i++) { + if (c.closed) break; + pushEvent("byte-cap-probe", { type: "big", payload: big }, "net-bcp"); + } + expect(c.closed).toBe(true); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has(key)).toBe(false); + + try { await res.body!.cancel(); } catch { /* already closed */ } + }); + + test("reading consumer drains queue → no spurious close even after many pushes", async () => { + const res = createSSEStream("drain-probe", "net-drain"); + const reader = res.body!.getReader(); + const key = "net-drain:drain-probe"; + const c = __SSE_INTERNALS_FOR_TEST.clientsMap.get(key)![0]; + + // Drain in the background while pushes happen. + let drained = 0; + const drainPromise = (async () => { + while (drained < 50) { + const { done, value } = await reader.read(); + if (done) break; + if (value) drained += value.byteLength; + } + })(); + + for (let i = 0; i < 20; i++) { + pushEvent("drain-probe", { type: "tick", n: i }, "net-drain"); + } + await drainPromise; + expect(c.closed).toBe(false); + expect(__SSE_INTERNALS_FOR_TEST.clientsMap.has(key)).toBe(true); + + await reader.cancel(); + }); +}); + +describe("getSSEStats — closed clients excluded", () => { + test("manually-closed client is not counted in stats", () => { + const ctrl = fakeController({ desiredSize: 500 }); + const c = makeClient(ctrl); + c.key = "global:stats-test"; + c.closed = true; + __SSE_INTERNALS_FOR_TEST.clientsMap.set("global:stats-test", [c as any]); + const stats = getSSEStats(); + expect(stats.sessions["global:stats-test"]).toBeUndefined(); + expect(stats.total).toBe(0); + }); + + test("mixed alive + closed clients → only alive count", () => { + const dead = fakeController({ desiredSize: 500 }); + const alive = fakeController({ desiredSize: 500 }); + const cDead = makeClient(dead); + cDead.closed = true; + const cAlive = makeClient(alive); + __SSE_INTERNALS_FOR_TEST.clientsMap.set("global:stats-mixed", [cDead, cAlive] as any); + const stats = getSSEStats(); + expect(stats.sessions["global:stats-mixed"]).toBe(1); + expect(stats.total).toBe(1); + }); +}); diff --git a/server/src/push.ts b/server/src/push.ts index 9600d670..caba3f56 100644 --- a/server/src/push.ts +++ b/server/src/push.ts @@ -1,16 +1,89 @@ // ── SSE Push: 实时推送事件给 Agent ────────────────── // Agent 连 GET /events/:session → 保持 SSE 长连接 // send_task 写 inbox 后 → pushEvent() → 秒达 +// +// Backpressure & half-open contract (Round-2/4 fix per 通信龙): +// +// 0. **CRITICAL**: `desiredSize` units are determined by the stream's +// queuing strategy. The web-streams default strategy is COUNT-based +// (each chunk = 1 unit), so a 1 MB chunk only nudges desiredSize by +// -1. That defeats the byte-cap entirely — pushing a million 1-byte +// chunks is what trips the threshold, not 1 MB of real bytes. +// Bun probe verified by 通信牛: default stream after 1 MiB chunk → +// desiredSize 0, after 2 MiB → -1; byte strategy after 1 MiB → 0, +// after 2 MiB → -1048576. We MUST construct the stream with an +// explicit byte-counting strategy: +// new ReadableStream(src, { highWaterMark: 0, size: c => c.byteLength }) +// With highWaterMark=0, every enqueued byte counts against the cap +// directly. The HARD CEILING below then closes the stream at +// -MAX_QUEUE_BACKPRESSURE_BYTES bytes of queued data, which is what +// we actually want. +// +// 1. `ReadableStreamDefaultController.desiredSize` is the SSE queue +// headroom in BYTES (given the strategy above). Positive = room, +// 0 = at high-water-mark, negative = over by that many bytes. +// Without a bound, half-open TCP consumers (network drop, client +// crash without FIN) silently accumulate bytes in the queue forever +// — a public-hub OOM vector. +// +// 2. We bound it two ways: +// +// a. HARD CEILING — `desiredSize < -MAX_QUEUE_BACKPRESSURE_BYTES` +// → close the stream immediately. Single huge event can't blow +// up a stuck client. +// +// b. STUCK TIMEOUT — `desiredSize < 0` continuously for +// `STUCK_CLOSE_MS` → close. Catches slow-trickle leaks where +// the consumer accepts a byte every few seconds but can't keep +// up. +// +// 3. Liveness: every `LIVENESS_SWEEP_MS` we re-check every registered +// client even when nobody calls `pushEvent`, so a session that goes +// half-open during a quiet period still gets reaped — not waiting +// for the next inbox push. import { eventBus, type RenameCommittedEvent } from "./event_bus"; +// ───────────────────────────────────────────────────────────────────── +// Tunables (env-overridable for tests and ops) +// ───────────────────────────────────────────────────────────────────── +function envNum(k: string, fallback: number): number { + const v = process.env[k]; + if (!v) return fallback; + const n = Number(v); + return Number.isFinite(n) && n > 0 ? n : fallback; +} + +/** Hard ceiling on per-client SSE queue backpressure before we drop the + * client. 1 MB by default — generous for normal bursts, lethal for the + * half-open consumer that has stopped reading entirely. */ +const MAX_QUEUE_BACKPRESSURE_BYTES = envNum("ANET_SSE_MAX_QUEUE_BYTES", 1_000_000); + +/** How long `desiredSize` can stay negative (i.e. consumer behind HWM) + * before we declare the client dead. 60s by default — past 2× the + * keepalive interval, so a live-but-slow consumer wouldn't trip it. */ +const STUCK_CLOSE_MS = envNum("ANET_SSE_STUCK_CLOSE_MS", 60_000); + +/** Keepalive cadence. Doubles as half-open liveness probe — if enqueue + * silently piles up bytes here, the stuck-timeout kicks in. */ +const KEEPALIVE_MS = envNum("ANET_SSE_KEEPALIVE_MS", 30_000); + +/** Liveness sweep cadence — proactive scan even when no events flow. */ +const LIVENESS_SWEEP_MS = envNum("ANET_SSE_LIVENESS_SWEEP_MS", 15_000); + type SSEClient = { controller: ReadableStreamDefaultController; encoder: TextEncoder; - // 30s keepalive interval — stored so cancel() can clear it without - // relying on stringly-typed any-cast. Set in start(), cleared in cancel() - // or by the interval itself on enqueue failure. - keepalive?: ReturnType; + /** When desiredSize first went negative; cleared on successful drain. */ + stuckSince: number | null; + /** Once true, the stream has been closed and the client should be + * evicted from the map on the next sweep / push. Guards against + * double-close. */ + closed: boolean; + /** keepalive timer handle so it can be cleared on close. */ + keepaliveTimer?: ReturnType; + /** Stable key for log/diagnostic context. */ + key: string; }; // 一个 session 可能有多个 SSE 连接(重连时短暂并存) @@ -20,49 +93,189 @@ function ts(): string { return new Date().toTimeString().slice(0, 8); } +function clientKey(sessionName: string, networkId?: string | null): string { + return `${networkId || "global"}:${sessionName}`; +} + +// ───────────────────────────────────────────────────────────────────── +// Close + dispatch helpers +// ───────────────────────────────────────────────────────────────────── + +/** Close a client's stream (idempotent) and clear its keepalive timer. + * Does NOT remove from the map — leave that to the caller so it can + * prune the right index without re-finding it. */ +function closeClient(client: SSEClient, reason: string): void { + if (client.closed) return; + client.closed = true; + if (client.keepaliveTimer) clearInterval(client.keepaliveTimer); + try { + client.controller.close(); + } catch { + // Already closed by the runtime, or controller errored. Either way + // we're done with it. + } + console.log(`[${ts()}] SSE ✕ ${client.key} closed (reason=${reason})`); +} + +/** Best-effort enqueue with backpressure guard. Returns: + * - "ok" → bytes accepted, queue healthy + * - "dead" → client closed; caller must remove from map */ +function tryEnqueueBytes(client: SSEClient, bytes: Uint8Array): "ok" | "dead" { + if (client.closed) return "dead"; + + const ctrl = client.controller; + // desiredSize may be null on a closed/errored stream — defensive + // null-check, then treat null as "no headroom" since we can't enqueue + // safely anyway. + const desired = ctrl.desiredSize; + if (desired === null) { + closeClient(client, "controller-null-desired"); + return "dead"; + } + + // Hard ceiling: too far behind HWM, single push could OOM us. Close. + if (desired < -MAX_QUEUE_BACKPRESSURE_BYTES) { + closeClient(client, `queue-overflow-${Math.abs(desired)}b`); + return "dead"; + } + + // Track stuck-since-when for the soft timeout below. + if (desired < 0) { + if (client.stuckSince === null) { + client.stuckSince = Date.now(); + } else if (Date.now() - client.stuckSince > STUCK_CLOSE_MS) { + closeClient(client, `stuck-${Date.now() - client.stuckSince}ms`); + return "dead"; + } + } else { + client.stuckSince = null; + } + + try { + ctrl.enqueue(bytes); + return "ok"; + } catch (e: any) { + closeClient(client, `enqueue-throw:${e?.message || e}`); + return "dead"; + } +} + +/** Walk every client and close any that are stuck/half-open. Run on a + * timer so half-open detection doesn't depend on event traffic. */ +function sweepLiveness(): void { + const now = Date.now(); + for (const [key, arr] of clients) { + const alive: SSEClient[] = []; + for (const c of arr) { + if (c.closed) continue; + const desired = c.controller.desiredSize; + if (desired === null) { + closeClient(c, "sweep-null-desired"); + continue; + } + if (desired < -MAX_QUEUE_BACKPRESSURE_BYTES) { + closeClient(c, `sweep-overflow-${Math.abs(desired)}b`); + continue; + } + if (desired < 0) { + if (c.stuckSince === null) c.stuckSince = now; + else if (now - c.stuckSince > STUCK_CLOSE_MS) { + closeClient(c, `sweep-stuck-${now - c.stuckSince}ms`); + continue; + } + } else { + c.stuckSince = null; + } + alive.push(c); + } + if (alive.length === 0) clients.delete(key); + else if (alive.length !== arr.length) clients.set(key, alive); + } +} + +// Liveness sweep timer. Started lazily on first stream creation; never +// stopped in production (one timer for the lifetime of the process). +let livenessSweepTimer: ReturnType | null = null; +function ensureLivenessSweep(): void { + if (livenessSweepTimer) return; + livenessSweepTimer = setInterval(() => { + try { + sweepLiveness(); + } catch (e: any) { + console.log(`[${ts()}] SSE sweep failed: ${e?.message || e}`); + } + }, LIVENESS_SWEEP_MS); + // Don't keep the event loop alive just for the sweep. + (livenessSweepTimer as any)?.unref?.(); +} + +// ───────────────────────────────────────────────────────────────────── +// Public API +// ───────────────────────────────────────────────────────────────────── + /** 创建 SSE Response 并注册到 clients map */ export function createSSEStream(sessionName: string, networkId?: string | null): Response { const encoder = new TextEncoder(); - let ctrl: ReadableStreamDefaultController; const key = clientKey(sessionName, networkId); + let client: SSEClient; - const stream = new ReadableStream({ + const stream = new ReadableStream({ start(controller) { - ctrl = controller; - const client: SSEClient = { controller, encoder }; + client = { + controller, + encoder, + stuckSince: null, + closed: false, + key, + }; - if (!clients.has(key)) { - clients.set(key, []); - } + if (!clients.has(key)) clients.set(key, []); clients.get(key)!.push(client); console.log(`[${ts()}] SSE ← ${key} connected (${clients.get(key)!.length} clients)`); - // 发送初始心跳 - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: "connected", session: sessionName, network_id: networkId ?? null })}\n\n`)); + // Send initial connected frame through the backpressure guard so + // even the first byte respects the contract. + tryEnqueueBytes( + client, + encoder.encode(`data: ${JSON.stringify({ type: "connected", session: sessionName, network_id: networkId ?? null })}\n\n`), + ); - // Periodic keepalive every 30s to prevent proxy/LB idle timeout - const keepalive = setInterval(() => { - try { - controller.enqueue(encoder.encode(`: keepalive\n\n`)); - } catch { - clearInterval(keepalive); + // Periodic keepalive doubles as a half-open probe — if the + // consumer is gone, this enqueue piles bytes in the queue until + // tryEnqueueBytes catches the ceiling / stuck timeout. + client.keepaliveTimer = setInterval(() => { + if (client.closed) { + if (client.keepaliveTimer) clearInterval(client.keepaliveTimer); + return; } - }, 30_000); - client.keepalive = keepalive; + const result = tryEnqueueBytes(client, encoder.encode(`: keepalive\n\n`)); + if (result === "dead") { + // closeClient already cleared the timer; just make sure we + // also prune from the map. + pruneClosed(key); + } + }, KEEPALIVE_MS); + (client.keepaliveTimer as any)?.unref?.(); + + ensureLivenessSweep(); }, cancel() { - // 断线清理 - const arr = clients.get(key); - if (arr) { - const idx = arr.findIndex(c => c.controller === ctrl); - if (idx !== -1) { - if (arr[idx].keepalive) clearInterval(arr[idx].keepalive); - arr.splice(idx, 1); - } - if (arr.length === 0) clients.delete(key); - console.log(`[${ts()}] SSE ✕ ${key} disconnected (${arr.length} remaining)`); + // Consumer side hangup — straightforward path, mark closed and + // prune. The half-open path goes through closeClient instead. + if (client) { + closeClient(client, "cancel"); + pruneClosed(key); } }, + }, { + // BYTE-COUNTING strategy. Without this, desiredSize is a chunk + // counter and the byte-cap below is meaningless (see file header + // comment #0). highWaterMark=0 means: after enqueuing N bytes, the + // queue is N bytes over the mark, so `desiredSize === -N`. Pair + // this with the hard ceiling in tryEnqueueBytes to actually close + // a half-open consumer at MAX_QUEUE_BACKPRESSURE_BYTES. + highWaterMark: 0, + size: (chunk: Uint8Array) => chunk.byteLength, }); return new Response(stream, { @@ -74,8 +287,17 @@ export function createSSEStream(sessionName: string, networkId?: string | null): }); } -function clientKey(sessionName: string, networkId?: string | null): string { - return `${networkId || "global"}:${sessionName}`; +/** Remove closed entries for one key, drop the map slot if empty. */ +function pruneClosed(key: string): void { + const arr = clients.get(key); + if (!arr) return; + const alive = arr.filter((c) => !c.closed); + if (alive.length === 0) { + clients.delete(key); + console.log(`[${ts()}] SSE ✕ ${key} disconnected (0 remaining)`); + } else if (alive.length !== arr.length) { + clients.set(key, alive); + } } function rekeyClient(oldSessionName: string, newSessionName: string, networkId?: string | null): number { @@ -85,6 +307,10 @@ function rekeyClient(oldSessionName: string, newSessionName: string, networkId?: const oldClients = clients.get(oldKey); if (!oldClients || oldClients.length === 0) return 0; + // Update the cached key on each moved client so later closeClient + // logs report the right session. + for (const c of oldClients) c.key = newKey; + const existing = clients.get(newKey) || []; clients.set(newKey, existing.concat(oldClients)); clients.delete(oldKey); @@ -102,39 +328,34 @@ eventBus.on("rename-committed", (event: RenameCommittedEvent) => { /** 推送事件给指定 session 的所有 SSE 连接 */ export function pushEvent(sessionName: string, event: Record, networkId?: string | null): void { - // Resolve the key once — both lookup and post-cleanup `clients.delete` MUST - // use the same key, otherwise an emptied bucket would leak (the explicit - // arg wins over event.network_id, falling back so legacy callers that pass - // network_id inside `event` still hit the right bucket). const key = clientKey(sessionName, networkId ?? (event.network_id as string | null | undefined)); const arr = clients.get(key); if (!arr || arr.length === 0) return; const data = `data: ${JSON.stringify(event)}\n\n`; - const dead: number[] = []; + let needPrune = false; - for (let i = 0; i < arr.length; i++) { - try { - arr[i].controller.enqueue(arr[i].encoder.encode(data)); - } catch { - dead.push(i); + for (const c of arr) { + if (c.closed) { + needPrune = true; + continue; } + const result = tryEnqueueBytes(c, c.encoder.encode(data)); + if (result === "dead") needPrune = true; } - // 清理死连接 — also clear their keepalive timers so the dead client doesn't - // keep an interval handle alive past its controller (small leak previously - // bounded by the next failed enqueue catching itself, but cleanup-on-detect - // is cheaper than waiting 30s). - for (let i = dead.length - 1; i >= 0; i--) { - const c = arr[dead[i]]; - if (c.keepalive) clearInterval(c.keepalive); - arr.splice(dead[i], 1); - } - if (arr.length === 0) clients.delete(key); + if (needPrune) pruneClosed(key); } export function __resetSSEClientsForTest(): void { + for (const arr of clients.values()) { + for (const c of arr) closeClient(c, "reset-for-test"); + } clients.clear(); + if (livenessSweepTimer) { + clearInterval(livenessSweepTimer); + livenessSweepTimer = null; + } } /** 获取当前 SSE 连接统计 */ @@ -142,8 +363,23 @@ export function getSSEStats(): { total: number; sessions: Record let total = 0; const sessions: Record = {}; for (const [name, arr] of clients) { - sessions[name] = arr.length; - total += arr.length; + const alive = arr.filter((c) => !c.closed); + if (alive.length === 0) continue; + sessions[name] = alive.length; + total += alive.length; } return { total, sessions }; } + +// Test hooks — exported so the test file can simulate half-open / stuck +// consumers without spinning up a real TCP socket. +export const __SSE_INTERNALS_FOR_TEST = { + tryEnqueueBytes, + sweepLiveness, + closeClient, + MAX_QUEUE_BACKPRESSURE_BYTES, + STUCK_CLOSE_MS, + KEEPALIVE_MS, + LIVENESS_SWEEP_MS, + clientsMap: clients, +};