Add large result set retrieval (fused + streaming)#128
Add large result set retrieval (fused + streaming)#128
Conversation
…e APIs Implements three-tier companion-mode bulk retrieval for returning thousands to millions of rows from parquet-backed splits via Arrow FFI with bounded memory. Includes DocIdCollector (no-score search), fused single-call path, streaming session pipeline with adaptive I/O strategy, ARC_REGISTRY-based session handles for memory safety, and StreamingSession AutoCloseable Java wrapper with double-close protection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude finished @schenksj's task —— View job Code Review CompleteReviewing PR #128 for:
Todo List
Summary: Found 4 Issues (1 Critical, 2 Moderate, 1 Minor)✅ Good Architecture Design
🔴 CRITICAL: Session Handle Memory LeakFile: ✅ RESOLVED - The 🟡 MODERATE: Integer Overflow RiskFile: let total: usize = segment_fruits.iter().map(|v| v.len()).sum();
let mut result = Vec::with_capacity(total);Issue: On very large result sets, Risk: Panic on allocation or incorrect capacity calculation. Recommendation: let total = segment_fruits.iter()
.map(|v| v.len())
.try_fold(0usize, |acc, len| acc.checked_add(len))
.ok_or_else(|| anyhow!("Result set too large: document count overflow"))?;🟡 MODERATE: Silent Error Swallowing in StreamFile: if tx.send(Ok(normalized)).await.is_err() {
perf_println!("⏱️ STREAMING: consumer dropped — stopping producer");
return Ok(()); // ← Silent success despite potential error
}Issue: Producer treats channel closure as success, potentially masking critical errors. Risk: Silent data loss if consumer fails early. Recommendation: Distinguish early failure from normal completion: if tx.send(Ok(normalized)).await.is_err() {
if rows_emitted == 0 {
return Err(anyhow!("Consumer dropped before any data sent - possible error"));
}
perf_println!("⏱️ STREAMING: consumer dropped after {} rows", rows_emitted);
return Ok(());
}🟢 MINOR: Missing Bounds CheckFile: let batches = read_parquet_batches_for_file(/* ... */);Issue: No validation that Risk: Potential panic on malformed input. Recommendation: Add bounds check: if file_idx >= manifest.parquet_files.len() {
return Err(anyhow!("Invalid file_idx {} >= {}", file_idx, manifest.parquet_files.len()));
}
let file_entry = &manifest.parquet_files[file_idx];✅ No Issues Found In:
Overall Assessment: Solid implementation with good architecture. The identified issues are edge cases that should be addressed for production robustness. |
…check 1. merge_fruits: use checked_add to prevent usize overflow on very large result sets instead of unchecked sum 2. Streaming producer: distinguish early consumer drop (rows_emitted==0, returns error) from normal mid-stream drop (returns Ok) 3. Streaming producer: validate file_idx bounds before indexing into manifest.parquet_files to prevent panic on malformed input Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Claude finished @schenksj's task —— View job Code Review CompleteReviewing PR #128 for:
Todo List
Summary: Found 1 Minor Issue✅ Good Architecture Design
🟢 MINOR: Missing Bounds Check in Arrow-to-Tantivy ModuleFile: let batches = read_parquet_batches_for_file(
file_idx,
&rows,
projected_fields.as_deref().map(|s| s as &[String]),
manifest,
storage,
metadata_cache,
byte_cache,
effective_coalesce,
).await?;Issue: While bounds checking exists at line 171-176, the Risk: Potential panic if the downstream function doesn't validate the file index. Recommendation: Verify that ✅ No Issues Found In:
Overall Assessment: Excellent implementation with solid architecture and comprehensive safety measures. The single minor issue is a best practice recommendation rather than a functional bug. |

Summary
searchAndRetrieveArrowFfi()for result sets < 50K rows — no BM25 scoring, no PartialHit protobufs, no JNI round-trip of doc addressesStreamingSession(AutoCloseable) with producer/consumer pipeline for result sets > 50K rows — bounded ~24MB memory regardless of total rowsdocs/LARGE_RESULT_SET_DEVELOPER_GUIDE.mddocumenting architecture, APIs, memory budget, and error handlingNew files
native/src/split_searcher/docid_collector.rs— No-score bulk document ID collectornative/src/split_searcher/bulk_retrieval.rs— Shared search + fast-field resolutionnative/src/split_searcher/fused_retrieval.rs— Single-call search+retrieve+FFInative/src/parquet_companion/streaming_ffi.rs— Session-based streaming pipelinenative/src/parquet_companion/read_strategy.rs— Adaptive I/O strategy selectionsrc/test/java/.../LargeResultSetRetrievalTest.java— ~20 integration testsKey API changes
SplitSearcher.startStreamingRetrieval()now returnsStreamingSession(AutoCloseable) instead of rawlongStreamingSession.nextBatch()/.getColumnCount()/.close()— synchronized, double-close safe@DeprecatedTest plan
LargeResultSetRetrievalTestcovers: error handling, term/boolean/range/exists/wildcard queries, empty results, single row, concurrency, field projection, cross-path consistency, scalecargo test --lib docid_collectorpassescargo test --lib streaming_ffipassescargo test --lib read_strategypasses🤖 Generated with Claude Code