diff --git a/services/api/src/blockchain.rs b/services/api/src/blockchain.rs index fa06ace..0caaffa 100644 --- a/services/api/src/blockchain.rs +++ b/services/api/src/blockchain.rs @@ -40,6 +40,10 @@ pub struct BlockchainClient { /// regardless of their finalization status to bound memory growth. const WATCHED_TX_TTL: Duration = Duration::from_secs(30 * 60); // 30 minutes +/// Maximum number of entries in `watched_txs`. When the cap is reached the +/// oldest entry (by insertion time) is evicted to make room for the new one. +pub const WATCHED_TX_MAX_SIZE: usize = 10_000; + #[derive(Default)] struct MonitoringState { /// Maps tx hash → time it was first watched. Evicted after `WATCHED_TX_TTL`. @@ -946,7 +950,6 @@ impl BlockchainClient { } pub async fn watch_transaction(&self, hash: &str) { - const MAX_WATCHED: usize = 1000; let mut set = self.monitor.watched_txs.write().await; // Evict TTL-expired entries first. @@ -959,15 +962,25 @@ impl BlockchainClient { tracing::info!(evicted, "watched_txs: TTL eviction"); } - if set.len() < MAX_WATCHED { - set.entry(hash.to_string()).or_insert(now); - } else { - tracing::warn!( - cap = MAX_WATCHED, - hash, - "watched_txs cap reached, dropping tx" - ); + // If still at cap, evict the single oldest entry to make room. + if set.len() >= WATCHED_TX_MAX_SIZE { + if let Some(oldest_key) = set + .iter() + .min_by_key(|(_, inserted_at)| *inserted_at) + .map(|(k, _)| k.clone()) + { + set.remove(&oldest_key); + self.metrics.observe_tx_eviction(1); + tracing::warn!( + cap = WATCHED_TX_MAX_SIZE, + evicted_hash = %oldest_key, + new_hash = hash, + "watched_txs cap reached, evicting oldest entry" + ); + } } + + set.entry(hash.to_string()).or_insert(now); } /// Replay missed events from `from_ledger` up to the current confirmed tip. @@ -1085,30 +1098,31 @@ mod tests { } } - /// Inserting more than MAX_WATCHED hashes must not grow the set beyond the cap. + /// Inserting more than WATCHED_TX_MAX_SIZE hashes must not grow the set beyond the cap. #[tokio::test] async fn watched_txs_cap_prevents_unbounded_growth() { - use super::{MonitoringState, WATCHED_TX_TTL}; + use super::{MonitoringState, WATCHED_TX_MAX_SIZE, WATCHED_TX_TTL}; use std::sync::Arc; - use tokio::sync::RwLock; - // Build a minimal MonitoringState directly. let state = Arc::new(MonitoringState::default()); - // Insert MAX_WATCHED + 50 unique hashes. - const MAX_WATCHED: usize = 1000; - for i in 0..MAX_WATCHED + 50 { + // Insert WATCHED_TX_MAX_SIZE + 50 unique hashes, simulating the eviction logic. + for i in 0..WATCHED_TX_MAX_SIZE + 50 { let hash = format!("hash-{i}"); let mut set = state.watched_txs.write().await; let now = std::time::Instant::now(); set.retain(|_, inserted_at| now.duration_since(*inserted_at) < WATCHED_TX_TTL); - if set.len() < MAX_WATCHED { - set.entry(hash).or_insert(now); + // Evict oldest if at cap. + if set.len() >= WATCHED_TX_MAX_SIZE { + if let Some(oldest) = set.iter().min_by_key(|(_, t)| *t).map(|(k, _)| k.clone()) { + set.remove(&oldest); + } } + set.entry(hash).or_insert(now); } let len = state.watched_txs.read().await.len(); - assert_eq!(len, MAX_WATCHED, "set must not exceed cap"); + assert_eq!(len, WATCHED_TX_MAX_SIZE, "set must not exceed cap"); } /// Entries older than WATCHED_TX_TTL are evicted on the next insert. @@ -1138,4 +1152,43 @@ mod tests { assert!(!set.contains_key("old-hash"), "stale entry must be evicted"); assert!(set.contains_key("new-hash"), "fresh entry must be present"); } + + /// When the cap is reached, the oldest entry is evicted (not the new one dropped). + #[tokio::test] + async fn watched_txs_cap_evicts_oldest_not_newest() { + use super::{MonitoringState, WATCHED_TX_MAX_SIZE, WATCHED_TX_TTL}; + use std::sync::Arc; + use std::time::{Duration, Instant}; + + let state = Arc::new(MonitoringState::default()); + + // Fill to cap, with "oldest-hash" inserted first (oldest timestamp). + { + let mut set = state.watched_txs.write().await; + let old_time = Instant::now() - Duration::from_secs(60); + set.insert("oldest-hash".to_string(), old_time); + let now = Instant::now(); + for i in 1..WATCHED_TX_MAX_SIZE { + set.insert(format!("hash-{i}"), now); + } + } + + // Insert one more — should evict "oldest-hash". + { + let mut set = state.watched_txs.write().await; + let now = Instant::now(); + set.retain(|_, inserted_at| now.duration_since(*inserted_at) < WATCHED_TX_TTL); + if set.len() >= WATCHED_TX_MAX_SIZE { + if let Some(oldest) = set.iter().min_by_key(|(_, t)| *t).map(|(k, _)| k.clone()) { + set.remove(&oldest); + } + } + set.entry("newest-hash".to_string()).or_insert(now); + } + + let set = state.watched_txs.read().await; + assert!(!set.contains_key("oldest-hash"), "oldest entry must be evicted"); + assert!(set.contains_key("newest-hash"), "newest entry must be present"); + assert_eq!(set.len(), WATCHED_TX_MAX_SIZE, "size must remain at cap"); + } }