diff --git a/crates/flight-ipc/src/connection_pool.rs b/crates/flight-ipc/src/connection_pool.rs index 526df784..aa1a0926 100644 --- a/crates/flight-ipc/src/connection_pool.rs +++ b/crates/flight-ipc/src/connection_pool.rs @@ -5,8 +5,14 @@ //! //! [`ConnectionPool`] tracks connected clients, enforces a maximum connection //! limit, and prunes idle connections that have exceeded a configurable timeout. +//! +//! [`ClientConnectionPool`] provides a client-side pool of gRPC connections +//! with health checking, round-robin selection, and connection lifecycle +//! management. [`KeepaliveConfig`] and [`PoolMetrics`] support observability. use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; use thiserror::Error; // --------------------------------------------------------------------------- @@ -164,6 +170,364 @@ impl ConnectionPool { } } +// --------------------------------------------------------------------------- +// KeepaliveConfig +// --------------------------------------------------------------------------- + +/// Configures keepalive behaviour for pooled connections. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KeepaliveConfig { + /// Interval between keepalive pings. + pub interval: Duration, + /// How long to wait for a pong before declaring the connection dead. + pub timeout: Duration, + /// Maximum consecutive missed pings before eviction. + pub max_missed_pings: u32, +} + +impl Default for KeepaliveConfig { + fn default() -> Self { + Self { + interval: Duration::from_secs(10), + timeout: Duration::from_secs(5), + max_missed_pings: 3, + } + } +} + +impl KeepaliveConfig { + /// Returns `true` when all fields have sensible positive values. + pub fn is_valid(&self) -> bool { + !self.interval.is_zero() && !self.timeout.is_zero() && self.max_missed_pings > 0 + } +} + +// --------------------------------------------------------------------------- +// PoolMetrics +// --------------------------------------------------------------------------- + +/// Observable metrics for a [`ClientConnectionPool`]. +#[derive(Debug, Default)] +pub struct PoolMetrics { + /// Total connections currently alive in the pool. + pub active_connections: AtomicU64, + /// Connections sitting idle (not checked-out). + pub idle_connections: AtomicU64, + /// Number of failed health checks since pool creation. + pub failed_health_checks: AtomicU64, + /// Total checkout requests served. + pub total_requests: AtomicU64, +} + +impl PoolMetrics { + /// Snapshot the counters into a plain struct for serialisation or logging. + pub fn snapshot(&self) -> PoolMetricsSnapshot { + PoolMetricsSnapshot { + active_connections: self.active_connections.load(Ordering::Relaxed), + idle_connections: self.idle_connections.load(Ordering::Relaxed), + failed_health_checks: self.failed_health_checks.load(Ordering::Relaxed), + total_requests: self.total_requests.load(Ordering::Relaxed), + } + } +} + +/// A plain-data snapshot of [`PoolMetrics`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PoolMetricsSnapshot { + /// Total connections currently alive in the pool. + pub active_connections: u64, + /// Connections sitting idle (not checked-out). + pub idle_connections: u64, + /// Number of failed health checks since pool creation. + pub failed_health_checks: u64, + /// Total checkout requests served. + pub total_requests: u64, +} + +// --------------------------------------------------------------------------- +// PooledConnection — individual entry in the client pool +// --------------------------------------------------------------------------- + +/// State of a connection inside the pool. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnState { + /// Available for checkout. + Idle, + /// Currently in use by a caller. + InUse, + /// Failed a health check and is awaiting eviction. + Unhealthy, +} + +/// A single connection tracked by [`ClientConnectionPool`]. +#[derive(Debug)] +pub struct PooledConnection { + /// Unique connection identifier. + pub id: u64, + /// Target endpoint address. + pub endpoint: String, + /// Current state. + pub state: ConnState, + /// Epoch-second timestamp of last successful health check. + pub last_health_check: u64, + /// Consecutive missed keepalive pings. + pub missed_pings: u32, + /// Total requests served by this connection. + pub request_count: u64, +} + +// --------------------------------------------------------------------------- +// SelectionStrategy +// --------------------------------------------------------------------------- + +/// Strategy used to pick the next connection on checkout. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SelectionStrategy { + /// Cycle through connections in order. + RoundRobin, + /// Pick the connection with the fewest in-flight requests. + LeastLoaded, +} + +// --------------------------------------------------------------------------- +// ClientConnectionPool +// --------------------------------------------------------------------------- + +/// Client-side pool of gRPC connections with health checking, configurable +/// size limits, and round-robin or least-loaded selection. +pub struct ClientConnectionPool { + connections: Vec, + min_connections: usize, + max_connections: usize, + keepalive: KeepaliveConfig, + strategy: SelectionStrategy, + metrics: PoolMetrics, + next_id: u64, + rr_index: usize, +} + +impl ClientConnectionPool { + /// Create a new pool targeting `endpoint`. + /// + /// `min` idle connections are pre-created; the pool will grow up to `max`. + pub fn new( + endpoint: &str, + min: usize, + max: usize, + keepalive: KeepaliveConfig, + strategy: SelectionStrategy, + ) -> Self { + assert!(min <= max, "min must be <= max"); + assert!(max > 0, "max must be > 0"); + + let mut pool = Self { + connections: Vec::with_capacity(max), + min_connections: min, + max_connections: max, + keepalive, + strategy, + metrics: PoolMetrics::default(), + next_id: 0, + rr_index: 0, + }; + + // Pre-populate with `min` idle connections. + for _ in 0..min { + pool.create_connection(endpoint); + } + + pool + } + + // -- connection lifecycle ----------------------------------------------- + + fn create_connection(&mut self, endpoint: &str) -> u64 { + let id = self.next_id; + self.next_id += 1; + self.connections.push(PooledConnection { + id, + endpoint: endpoint.to_owned(), + state: ConnState::Idle, + last_health_check: 0, + missed_pings: 0, + request_count: 0, + }); + self.metrics + .active_connections + .fetch_add(1, Ordering::Relaxed); + self.metrics + .idle_connections + .fetch_add(1, Ordering::Relaxed); + id + } + + /// Check out a connection using the configured [`SelectionStrategy`]. + /// + /// Returns the connection ID, or `None` if the pool is exhausted. + pub fn checkout(&mut self) -> Option { + self.metrics.total_requests.fetch_add(1, Ordering::Relaxed); + + let id = match self.strategy { + SelectionStrategy::RoundRobin => self.checkout_round_robin(), + SelectionStrategy::LeastLoaded => self.checkout_least_loaded(), + }; + + if let Some(conn_id) = id + && let Some(conn) = self.connections.iter_mut().find(|c| c.id == conn_id) + { + conn.state = ConnState::InUse; + conn.request_count += 1; + self.metrics + .idle_connections + .fetch_sub(1, Ordering::Relaxed); + } + + id + } + + fn checkout_round_robin(&mut self) -> Option { + let idle: Vec = self + .connections + .iter() + .enumerate() + .filter(|(_, c)| c.state == ConnState::Idle) + .map(|(i, _)| i) + .collect(); + if idle.is_empty() { + return None; + } + // Find the next index >= rr_index, wrapping around. + let pick = idle + .iter() + .find(|&&i| i >= self.rr_index) + .or(idle.first()) + .copied() + .unwrap(); + self.rr_index = pick + 1; + Some(self.connections[pick].id) + } + + fn checkout_least_loaded(&self) -> Option { + self.connections + .iter() + .filter(|c| c.state == ConnState::Idle) + .min_by_key(|c| c.request_count) + .map(|c| c.id) + } + + /// Return a connection to the pool after use. + pub fn checkin(&mut self, id: u64) { + if let Some(conn) = self.connections.iter_mut().find(|c| c.id == id) + && conn.state == ConnState::InUse + { + conn.state = ConnState::Idle; + self.metrics + .idle_connections + .fetch_add(1, Ordering::Relaxed); + } + } + + /// Simulate a health-check pass at time `now`. + /// + /// Connections with more missed pings than [`KeepaliveConfig::max_missed_pings`] + /// are marked [`ConnState::Unhealthy`]. + pub fn health_check(&mut self, now: u64, healthy_ids: &[u64]) { + for conn in &mut self.connections { + if conn.state == ConnState::Unhealthy { + continue; + } + if healthy_ids.contains(&conn.id) { + conn.last_health_check = now; + conn.missed_pings = 0; + } else { + conn.missed_pings += 1; + self.metrics + .failed_health_checks + .fetch_add(1, Ordering::Relaxed); + if conn.missed_pings >= self.keepalive.max_missed_pings { + let was_idle = conn.state == ConnState::Idle; + conn.state = ConnState::Unhealthy; + if was_idle { + self.metrics + .idle_connections + .fetch_sub(1, Ordering::Relaxed); + } + } + } + } + } + + /// Evict all unhealthy connections. Returns the IDs removed. + pub fn evict_unhealthy(&mut self) -> Vec { + let evicted: Vec = self + .connections + .iter() + .filter(|c| c.state == ConnState::Unhealthy) + .map(|c| c.id) + .collect(); + let count = evicted.len() as u64; + self.connections.retain(|c| c.state != ConnState::Unhealthy); + self.metrics + .active_connections + .fetch_sub(count, Ordering::Relaxed); + evicted + } + + // -- accessors ---------------------------------------------------------- + + /// Current number of connections (all states). + pub fn size(&self) -> usize { + self.connections.len() + } + + /// Number of idle connections. + pub fn idle_count(&self) -> usize { + self.connections + .iter() + .filter(|c| c.state == ConnState::Idle) + .count() + } + + /// Min pool size. + pub fn min_connections(&self) -> usize { + self.min_connections + } + + /// Max pool size. + pub fn max_connections(&self) -> usize { + self.max_connections + } + + /// Reference to the keepalive config. + pub fn keepalive(&self) -> &KeepaliveConfig { + &self.keepalive + } + + /// Reference to the live metrics. + pub fn metrics(&self) -> &PoolMetrics { + &self.metrics + } + + /// Look up a connection by ID. + pub fn get_connection(&self, id: u64) -> Option<&PooledConnection> { + self.connections.iter().find(|c| c.id == id) + } + + /// Returns `true` when the pool cannot grow further. + pub fn is_full(&self) -> bool { + self.connections.len() >= self.max_connections + } + + /// Attempt to grow the pool by one connection. Returns the new ID, or + /// `None` if already at capacity. + pub fn grow(&mut self, endpoint: &str) -> Option { + if self.is_full() { + return None; + } + Some(self.create_connection(endpoint)) + } +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -172,6 +536,8 @@ impl ConnectionPool { mod tests { use super::*; + // ===== Existing ConnectionPool tests =================================== + // 1. New pool is empty #[test] fn new_pool_is_empty() { @@ -326,4 +692,279 @@ mod tests { let pruned = pool.prune_idle(0); assert!(pruned.is_empty()); } + + // ===== KeepaliveConfig tests =========================================== + + // 15. Default keepalive config is valid + #[test] + fn default_keepalive_is_valid() { + let cfg = KeepaliveConfig::default(); + assert!(cfg.is_valid()); + assert_eq!(cfg.interval, Duration::from_secs(10)); + assert_eq!(cfg.timeout, Duration::from_secs(5)); + assert_eq!(cfg.max_missed_pings, 3); + } + + // 16. Zero interval is invalid + #[test] + fn zero_interval_keepalive_invalid() { + let cfg = KeepaliveConfig { + interval: Duration::ZERO, + ..KeepaliveConfig::default() + }; + assert!(!cfg.is_valid()); + } + + // 17. Zero timeout is invalid + #[test] + fn zero_timeout_keepalive_invalid() { + let cfg = KeepaliveConfig { + timeout: Duration::ZERO, + ..KeepaliveConfig::default() + }; + assert!(!cfg.is_valid()); + } + + // 18. Zero max_missed_pings is invalid + #[test] + fn zero_max_missed_pings_invalid() { + let cfg = KeepaliveConfig { + max_missed_pings: 0, + ..KeepaliveConfig::default() + }; + assert!(!cfg.is_valid()); + } + + // ===== PoolMetrics tests =============================================== + + // 19. Default metrics are zeroed + #[test] + fn default_pool_metrics_zeroed() { + let m = PoolMetrics::default(); + let snap = m.snapshot(); + assert_eq!(snap.active_connections, 0); + assert_eq!(snap.idle_connections, 0); + assert_eq!(snap.failed_health_checks, 0); + assert_eq!(snap.total_requests, 0); + } + + // 20. Metrics snapshot reflects atomic updates + #[test] + fn metrics_snapshot_reflects_updates() { + let m = PoolMetrics::default(); + m.active_connections.store(5, Ordering::Relaxed); + m.idle_connections.store(3, Ordering::Relaxed); + m.failed_health_checks.store(1, Ordering::Relaxed); + m.total_requests.store(42, Ordering::Relaxed); + let snap = m.snapshot(); + assert_eq!(snap.active_connections, 5); + assert_eq!(snap.idle_connections, 3); + assert_eq!(snap.failed_health_checks, 1); + assert_eq!(snap.total_requests, 42); + } + + // ===== ClientConnectionPool tests ====================================== + + // 21. Pool creation pre-populates min connections + #[test] + fn client_pool_creation_with_min_max() { + let pool = ClientConnectionPool::new( + "http://localhost:50051", + 2, + 5, + KeepaliveConfig::default(), + SelectionStrategy::RoundRobin, + ); + assert_eq!(pool.size(), 2); + assert_eq!(pool.idle_count(), 2); + assert_eq!(pool.min_connections(), 2); + assert_eq!(pool.max_connections(), 5); + assert!(!pool.is_full()); + } + + // 22. Checkout and checkin cycle + #[test] + fn checkout_and_checkin() { + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 2, + 4, + KeepaliveConfig::default(), + SelectionStrategy::RoundRobin, + ); + let id = pool.checkout().unwrap(); + assert_eq!(pool.idle_count(), 1); + + let conn = pool.get_connection(id).unwrap(); + assert_eq!(conn.state, ConnState::InUse); + assert_eq!(conn.request_count, 1); + + pool.checkin(id); + assert_eq!(pool.idle_count(), 2); + let conn = pool.get_connection(id).unwrap(); + assert_eq!(conn.state, ConnState::Idle); + } + + // 23. Checkout returns None when all busy + #[test] + fn checkout_returns_none_when_all_busy() { + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 1, + 1, + KeepaliveConfig::default(), + SelectionStrategy::RoundRobin, + ); + let _id = pool.checkout().unwrap(); + assert!(pool.checkout().is_none()); + } + + // 24. Health check marks connections unhealthy after max_missed_pings + #[test] + fn health_check_evicts_stale() { + let keepalive = KeepaliveConfig { + max_missed_pings: 2, + ..KeepaliveConfig::default() + }; + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 2, + 4, + keepalive, + SelectionStrategy::RoundRobin, + ); + let id0 = pool.connections[0].id; + let id1 = pool.connections[1].id; + + // First check: only id1 responds → id0 gets 1 miss + pool.health_check(100, &[id1]); + assert_eq!(pool.get_connection(id0).unwrap().missed_pings, 1); + assert_eq!(pool.get_connection(id0).unwrap().state, ConnState::Idle); + + // Second check: still only id1 → id0 gets 2 misses → Unhealthy + pool.health_check(200, &[id1]); + assert_eq!( + pool.get_connection(id0).unwrap().state, + ConnState::Unhealthy + ); + assert_eq!(pool.get_connection(id1).unwrap().state, ConnState::Idle); + + // Evict + let evicted = pool.evict_unhealthy(); + assert_eq!(evicted, vec![id0]); + assert_eq!(pool.size(), 1); + } + + // 25. Round-robin selection distributes across connections + #[test] + fn round_robin_distributes() { + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 3, + 5, + KeepaliveConfig::default(), + SelectionStrategy::RoundRobin, + ); + let a = pool.checkout().unwrap(); + pool.checkin(a); + let b = pool.checkout().unwrap(); + pool.checkin(b); + let c = pool.checkout().unwrap(); + pool.checkin(c); + + // All three should have been used (IDs 0, 1, 2) + let mut used = vec![a, b, c]; + used.sort(); + used.dedup(); + assert_eq!(used.len(), 3); + } + + // 26. Least-loaded selection picks connection with fewest requests + #[test] + fn least_loaded_selects_lowest() { + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 2, + 4, + KeepaliveConfig::default(), + SelectionStrategy::LeastLoaded, + ); + + // Checkout and checkin first connection 3 times + for _ in 0..3 { + let id = pool.checkout().unwrap(); + pool.checkin(id); + } + // Now conn 0 has 3 requests, conn 1 has 0 → least-loaded picks conn 1 + let next = pool.checkout().unwrap(); + assert_eq!(next, pool.connections[1].id); + } + + // 27. Pool growth + #[test] + fn pool_grow() { + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 1, + 3, + KeepaliveConfig::default(), + SelectionStrategy::RoundRobin, + ); + assert_eq!(pool.size(), 1); + + let id = pool.grow("http://localhost:50051").unwrap(); + assert_eq!(pool.size(), 2); + assert_eq!(pool.get_connection(id).unwrap().state, ConnState::Idle); + + pool.grow("http://localhost:50051").unwrap(); + assert_eq!(pool.size(), 3); + assert!(pool.is_full()); + assert!(pool.grow("http://localhost:50051").is_none()); + } + + // 28. Metrics track checkout and health-check failures + #[test] + fn pool_metrics_tracking() { + let keepalive = KeepaliveConfig { + max_missed_pings: 1, + ..KeepaliveConfig::default() + }; + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 2, + 4, + keepalive, + SelectionStrategy::RoundRobin, + ); + + // 3 checkouts + for _ in 0..3 { + if let Some(id) = pool.checkout() { + pool.checkin(id); + } + } + + // 1 failed health check + pool.health_check(100, &[]); + + let snap = pool.metrics().snapshot(); + assert_eq!(snap.total_requests, 3); + assert!(snap.failed_health_checks >= 2); // two connections failed + assert_eq!(snap.active_connections, 2); + } + + // 29. Evict unhealthy returns empty when all healthy + #[test] + fn evict_unhealthy_noop_when_healthy() { + let mut pool = ClientConnectionPool::new( + "http://localhost:50051", + 2, + 4, + KeepaliveConfig::default(), + SelectionStrategy::RoundRobin, + ); + let evicted = pool.evict_unhealthy(); + assert!(evicted.is_empty()); + assert_eq!(pool.size(), 2); + } } diff --git a/crates/flight-ipc/src/subscriptions.rs b/crates/flight-ipc/src/subscriptions.rs index d9dc5c74..d691fc86 100644 --- a/crates/flight-ipc/src/subscriptions.rs +++ b/crates/flight-ipc/src/subscriptions.rs @@ -184,6 +184,26 @@ struct SubscriptionRecord { last_delivery: Option, /// Last delivered payload hash – used for `changed_only`. last_payload_hash: Option, + /// Maximum pending deliveries before the subscriber is considered slow. + capacity: usize, + /// Number of pending (unacknowledged) deliveries. + pending: usize, + /// Cumulative number of events dropped due to backpressure. + dropped: u64, +} + +// --------------------------------------------------------------------------- +// BackpressureStats +// --------------------------------------------------------------------------- + +/// Per-broadcast statistics returned by +/// [`SubscriptionManager::broadcast_with_backpressure`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BackpressureStats { + /// Subscription IDs that received the event. + pub delivered: Vec, + /// Subscription IDs that were skipped because their queue was full. + pub dropped: Vec, } // --------------------------------------------------------------------------- @@ -213,6 +233,19 @@ impl SubscriptionManager { /// Returns a [`SubscriptionHandle`] the caller can use to check liveness /// or cancel the subscription. pub fn subscribe(&mut self, topic: Topic, filter: SubscriptionFilter) -> SubscriptionHandle { + self.subscribe_with_capacity(topic, filter, usize::MAX) + } + + /// Subscribe with an explicit backpressure capacity. + /// + /// When the subscriber accumulates more than `capacity` pending deliveries, + /// new events are dropped and counted via [`dropped_count`](Self::dropped_count). + pub fn subscribe_with_capacity( + &mut self, + topic: Topic, + filter: SubscriptionFilter, + capacity: usize, + ) -> SubscriptionHandle { let id = self.next_id.fetch_add(1, Ordering::Relaxed); let active = Arc::new(AtomicBool::new(true)); let now = Instant::now(); @@ -227,6 +260,9 @@ impl SubscriptionManager { active: Arc::clone(&active), last_delivery: None, last_payload_hash: None, + capacity, + pending: 0, + dropped: 0, }, ); @@ -306,6 +342,71 @@ impl SubscriptionManager { delivered } + /// Broadcast with backpressure: slow subscribers whose pending count has + /// reached their capacity will have the event dropped (and counted). + pub fn broadcast_with_backpressure(&mut self, message: &BroadcastMessage) -> BackpressureStats { + self.gc(); + + let now = Instant::now(); + let payload_hash = simple_hash(&message.payload); + let mut delivered = Vec::new(); + let mut dropped = Vec::new(); + + for rec in self.subscriptions.values_mut() { + if rec.topic != message.topic { + continue; + } + + if !rec + .filter + .matches(message.device_id.as_deref(), message.axis_id.as_deref()) + { + continue; + } + + if let Some(min_ms) = rec.filter.min_interval_ms + && let Some(last) = rec.last_delivery + && now.duration_since(last).as_millis() < u128::from(min_ms) + { + continue; + } + + if rec.filter.changed_only && rec.last_payload_hash == Some(payload_hash) { + continue; + } + + // Backpressure check + if rec.pending >= rec.capacity { + rec.dropped += 1; + dropped.push(rec.id); + continue; + } + + rec.last_delivery = Some(now); + rec.last_payload_hash = Some(payload_hash); + rec.pending += 1; + delivered.push(rec.id); + } + + BackpressureStats { delivered, dropped } + } + + /// Acknowledge that a subscriber has consumed a pending delivery. + /// + /// This decrements the subscriber's pending counter, allowing future + /// deliveries when backpressure is in effect. + pub fn acknowledge(&mut self, id: SubscriptionId) { + if let Some(rec) = self.subscriptions.get_mut(&id) { + rec.pending = rec.pending.saturating_sub(1); + } + } + + /// Return the cumulative number of events dropped for `id` due to + /// backpressure. + pub fn dropped_count(&self, id: SubscriptionId) -> u64 { + self.subscriptions.get(&id).map_or(0, |rec| rec.dropped) + } + /// Number of active subscriptions (after garbage collection). pub fn active_count(&mut self) -> usize { self.gc(); @@ -807,4 +908,97 @@ mod tests { assert_eq!(ids.len(), 2); assert!(!ids.contains(&h1.id)); } + + // ===== Backpressure tests ============================================== + + // ----------------------------------------------------------------------- + // 28. Backpressure drops events when subscriber is slow + // ----------------------------------------------------------------------- + #[test] + fn backpressure_drops_slow_subscriber() { + let mut mgr = SubscriptionManager::new(); + let h = mgr.subscribe_with_capacity(Topic::AxisData, no_filter(), 2); + + // First two deliveries should succeed + let stats = mgr.broadcast_with_backpressure(&msg(Topic::AxisData, "v1")); + assert_eq!(stats.delivered, vec![h.id]); + assert!(stats.dropped.is_empty()); + + let stats = mgr.broadcast_with_backpressure(&msg(Topic::AxisData, "v2")); + assert_eq!(stats.delivered, vec![h.id]); + + // Third delivery exceeds capacity → dropped + let stats = mgr.broadcast_with_backpressure(&msg(Topic::AxisData, "v3")); + assert!(stats.delivered.is_empty()); + assert_eq!(stats.dropped, vec![h.id]); + + assert_eq!(mgr.dropped_count(h.id), 1); + } + + // ----------------------------------------------------------------------- + // 29. Acknowledge frees capacity + // ----------------------------------------------------------------------- + #[test] + fn acknowledge_frees_capacity() { + let mut mgr = SubscriptionManager::new(); + let h = mgr.subscribe_with_capacity(Topic::DeviceEvents, no_filter(), 1); + + let stats = mgr.broadcast_with_backpressure(&msg(Topic::DeviceEvents, "e1")); + assert_eq!(stats.delivered, vec![h.id]); + + // At capacity — next would be dropped + let stats = mgr.broadcast_with_backpressure(&msg(Topic::DeviceEvents, "e2")); + assert!(stats.delivered.is_empty()); + + // Acknowledge → free a slot + mgr.acknowledge(h.id); + + let stats = mgr.broadcast_with_backpressure(&msg(Topic::DeviceEvents, "e3")); + assert_eq!(stats.delivered, vec![h.id]); + } + + // ----------------------------------------------------------------------- + // 30. Dropped count accumulates + // ----------------------------------------------------------------------- + #[test] + fn dropped_count_accumulates() { + let mut mgr = SubscriptionManager::new(); + let h = mgr.subscribe_with_capacity(Topic::HealthStatus, no_filter(), 0); + + for i in 0..5 { + mgr.broadcast_with_backpressure(&msg(Topic::HealthStatus, &format!("d{i}"))); + } + assert_eq!(mgr.dropped_count(h.id), 5); + } + + // ----------------------------------------------------------------------- + // 31. Mixed fast and slow subscribers + // ----------------------------------------------------------------------- + #[test] + fn backpressure_mixed_fast_slow() { + let mut mgr = SubscriptionManager::new(); + let fast = mgr.subscribe(Topic::AxisData, no_filter()); // unlimited + let slow = mgr.subscribe_with_capacity(Topic::AxisData, no_filter(), 1); + + // First event reaches both + let stats = mgr.broadcast_with_backpressure(&msg(Topic::AxisData, "a")); + assert!(stats.delivered.contains(&fast.id)); + assert!(stats.delivered.contains(&slow.id)); + + // Second event: slow is at capacity + let stats = mgr.broadcast_with_backpressure(&msg(Topic::AxisData, "b")); + assert!(stats.delivered.contains(&fast.id)); + assert!(stats.dropped.contains(&slow.id)); + assert_eq!(mgr.dropped_count(slow.id), 1); + assert_eq!(mgr.dropped_count(fast.id), 0); + } + + // ----------------------------------------------------------------------- + // 32. Dropped count for unknown subscription returns 0 + // ----------------------------------------------------------------------- + #[test] + fn dropped_count_unknown_id() { + let mgr = SubscriptionManager::new(); + assert_eq!(mgr.dropped_count(9999), 0); + } }