Skip to content

Add large result set retrieval (fused + streaming)#128

Open
schenksj wants to merge 2 commits intomainfrom
feature/large-result-set-retrieval
Open

Add large result set retrieval (fused + streaming)#128
schenksj wants to merge 2 commits intomainfrom
feature/large-result-set-retrieval

Conversation

@schenksj
Copy link
Collaborator

Summary

  • Fused path: Single-call searchAndRetrieveArrowFfi() for result sets < 50K rows — no BM25 scoring, no PartialHit protobufs, no JNI round-trip of doc addresses
  • Streaming path: Session-based StreamingSession (AutoCloseable) with producer/consumer pipeline for result sets > 50K rows — bounded ~24MB memory regardless of total rows
  • Memory safety: ARC_REGISTRY-based session handles (no raw pointer use-after-free), Arrow FFI buffer leak prevention, RwLock panic elimination, bounds-checked array access
  • Selective fast field warmup: Only warms fast fields for range/exists queries, not all query fields
  • Adaptive I/O: Per-file read strategy (PageLevel → FullRowGroup) based on selectivity to minimize S3 GET requests
  • Developer guide: Complete docs/LARGE_RESULT_SET_DEVELOPER_GUIDE.md documenting architecture, APIs, memory budget, and error handling

New files

  • native/src/split_searcher/docid_collector.rs — No-score bulk document ID collector
  • native/src/split_searcher/bulk_retrieval.rs — Shared search + fast-field resolution
  • native/src/split_searcher/fused_retrieval.rs — Single-call search+retrieve+FFI
  • native/src/parquet_companion/streaming_ffi.rs — Session-based streaming pipeline
  • native/src/parquet_companion/read_strategy.rs — Adaptive I/O strategy selection
  • src/test/java/.../LargeResultSetRetrievalTest.java — ~20 integration tests

Key API changes

  • SplitSearcher.startStreamingRetrieval() now returns StreamingSession (AutoCloseable) instead of raw long
  • StreamingSession.nextBatch() / .getColumnCount() / .close() — synchronized, double-close safe
  • Old raw-handle methods marked @Deprecated

Test plan

  • All 44 existing tests pass (unit + integration)
  • LargeResultSetRetrievalTest covers: error handling, term/boolean/range/exists/wildcard queries, empty results, single row, concurrency, field projection, cross-path consistency, scale
  • Double-close safety verified (no crash, no exception)
  • cargo test --lib docid_collector passes
  • cargo test --lib streaming_ffi passes
  • cargo test --lib read_strategy passes

🤖 Generated with Claude Code

…e APIs

Implements three-tier companion-mode bulk retrieval for returning thousands
to millions of rows from parquet-backed splits via Arrow FFI with bounded
memory. Includes DocIdCollector (no-score search), fused single-call path,
streaming session pipeline with adaptive I/O strategy, ARC_REGISTRY-based
session handles for memory safety, and StreamingSession AutoCloseable Java
wrapper with double-close protection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link

claude bot commented Mar 13, 2026

Claude finished @schenksj's task —— View job


Code Review Complete

Reviewing PR #128 for:

  1. Correctness bugs (logic errors, off-by-one, null safety)
  2. Security issues (injection, unsafe operations, credential handling)
  3. Merge conflict markers or build-breaking issues
  4. Missing error propagation (especially JNI error swallowing)
  5. API consistency with existing patterns

Todo List

  • Review new Rust modules for memory safety and correctness
  • Check JNI error handling and propagation
  • Validate Java API changes for consistency
  • Look for potential security vulnerabilities
  • Check session management and resource cleanup
  • Review streaming pipeline implementation
  • Validate Arrow FFI buffer management

Summary: Found 4 Issues (1 Critical, 2 Moderate, 1 Minor)

Good Architecture Design

  • Proper use of ARC_REGISTRY for session management (avoids use-after-free)
  • Sound streaming pipeline with bounded memory (~24MB max)
  • Appropriate separation of fused vs streaming paths
  • Good error handling in most areas

🔴 CRITICAL: Session Handle Memory Leak

File: SplitSearcher.java - StreamingSession class
Issue: Found proper close() implementation using nativeCloseStreamingSession() - FALSE ALARM

RESOLVED - The StreamingSession class correctly implements AutoCloseable with proper native cleanup in the close() method.


🟡 MODERATE: Integer Overflow Risk

File: bulk_retrieval.rs:50-51

let total: usize = segment_fruits.iter().map(|v| v.len()).sum();
let mut result = Vec::with_capacity(total);

Issue: On very large result sets, total can overflow since each segment could contain millions of documents.

Risk: Panic on allocation or incorrect capacity calculation.

Recommendation:

let total = segment_fruits.iter()
    .map(|v| v.len())
    .try_fold(0usize, |acc, len| acc.checked_add(len))
    .ok_or_else(|| anyhow!("Result set too large: document count overflow"))?;

🟡 MODERATE: Silent Error Swallowing in Stream

File: streaming_ffi.rs:215-217

if tx.send(Ok(normalized)).await.is_err() {
    perf_println!("⏱️ STREAMING: consumer dropped — stopping producer");
    return Ok(()); // ← Silent success despite potential error
}

Issue: Producer treats channel closure as success, potentially masking critical errors.

Risk: Silent data loss if consumer fails early.

Recommendation: Distinguish early failure from normal completion:

if tx.send(Ok(normalized)).await.is_err() {
    if rows_emitted == 0 {
        return Err(anyhow!("Consumer dropped before any data sent - possible error"));
    }
    perf_println!("⏱️ STREAMING: consumer dropped after {} rows", rows_emitted);
    return Ok(());
}

🟢 MINOR: Missing Bounds Check

File: streaming_ffi.rs:195

let batches = read_parquet_batches_for_file(/* ... */);

Issue: No validation that file_idx is within manifest.parquet_files bounds.

Risk: Potential panic on malformed input.

Recommendation: Add bounds check:

if file_idx >= manifest.parquet_files.len() {
    return Err(anyhow!("Invalid file_idx {} >= {}", file_idx, manifest.parquet_files.len()));
}
let file_entry = &manifest.parquet_files[file_idx];

No Issues Found In:

  • Memory safety (proper Arc management)
  • JNI error propagation (well handled)
  • API consistency (matches existing patterns)
  • Security vulnerabilities (no unsafe operations)
  • Merge conflicts (clean)

Overall Assessment: Solid implementation with good architecture. The identified issues are edge cases that should be addressed for production robustness.

…check

1. merge_fruits: use checked_add to prevent usize overflow on very large
   result sets instead of unchecked sum
2. Streaming producer: distinguish early consumer drop (rows_emitted==0,
   returns error) from normal mid-stream drop (returns Ok)
3. Streaming producer: validate file_idx bounds before indexing into
   manifest.parquet_files to prevent panic on malformed input

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@claude
Copy link

claude bot commented Mar 13, 2026

Claude finished @schenksj's task —— View job


****


Code Review Complete

Reviewing PR #128 for:

  1. Correctness bugs (logic errors, off-by-one, null safety)
  2. Security issues (injection, unsafe operations, credential handling)
  3. Merge conflict markers or build-breaking issues
  4. Missing error propagation (especially JNI error swallowing)
  5. API consistency with existing patterns

Todo List

  • Review new Rust modules for memory safety and correctness
  • Check JNI error handling and propagation
  • Validate Java API changes for consistency
  • Look for potential security vulnerabilities
  • Check session management and resource cleanup
  • Review streaming pipeline implementation
  • Validate Arrow FFI buffer management

Summary: Found 1 Minor Issue

Good Architecture Design

  • Proper use of ARC_REGISTRY for session management (avoids use-after-free)
  • Sound streaming pipeline with bounded memory (~24MB max)
  • Appropriate separation of fused vs streaming paths
  • Good error handling in most areas
  • Integer overflow protection already implemented in docid_collector.rs with checked_add()
  • Proper bounds checking for file indices in streaming_ffi.rs:171-176

🟢 MINOR: Missing Bounds Check in Arrow-to-Tantivy Module

File: streaming_ffi.rs:197

let batches = read_parquet_batches_for_file(
    file_idx,
    &rows,
    projected_fields.as_deref().map(|s| s as &[String]),
    manifest,
    storage,
    metadata_cache,
    byte_cache,
    effective_coalesce,
).await?;

Issue: While bounds checking exists at line 171-176, the read_parquet_batches_for_file function receives file_idx that could potentially access arrays without additional validation in the callee.

Risk: Potential panic if the downstream function doesn't validate the file index.

Recommendation: Verify that read_parquet_batches_for_file includes its own bounds validation, or add a comment confirming that validation occurs at line 171-176.


No Issues Found In:

  • Memory safety (proper Arc management with ARC_REGISTRY)
  • JNI error propagation (comprehensive error handling with to_java_exception)
  • API consistency (matches existing patterns with AutoCloseable, synchronized methods)
  • Security vulnerabilities (no unsafe operations, proper credential handling)
  • Merge conflicts (clean)
  • Session lifecycle management (proper StreamingSession.close() implementation)
  • Integer overflow (already protected with checked_add())
  • Resource cleanup (proper native session cleanup via nativeCloseStreamingSession)

Overall Assessment: Excellent implementation with solid architecture and comprehensive safety measures. The single minor issue is a best practice recommendation rather than a functional bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant