Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tantivy4java"
version = "0.31.2"
version = "0.32.4"
edition = "2021"
license = "MIT"
authors = ["Tantivy4Java Contributors"]
Expand Down
54 changes: 48 additions & 6 deletions native/src/split_searcher/aggregation_arrow_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// via the Arrow C Data Interface (FFI). It eliminates per-result JNI overhead by
// converting tantivy's AggregationResults directly to Arrow RecordBatch format.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::{Context, Result};
Expand Down Expand Up @@ -34,11 +35,27 @@ pub fn aggregation_result_to_record_batch(
_name: &str,
result: &AggregationResult,
is_date_histogram: bool,
) -> Result<RecordBatch> {
aggregation_result_to_record_batch_with_hash_resolution(
_name,
result,
is_date_histogram,
None,
)
}

/// Convert a single named AggregationResult to an Arrow RecordBatch,
/// resolving U64 hash bucket keys back to original strings using the provided map.
pub fn aggregation_result_to_record_batch_with_hash_resolution(
_name: &str,
result: &AggregationResult,
is_date_histogram: bool,
hash_resolution_map: Option<&HashMap<u64, String>>,
) -> Result<RecordBatch> {
match result {
AggregationResult::MetricResult(metric) => metric_to_record_batch(metric),
AggregationResult::BucketResult(bucket) => {
bucket_to_record_batch(bucket, is_date_histogram)
bucket_to_record_batch(bucket, is_date_histogram, hash_resolution_map)
}
}
}
Expand Down Expand Up @@ -184,9 +201,10 @@ fn metric_to_record_batch(metric: &MetricResult) -> Result<RecordBatch> {
fn bucket_to_record_batch(
bucket: &BucketResult,
is_date_histogram: bool,
hash_resolution_map: Option<&HashMap<u64, String>>,
) -> Result<RecordBatch> {
match bucket {
BucketResult::Terms { buckets, .. } => terms_to_record_batch(buckets),
BucketResult::Terms { buckets, .. } => terms_to_record_batch(buckets, hash_resolution_map),
BucketResult::Histogram { buckets } => {
if is_date_histogram {
date_histogram_to_record_batch(buckets)
Expand All @@ -198,10 +216,13 @@ fn bucket_to_record_batch(
}
}

fn terms_to_record_batch(buckets: &[BucketEntry]) -> Result<RecordBatch> {
fn terms_to_record_batch(
buckets: &[BucketEntry],
hash_resolution_map: Option<&HashMap<u64, String>>,
) -> Result<RecordBatch> {
// Check if there is a nested bucket sub-aggregation (FR-2: flattening)
if let Some(nested) = find_nested_bucket_sub_agg(buckets.first().map(|b| &b.sub_aggregation)) {
return flatten_nested_bucket_terms(buckets, &nested.0, &nested.1);
return flatten_nested_bucket_terms(buckets, &nested.0, &nested.1, hash_resolution_map);
}

let sub_agg_names = collect_metric_sub_agg_names(
Expand All @@ -217,7 +238,7 @@ fn terms_to_record_batch(buckets: &[BucketEntry]) -> Result<RecordBatch> {
}
let schema = Arc::new(Schema::new(fields));

let keys: Vec<String> = buckets.iter().map(|b| key_to_string(&b.key)).collect();
let keys: Vec<String> = buckets.iter().map(|b| key_to_string_resolved(&b.key, hash_resolution_map)).collect();
let doc_counts: Vec<i64> = buckets.iter().map(|b| b.doc_count as i64).collect();

let mut columns: Vec<Arc<dyn arrow_array::Array>> = vec![
Expand Down Expand Up @@ -442,10 +463,11 @@ fn collect_nested_rows(
key_columns: &mut Vec<Vec<String>>,
doc_count_values: &mut Vec<i64>,
metric_values: &mut Vec<Vec<Option<f64>>>,
hash_resolution_map: Option<&HashMap<u64, String>>,
) {
for bucket in buckets {
let mut current_keys = prefix_keys.to_vec();
current_keys.push(key_to_string(&bucket.key));
current_keys.push(key_to_string_resolved(&bucket.key, hash_resolution_map));

if current_keys.len() == depth {
// Leaf level: emit a row
Expand Down Expand Up @@ -477,6 +499,7 @@ fn collect_nested_rows(
key_columns,
doc_count_values,
metric_values,
hash_resolution_map,
);
}
}
Expand Down Expand Up @@ -504,6 +527,7 @@ fn flatten_nested_bucket_terms(
outer_buckets: &[BucketEntry],
_nested_name: &str,
_nested_template: &BucketResult,
hash_resolution_map: Option<&HashMap<u64, String>>,
) -> Result<RecordBatch> {
// Determine total nesting depth (number of key columns)
let depth = count_nesting_depth(outer_buckets);
Expand Down Expand Up @@ -533,6 +557,7 @@ fn flatten_nested_bucket_terms(
&mut key_columns,
&mut doc_count_values,
&mut metric_values,
hash_resolution_map,
);

// Build Arrow columns
Expand Down Expand Up @@ -634,6 +659,7 @@ fn flatten_nested_bucket_histogram(
&mut inner_keys,
&mut inner_docs,
&mut inner_metrics,
None, // histogram inner keys are not hash fields
);

let num_inner_rows = inner_docs.len();
Expand Down Expand Up @@ -684,6 +710,22 @@ fn key_to_string(key: &Key) -> String {
}
}

/// Like `key_to_string`, but resolves U64 hash keys back to original string values
/// using the provided hash resolution map (from Phase 3 hash touchup).
fn key_to_string_resolved(key: &Key, hash_resolution_map: Option<&HashMap<u64, String>>) -> String {
match key {
Key::U64(v) => {
if let Some(map) = hash_resolution_map {
if let Some(resolved) = map.get(v) {
return resolved.clone();
}
}
v.to_string()
}
_ => key_to_string(key),
}
}

fn key_to_f64(key: &Key) -> f64 {
match key {
Key::F64(v) => *v,
Expand Down
Loading