Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 57 additions & 11 deletions src/relay_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ fn at_capacity(len: usize, max: usize) -> bool {
len >= max
}

/// Drop subscriber senders whose receiver (the SSE client) has disconnected.
/// `is_closed()` becomes true once the `UnboundedReceiverStream` is dropped on
/// client disconnect. `post_event` only prunes lazily on its next broadcast, so
/// a slot that goes silent would otherwise leak dead senders forever; pruning at
/// stream admission bounds it. Pure → unit-tested.
fn retain_live_subscribers(subs: &mut Vec<tokio::sync::mpsc::UnboundedSender<Value>>) {
subs.retain(|tx| !tx.is_closed());
}

/// Wall-clock unix seconds (best-effort; 0 on a pre-epoch clock).
fn unix_now() -> u64 {
SystemTime::now()
Expand Down Expand Up @@ -977,6 +986,11 @@ async fn post_event(
let mut inner = relay.inner.lock().await;
if let Some(subs) = inner.streams.get_mut(&slot_id) {
subs.retain(|tx| tx.send(req.event.clone()).is_ok());
if subs.is_empty() {
// Don't leave an empty per-slot Vec behind — `streams` would
// otherwise accumulate one key per ever-streamed slot.
inner.streams.remove(&slot_id);
}
}
}
(
Expand Down Expand Up @@ -1013,6 +1027,15 @@ async fn stream_events(
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
{
let mut inner = relay.inner.lock().await;
// Prune already-disconnected subscribers before counting toward the cap
// (post_event only prunes lazily on a broadcast → a silent slot leaks
// dead senders forever and over-counts against MAX_STREAMS_PER_SLOT).
if let Some(subs) = inner.streams.get_mut(&slot_id) {
retain_live_subscribers(subs);
if subs.is_empty() {
inner.streams.remove(&slot_id);
}
}
// Audit: cap subscribers per slot so an authed token-holder can't open
// unbounded streams (each fans out on every post_event → memory + O(n)
// latency DoS). Over the ceiling → 503.
Expand Down Expand Up @@ -2168,26 +2191,37 @@ async fn list_events(
return resp;
}
let limit = q.limit.unwrap_or(100).min(1000);
let mut inner = relay.inner.lock().await;
// R4: record this pull as proof that the slot owner is still polling.
// Anyone holding the slot_token (i.e., a paired peer) can later read
// last_pull_at_unix via /v1/slot/:slot_id/state to gauge attentiveness.
let now_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut inner = relay.inner.lock().await;
inner.last_pull_at_unix.insert(slot_id.clone(), now_unix);
let events = inner.slots.get(&slot_id).cloned().unwrap_or_default();
let start = match q.since {
Some(eid) => events
.iter()
.position(|e| e.get("event_id").and_then(Value::as_str) == Some(&eid))
.map(|i| i + 1)
.unwrap_or(0),
None => 0,
// Borrow the slot and clone ONLY the requested window. Pre-fix this
// `.cloned()`'d the WHOLE slot Vec (bounded only by MAX_SLOT_BYTES = 64 MiB)
// under the global mutex on every pull — a multi-MB memcpy serialized under
// the lock that post_event and every other handler contend on, which was the
// #342 saturation hot path. The `since` position scan stays under the lock
// but only walks refs (no allocation); the sole clone is the <=limit slice.
let slice: Vec<Value> = match inner.slots.get(&slot_id) {
Some(events) => {
let start = match q.since {
Some(ref eid) => events
.iter()
.position(|e| e.get("event_id").and_then(Value::as_str) == Some(eid.as_str()))
.map(|i| i + 1)
.unwrap_or(0),
None => 0,
};
let end = (start + limit).min(events.len());
events[start..end].to_vec()
}
None => Vec::new(),
};
let end = (start + limit).min(events.len());
let slice = events[start..end].to_vec();
drop(inner); // release before serializing the response
(StatusCode::OK, Json(slice)).into_response()
}

Expand Down Expand Up @@ -2718,6 +2752,18 @@ mod tests {
assert!(at_capacity(MAX_STREAMS_PER_SLOT + 1, MAX_STREAMS_PER_SLOT));
}

#[test]
fn retain_live_subscribers_drops_disconnected() {
// A disconnected SSE client = a dropped receiver → sender is_closed().
let (live_tx, _live_rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
let (dead_tx, dead_rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
drop(dead_rx); // client disconnected
let mut subs = vec![live_tx, dead_tx];
retain_live_subscribers(&mut subs);
assert_eq!(subs.len(), 1, "the disconnected subscriber is pruned");
assert!(!subs[0].is_closed(), "the survivor is the live sender");
}

#[test]
fn evict_stale_intro_nicks_drops_only_fully_aged_entries() {
let now = 10_000u64;
Expand Down