Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions native/src/delta_reader/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,18 @@ pub fn read_checkpoint_part_arrow_ffi(
}
}

// 5. Build FFI structs in a temporary vec first, then write all at once.
// 5. Memory pool reservation for Arrow FFI data
let estimated_size: usize = arrays.iter().map(|(a, _)| a.get_buffer_memory_size()).sum();
let _reservation = crate::memory_pool::MemoryReservation::try_new(
&crate::memory_pool::global_pool(),
estimated_size,
"arrow_ffi",
)
.unwrap_or_else(|_| {
crate::memory_pool::MemoryReservation::empty(&crate::memory_pool::global_pool(), "arrow_ffi")
});

// 6. Build FFI structs in a temporary vec first, then write all at once.
// If any schema conversion fails, nothing is written.
let mut ffi_pairs: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = Vec::with_capacity(NUM_COLS);
for (i, (array, field)) in arrays.iter().enumerate() {
Expand All @@ -1026,7 +1037,7 @@ pub fn read_checkpoint_part_arrow_ffi(
ffi_pairs.push((ffi_array, ffi_schema));
}

// 6. All FFI structs built successfully — write them all out.
// 7. All FFI structs built successfully — write them all out.
for (i, (ffi_array, ffi_schema)) in ffi_pairs.into_iter().enumerate() {
let array_ptr = array_addrs[i] as *mut FFI_ArrowArray;
let schema_ptr = schema_addrs[i] as *mut FFI_ArrowSchema;
Expand Down
9 changes: 8 additions & 1 deletion native/src/disk_cache/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,16 @@ impl L2DiskCache {

// For size-based mode: drain bytes and notify waiting senders
if let Some((queued_bytes, backpressure)) = sb_state {
queued_bytes.fetch_sub(data_len, Ordering::Release);
let remaining = queued_bytes.fetch_sub(data_len, Ordering::Release) - data_len;
let (_lock, cvar) = &*backpressure;
cvar.notify_all();

// When queue fully drains, release overflow memory back to pool
if remaining == 0 {
if let Some(budget) = &cache.memory_budget {
budget.on_queue_drained();
}
}
}

drop(permit); // Release permit when write completes
Expand Down
80 changes: 77 additions & 3 deletions native/src/disk_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::Duration;

use crate::debug_println;
use crate::memory_pool::{self, DiskCacheMemoryBudget};
use tantivy::directory::OwnedBytes;

use lru::SplitLruTable;
Expand Down Expand Up @@ -183,6 +184,8 @@ pub struct L2DiskCache {
thread_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,
/// Dirty flag - set when manifest has uncommitted changes
manifest_dirty: Arc<std::sync::atomic::AtomicBool>,
/// Memory budget for write queue (staircase-up/cliff-down pattern)
memory_budget: Option<DiskCacheMemoryBudget>,
}

#[allow(dead_code)]
Expand Down Expand Up @@ -240,6 +243,39 @@ impl L2DiskCache {

let manifest_dirty = Arc::new(std::sync::atomic::AtomicBool::new(false));

// Create memory budget for the write queue — only when a JVM pool is
// explicitly configured. With UnlimitedMemoryPool (default), the write
// queue operates with its static max size and no expand/contract behavior.
let memory_budget = if memory_pool::is_pool_configured() {
let max_budget = config.max_write_queue_budget as usize; // 0 = default (8x)
match &config.write_queue_mode {
WriteQueueMode::SizeBased { max_bytes } => {
Some(DiskCacheMemoryBudget::with_config(
&memory_pool::global_pool(),
*max_bytes as usize,
500 * 1024 * 1024, // 500MB grow increment
max_budget,
))
}
WriteQueueMode::Fragment { capacity } => {
// Estimate: each fragment slot can hold ~1MB of data
let estimated_bytes = (*capacity as usize) * 1024 * 1024;
if estimated_bytes > 0 {
Some(DiskCacheMemoryBudget::with_config(
&memory_pool::global_pool(),
estimated_bytes,
500 * 1024 * 1024,
max_budget,
))
} else {
None
}
}
}
} else {
None // UnlimitedMemoryPool — no budget tracking, static queue size
};

let cache = Arc::new(Self {
config: config.clone(),
manifest: RwLock::new(manifest),
Expand All @@ -252,6 +288,7 @@ impl L2DiskCache {
shutdown_flag: Arc::clone(&shutdown_flag),
thread_handles: Mutex::new(Vec::new()),
manifest_dirty: Arc::clone(&manifest_dirty),
memory_budget,
});

// Start background writer (uses Weak reference - doesn't prevent Drop)
Expand Down Expand Up @@ -411,9 +448,22 @@ impl L2DiskCache {
)
}

/// Try to expand the memory budget to accommodate new data.
/// Returns true if the budget has room (or was successfully expanded), false if denied.
fn try_expand_budget(&self, needed: usize) -> bool {
match &self.memory_budget {
Some(budget) => budget.ensure_capacity(needed),
None => true, // No budget configured — always allow
}
}

/// Cache data (async write via background thread).
/// Blocks if the write queue is full (backpressure).
/// Use this for prewarm operations where data must be written.
///
/// Tries to expand the memory budget first. If the pool denies expansion,
/// proceeds with a blocking write anyway — the background writer will drain
/// the queue naturally, and the user explicitly requested this data be cached.
pub fn put(
&self,
storage_loc: &str,
Expand All @@ -431,7 +481,16 @@ impl L2DiskCache {
self.trigger_eviction((self.max_bytes * 90) / 100);
}

// Send to background writer with backpressure.
// Try to expand budget. If denied, proceed anyway — prewarm blocks on
// the channel send until the background writer drains and frees queue space.
if !self.try_expand_budget(data.len()) {
debug_println!(
"⚠️ L2DiskCache::put (prewarm): Memory budget denied expansion, \
will block on queue backpressure until background writer drains"
);
}

// Send to background writer with backpressure (blocks if queue is full).
let _ = self.write_tx.send(WriteRequest::Put {
storage_loc: storage_loc.to_string(),
split_id: split_id.to_string(),
Expand All @@ -444,6 +503,9 @@ impl L2DiskCache {
/// Cache data if the write queue has capacity, otherwise drop silently.
/// Returns `true` if the write was enqueued, `false` if dropped.
/// Use this for query-path opportunistic caching where dropping is acceptable.
///
/// Tries to expand the memory budget first. If the pool denies expansion,
/// drops the entry — a cache miss is preferable to over-allocating.
pub fn put_if_ready(
&self,
storage_loc: &str,
Expand All @@ -459,6 +521,12 @@ impl L2DiskCache {
self.trigger_eviction((self.max_bytes * 90) / 100);
}

// Try to expand budget. If denied, drop — query path should not over-allocate.
if !self.try_expand_budget(data.len()) {
debug_println!("⚠️ L2DiskCache::put_if_ready: Memory budget denied expansion, dropping cache entry");
return false;
}

self.write_tx.send_or_drop(WriteRequest::Put {
storage_loc: storage_loc.to_string(),
split_id: split_id.to_string(),
Expand All @@ -474,8 +542,8 @@ impl L2DiskCache {
}

/// Cache data for the query path — blocks or drops depending on config.
/// When `drop_writes_when_full` is enabled, silently drops writes if the queue is full.
/// When disabled, behaves identically to `put()` (blocks until enqueued).
/// Tries to expand the memory budget first. If the pool denies expansion,
/// drops the entry — a cache miss is preferable to over-allocating.
pub fn put_query_path(
&self,
storage_loc: &str,
Expand All @@ -484,6 +552,12 @@ impl L2DiskCache {
byte_range: Option<Range<u64>>,
data: &[u8],
) {
// Try to expand budget. If denied, drop — query path should not over-allocate.
if !self.try_expand_budget(data.len()) {
debug_println!("⚠️ L2DiskCache::put_query_path: Memory budget denied expansion, dropping cache entry");
return;
}

if self.config.drop_writes_when_full {
self.put_if_ready(storage_loc, split_id, component, byte_range, data);
} else {
Expand Down
5 changes: 5 additions & 0 deletions native/src/disk_cache/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ pub struct DiskCacheConfig {
/// When true, non-prewarm (query-path) writes are dropped if the write queue is full
/// instead of blocking. Prewarm writes always block. Default: false (all writes block).
pub drop_writes_when_full: bool,
/// Maximum memory budget for the write queue (bytes). Controls the hard cap for
/// staircase-up growth. 0 = default (8x the initial write queue size).
/// Only effective when a JVM memory pool is configured.
pub max_write_queue_budget: u64,
}

impl Default for DiskCacheConfig {
Expand All @@ -86,6 +90,7 @@ impl Default for DiskCacheConfig {
mmap_cache_size: DEFAULT_MMAP_CACHE_SIZE,
write_queue_mode: WriteQueueMode::default(),
drop_writes_when_full: false,
max_write_queue_budget: 0, // Default: 8x initial
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions native/src/global_cache/l1_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::OnceLock;
use quickwit_storage::ByteRangeCache;

use crate::debug_println;
use crate::memory_pool::{self, MemoryReservation};

/// Global L1 ByteRangeCache shared across all SplitSearcher instances
/// This provides memory-efficient caching - one bounded cache instead of per-split caches
Expand All @@ -19,6 +20,13 @@ static CONFIGURED_L1_CACHE_CAPACITY: OnceLock<std::sync::RwLock<Option<u64>>> =
/// When true, all storage requests bypass L1 memory cache and go to L2 disk cache / L3 storage
static DISABLE_L1_CACHE: OnceLock<std::sync::RwLock<bool>> = OnceLock::new();

/// Memory reservation for the L1 cache capacity. Released when cache is reset or process exits.
static L1_CACHE_RESERVATION: OnceLock<std::sync::Mutex<Option<MemoryReservation>>> = OnceLock::new();

fn get_l1_reservation_holder() -> &'static std::sync::Mutex<Option<MemoryReservation>> {
L1_CACHE_RESERVATION.get_or_init(|| std::sync::Mutex::new(None))
}

fn get_l1_cache_holder() -> &'static std::sync::RwLock<Option<ByteRangeCache>> {
GLOBAL_L1_CACHE.get_or_init(|| std::sync::RwLock::new(None))
}
Expand Down Expand Up @@ -53,6 +61,11 @@ pub fn reset_global_l1_cache() {
*guard = None;
}

// Release the memory reservation
{
*get_l1_reservation_holder().lock().unwrap() = None;
}

// Then clear the cache itself (will be recreated on next access)
{
let holder = get_l1_cache_holder();
Expand Down Expand Up @@ -93,6 +106,24 @@ pub fn get_or_create_global_l1_cache() -> Option<ByteRangeCache> {

// Create bounded L1 cache with configurable capacity
let capacity = get_l1_cache_capacity_bytes();

// Reserve memory from the global pool for L1 cache — fail-fast if denied
let reservation = match MemoryReservation::try_new(
&memory_pool::global_pool(),
capacity as usize,
"l1_cache",
) {
Ok(r) => r,
Err(e) => {
debug_println!(
"❌ GLOBAL_L1_CACHE: Memory pool denied L1 reservation of {} MB: {}. L1 cache will not be created.",
capacity / 1024 / 1024, e
);
return None;
}
};
*get_l1_reservation_holder().lock().unwrap() = Some(reservation);

let cache = ByteRangeCache::with_capacity(
capacity,
&quickwit_storage::STORAGE_METRICS.shortlived_cache,
Expand Down
15 changes: 13 additions & 2 deletions native/src/iceberg_reader/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,18 @@ pub fn read_iceberg_manifest_arrow_ffi(
}
}

// 5. Build FFI structs in a temporary vec first, then write all at once.
// 5. Memory pool reservation for Arrow FFI data
let estimated_size: usize = arrays.iter().map(|(a, _)| a.get_buffer_memory_size()).sum();
let _reservation = crate::memory_pool::MemoryReservation::try_new(
&crate::memory_pool::global_pool(),
estimated_size,
"arrow_ffi",
)
.unwrap_or_else(|_| {
crate::memory_pool::MemoryReservation::empty(&crate::memory_pool::global_pool(), "arrow_ffi")
});

// 6. Build FFI structs in a temporary vec first, then write all at once.
// If any schema conversion fails, nothing is written.
let mut ffi_pairs: Vec<(FFI_ArrowArray, FFI_ArrowSchema)> = Vec::with_capacity(NUM_COLS);
for (i, (array, field)) in arrays.iter().enumerate() {
Expand All @@ -371,7 +382,7 @@ pub fn read_iceberg_manifest_arrow_ffi(
ffi_pairs.push((ffi_array, ffi_schema));
}

// 6. All FFI structs built successfully — write them all out.
// 7. All FFI structs built successfully — write them all out.
for (i, (ffi_array, ffi_schema)) in ffi_pairs.into_iter().enumerate() {
let array_ptr = array_addrs[i] as *mut FFI_ArrowArray;
let schema_ptr = schema_addrs[i] as *mut FFI_ArrowSchema;
Expand Down
Loading
Loading