diff --git a/native/Cargo.lock b/native/Cargo.lock index 3c3a61d..0b8f89c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -4337,6 +4337,7 @@ checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" [[package]] name = "ownedbytes" version = "0.7.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "stable_deref_trait", ] @@ -4968,6 +4969,7 @@ dependencies = [ [[package]] name = "quickwit-actors" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -4986,6 +4988,7 @@ dependencies = [ [[package]] name = "quickwit-aws" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "aws-config", "aws-runtime", @@ -5005,6 +5008,7 @@ dependencies = [ [[package]] name = "quickwit-cluster" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5031,6 +5035,7 @@ dependencies = [ [[package]] name = "quickwit-codegen" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "heck 0.5.0", @@ -5045,6 +5050,7 @@ dependencies = [ [[package]] name = "quickwit-common" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-speed-limit", @@ -5085,6 +5091,7 @@ dependencies = [ [[package]] name = "quickwit-config" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "bytes", @@ -5116,6 +5123,7 @@ dependencies = [ [[package]] name = "quickwit-datetime" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "itertools 0.14.0", @@ -5129,6 +5137,7 @@ dependencies = [ [[package]] name = "quickwit-directories" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5145,6 +5154,7 @@ dependencies = [ [[package]] name = "quickwit-doc-mapper" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "base64 0.22.1", @@ -5173,6 +5183,7 @@ dependencies = [ [[package]] name = "quickwit-indexing" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "arc-swap", @@ -5217,6 +5228,7 @@ dependencies = [ [[package]] name = "quickwit-ingest" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5251,6 +5263,7 @@ dependencies = [ [[package]] name = "quickwit-macros" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "proc-macro2", "quote", @@ -5260,6 +5273,7 @@ dependencies = [ [[package]] name = "quickwit-metastore" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5295,6 +5309,7 @@ dependencies = [ [[package]] name = "quickwit-opentelemetry" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5317,6 +5332,7 @@ dependencies = [ [[package]] name = "quickwit-proto" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5349,6 +5365,7 @@ dependencies = [ [[package]] name = "quickwit-query" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "base64 0.22.1", @@ -5370,6 +5387,7 @@ dependencies = [ [[package]] name = "quickwit-search" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -5414,6 +5432,7 @@ dependencies = [ [[package]] name = "quickwit-storage" version = "0.8.0" +source = "git+https://github.com/indextables/quickwit?rev=7af2361e#7af2361ef0184e1572537a2a91fe5d581b453bfd" dependencies = [ "anyhow", "async-trait", @@ -6688,6 +6707,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "tantivy" version = "0.23.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "aho-corasick", "arc-swap", @@ -6742,6 +6762,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "bitpacking", ] @@ -6749,6 +6770,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "downcast-rs", "fastdivide", @@ -6763,6 +6785,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "async-trait", "byteorder", @@ -6785,6 +6808,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "nom", ] @@ -6792,6 +6816,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "futures-util", "itertools 0.14.0", @@ -6804,6 +6829,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "murmurhash32", "rand_distr", @@ -6813,13 +6839,14 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" +source = "git+https://github.com/indextables/tantivy?rev=4b6d7d49#4b6d7d49cf1f55592dc29d4ca3867395df59ec7d" dependencies = [ "serde", ] [[package]] name = "tantivy4java" -version = "0.31.2" +version = "0.32.4" dependencies = [ "anyhow", "arrow", diff --git a/native/Cargo.toml b/native/Cargo.toml index 05ddaa7..97e7ebb 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tantivy4java" -version = "0.31.2" +version = "0.32.4" edition = "2021" license = "MIT" authors = ["Tantivy4Java Contributors"] diff --git a/native/src/split_searcher/aggregation_arrow_ffi.rs b/native/src/split_searcher/aggregation_arrow_ffi.rs index 3d46ae8..c1f9e70 100644 --- a/native/src/split_searcher/aggregation_arrow_ffi.rs +++ b/native/src/split_searcher/aggregation_arrow_ffi.rs @@ -4,6 +4,7 @@ // via the Arrow C Data Interface (FFI). It eliminates per-result JNI overhead by // converting tantivy's AggregationResults directly to Arrow RecordBatch format. +use std::collections::HashMap; use std::sync::Arc; use anyhow::{Context, Result}; @@ -34,11 +35,27 @@ pub fn aggregation_result_to_record_batch( _name: &str, result: &AggregationResult, is_date_histogram: bool, +) -> Result { + aggregation_result_to_record_batch_with_hash_resolution( + _name, + result, + is_date_histogram, + None, + ) +} + +/// Convert a single named AggregationResult to an Arrow RecordBatch, +/// resolving U64 hash bucket keys back to original strings using the provided map. +pub fn aggregation_result_to_record_batch_with_hash_resolution( + _name: &str, + result: &AggregationResult, + is_date_histogram: bool, + hash_resolution_map: Option<&HashMap>, ) -> Result { match result { AggregationResult::MetricResult(metric) => metric_to_record_batch(metric), AggregationResult::BucketResult(bucket) => { - bucket_to_record_batch(bucket, is_date_histogram) + bucket_to_record_batch(bucket, is_date_histogram, hash_resolution_map) } } } @@ -184,9 +201,10 @@ fn metric_to_record_batch(metric: &MetricResult) -> Result { fn bucket_to_record_batch( bucket: &BucketResult, is_date_histogram: bool, + hash_resolution_map: Option<&HashMap>, ) -> Result { match bucket { - BucketResult::Terms { buckets, .. } => terms_to_record_batch(buckets), + BucketResult::Terms { buckets, .. } => terms_to_record_batch(buckets, hash_resolution_map), BucketResult::Histogram { buckets } => { if is_date_histogram { date_histogram_to_record_batch(buckets) @@ -198,10 +216,13 @@ fn bucket_to_record_batch( } } -fn terms_to_record_batch(buckets: &[BucketEntry]) -> Result { +fn terms_to_record_batch( + buckets: &[BucketEntry], + hash_resolution_map: Option<&HashMap>, +) -> Result { // Check if there is a nested bucket sub-aggregation (FR-2: flattening) if let Some(nested) = find_nested_bucket_sub_agg(buckets.first().map(|b| &b.sub_aggregation)) { - return flatten_nested_bucket_terms(buckets, &nested.0, &nested.1); + return flatten_nested_bucket_terms(buckets, &nested.0, &nested.1, hash_resolution_map); } let sub_agg_names = collect_metric_sub_agg_names( @@ -217,7 +238,7 @@ fn terms_to_record_batch(buckets: &[BucketEntry]) -> Result { } let schema = Arc::new(Schema::new(fields)); - let keys: Vec = buckets.iter().map(|b| key_to_string(&b.key)).collect(); + let keys: Vec = buckets.iter().map(|b| key_to_string_resolved(&b.key, hash_resolution_map)).collect(); let doc_counts: Vec = buckets.iter().map(|b| b.doc_count as i64).collect(); let mut columns: Vec> = vec![ @@ -442,10 +463,11 @@ fn collect_nested_rows( key_columns: &mut Vec>, doc_count_values: &mut Vec, metric_values: &mut Vec>>, + hash_resolution_map: Option<&HashMap>, ) { for bucket in buckets { let mut current_keys = prefix_keys.to_vec(); - current_keys.push(key_to_string(&bucket.key)); + current_keys.push(key_to_string_resolved(&bucket.key, hash_resolution_map)); if current_keys.len() == depth { // Leaf level: emit a row @@ -477,6 +499,7 @@ fn collect_nested_rows( key_columns, doc_count_values, metric_values, + hash_resolution_map, ); } } @@ -504,6 +527,7 @@ fn flatten_nested_bucket_terms( outer_buckets: &[BucketEntry], _nested_name: &str, _nested_template: &BucketResult, + hash_resolution_map: Option<&HashMap>, ) -> Result { // Determine total nesting depth (number of key columns) let depth = count_nesting_depth(outer_buckets); @@ -533,6 +557,7 @@ fn flatten_nested_bucket_terms( &mut key_columns, &mut doc_count_values, &mut metric_values, + hash_resolution_map, ); // Build Arrow columns @@ -634,6 +659,7 @@ fn flatten_nested_bucket_histogram( &mut inner_keys, &mut inner_docs, &mut inner_metrics, + None, // histogram inner keys are not hash fields ); let num_inner_rows = inner_docs.len(); @@ -684,6 +710,22 @@ fn key_to_string(key: &Key) -> String { } } +/// Like `key_to_string`, but resolves U64 hash keys back to original string values +/// using the provided hash resolution map (from Phase 3 hash touchup). +fn key_to_string_resolved(key: &Key, hash_resolution_map: Option<&HashMap>) -> String { + match key { + Key::U64(v) => { + if let Some(map) = hash_resolution_map { + if let Some(resolved) = map.get(v) { + return resolved.clone(); + } + } + v.to_string() + } + _ => key_to_string(key), + } +} + fn key_to_f64(key: &Key) -> f64 { match key { Key::F64(v) => *v, diff --git a/native/src/split_searcher/jni_agg_arrow.rs b/native/src/split_searcher/jni_agg_arrow.rs index a860c6c..0827b26 100644 --- a/native/src/split_searcher/jni_agg_arrow.rs +++ b/native/src/split_searcher/jni_agg_arrow.rs @@ -23,6 +23,7 @@ use crate::runtime_manager::block_on_operation; use crate::searcher::aggregation::json_helpers::is_date_histogram_aggregation; use super::aggregation_arrow_ffi::{ aggregation_result_arrow_schema_json, aggregation_result_to_record_batch, + aggregation_result_to_record_batch_with_hash_resolution, export_record_batch_ffi, }; use super::jni_search::perform_search_async_impl_leaf_response_with_aggregations; @@ -119,8 +120,12 @@ pub extern "system" fn Java_io_indextables_tantivy4java_split_SplitSearcher_nati })?; let is_date_hist = is_date_histogram_aggregation(&effective_agg_json, &agg_name_str); - let batch = - aggregation_result_to_record_batch(&agg_name_str, agg_result, is_date_hist)?; + let batch = aggregation_result_to_record_batch_with_hash_resolution( + &agg_name_str, + agg_result, + is_date_hist, + ctx.hash_resolution_map.as_ref(), + )?; let row_count = export_record_batch_ffi(&batch, arr_slice, sch_slice)?; @@ -222,24 +227,38 @@ pub extern "system" fn Java_io_indextables_tantivy4java_split_SplitCacheManager_ let query_json_owned = query_json.clone(); let agg_json_owned = agg_json_str.clone(); - // Search each split and collect intermediate aggregation bytes - let intermediate_bytes_vec: Vec> = block_on_operation(async move { - let mut results = Vec::with_capacity(ptrs_owned.len()); - for &sptr in &ptrs_owned { - let ctx = perform_search_async_impl_leaf_response_with_aggregations( - sptr, - query_json_owned.clone(), - 0, - Some(agg_json_owned.clone()), + // Search each split and collect intermediate aggregation bytes + hash resolution maps + let (intermediate_bytes_vec, merged_hash_map): (Vec>, std::collections::HashMap) = + block_on_operation(async move { + let mut results = Vec::with_capacity(ptrs_owned.len()); + let mut all_hash_maps: std::collections::HashMap = + std::collections::HashMap::new(); + for &sptr in &ptrs_owned { + let ctx = perform_search_async_impl_leaf_response_with_aggregations( + sptr, + query_json_owned.clone(), + 0, + Some(agg_json_owned.clone()), + ) + .await?; + + if let Some(map) = ctx.hash_resolution_map { + all_hash_maps.extend(map); + } + if let Some(bytes) = ctx.leaf_response.intermediate_aggregation_result { + results.push(bytes); + } + } + Ok::<(Vec>, std::collections::HashMap), anyhow::Error>( + (results, all_hash_maps), ) - .await?; + })?; - if let Some(bytes) = ctx.leaf_response.intermediate_aggregation_result { - results.push(bytes); - } - } - Ok::>, anyhow::Error>(results) - })?; + let hash_resolution_map = if merged_hash_map.is_empty() { + None + } else { + Some(merged_hash_map) + }; if intermediate_bytes_vec.is_empty() { anyhow::bail!("No splits returned aggregation results"); @@ -273,7 +292,8 @@ pub extern "system" fn Java_io_indextables_tantivy4java_split_SplitCacheManager_ t_merge.elapsed().as_millis() ); - // Finalize + // Use effective_agg_json from the first split (all splits share the same rewriting) + // For finalization, we need the rewritten agg JSON that matches what was actually searched let aggregations: Aggregations = serde_json::from_str(&agg_json_str) .map_err(|e| anyhow::anyhow!("Failed to parse aggregation JSON: {}", e))?; @@ -294,8 +314,12 @@ pub extern "system" fn Java_io_indextables_tantivy4java_split_SplitCacheManager_ })?; let is_date_hist = is_date_histogram_aggregation(&agg_json_str, &agg_name_str); - let batch = - aggregation_result_to_record_batch(&agg_name_str, agg_result, is_date_hist)?; + let batch = aggregation_result_to_record_batch_with_hash_resolution( + &agg_name_str, + agg_result, + is_date_hist, + hash_resolution_map.as_ref(), + )?; let row_count = export_record_batch_ffi(&batch, arr_slice, sch_slice)?; @@ -503,22 +527,36 @@ pub extern "system" fn Java_io_indextables_tantivy4java_split_SplitCacheManager_ let query_json_owned = query_json.clone(); let agg_json_owned = agg_json_str.clone(); - let intermediate_bytes_vec: Vec> = block_on_operation(async move { - let mut results = Vec::with_capacity(ptrs_owned.len()); - for &sptr in &ptrs_owned { - let ctx = perform_search_async_impl_leaf_response_with_aggregations( - sptr, - query_json_owned.clone(), - 0, - Some(agg_json_owned.clone()), - ) - .await?; - if let Some(bytes) = ctx.leaf_response.intermediate_aggregation_result { - results.push(bytes); + let (intermediate_bytes_vec, merged_hash_map): (Vec>, std::collections::HashMap) = + block_on_operation(async move { + let mut results = Vec::with_capacity(ptrs_owned.len()); + let mut all_hash_maps: std::collections::HashMap = + std::collections::HashMap::new(); + for &sptr in &ptrs_owned { + let ctx = perform_search_async_impl_leaf_response_with_aggregations( + sptr, + query_json_owned.clone(), + 0, + Some(agg_json_owned.clone()), + ) + .await?; + if let Some(map) = ctx.hash_resolution_map { + all_hash_maps.extend(map); + } + if let Some(bytes) = ctx.leaf_response.intermediate_aggregation_result { + results.push(bytes); + } } - } - Ok::>, anyhow::Error>(results) - })?; + Ok::<(Vec>, std::collections::HashMap), anyhow::Error>( + (results, all_hash_maps), + ) + })?; + + let hash_resolution_map = if merged_hash_map.is_empty() { + None + } else { + Some(merged_hash_map) + }; if intermediate_bytes_vec.is_empty() { anyhow::bail!("No splits returned aggregation results"); @@ -563,7 +601,12 @@ pub extern "system" fn Java_io_indextables_tantivy4java_split_SplitCacheManager_ })?; let is_date_hist = is_date_histogram_aggregation(&agg_json_str, agg_name); - let batch = aggregation_result_to_record_batch(agg_name, agg_result, is_date_hist)?; + let batch = aggregation_result_to_record_batch_with_hash_resolution( + agg_name, + agg_result, + is_date_hist, + hash_resolution_map.as_ref(), + )?; let row_count = export_record_batch_ffi(&batch, arr_slice, sch_slice)?; row_counts.insert(agg_name.clone(), serde_json::json!(row_count)); } diff --git a/pom.xml b/pom.xml index fb946b0..39d1811 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ io.indextables tantivy4java - 0.32.3 + 0.32.4 jar Tantivy4Java Experimental diff --git a/src/test/java/io/indextables/tantivy4java/AggregationArrowFfiTest.java b/src/test/java/io/indextables/tantivy4java/AggregationArrowFfiTest.java index fef022f..ab27782 100644 --- a/src/test/java/io/indextables/tantivy4java/AggregationArrowFfiTest.java +++ b/src/test/java/io/indextables/tantivy4java/AggregationArrowFfiTest.java @@ -917,4 +917,136 @@ private double getSingleDouble(Map colMap, String name) { List values = getColumnValues(colMap, name); return ((Number) values.get(0)).doubleValue(); } + + // ---- Regression Tests: String Fingerprint Hash Resolution via Arrow FFI ---- + // + // These tests verify that terms aggregations on companion splits with + // withStringHashOptimization(true) return the original string bucket keys + // (e.g. "item_0") via the Arrow FFI path, NOT raw U64 hash values. + + /** + * Create a companion split with string hash optimization enabled. + * Returns [searcher, cacheManager] -- caller must close both. + */ + private Object[] createCompanionSearcherWithHashOpt(Path dir, int numRows) throws Exception { + String tag = "hash_ffi_" + System.nanoTime(); + Path parquetFile = dir.resolve(tag + ".parquet"); + Path splitFile = dir.resolve(tag + ".split"); + + QuickwitSplit.nativeWriteTestParquet(parquetFile.toString(), numRows, 0); + + ParquetCompanionConfig config = new ParquetCompanionConfig(dir.toString()) + .withFastFieldMode(ParquetCompanionConfig.FastFieldMode.HYBRID) + .withStringHashOptimization(true); + + QuickwitSplit.SplitMetadata metadata = QuickwitSplit.createFromParquet( + Collections.singletonList(parquetFile.toString()), + splitFile.toString(), config); + + String cacheName = tag + "-cache"; + SplitCacheManager.CacheConfig cacheConfig = new SplitCacheManager.CacheConfig(cacheName); + SplitCacheManager mgr = SplitCacheManager.getInstance(cacheConfig); + + String splitUrl = "file://" + splitFile.toAbsolutePath(); + SplitSearcher s = mgr.createSplitSearcher(splitUrl, metadata, dir.toString()); + return new Object[] { s, mgr }; + } + + @Test + @DisplayName("Regression: Arrow FFI terms agg on companion split with hash opt returns string keys, not hashes") + public void testArrowFfiTermsAggHashResolution(@TempDir Path dir) throws Exception { + int numRows = 10; + Object[] pair = createCompanionSearcherWithHashOpt(dir, numRows); + SplitSearcher hashSearcher = (SplitSearcher) pair[0]; + SplitCacheManager hashCacheMgr = (SplitCacheManager) pair[1]; + + try { + // Terms aggregation on "category" field (has _phash_category hash field) + // category values are "cat_0", "cat_1", "cat_2", "cat_3", "cat_4" + String queryAst = "{\"type\":\"match_all\"}"; + String aggJson = "{\"cat_terms\":{\"terms\":{\"field\":\"category\",\"size\":100}}}"; + + int numCols = 2; // key + doc_count + long[] arrayAddrs = new long[numCols]; + long[] schemaAddrs = new long[numCols]; + allocateFfiBuffers(arrayAddrs, schemaAddrs, numCols); + try { + int rowCount = hashSearcher.aggregateArrowFfi(queryAst, "cat_terms", aggJson, + arrayAddrs, schemaAddrs); + assertEquals(5, rowCount, "Should have 5 category buckets"); + + String json = nativeReadAggArrowColumnsAsJson(arrayAddrs, schemaAddrs, numCols, rowCount); + assertNotNull(json, "Read-back JSON should not be null"); + + Map result = parseJson(json); + List> columns = getColumns(result); + assertEquals(2, columns.size()); + + // Key column: must be actual string values, NOT numeric hashes + List keys = (List) columns.get(0).get("values"); + for (Object key : keys) { + String keyStr = key.toString(); + assertTrue(keyStr.startsWith("cat_"), + "Arrow FFI terms key should be original string (e.g. 'cat_0'), got: " + keyStr); + // Verify it's NOT a numeric hash (would be a long decimal number) + assertFalse(keyStr.matches("\\d{10,}"), + "Arrow FFI terms key should NOT be a numeric hash, got: " + keyStr); + } + assertTrue(keys.contains("cat_0"), "Should contain 'cat_0'"); + assertTrue(keys.contains("cat_4"), "Should contain 'cat_4'"); + } finally { + freeFfiBuffers(arrayAddrs, schemaAddrs, numCols); + } + } finally { + hashSearcher.close(); + hashCacheMgr.close(); + } + } + + @Test + @DisplayName("Regression: Arrow FFI terms+sub-agg on companion split with hash opt resolves keys") + public void testArrowFfiTermsSubAggHashResolution(@TempDir Path dir) throws Exception { + int numRows = 20; + Object[] pair = createCompanionSearcherWithHashOpt(dir, numRows); + SplitSearcher hashSearcher = (SplitSearcher) pair[0]; + SplitCacheManager hashCacheMgr = (SplitCacheManager) pair[1]; + + try { + // Terms aggregation on "name" field with avg(score) sub-aggregation + // name values are "item_0" through "item_19" (all unique) + String queryAst = "{\"type\":\"match_all\"}"; + String aggJson = "{\"name_terms\":{\"terms\":{\"field\":\"name\",\"size\":100}," + + "\"aggs\":{\"avg_score\":{\"avg\":{\"field\":\"score\"}}}}}"; + + int numCols = 3; // key + doc_count + avg_score + long[] arrayAddrs = new long[numCols]; + long[] schemaAddrs = new long[numCols]; + allocateFfiBuffers(arrayAddrs, schemaAddrs, numCols); + try { + int rowCount = hashSearcher.aggregateArrowFfi(queryAst, "name_terms", aggJson, + arrayAddrs, schemaAddrs); + assertEquals(numRows, rowCount, "Should have " + numRows + " name buckets"); + + String json = nativeReadAggArrowColumnsAsJson(arrayAddrs, schemaAddrs, numCols, rowCount); + assertNotNull(json); + + Map result = parseJson(json); + List> columns = getColumns(result); + assertEquals(3, columns.size()); + + // Verify keys are "item_X" strings, not hash numbers + List keys = (List) columns.get(0).get("values"); + for (Object key : keys) { + String keyStr = key.toString(); + assertTrue(keyStr.startsWith("item_"), + "Arrow FFI terms key should be 'item_X', got: " + keyStr); + } + } finally { + freeFfiBuffers(arrayAddrs, schemaAddrs, numCols); + } + } finally { + hashSearcher.close(); + hashCacheMgr.close(); + } + } }