Skip to content

Unified memory management: JVM-coordinated memory pool for native allocations #105

@schenksj

Description

@schenksj

Summary

Implement a memory pool in tantivy4java that coordinates native Rust memory allocations with the JVM, following the pattern established by DataFusion Comet's CometUnifiedMemoryPool. This allows Spark's TaskMemoryManager to have visibility into and control over native memory usage, preventing OOM from untracked off-heap allocations.

Priority: P0 (prerequisite for safe production use of P0/P1 FFI features)

Background: How Comet Does It

DataFusion Comet implements a layered memory management system:

JVM Side (CometTaskMemoryManager.java)

public class CometTaskMemoryManager {
    private final TaskMemoryManager internal;        // Spark's unified memory manager
    private final NativeMemoryConsumer consumer;     // MemoryConsumer for Spark accounting
    private final AtomicLong used = new AtomicLong();

    // Called by native code via JNI
    public long acquireMemory(long size) {
        long acquired = internal.acquireExecutionMemory(size, consumer);
        used.addAndGet(acquired);
        return acquired;  // May return less than requested
    }

    public void releaseMemory(long size) {
        used.addAndGet(-size);
        internal.releaseExecutionMemory(size, consumer);
    }
}

Rust Side (CometUnifiedMemoryPool)

impl MemoryPool for CometUnifiedMemoryPool {
    fn try_grow(&self, additional: usize) -> Result<()> {
        let acquired = self.acquire_from_spark(additional)?;  // JNI callback
        if acquired < additional as i64 {
            self.release_to_spark(acquired as usize)?;
            return Err(resources_datafusion_err!("insufficient memory"));
        }
        self.used.fetch_add(acquired as usize, Relaxed);
        Ok(())
    }

    fn shrink(&self, size: usize) {
        self.release_to_spark(size);
        self.used.fetch_sub(size, Relaxed);
    }
}

Key Design Decisions in Comet

  1. Off-heap mode only — all native allocations tracked through Spark's off-heap pool
  2. Synchronous JNI callbacks — Rust calls acquireMemory/releaseMemory on every allocation event
  3. Error-driven backpressure — when Spark can't grant memory, Rust receives an error and callers must handle it (spill, fail, etc.)
  4. AtomicUsize tracking — lock-free local counters for fast accounting
  5. NativeMemoryConsumer.spill() returns 0 — Spark can't directly trigger native spills; the native side handles its own spilling when try_grow fails
  6. jemalloc as global allocator — better performance and optional stats integration
  7. Task-shared pools — multiple native plans in one Spark task share a single memory pool via refcounting

Proposed Implementation for tantivy4java

Phase 1: Memory Accounting Interface

Add a Java-side MemoryAccountant interface that tantivy4java native code calls via JNI:

public interface NativeMemoryAccountant {
    /**
     * Request memory from the JVM memory manager.
     * @param bytes requested allocation size
     * @return bytes actually granted (may be less than requested)
     */
    long acquireMemory(long bytes);

    /**
     * Release previously acquired memory back to the JVM memory manager.
     * @param bytes amount to release
     */
    void releaseMemory(long bytes);
}

Phase 2: Rust-Side Memory Pool

Implement a JvmCoordinatedMemoryPool in Rust that:

  1. Stores a JNI GlobalRef to the Java NativeMemoryAccountant instance
  2. Caches JNI method IDs for acquireMemory and releaseMemory (like Comet's comet_task_memory_manager.rs)
  3. Tracks local usage with AtomicUsize
  4. Exposes acquire(bytes) -> Result<(), MemoryError> and release(bytes) methods
  5. Is passed to tantivy operations that perform significant allocations (search result buffering, merge operations, Arrow batch construction)
pub struct JvmCoordinatedMemoryPool {
    accountant: Arc<GlobalRef>,           // Java NativeMemoryAccountant
    acquire_method: JMethodID,
    release_method: JMethodID,
    used: AtomicUsize,
}

impl JvmCoordinatedMemoryPool {
    pub fn acquire(&self, bytes: usize) -> Result<(), MemoryError> {
        let env = get_jni_env();
        let acquired = env.call_method(
            self.accountant.as_obj(),
            self.acquire_method,
            ReturnType::Primitive(Primitive::Long),
            &[JValue::Long(bytes as i64)]
        )?;

        if acquired < bytes as i64 {
            self.release_to_jvm(acquired as usize);
            return Err(MemoryError::InsufficientMemory { requested: bytes, available: acquired as usize });
        }
        self.used.fetch_add(bytes, Relaxed);
        Ok(())
    }

    pub fn release(&self, bytes: usize) {
        let env = get_jni_env();
        env.call_method(
            self.accountant.as_obj(),
            self.release_method,
            ReturnType::Primitive(Primitive::Void),
            &[JValue::Long(bytes as i64)]
        );
        self.used.fetch_sub(bytes, Relaxed);
    }
}

Phase 3: Integration Points

Thread the memory pool into operations that perform significant allocations:

Phase 4: Optional Enhancements

  • jemalloc integration — use jemalloc as global allocator with stats reporting
  • Memory pool modes — greedy vs fair allocation (like Comet's pool types)
  • Logging wrapper — debug-mode memory logging (like Comet's LoggingMemoryPool)

API Design Considerations

  1. Backward compatibility: The memory pool should be optional. Existing API (SplitSearcher, etc.) should continue working without a memory accountant (defaulting to unlimited/untracked).
  2. Granularity: Don't track every small allocation — track significant buffers (Arrow batches, merge buffers, search result collections). Use a threshold (e.g., >1KB).
  3. Thread safety: The pool must be Send + Sync since tantivy operations may run on multiple threads.
  4. JNI overhead: Consider batching small allocations and reporting periodically rather than per-allocation JNI calls. Comet does per-allocation calls but their allocations are large (DataFusion operator buffers).

References

  • DataFusion Comet source: CometTaskMemoryManager.java, unified_pool.rs, fair_pool.rs
  • Spark TaskMemoryManager API: acquireExecutionMemory(size, consumer), releaseExecutionMemory(size, consumer)
  • indextables/indextables_spark integration issue (to be linked)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions