From 118c5faaef4b64ea8ade7f669790de3ab13396db Mon Sep 17 00:00:00 2001 From: Paul Logan Date: Fri, 19 Jun 2026 19:06:31 -0700 Subject: [PATCH] fix(relay): stop cloning whole slots under the lock + leaking dead SSE subs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The actual root cause of the read-path saturation #342 mitigated by raising the fly concurrency ceiling. Three relay_server.rs contention/leak fixes from the bug-hunt's relay-locks + sse-stream dimensions. #2 (HIGH) list_events `.cloned()`'d the ENTIRE slot Vec (bounded only by MAX_SLOT_BYTES = 64 MiB) under the single global mutex on every pull, then sliced. Under the concurrent `?limit=1000` pulls that caused the outage, each pull serialized a multi-MB memcpy under the lock that post_event and every other handler contend on. Now borrows the slot, clones ONLY the `[start..end]` window (<= limit <= 1000 events), and drops the lock before serializing. unix-now is also computed before taking the lock. #4 disconnected SSE subscribers leaked: post_event prunes dead senders only lazily on its next broadcast, so a slot that goes silent kept dead senders forever (and over-counted them against MAX_STREAMS_PER_SLOT). stream_events now prunes at admission via `retain_live_subscribers` (tx.is_closed()). #13 empty per-slot Vecs in `streams` were never removed → one map key per ever-streamed slot. post_event + stream_events now drop the key when it empties. (#8 — broadcast cloning per-subscriber under the lock — deferred: the safe fix needs an Arc channel-type change / lock dance with event-loss edges, and it's dominated by #2.) Test `retain_live_subscribers_drops_disconnected`; the 21 relay integration tests (list_events round-trips + SSE) confirm #2 is behaviour-preserving. 607 lib tests; fmt + clippy clean. Co-Authored-By: Claude Opus 4.8 --- src/relay_server.rs | 68 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/src/relay_server.rs b/src/relay_server.rs index 155a4f3..5974d18 100644 --- a/src/relay_server.rs +++ b/src/relay_server.rs @@ -88,6 +88,15 @@ fn at_capacity(len: usize, max: usize) -> bool { len >= max } +/// Drop subscriber senders whose receiver (the SSE client) has disconnected. +/// `is_closed()` becomes true once the `UnboundedReceiverStream` is dropped on +/// client disconnect. `post_event` only prunes lazily on its next broadcast, so +/// a slot that goes silent would otherwise leak dead senders forever; pruning at +/// stream admission bounds it. Pure → unit-tested. +fn retain_live_subscribers(subs: &mut Vec>) { + subs.retain(|tx| !tx.is_closed()); +} + /// Wall-clock unix seconds (best-effort; 0 on a pre-epoch clock). fn unix_now() -> u64 { SystemTime::now() @@ -977,6 +986,11 @@ async fn post_event( let mut inner = relay.inner.lock().await; if let Some(subs) = inner.streams.get_mut(&slot_id) { subs.retain(|tx| tx.send(req.event.clone()).is_ok()); + if subs.is_empty() { + // Don't leave an empty per-slot Vec behind — `streams` would + // otherwise accumulate one key per ever-streamed slot. + inner.streams.remove(&slot_id); + } } } ( @@ -1013,6 +1027,15 @@ async fn stream_events( let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); { let mut inner = relay.inner.lock().await; + // Prune already-disconnected subscribers before counting toward the cap + // (post_event only prunes lazily on a broadcast → a silent slot leaks + // dead senders forever and over-counts against MAX_STREAMS_PER_SLOT). + if let Some(subs) = inner.streams.get_mut(&slot_id) { + retain_live_subscribers(subs); + if subs.is_empty() { + inner.streams.remove(&slot_id); + } + } // Audit: cap subscribers per slot so an authed token-holder can't open // unbounded streams (each fans out on every post_event → memory + O(n) // latency DoS). Over the ceiling → 503. @@ -2168,7 +2191,6 @@ async fn list_events( return resp; } let limit = q.limit.unwrap_or(100).min(1000); - let mut inner = relay.inner.lock().await; // R4: record this pull as proof that the slot owner is still polling. // Anyone holding the slot_token (i.e., a paired peer) can later read // last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness. @@ -2176,18 +2198,30 @@ async fn list_events( .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs()) .unwrap_or(0); + let mut inner = relay.inner.lock().await; inner.last_pull_at_unix.insert(slot_id.clone(), now_unix); - let events = inner.slots.get(&slot_id).cloned().unwrap_or_default(); - let start = match q.since { - Some(eid) => events - .iter() - .position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid)) - .map(|i| i + 1) - .unwrap_or(0), - None => 0, + // Borrow the slot and clone ONLY the requested window. Pre-fix this + // `.cloned()`'d the WHOLE slot Vec (bounded only by MAX_SLOT_BYTES = 64 MiB) + // under the global mutex on every pull — a multi-MB memcpy serialized under + // the lock that post_event and every other handler contend on, which was the + // #342 saturation hot path. The `since` position scan stays under the lock + // but only walks refs (no allocation); the sole clone is the <=limit slice. + let slice: Vec = match inner.slots.get(&slot_id) { + Some(events) => { + let start = match q.since { + Some(ref eid) => events + .iter() + .position(|e| e.get("event_id").and_then(Value::as_str) == Some(eid.as_str())) + .map(|i| i + 1) + .unwrap_or(0), + None => 0, + }; + let end = (start + limit).min(events.len()); + events[start..end].to_vec() + } + None => Vec::new(), }; - let end = (start + limit).min(events.len()); - let slice = events[start..end].to_vec(); + drop(inner); // release before serializing the response (StatusCode::OK, Json(slice)).into_response() } @@ -2718,6 +2752,18 @@ mod tests { assert!(at_capacity(MAX_STREAMS_PER_SLOT + 1, MAX_STREAMS_PER_SLOT)); } + #[test] + fn retain_live_subscribers_drops_disconnected() { + // A disconnected SSE client = a dropped receiver → sender is_closed(). + let (live_tx, _live_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (dead_tx, dead_rx) = tokio::sync::mpsc::unbounded_channel::(); + drop(dead_rx); // client disconnected + let mut subs = vec![live_tx, dead_tx]; + retain_live_subscribers(&mut subs); + assert_eq!(subs.len(), 1, "the disconnected subscriber is pruned"); + assert!(!subs[0].is_closed(), "the survivor is the live sender"); + } + #[test] fn evict_stale_intro_nicks_drops_only_fully_aged_entries() { let now = 10_000u64;