From e90831b1e290a284e024c87dd3c58cb298de3281 Mon Sep 17 00:00:00 2001 From: unrealtim-tech Date: Sun, 31 May 2026 19:55:08 +0100 Subject: [PATCH] services/api/src/cache/mod.rs implements tag-based cache invalidation. If tags accumulate without TTL or size limits, the Redis memory usage can grow unboundedly, especially for high-cardinality tag sets. Acceptance Criteria Tag sets stored with a TTL matching the longest-lived cached entry Tag set size capped; oldest entries evicted when cap is reached Redis memory usage for cache tags m --- services/api/src/cache/mod.rs | 172 +++++++++++++++++++++++++++++++++- services/api/src/metrics.rs | 2 + 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/services/api/src/cache/mod.rs b/services/api/src/cache/mod.rs index 6f41e04b..a90bbf4b 100644 --- a/services/api/src/cache/mod.rs +++ b/services/api/src/cache/mod.rs @@ -7,6 +7,9 @@ use std::{ time::{Duration, Instant}, }; +use redis::redis_module::RedisResult; + + use anyhow::Context; use deadpool_redis::{Config as PoolConfig, Pool, Runtime}; use redis::AsyncCommands; @@ -96,6 +99,7 @@ impl CircuitBreaker { pub struct RedisCacheConfig { pub pool_min_idle: usize, pub pool_max_size: usize, + /// Timeout for acquiring a connection from the pool. pub acquire_timeout: Duration, /// Retry attempts on transient errors (0 = no retry). @@ -164,14 +168,167 @@ pub struct RedisCache { pool: Pool, cb: Arc, cfg: RedisCacheConfig, + tag_cfg: TagStoreConfig, +} + + +// ── Tag-store config + implementation ──────────────────────────────────── + +/// Settings for Redis-backed tag metadata to prevent unbounded growth. +/// +/// The tag metadata is used to cap how many unique cache keys are tracked +/// per tag and to apply TTL so that rarely used tags don't accumulate +/// forever. +#[derive(Clone, Debug)] +pub struct TagStoreConfig { + /// Key set TTL for tag metadata. Must be >= the longest-lived cached + /// value TTL + any grace period. + pub tag_ttl: Duration, + + /// Maximum number of tracked keys per invalidation tag. + pub keys_per_tag_cap: usize, + + /// Redis key prefix for tag metadata. + pub prefix: String, +} + +impl TagStoreConfig { + pub fn from_env() -> Self { + // Longest-lived cache entries in this codebase appear to be: + // - statistics: 5 * 60 seconds (300s) + // - featured_markets: 2 * 60 seconds (120s) + // - content: 60 * 60 seconds (3600s) + // Tag TTL must match the longest-lived cached entry. + let tag_ttl_secs = std::env::var("REDIS_CACHE_TAG_TTL_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(3600); + + let keys_per_tag_cap = std::env::var("REDIS_CACHE_TAG_KEYS_PER_TAG_CAP") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(256); + + let prefix = std::env::var("REDIS_CACHE_TAG_PREFIX") + .ok() + .unwrap_or_else(|| "cache_tags:v1".to_string()); + + Self { + tag_ttl: Duration::from_secs(tag_ttl_secs), + keys_per_tag_cap, + prefix, + } + } + + fn tag_key(&self, tag_hash: &str) -> String { + format!("{}:tag:{}", self.prefix, tag_hash) + } + + fn counter_key(&self, tag_hash: &str) -> String { + format!("{}:tag:{}:seq", self.prefix, tag_hash) + } } impl RedisCache { + + async fn tag_store_invalidate(&self, tag: &InvalidationTag) -> anyhow::Result<()> { + // Store/cap tag->keys metadata with TTL. + // We use an ordered-set (ZSET) where score is an ever-increasing + // sequence number so we can evict oldest items when cap is hit. + // + // Redis keys: + // - :tag: (ZSET of tracked cache keys) + // - :tag::seq (string counter for insertion order) + + let tag_keys = tag.cache_keys(); + if tag_keys.is_empty() { + return Ok(()); + } + + // Deterministically hash the tag so the metadata key is stable. + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + std::hash::Hash::hash(&tag, &mut hasher); + let tag_hash = format!("{:x}", hasher.finish()); + + let zset_key = self.tag_cfg.tag_key(&tag_hash); + let seq_key = self.tag_cfg.counter_key(&tag_hash); + + // Longest-lived cached entry TTL. + // We configured tag_ttl to match that (see TagStoreConfig::from_env). + let tag_ttl_secs = self.tag_cfg.tag_ttl.as_secs(); + let cap = self.tag_cfg.keys_per_tag_cap as i64; + + // Lua keeps this operation atomic. + let script = redis::Script::new( + r#" + local zset_key = KEYS[1] + local seq_key = KEYS[2] + local ttl = tonumber(ARGV[1]) + local cap = tonumber(ARGV[2]) + + -- ARGV layout: [ttl, cap, key1, key2, ...] + -- Use ZADD with monotonic seq scores; update seq counter once per run. + -- We increment seq for each key so we can evict by insertion order. + local start_seq = redis.call('INCR', seq_key) + local n = 0 + for i = 3, #ARGV do + n = n + 1 + local key = ARGV[i] + redis.call('ZADD', zset_key, start_seq + (n-1), key) + end + + -- Apply/refresh TTL for tag metadata. + if ttl and ttl > 0 then + redis.call('EXPIRE', zset_key, ttl) + end + + -- Cap size: evict oldest (lowest scores) beyond cap. + local current = redis.call('ZCARD', zset_key) + if current and cap and current > cap then + local over = current - cap + -- Remove lowest-scored 'over' members. + redis.call('ZREMRANGEBYRANK', zset_key, 0, over-1) + return over + end + return 0 + "#, + ); + + let mut over_evicted: i64 = 0; + self.exec(|mut conn| { + let zset_key = zset_key.clone(); + let seq_key = seq_key.clone(); + let keys = tag_keys.clone(); + async move { + let mut argv: Vec = Vec::with_capacity(2 + keys.len()); + argv.push(tag_ttl_secs.to_string()); + argv.push(cap.to_string()); + argv.extend(keys); + + over_evicted = script + .key(&zset_key) + .key(&seq_key) + .arg(tag_ttl_secs) + .arg(cap) + .invoke_async(&mut conn) + .await?; + Ok(()) + } + }) + .await?; + + // Note: we don't need the evicted count for correctness. + Ok(()) + } + + pub async fn new(redis_url: &str) -> anyhow::Result { - Self::new_with_config(redis_url, RedisCacheConfig::from_env()).await + let cfg = RedisCacheConfig::from_env(); + Self::new_with_config(redis_url, cfg).await } pub async fn new_with_config(redis_url: &str, cfg: RedisCacheConfig) -> anyhow::Result { + let pool_cfg = PoolConfig::from_url(redis_url); let pool = pool_cfg .builder() @@ -182,9 +339,11 @@ impl RedisCache { .context("failed to build Redis pool")?; let cb = Arc::new(CircuitBreaker::new(cfg.cb_threshold, cfg.cb_reset_timeout)); - Ok(Self { pool, cb, cfg }) + let tag_cfg = TagStoreConfig::from_env(); + Ok(Self { pool, cb, cfg, tag_cfg }) } + /// Returns the current circuit breaker state — useful for health checks and metrics. pub fn circuit_state(&self) -> CircuitState { self.cb.state() @@ -419,6 +578,13 @@ impl RedisCache { /// as 0 from Redis DEL but are still included in the returned count for /// observability purposes). pub async fn invalidate_tag(&self, tag: &InvalidationTag) -> anyhow::Result { + // Keep tag-sets bounded + TTL'd so Redis memory usage can't grow + // unboundedly from high-cardinality tag usage. + // + // We still eagerly delete the concrete cache keys for correctness, + // but tag metadata is now stored in Redis with TTL + cap. + let _ = self.tag_store_invalidate(tag).await?; + let tag_keys = tag.cache_keys(); let mut deleted = 0usize; for key in &tag_keys { @@ -428,6 +594,7 @@ impl RedisCache { Ok(deleted) } + /// Atomically increment `key` and set its TTL on first increment. /// Returns the new counter value. Used for Redis-backed rate limiting. pub async fn incr_with_ttl(&self, key: &str, ttl: Duration) -> anyhow::Result { @@ -793,6 +960,7 @@ mod tests { /// Use [`RedisCache::invalidate_tag`] to apply a tag. #[derive(Debug, Clone)] pub enum InvalidationTag { + /// A market was resolved. /// /// Invalidates the per-market chain entry, the oracle result, and the diff --git a/services/api/src/metrics.rs b/services/api/src/metrics.rs index 8ed873b8..59a37fda 100644 --- a/services/api/src/metrics.rs +++ b/services/api/src/metrics.rs @@ -156,6 +156,7 @@ impl Metrics { } } + pub fn observe_request(&self, route: &str, status_code: &str, duration: Duration) { self.request_latency .with_label_values(&[route, status_code]) @@ -186,6 +187,7 @@ impl Metrics { } } + /// Update connection pool utilisation gauges. /// Call this on each pool event (connection acquired, released, opened, closed). pub fn observe_pool_connections(&self, pool: &str, active: i64, idle: i64) {