Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 72 additions & 19 deletions services/api/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub struct BlockchainClient {
/// regardless of their finalization status to bound memory growth.
const WATCHED_TX_TTL: Duration = Duration::from_secs(30 * 60); // 30 minutes

/// Maximum number of entries in `watched_txs`. When the cap is reached the
/// oldest entry (by insertion time) is evicted to make room for the new one.
pub const WATCHED_TX_MAX_SIZE: usize = 10_000;

#[derive(Default)]
struct MonitoringState {
/// Maps tx hash → time it was first watched. Evicted after `WATCHED_TX_TTL`.
Expand Down Expand Up @@ -946,7 +950,6 @@ impl BlockchainClient {
}

pub async fn watch_transaction(&self, hash: &str) {
const MAX_WATCHED: usize = 1000;
let mut set = self.monitor.watched_txs.write().await;

// Evict TTL-expired entries first.
Expand All @@ -959,15 +962,25 @@ impl BlockchainClient {
tracing::info!(evicted, "watched_txs: TTL eviction");
}

if set.len() < MAX_WATCHED {
set.entry(hash.to_string()).or_insert(now);
} else {
tracing::warn!(
cap = MAX_WATCHED,
hash,
"watched_txs cap reached, dropping tx"
);
// If still at cap, evict the single oldest entry to make room.
if set.len() >= WATCHED_TX_MAX_SIZE {
if let Some(oldest_key) = set
.iter()
.min_by_key(|(_, inserted_at)| *inserted_at)
.map(|(k, _)| k.clone())
{
set.remove(&oldest_key);
self.metrics.observe_tx_eviction(1);
tracing::warn!(
cap = WATCHED_TX_MAX_SIZE,
evicted_hash = %oldest_key,
new_hash = hash,
"watched_txs cap reached, evicting oldest entry"
);
}
}

set.entry(hash.to_string()).or_insert(now);
}

/// Replay missed events from `from_ledger` up to the current confirmed tip.
Expand Down Expand Up @@ -1085,30 +1098,31 @@ mod tests {
}
}

/// Inserting more than MAX_WATCHED hashes must not grow the set beyond the cap.
/// Inserting more than WATCHED_TX_MAX_SIZE hashes must not grow the set beyond the cap.
#[tokio::test]
async fn watched_txs_cap_prevents_unbounded_growth() {
use super::{MonitoringState, WATCHED_TX_TTL};
use super::{MonitoringState, WATCHED_TX_MAX_SIZE, WATCHED_TX_TTL};
use std::sync::Arc;
use tokio::sync::RwLock;

// Build a minimal MonitoringState directly.
let state = Arc::new(MonitoringState::default());

// Insert MAX_WATCHED + 50 unique hashes.
const MAX_WATCHED: usize = 1000;
for i in 0..MAX_WATCHED + 50 {
// Insert WATCHED_TX_MAX_SIZE + 50 unique hashes, simulating the eviction logic.
for i in 0..WATCHED_TX_MAX_SIZE + 50 {
let hash = format!("hash-{i}");
let mut set = state.watched_txs.write().await;
let now = std::time::Instant::now();
set.retain(|_, inserted_at| now.duration_since(*inserted_at) < WATCHED_TX_TTL);
if set.len() < MAX_WATCHED {
set.entry(hash).or_insert(now);
// Evict oldest if at cap.
if set.len() >= WATCHED_TX_MAX_SIZE {
if let Some(oldest) = set.iter().min_by_key(|(_, t)| *t).map(|(k, _)| k.clone()) {
set.remove(&oldest);
}
}
set.entry(hash).or_insert(now);
}

let len = state.watched_txs.read().await.len();
assert_eq!(len, MAX_WATCHED, "set must not exceed cap");
assert_eq!(len, WATCHED_TX_MAX_SIZE, "set must not exceed cap");
}

/// Entries older than WATCHED_TX_TTL are evicted on the next insert.
Expand Down Expand Up @@ -1138,4 +1152,43 @@ mod tests {
assert!(!set.contains_key("old-hash"), "stale entry must be evicted");
assert!(set.contains_key("new-hash"), "fresh entry must be present");
}

/// When the cap is reached, the oldest entry is evicted (not the new one dropped).
#[tokio::test]
async fn watched_txs_cap_evicts_oldest_not_newest() {
use super::{MonitoringState, WATCHED_TX_MAX_SIZE, WATCHED_TX_TTL};
use std::sync::Arc;
use std::time::{Duration, Instant};

let state = Arc::new(MonitoringState::default());

// Fill to cap, with "oldest-hash" inserted first (oldest timestamp).
{
let mut set = state.watched_txs.write().await;
let old_time = Instant::now() - Duration::from_secs(60);
set.insert("oldest-hash".to_string(), old_time);
let now = Instant::now();
for i in 1..WATCHED_TX_MAX_SIZE {
set.insert(format!("hash-{i}"), now);
}
}

// Insert one more — should evict "oldest-hash".
{
let mut set = state.watched_txs.write().await;
let now = Instant::now();
set.retain(|_, inserted_at| now.duration_since(*inserted_at) < WATCHED_TX_TTL);
if set.len() >= WATCHED_TX_MAX_SIZE {
if let Some(oldest) = set.iter().min_by_key(|(_, t)| *t).map(|(k, _)| k.clone()) {
set.remove(&oldest);
}
}
set.entry("newest-hash".to_string()).or_insert(now);
}

let set = state.watched_txs.read().await;
assert!(!set.contains_key("oldest-hash"), "oldest entry must be evicted");
assert!(set.contains_key("newest-hash"), "newest entry must be present");
assert_eq!(set.len(), WATCHED_TX_MAX_SIZE, "size must remain at cap");
}
}