Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 170 additions & 2 deletions services/api/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::{
time::{Duration, Instant},
};

use redis::redis_module::RedisResult;


use anyhow::Context;
use deadpool_redis::{Config as PoolConfig, Pool, Runtime};
use redis::AsyncCommands;
Expand Down Expand Up @@ -96,6 +99,7 @@ impl CircuitBreaker {
pub struct RedisCacheConfig {
pub pool_min_idle: usize,
pub pool_max_size: usize,

/// Timeout for acquiring a connection from the pool.
pub acquire_timeout: Duration,
/// Retry attempts on transient errors (0 = no retry).
Expand Down Expand Up @@ -164,14 +168,167 @@ pub struct RedisCache {
pool: Pool,
cb: Arc<CircuitBreaker>,
cfg: RedisCacheConfig,
tag_cfg: TagStoreConfig,
}


// ── Tag-store config + implementation ────────────────────────────────────

/// Settings for Redis-backed tag metadata to prevent unbounded growth.
///
/// The tag metadata is used to cap how many unique cache keys are tracked
/// per tag and to apply TTL so that rarely used tags don't accumulate
/// forever.
#[derive(Clone, Debug)]
pub struct TagStoreConfig {
/// Key set TTL for tag metadata. Must be >= the longest-lived cached
/// value TTL + any grace period.
pub tag_ttl: Duration,

/// Maximum number of tracked keys per invalidation tag.
pub keys_per_tag_cap: usize,

/// Redis key prefix for tag metadata.
pub prefix: String,
}

impl TagStoreConfig {
pub fn from_env() -> Self {
// Longest-lived cache entries in this codebase appear to be:
// - statistics: 5 * 60 seconds (300s)
// - featured_markets: 2 * 60 seconds (120s)
// - content: 60 * 60 seconds (3600s)
// Tag TTL must match the longest-lived cached entry.
let tag_ttl_secs = std::env::var("REDIS_CACHE_TAG_TTL_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(3600);

let keys_per_tag_cap = std::env::var("REDIS_CACHE_TAG_KEYS_PER_TAG_CAP")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(256);

let prefix = std::env::var("REDIS_CACHE_TAG_PREFIX")
.ok()
.unwrap_or_else(|| "cache_tags:v1".to_string());

Self {
tag_ttl: Duration::from_secs(tag_ttl_secs),
keys_per_tag_cap,
prefix,
}
}

fn tag_key(&self, tag_hash: &str) -> String {
format!("{}:tag:{}", self.prefix, tag_hash)
}

fn counter_key(&self, tag_hash: &str) -> String {
format!("{}:tag:{}:seq", self.prefix, tag_hash)
}
}

impl RedisCache {

async fn tag_store_invalidate(&self, tag: &InvalidationTag) -> anyhow::Result<()> {
// Store/cap tag->keys metadata with TTL.
// We use an ordered-set (ZSET) where score is an ever-increasing
// sequence number so we can evict oldest items when cap is hit.
//
// Redis keys:
// - <prefix>:tag:<hash> (ZSET of tracked cache keys)
// - <prefix>:tag:<hash>:seq (string counter for insertion order)

let tag_keys = tag.cache_keys();
if tag_keys.is_empty() {
return Ok(());
}

// Deterministically hash the tag so the metadata key is stable.
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::hash::Hash::hash(&tag, &mut hasher);
let tag_hash = format!("{:x}", hasher.finish());

let zset_key = self.tag_cfg.tag_key(&tag_hash);
let seq_key = self.tag_cfg.counter_key(&tag_hash);

// Longest-lived cached entry TTL.
// We configured tag_ttl to match that (see TagStoreConfig::from_env).
let tag_ttl_secs = self.tag_cfg.tag_ttl.as_secs();
let cap = self.tag_cfg.keys_per_tag_cap as i64;

// Lua keeps this operation atomic.
let script = redis::Script::new(
r#"
local zset_key = KEYS[1]
local seq_key = KEYS[2]
local ttl = tonumber(ARGV[1])
local cap = tonumber(ARGV[2])

-- ARGV layout: [ttl, cap, key1, key2, ...]
-- Use ZADD with monotonic seq scores; update seq counter once per run.
-- We increment seq for each key so we can evict by insertion order.
local start_seq = redis.call('INCR', seq_key)
local n = 0
for i = 3, #ARGV do
n = n + 1
local key = ARGV[i]
redis.call('ZADD', zset_key, start_seq + (n-1), key)
end

-- Apply/refresh TTL for tag metadata.
if ttl and ttl > 0 then
redis.call('EXPIRE', zset_key, ttl)
end

-- Cap size: evict oldest (lowest scores) beyond cap.
local current = redis.call('ZCARD', zset_key)
if current and cap and current > cap then
local over = current - cap
-- Remove lowest-scored 'over' members.
redis.call('ZREMRANGEBYRANK', zset_key, 0, over-1)
return over
end
return 0
"#,
);

let mut over_evicted: i64 = 0;
self.exec(|mut conn| {
let zset_key = zset_key.clone();
let seq_key = seq_key.clone();
let keys = tag_keys.clone();
async move {
let mut argv: Vec<String> = Vec::with_capacity(2 + keys.len());
argv.push(tag_ttl_secs.to_string());
argv.push(cap.to_string());
argv.extend(keys);

over_evicted = script
.key(&zset_key)
.key(&seq_key)
.arg(tag_ttl_secs)
.arg(cap)
.invoke_async(&mut conn)
.await?;
Ok(())
}
})
.await?;

// Note: we don't need the evicted count for correctness.
Ok(())
}


pub async fn new(redis_url: &str) -> anyhow::Result<Self> {
Self::new_with_config(redis_url, RedisCacheConfig::from_env()).await
let cfg = RedisCacheConfig::from_env();
Self::new_with_config(redis_url, cfg).await
}

pub async fn new_with_config(redis_url: &str, cfg: RedisCacheConfig) -> anyhow::Result<Self> {

let pool_cfg = PoolConfig::from_url(redis_url);
let pool = pool_cfg
.builder()
Expand All @@ -182,9 +339,11 @@ impl RedisCache {
.context("failed to build Redis pool")?;

let cb = Arc::new(CircuitBreaker::new(cfg.cb_threshold, cfg.cb_reset_timeout));
Ok(Self { pool, cb, cfg })
let tag_cfg = TagStoreConfig::from_env();
Ok(Self { pool, cb, cfg, tag_cfg })
}


/// Returns the current circuit breaker state — useful for health checks and metrics.
pub fn circuit_state(&self) -> CircuitState {
self.cb.state()
Expand Down Expand Up @@ -419,6 +578,13 @@ impl RedisCache {
/// as 0 from Redis DEL but are still included in the returned count for
/// observability purposes).
pub async fn invalidate_tag(&self, tag: &InvalidationTag) -> anyhow::Result<usize> {
// Keep tag-sets bounded + TTL'd so Redis memory usage can't grow
// unboundedly from high-cardinality tag usage.
//
// We still eagerly delete the concrete cache keys for correctness,
// but tag metadata is now stored in Redis with TTL + cap.
let _ = self.tag_store_invalidate(tag).await?;

let tag_keys = tag.cache_keys();
let mut deleted = 0usize;
for key in &tag_keys {
Expand All @@ -428,6 +594,7 @@ impl RedisCache {
Ok(deleted)
}


/// Atomically increment `key` and set its TTL on first increment.
/// Returns the new counter value. Used for Redis-backed rate limiting.
pub async fn incr_with_ttl(&self, key: &str, ttl: Duration) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -793,6 +960,7 @@ mod tests {
/// Use [`RedisCache::invalidate_tag`] to apply a tag.
#[derive(Debug, Clone)]
pub enum InvalidationTag {

/// A market was resolved.
///
/// Invalidates the per-market chain entry, the oracle result, and the
Expand Down
2 changes: 2 additions & 0 deletions services/api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl Metrics {
}
}


pub fn observe_request(&self, route: &str, status_code: &str, duration: Duration) {
self.request_latency
.with_label_values(&[route, status_code])
Expand Down Expand Up @@ -186,6 +187,7 @@ impl Metrics {
}
}


/// Update connection pool utilisation gauges.
/// Call this on each pool event (connection acquired, released, opened, closed).
pub fn observe_pool_connections(&self, pool: &str, active: i64, idle: i64) {
Expand Down