-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Summary
Implement a memory pool in tantivy4java that coordinates native Rust memory allocations with the JVM, following the pattern established by DataFusion Comet's CometUnifiedMemoryPool. This allows Spark's TaskMemoryManager to have visibility into and control over native memory usage, preventing OOM from untracked off-heap allocations.
Priority: P0 (prerequisite for safe production use of P0/P1 FFI features)
Background: How Comet Does It
DataFusion Comet implements a layered memory management system:
JVM Side (CometTaskMemoryManager.java)
public class CometTaskMemoryManager {
private final TaskMemoryManager internal; // Spark's unified memory manager
private final NativeMemoryConsumer consumer; // MemoryConsumer for Spark accounting
private final AtomicLong used = new AtomicLong();
// Called by native code via JNI
public long acquireMemory(long size) {
long acquired = internal.acquireExecutionMemory(size, consumer);
used.addAndGet(acquired);
return acquired; // May return less than requested
}
public void releaseMemory(long size) {
used.addAndGet(-size);
internal.releaseExecutionMemory(size, consumer);
}
}Rust Side (CometUnifiedMemoryPool)
impl MemoryPool for CometUnifiedMemoryPool {
fn try_grow(&self, additional: usize) -> Result<()> {
let acquired = self.acquire_from_spark(additional)?; // JNI callback
if acquired < additional as i64 {
self.release_to_spark(acquired as usize)?;
return Err(resources_datafusion_err!("insufficient memory"));
}
self.used.fetch_add(acquired as usize, Relaxed);
Ok(())
}
fn shrink(&self, size: usize) {
self.release_to_spark(size);
self.used.fetch_sub(size, Relaxed);
}
}Key Design Decisions in Comet
- Off-heap mode only — all native allocations tracked through Spark's off-heap pool
- Synchronous JNI callbacks — Rust calls
acquireMemory/releaseMemoryon every allocation event - Error-driven backpressure — when Spark can't grant memory, Rust receives an error and callers must handle it (spill, fail, etc.)
- AtomicUsize tracking — lock-free local counters for fast accounting
- NativeMemoryConsumer.spill() returns 0 — Spark can't directly trigger native spills; the native side handles its own spilling when
try_growfails - jemalloc as global allocator — better performance and optional stats integration
- Task-shared pools — multiple native plans in one Spark task share a single memory pool via refcounting
Proposed Implementation for tantivy4java
Phase 1: Memory Accounting Interface
Add a Java-side MemoryAccountant interface that tantivy4java native code calls via JNI:
public interface NativeMemoryAccountant {
/**
* Request memory from the JVM memory manager.
* @param bytes requested allocation size
* @return bytes actually granted (may be less than requested)
*/
long acquireMemory(long bytes);
/**
* Release previously acquired memory back to the JVM memory manager.
* @param bytes amount to release
*/
void releaseMemory(long bytes);
}Phase 2: Rust-Side Memory Pool
Implement a JvmCoordinatedMemoryPool in Rust that:
- Stores a JNI
GlobalRefto the JavaNativeMemoryAccountantinstance - Caches JNI method IDs for
acquireMemoryandreleaseMemory(like Comet'scomet_task_memory_manager.rs) - Tracks local usage with
AtomicUsize - Exposes
acquire(bytes) -> Result<(), MemoryError>andrelease(bytes)methods - Is passed to tantivy operations that perform significant allocations (search result buffering, merge operations, Arrow batch construction)
pub struct JvmCoordinatedMemoryPool {
accountant: Arc<GlobalRef>, // Java NativeMemoryAccountant
acquire_method: JMethodID,
release_method: JMethodID,
used: AtomicUsize,
}
impl JvmCoordinatedMemoryPool {
pub fn acquire(&self, bytes: usize) -> Result<(), MemoryError> {
let env = get_jni_env();
let acquired = env.call_method(
self.accountant.as_obj(),
self.acquire_method,
ReturnType::Primitive(Primitive::Long),
&[JValue::Long(bytes as i64)]
)?;
if acquired < bytes as i64 {
self.release_to_jvm(acquired as usize);
return Err(MemoryError::InsufficientMemory { requested: bytes, available: acquired as usize });
}
self.used.fetch_add(bytes, Relaxed);
Ok(())
}
pub fn release(&self, bytes: usize) {
let env = get_jni_env();
env.call_method(
self.accountant.as_obj(),
self.release_method,
ReturnType::Primitive(Primitive::Void),
&[JValue::Long(bytes as i64)]
);
self.used.fetch_sub(bytes, Relaxed);
}
}Phase 3: Integration Points
Thread the memory pool into operations that perform significant allocations:
- Search result buffering —
docBatchArrowFfiallocates Arrow buffers - Transaction log reading (issue Native transaction log state reader with Arrow FFI export #100) — manifest parsing allocates state
- Merge operations — split merging requires significant memory
- Index building — document ingestion buffers
Phase 4: Optional Enhancements
- jemalloc integration — use jemalloc as global allocator with stats reporting
- Memory pool modes — greedy vs fair allocation (like Comet's pool types)
- Logging wrapper — debug-mode memory logging (like Comet's
LoggingMemoryPool)
API Design Considerations
- Backward compatibility: The memory pool should be optional. Existing API (
SplitSearcher, etc.) should continue working without a memory accountant (defaulting to unlimited/untracked). - Granularity: Don't track every small allocation — track significant buffers (Arrow batches, merge buffers, search result collections). Use a threshold (e.g., >1KB).
- Thread safety: The pool must be
Send + Syncsince tantivy operations may run on multiple threads. - JNI overhead: Consider batching small allocations and reporting periodically rather than per-allocation JNI calls. Comet does per-allocation calls but their allocations are large (DataFusion operator buffers).
References
- DataFusion Comet source:
CometTaskMemoryManager.java,unified_pool.rs,fair_pool.rs - Spark
TaskMemoryManagerAPI:acquireExecutionMemory(size, consumer),releaseExecutionMemory(size, consumer) - indextables/indextables_spark integration issue (to be linked)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request