-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Feature Request: Time-Series Histogram Aggregation with Sub-Aggregations for tantivy4java
Related Issue: IndexTables4Spark #13 - Time-Series Histogram Aggregation Support
Executive Summary
Enable advanced time-series analytics in tantivy4java by fully implementing date histogram aggregations with metric sub-aggregations. This feature leverages existing Tantivy/Quickwit aggregation infrastructure to provide high-performance temporal bucketing with nested metric calculations (AVG, MIN, MAX, SUM, PERCENTILE, etc.).
Current State Analysis
✅ What Already Works
- DateHistogramAggregation Java API - Fully implemented class with interval, timezone, offset, and bounds support
- Bucket aggregations - Terms, Range, Histogram aggregations working for SplitSearcher
- Metric aggregations - Count, Sum, Average, Min, Max, Stats all implemented as separate aggregations
- Sub-aggregation infrastructure - TermsAggregation already supports
addSubAggregation()method - Tantivy native support - Underlying Tantivy library has complete date histogram + sub-aggregation support
- JSON serialization - DateHistogramAggregation generates proper Quickwit-compatible JSON
❌ What's Missing
- Sub-aggregation support for DateHistogramAggregation - No
addSubAggregation()method - DateHistogramResult parsing - Result class doesn't handle sub-aggregation results in buckets
- Native JNI bridge - Rust/JNI layer doesn't expose date histogram results with nested metrics
- SplitSearcher integration - DateHistogramAggregation not fully functional in split-based searches
- Result deserialization - Complex nested results from Tantivy not properly mapped to Java objects
Technical Analysis
Tantivy/Quickwit Foundation
Based on analysis of quickwit/quickwit-query/src/aggregations.rs and quickwit/quickwit-search/src/leaf.rs:
Tantivy Aggregation Architecture:
// Tantivy supports nested aggregations natively
pub enum BucketResult {
Histogram {
buckets: BucketEntries<BucketEntry>, // Each bucket can contain sub-aggregations
},
// ... other bucket types
}
pub struct BucketEntry {
pub key: Key,
pub doc_count: u64,
pub sub_aggregation: AggregationResults, // Nested metric results
}
pub enum MetricResult {
Average(SingleMetricResult),
Count(SingleMetricResult),
Max(SingleMetricResult),
Min(SingleMetricResult),
Sum(SingleMetricResult),
Percentiles(PercentilesMetricResult),
Stats(Stats),
ExtendedStats(Box<ExtendedStats>),
TopHits(TopHitsMetricResult),
Cardinality(SingleMetricResult),
}Key Insight: Tantivy already returns date histogram buckets with nested metric aggregation results. tantivy4java just needs to properly deserialize and expose them to Java.
Current tantivy4java Implementation Gap
Java Side (Partially Complete):
// DateHistogramAggregation.java - HAS basic structure
public class DateHistogramAggregation extends SplitAggregation {
private String fixedInterval;
private String timeZone;
// ... BUT MISSING:
// private Map<String, SplitAggregation> subAggregations; ❌
// public DateHistogramAggregation addSubAggregation(...) ❌
}
// DateHistogramResult.java - EXISTS but incomplete
public class DateHistogramResult extends AggregationResult {
private List<DateHistogramBucket> buckets;
public static class DateHistogramBucket {
private final String key;
private final long docCount;
// ... BUT MISSING:
// private Map<String, AggregationResult> subAggregations; ❌
}
}Native Side (Needs Implementation):
// searcher.rs - Has aggregation infrastructure but needs enhancement
// Currently handles simple aggregations, needs to:
// 1. Parse date histogram results from Tantivy
// 2. Extract nested metric results from each bucket
// 3. Serialize to Java-compatible formatProposed Implementation
Phase 1: Extend Java API (Estimated: 3-5 days)
1.1 Add Sub-Aggregation Support to DateHistogramAggregation
File: src/main/java/io/indextables/tantivy4java/aggregation/DateHistogramAggregation.java
public class DateHistogramAggregation extends SplitAggregation {
// Existing fields...
private final Map<String, SplitAggregation> subAggregations;
// NEW: Constructor initialization
public DateHistogramAggregation(String name, String fieldName) {
super(name);
// ... existing code
this.subAggregations = new HashMap<>();
}
// NEW: Add sub-aggregation support
/**
* Adds a metric sub-aggregation to be computed for each time bucket.
*
* Example:
* <pre>{@code
* DateHistogramAggregation hourly = new DateHistogramAggregation("hourly", "timestamp")
* .setFixedInterval("1h")
* .addSubAggregation("avg_latency", new AverageAggregation("latency"))
* .addSubAggregation("p95_latency", new PercentileAggregation("latency", 95.0))
* .addSubAggregation("total_requests", new CountAggregation());
* }</pre>
*/
public DateHistogramAggregation addSubAggregation(String name, SplitAggregation aggregation) {
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Sub-aggregation name cannot be null or empty");
}
if (aggregation == null) {
throw new IllegalArgumentException("Sub-aggregation cannot be null");
}
subAggregations.put(name.trim(), aggregation);
return this;
}
// NEW: Update JSON generation to include sub-aggregations
@Override
public String toAggregationJson() {
// ... existing date_histogram JSON generation
// Add sub-aggregations
if (!subAggregations.isEmpty()) {
json.append(", \"aggs\": {");
boolean first = true;
for (Map.Entry<String, SplitAggregation> entry : subAggregations.entrySet()) {
if (!first) json.append(", ");
json.append("\"").append(entry.getKey()).append("\": ");
json.append(entry.getValue().toAggregationJson());
first = false;
}
json.append("}");
}
json.append("}}");
return json.toString();
}
public Map<String, SplitAggregation> getSubAggregations() {
return subAggregations;
}
}Generated JSON Example:
{
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h",
"time_zone": "UTC",
"aggs": {
"avg_latency": {"avg": {"field": "latency"}},
"p95_latency": {"percentiles": {"field": "latency", "percents": [95.0]}},
"total_requests": {"value_count": {"field": "_id"}}
}
}
}1.2 Enhance DateHistogramResult to Handle Sub-Aggregations
File: src/main/java/io/indextables/tantivy4java/aggregation/DateHistogramResult.java
public class DateHistogramResult extends AggregationResult {
private final List<DateHistogramBucket> buckets;
// NEW: Enhanced bucket class with sub-aggregation support
public static class DateHistogramBucket {
private final String key; // ISO timestamp or formatted string
private final long keyAsLong; // Unix timestamp in milliseconds
private final long docCount;
private final Map<String, AggregationResult> subAggregations; // NEW
public DateHistogramBucket(String key, long keyAsLong, long docCount) {
this(key, keyAsLong, docCount, new HashMap<>());
}
// NEW: Constructor with sub-aggregations
public DateHistogramBucket(String key, long keyAsLong, long docCount,
Map<String, AggregationResult> subAggregations) {
this.key = key;
this.keyAsLong = keyAsLong;
this.docCount = docCount;
this.subAggregations = subAggregations;
}
// Existing getters...
// NEW: Sub-aggregation accessors
public Map<String, AggregationResult> getSubAggregations() {
return subAggregations;
}
public AggregationResult getSubAggregation(String name) {
return subAggregations.get(name);
}
public boolean hasSubAggregations() {
return !subAggregations.isEmpty();
}
// NEW: Typed accessor helpers
public SingleMetricResult getMetric(String name) {
AggregationResult result = subAggregations.get(name);
if (result instanceof AverageResult) {
return ((AverageResult) result).getValue();
} else if (result instanceof SumResult) {
return ((SumResult) result).getValue();
} else if (result instanceof MinResult) {
return ((MinResult) result).getValue();
} else if (result instanceof MaxResult) {
return ((MaxResult) result).getValue();
}
return null;
}
}
// ... existing methods
}1.3 Create Missing Metric Aggregation Classes
Files to create/enhance:
PercentileAggregation.java- Support for percentile calculationsPercentilesResult.java- Multi-percentile resultsCardinalityAggregation.java- COUNT(DISTINCT) supportExtendedStatsAggregation.java- Variance, stddev, etc.
Example: PercentileAggregation.java
package io.indextables.tantivy4java.aggregation;
public class PercentileAggregation extends SplitAggregation {
private final String fieldName;
private final double[] percentiles;
/**
* Creates a percentile aggregation for a single percentile.
*/
public PercentileAggregation(String fieldName, double percentile) {
this("percentile_" + fieldName, fieldName, new double[]{percentile});
}
/**
* Creates a percentile aggregation for multiple percentiles.
*/
public PercentileAggregation(String name, String fieldName, double[] percentiles) {
super(name);
this.fieldName = fieldName;
this.percentiles = percentiles;
// Validate percentiles
for (double p : percentiles) {
if (p < 0.0 || p > 100.0) {
throw new IllegalArgumentException("Percentile must be between 0 and 100");
}
}
}
@Override
public String getFieldName() {
return fieldName;
}
@Override
public String getAggregationType() {
return "percentiles";
}
@Override
public String toAggregationJson() {
StringBuilder json = new StringBuilder();
json.append("{\"percentiles\": {\"field\": \"").append(fieldName).append("\"");
json.append(", \"percents\": [");
for (int i = 0; i < percentiles.length; i++) {
if (i > 0) json.append(", ");
json.append(percentiles[i]);
}
json.append("]}}");
return json.toString();
}
}Phase 2: Enhance Native JNI Layer (Estimated: 5-7 days)
2.1 Update Rust Aggregation Deserialization
File: native/src/searcher.rs
// NEW: Enhanced date histogram result parsing
fn parse_date_histogram_result(
env: &mut JNIEnv,
bucket_result: &tantivy::aggregation::agg_result::BucketResult,
) -> Result<jobject, String> {
use tantivy::aggregation::agg_result::BucketResult;
match bucket_result {
BucketResult::Histogram { buckets } => {
// Create ArrayList<DateHistogramBucket>
let bucket_list = create_java_arraylist(env)?;
for (key, bucket_entry) in buckets.entries.iter() {
// Extract bucket key (timestamp)
let key_as_long = match key {
Key::F64(f) => *f as i64,
Key::I64(i) => *i,
_ => return Err("Invalid date histogram key type".to_string()),
};
// Format key as ISO8601 string
let key_as_string = format_timestamp_iso8601(key_as_long)?;
// Extract document count
let doc_count = bucket_entry.doc_count;
// NEW: Parse sub-aggregations for this bucket
let sub_aggs_map = if !bucket_entry.sub_aggregation.0.is_empty() {
parse_sub_aggregations(env, &bucket_entry.sub_aggregation)?
} else {
create_empty_java_hashmap(env)?
};
// Create DateHistogramBucket Java object
let bucket_class = env.find_class(
"io/indextables/tantivy4java/aggregation/DateHistogramResult$DateHistogramBucket"
)?;
let bucket_obj = env.new_object(
bucket_class,
"(Ljava/lang/String;JJLjava/util/Map;)V",
&[
JValue::Object(&env.new_string(&key_as_string)?),
JValue::Long(key_as_long),
JValue::Long(doc_count as i64),
JValue::Object(&sub_aggs_map),
],
)?;
// Add to list
env.call_method(&bucket_list, "add", "(Ljava/lang/Object;)Z", &[JValue::Object(&bucket_obj)])?;
}
// Create DateHistogramResult object
let result_class = env.find_class("io/indextables/tantivy4java/aggregation/DateHistogramResult")?;
let result_obj = env.new_object(
result_class,
"(Ljava/util/List;)V",
&[JValue::Object(&bucket_list)],
)?;
Ok(result_obj.into_raw())
}
_ => Err("Expected histogram bucket result".to_string()),
}
}
// NEW: Parse nested sub-aggregation results
fn parse_sub_aggregations(
env: &mut JNIEnv,
agg_results: &tantivy::aggregation::agg_result::AggregationResults,
) -> Result<jobject, String> {
let hashmap = create_java_hashmap(env)?;
for (name, result) in agg_results.0.iter() {
let java_result = match result {
AggregationResult::MetricResult(metric) => {
parse_metric_result(env, metric)?
}
AggregationResult::BucketResult(bucket) => {
// Nested buckets (rare but possible)
parse_bucket_result(env, bucket)?
}
};
let name_jstring = env.new_string(name)?;
env.call_method(
&hashmap,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[JValue::Object(&name_jstring), JValue::Object(&java_result)],
)?;
}
Ok(hashmap.into_raw())
}
// NEW: Parse metric aggregation results
fn parse_metric_result(
env: &mut JNIEnv,
metric: &tantivy::aggregation::agg_result::MetricResult,
) -> Result<jobject, String> {
use tantivy::aggregation::agg_result::MetricResult;
match metric {
MetricResult::Average(single) => {
create_java_average_result(env, single.value)
}
MetricResult::Sum(single) => {
create_java_sum_result(env, single.value)
}
MetricResult::Min(single) => {
create_java_min_result(env, single.value)
}
MetricResult::Max(single) => {
create_java_max_result(env, single.value)
}
MetricResult::Count(single) => {
create_java_count_result(env, single.value as u64)
}
MetricResult::Stats(stats) => {
create_java_stats_result(env, stats)
}
MetricResult::Percentiles(percentiles) => {
create_java_percentiles_result(env, percentiles)
}
MetricResult::Cardinality(single) => {
create_java_cardinality_result(env, single.value as u64)
}
_ => Err("Unsupported metric type".to_string()),
}
}
// Helper functions for creating Java result objects
fn create_java_average_result(env: &mut JNIEnv, value: f64) -> Result<jobject, String> {
let class = env.find_class("io/indextables/tantivy4java/aggregation/AverageResult")?;
let obj = env.new_object(class, "(D)V", &[JValue::Double(value)])?;
Ok(obj.into_raw())
}
// ... similar helpers for Sum, Min, Max, Count, Stats, Percentiles, Cardinality2.2 Add Timestamp Formatting Utilities
// NEW: Format Unix timestamp as ISO8601 string
fn format_timestamp_iso8601(timestamp_ms: i64) -> Result<String, String> {
use chrono::{DateTime, Utc, NaiveDateTime};
let seconds = timestamp_ms / 1000;
let nanos = ((timestamp_ms % 1000) * 1_000_000) as u32;
let naive = NaiveDateTime::from_timestamp_opt(seconds, nanos)
.ok_or_else(|| "Invalid timestamp".to_string())?;
let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc);
Ok(datetime.to_rfc3339())
}
// NEW: Format with custom timezone
fn format_timestamp_with_tz(timestamp_ms: i64, timezone: &str) -> Result<String, String> {
use chrono_tz::Tz;
let tz: Tz = timezone.parse()
.map_err(|_| format!("Invalid timezone: {}", timezone))?;
let seconds = timestamp_ms / 1000;
let nanos = ((timestamp_ms % 1000) * 1_000_000) as u32;
let naive = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos)
.ok_or_else(|| "Invalid timestamp".to_string())?;
let utc_time = chrono::DateTime::<chrono::Utc>::from_utc(naive, chrono::Utc);
let local_time = utc_time.with_timezone(&tz);
Ok(local_time.to_rfc3339())
}2.3 Update Cargo.toml Dependencies
[dependencies]
# Existing dependencies...
chrono = "0.4"
chrono-tz = "0.8"Phase 3: Integration & Testing (Estimated: 3-5 days)
3.1 Comprehensive Test Suite
File: src/test/java/io/indextables/tantivy4java/aggregation/DateHistogramSubAggregationTest.java
public class DateHistogramSubAggregationTest {
@Test
public void testDateHistogramWithSingleMetric() throws Exception {
// Create test data with timestamps and latency values
// ...
// Create date histogram with avg sub-aggregation
DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
.setFixedInterval("1h")
.setTimeZone("UTC")
.addSubAggregation("avg_latency", new AverageAggregation("latency"));
SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");
assertNotNull(histResult);
assertFalse(histResult.getBuckets().isEmpty());
for (DateHistogramBucket bucket : histResult.getBuckets()) {
assertTrue(bucket.hasSubAggregations());
AggregationResult avgResult = bucket.getSubAggregation("avg_latency");
assertNotNull(avgResult);
assertTrue(avgResult instanceof AverageResult);
double avgLatency = ((AverageResult) avgResult).getValue();
assertTrue(avgLatency > 0);
}
}
@Test
public void testDateHistogramWithMultipleMetrics() throws Exception {
// Create date histogram with multiple sub-aggregations
DateHistogramAggregation histogram = new DateHistogramAggregation("daily", "timestamp")
.setFixedInterval("1d")
.setTimeZone("America/New_York")
.addSubAggregation("avg_latency", new AverageAggregation("latency"))
.addSubAggregation("min_latency", new MinAggregation("latency"))
.addSubAggregation("max_latency", new MaxAggregation("latency"))
.addSubAggregation("p95_latency", new PercentileAggregation("latency", 95.0))
.addSubAggregation("total_requests", new CountAggregation());
SearchResult result = searcher.search(query, 10, Map.of("daily", histogram));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("daily");
for (DateHistogramBucket bucket : histResult.getBuckets()) {
assertEquals(5, bucket.getSubAggregations().size());
// Validate each metric
assertNotNull(bucket.getSubAggregation("avg_latency"));
assertNotNull(bucket.getSubAggregation("min_latency"));
assertNotNull(bucket.getSubAggregation("max_latency"));
assertNotNull(bucket.getSubAggregation("p95_latency"));
assertNotNull(bucket.getSubAggregation("total_requests"));
}
}
@Test
public void testDateHistogramWithTimezoneConversion() throws Exception {
// Test that timezone parameter correctly shifts bucket boundaries
DateHistogramAggregation utcHistogram = new DateHistogramAggregation("utc", "timestamp")
.setFixedInterval("1d")
.setTimeZone("UTC");
DateHistogramAggregation etHistogram = new DateHistogramAggregation("et", "timestamp")
.setFixedInterval("1d")
.setTimeZone("America/New_York");
Map<String, SplitAggregation> aggs = Map.of(
"utc", utcHistogram,
"et", etHistogram
);
SearchResult result = searcher.search(query, 10, aggs);
DateHistogramResult utcResult = (DateHistogramResult) result.getAggregation("utc");
DateHistogramResult etResult = (DateHistogramResult) result.getAggregation("et");
// Verify bucket keys are different due to timezone shift
assertNotEquals(
utcResult.getBuckets().get(0).getKey(),
etResult.getBuckets().get(0).getKey()
);
}
@Test
public void testDateHistogramWithExtendedBounds() throws Exception {
long startTime = System.currentTimeMillis();
long endTime = startTime + (30L * 24 * 60 * 60 * 1000); // 30 days later
DateHistogramAggregation histogram = new DateHistogramAggregation("daily", "timestamp")
.setFixedInterval("1d")
.setExtendedBounds(startTime, endTime);
SearchResult result = searcher.search(query, 10, Map.of("daily", histogram));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("daily");
// Should have 30 buckets even if some have 0 documents
assertEquals(30, histResult.getBuckets().size());
}
@Test
public void testDateHistogramJsonSerialization() {
DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
.setFixedInterval("1h")
.setTimeZone("UTC")
.setOffset("-4h")
.addSubAggregation("avg_value", new AverageAggregation("value"))
.addSubAggregation("count", new CountAggregation());
String json = histogram.toAggregationJson();
// Verify JSON structure
assertTrue(json.contains("\"date_histogram\""));
assertTrue(json.contains("\"field\": \"timestamp\""));
assertTrue(json.contains("\"fixed_interval\": \"1h\""));
assertTrue(json.contains("\"time_zone\": \"UTC\""));
assertTrue(json.contains("\"offset\": \"-4h\""));
assertTrue(json.contains("\"aggs\""));
assertTrue(json.contains("\"avg_value\""));
assertTrue(json.contains("\"count\""));
}
}3.2 Integration Test with Real S3 Splits
@Test
public void testDateHistogramOnS3Split() throws Exception {
// Use SplitSearcher with real split file
SplitCacheManager.CacheConfig config = new SplitCacheManager.CacheConfig("test-cache")
.withMaxCacheSize(100_000_000)
.withAwsCredentials(accessKey, secretKey)
.withAwsRegion("us-east-1");
try (SplitCacheManager cacheManager = SplitCacheManager.getInstance(config)) {
try (SplitSearcher searcher = cacheManager.createSplitSearcher("s3://bucket/test.split")) {
DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "event_timestamp")
.setFixedInterval("1h")
.addSubAggregation("avg_latency", new AverageAggregation("latency"))
.addSubAggregation("p95_latency", new PercentileAggregation("latency", 95.0));
SplitQuery query = new SplitMatchAllQuery();
SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");
assertNotNull(histResult);
assertTrue(histResult.getBuckets().size() > 0);
// Verify sub-aggregations work correctly
for (DateHistogramBucket bucket : histResult.getBuckets()) {
if (bucket.getDocCount() > 0) {
assertTrue(bucket.hasSubAggregations());
assertNotNull(bucket.getSubAggregation("avg_latency"));
assertNotNull(bucket.getSubAggregation("p95_latency"));
}
}
}
}
}3.3 Performance Benchmark Tests
@Test
public void benchmarkDateHistogramPerformance() throws Exception {
// Compare performance: date histogram vs Spark-side aggregation
// 1. Date histogram aggregation (pushed down)
long startPushdown = System.currentTimeMillis();
DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
.setFixedInterval("1h")
.addSubAggregation("avg_latency", new AverageAggregation("latency"))
.addSubAggregation("max_latency", new MaxAggregation("latency"))
.addSubAggregation("count", new CountAggregation());
SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
long endPushdown = System.currentTimeMillis();
System.out.println("Pushdown time: " + (endPushdown - startPushdown) + "ms");
// 2. Manual aggregation (retrieve all docs, aggregate in Java)
long startManual = System.currentTimeMillis();
SearchResult allDocs = searcher.search(new SplitMatchAllQuery(), 1_000_000);
Map<Long, List<Double>> hourlyBuckets = new HashMap<>();
// ... manual bucketing and metric calculation
long endManual = System.currentTimeMillis();
System.out.println("Manual time: " + (endManual - startManual) + "ms");
System.out.println("Speedup: " + ((double)(endManual - startManual) / (endPushdown - startPushdown)) + "x");
// Assert significant speedup
assertTrue((endManual - startManual) > (endPushdown - startPushdown) * 5);
}Phase 4: Documentation & Examples (Estimated: 2-3 days)
4.1 Update README.md
### Time-Series Histogram Aggregations
Analyze time-series data with date histogram aggregations and nested metric calculations:
```java
// Hourly performance metrics
DateHistogramAggregation hourlyMetrics = new DateHistogramAggregation("hourly", "timestamp")
.setFixedInterval("1h")
.setTimeZone("America/New_York")
.addSubAggregation("avg_response_time", new AverageAggregation("response_time"))
.addSubAggregation("p95_response_time", new PercentileAggregation("response_time", 95.0))
.addSubAggregation("p99_response_time", new PercentileAggregation("response_time", 99.0))
.addSubAggregation("total_requests", new CountAggregation())
.addSubAggregation("unique_users", new CardinalityAggregation("user_id"));
SearchResult result = searcher.search(query, 10, Map.of("hourly", hourlyMetrics));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");
for (DateHistogramBucket bucket : histResult.getBuckets()) {
System.out.println("Hour: " + bucket.getKey());
System.out.println(" Requests: " + bucket.getDocCount());
AverageResult avg = (AverageResult) bucket.getSubAggregation("avg_response_time");
System.out.println(" Avg Response: " + avg.getValue() + "ms");
PercentilesResult p95 = (PercentilesResult) bucket.getSubAggregation("p95_response_time");
System.out.println(" P95 Response: " + p95.getValue(95.0) + "ms");
}Supported Intervals:
"1s","30s"- Seconds"1m","5m","15m"- Minutes"1h","6h","12h"- Hours"1d","7d"- Days"1w"- Week"1M"- Month"1y"- Year
Supported Sub-Aggregations:
AverageAggregation- Average valueSumAggregation- Sum of valuesMinAggregation- Minimum valueMaxAggregation- Maximum valueCountAggregation- Document countPercentileAggregation- Percentile values (p50, p95, p99, etc.)StatsAggregation- Complete statistics (min, max, avg, sum, count)CardinalityAggregation- Unique value count (approximate)
#### 4.2 Create Comprehensive Guide
**File:** `docs/TIME_SERIES_AGGREGATION_GUIDE.md`
```markdown
# Time-Series Aggregation Guide
## Overview
Date histogram aggregations enable efficient time-series analytics by bucketing documents into time intervals and computing metrics for each bucket.
## Basic Usage
### Simple Hourly Histogram
```java
DateHistogramAggregation hourly = new DateHistogramAggregation("hourly", "timestamp")
.setFixedInterval("1h");
SearchResult result = searcher.search(query, 10, Map.of("hourly", hourly));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");
for (DateHistogramBucket bucket : histResult.getBuckets()) {
System.out.println(bucket.getKey() + ": " + bucket.getDocCount() + " events");
}
Advanced Features
Timezone Support
// Business hours in Eastern Time
DateHistogramAggregation businessHours = new DateHistogramAggregation("hours", "timestamp")
.setFixedInterval("1h")
.setTimeZone("America/New_York");Time Offset
// Shift bucket boundaries by 30 minutes
DateHistogramAggregation shifted = new DateHistogramAggregation("shifted", "timestamp")
.setFixedInterval("1h")
.setOffset("30m");Extended Bounds (Fill Empty Buckets)
long startTime = /* ... */;
long endTime = /* ... */;
DateHistogramAggregation complete = new DateHistogramAggregation("complete", "timestamp")
.setFixedInterval("1d")
.setExtendedBounds(startTime, endTime);
// Returns buckets for entire range, even with 0 documentsSub-Aggregations
Multiple Metrics Per Bucket
DateHistogramAggregation comprehensive = new DateHistogramAggregation("analysis", "timestamp")
.setFixedInterval("1h")
.addSubAggregation("avg_latency", new AverageAggregation("latency"))
.addSubAggregation("min_latency", new MinAggregation("latency"))
.addSubAggregation("max_latency", new MaxAggregation("latency"))
.addSubAggregation("sum_bytes", new SumAggregation("bytes_transferred"))
.addSubAggregation("unique_users", new CardinalityAggregation("user_id"));
SearchResult result = searcher.search(query, 10, Map.of("analysis", comprehensive));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("analysis");
for (DateHistogramBucket bucket : histResult.getBuckets()) {
System.out.println("Time: " + bucket.getKey());
System.out.println("Events: " + bucket.getDocCount());
// Access metrics
AverageResult avg = (AverageResult) bucket.getSubAggregation("avg_latency");
System.out.println("Avg Latency: " + avg.getValue());
MinResult min = (MinResult) bucket.getSubAggregation("min_latency");
System.out.println("Min Latency: " + min.getValue());
MaxResult max = (MaxResult) bucket.getSubAggregation("max_latency");
System.out.println("Max Latency: " + max.getValue());
SumResult sum = (SumResult) bucket.getSubAggregation("sum_bytes");
System.out.println("Total Bytes: " + sum.getValue());
CardinalityResult cardinality = (CardinalityResult) bucket.getSubAggregation("unique_users");
System.out.println("Unique Users: " + cardinality.getValue());
}Percentiles for SLA Monitoring
DateHistogramAggregation slaMonitoring = new DateHistogramAggregation("sla", "timestamp")
.setFixedInterval("5m")
.addSubAggregation("p50", new PercentileAggregation("response_time", 50.0))
.addSubAggregation("p95", new PercentileAggregation("response_time", 95.0))
.addSubAggregation("p99", new PercentileAggregation("response_time", 99.0))
.addSubAggregation("p999", new PercentileAggregation("response_time", 99.9));
SearchResult result = searcher.search(query, 10, Map.of("sla", slaMonitoring));
DateHistogramResult slaResult = (DateHistogramResult) result.getAggregation("sla");
for (DateHistogramBucket bucket : slaResult.getBuckets()) {
PercentilesResult p95 = (PercentilesResult) bucket.getSubAggregation("p95");
PercentilesResult p99 = (PercentilesResult) bucket.getSubAggregation("p99");
if (p95.getValue(95.0) > 200 || p99.getValue(99.0) > 500) {
System.out.println("SLA VIOLATION at " + bucket.getKey());
System.out.println(" P95: " + p95.getValue(95.0) + "ms");
System.out.println(" P99: " + p99.getValue(99.0) + "ms");
}
}Real-World Use Cases
1. Application Performance Monitoring
// Monitor application latency by hour
DateHistogramAggregation apmHourly = new DateHistogramAggregation("apm_hourly", "request_timestamp")
.setFixedInterval("1h")
.setTimeZone("UTC")
.addSubAggregation("avg_latency", new AverageAggregation("latency_ms"))
.addSubAggregation("p95_latency", new PercentileAggregation("latency_ms", 95.0))
.addSubAggregation("p99_latency", new PercentileAggregation("latency_ms", 99.0))
.addSubAggregation("error_count", new CountAggregation())
.addSubAggregation("unique_users", new CardinalityAggregation("user_id"));2. Log Analytics Dashboard
// Analyze log volume and error rates by 5-minute intervals
DateHistogramAggregation logAnalytics = new DateHistogramAggregation("logs_5m", "log_timestamp")
.setFixedInterval("5m")
.addSubAggregation("log_count", new CountAggregation())
.addSubAggregation("avg_severity", new AverageAggregation("severity_level"))
.addSubAggregation("unique_services", new CardinalityAggregation("service_name"));3. Business Metrics Tracking
// Track sales metrics by day
DateHistogramAggregation dailySales = new DateHistogramAggregation("daily_sales", "order_timestamp")
.setFixedInterval("1d")
.setTimeZone("America/Los_Angeles")
.addSubAggregation("total_revenue", new SumAggregation("order_amount"))
.addSubAggregation("avg_order_value", new AverageAggregation("order_amount"))
.addSubAggregation("order_count", new CountAggregation())
.addSubAggregation("unique_customers", new CardinalityAggregation("customer_id"));Performance Considerations
Field Configuration
Date histogram requires fields to be configured as fast fields:
// During indexing, mark timestamp fields as fast
schema.addDateField("timestamp", true, true, true); // stored, indexed, fast
schema.addIntegerField("latency", true, true, true); // for metricsQuery Optimization
- Use filters to reduce time range before aggregation
- Limit bucket count with appropriate intervals
- Cache frequently used queries
Expected Performance
- 10-100x faster than fetching all documents and aggregating in application code
- Minimal memory usage - results are bucketed aggregates, not raw documents
- Scalable - performance independent of document count for aggregation operations
Migration from Manual Aggregation
Before (Manual):
// Fetch all documents
SearchResult all = searcher.search(query, 1_000_000);
Map<Long, List<Double>> buckets = new HashMap<>();
// Manual bucketing
for (Hit hit : all.getHits()) {
Document doc = searcher.doc(hit.getDocAddress());
long timestamp = (long) doc.getFirst("timestamp");
double latency = (double) doc.getFirst("latency");
long hourBucket = timestamp / (60 * 60 * 1000);
buckets.computeIfAbsent(hourBucket, k -> new ArrayList<>()).add(latency);
}
// Manual metric calculation
for (Map.Entry<Long, List<Double>> entry : buckets.entrySet()) {
double avg = entry.getValue().stream().mapToDouble(Double::doubleValue).average().orElse(0);
System.out.println("Hour: " + entry.getKey() + ", Avg: " + avg);
}After (Optimized):
// Single pushdown query
DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
.setFixedInterval("1h")
.addSubAggregation("avg_latency", new AverageAggregation("latency"));
SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");
for (DateHistogramBucket bucket : histResult.getBuckets()) {
AverageResult avg = (AverageResult) bucket.getSubAggregation("avg_latency");
System.out.println("Hour: " + bucket.getKey() + ", Avg: " + avg.getValue());
}Benefits:
- 10-100x performance improvement
- Reduced memory usage
- Cleaner, more maintainable code
- Native Tantivy performance
## Implementation Recommendation
Based on analysis of Quickwit and tantivy4java architecture:
### ✅ **RECOMMENDED APPROACH: Full Sub-Aggregation Support**
**Rationale:**
1. **Tantivy already supports it** - The underlying Tantivy library has complete date histogram + sub-aggregation support
2. **Pattern already exists** - TermsAggregation already implements sub-aggregation pattern that can be replicated
3. **Critical for use case** - Time-series analytics without metrics (AVG, MIN, MAX, P95, etc.) has limited value
4. **Minimal additional complexity** - Once date histogram buckets work, adding sub-aggregations is straightforward
5. **Future-proof** - Matches Elasticsearch/Quickwit patterns, making future enhancements easier
### 📊 **Implementation Feasibility Assessment**
| Component | Complexity | Estimated Effort | Risk |
|-----------|-----------|------------------|------|
| Java API Enhancement | Low | 3-5 days | Low |
| Metric Aggregation Classes | Low | 2-3 days | Low |
| Native JNI Date Histogram Parsing | Medium | 4-5 days | Medium |
| Native Sub-Aggregation Parsing | Medium | 3-4 days | Medium |
| Result Deserialization | Medium-High | 3-4 days | Medium |
| Testing & Validation | Medium | 3-5 days | Low |
| Documentation | Low | 2-3 days | Low |
| **TOTAL** | **Medium** | **20-29 days (4-6 weeks)** | **Medium** |
### 🎯 **Phased Rollout Strategy**
**Phase 1 (MVP): Date Histogram with Basic Metrics (2-3 weeks)**
- Date histogram buckets (count only)
- Single metric sub-aggregations (AVG, MIN, MAX, SUM, COUNT)
- No percentiles or cardinality yet
- Basic timezone support
**Phase 2 (Full Feature): Advanced Metrics (1-2 weeks)**
- Percentile aggregations (P50, P95, P99)
- Cardinality aggregations (COUNT DISTINCT)
- Stats aggregation (comprehensive statistics)
- Extended bounds support
- Full timezone and offset support
**Phase 3 (Optimization): Performance & Polish (1 week)**
- Performance optimization
- Comprehensive test suite
- Documentation and examples
- Production hardening
## Expected Benefits
### Performance
- **10-100x speedup** compared to application-side aggregation
- **Minimal data transfer** - Only aggregated results returned, not raw documents
- **Memory efficient** - Aggregation happens in Tantivy, not JVM heap
- **Scalable** - Performance independent of document count
### Use Cases Enabled
- Real-time application performance monitoring dashboards
- Log analytics with time-series metrics
- Business intelligence reporting
- SLA monitoring and alerting
- Anomaly detection in time-series data
### IndexTables4Spark Integration
Once implemented in tantivy4java, IndexTables4Spark can leverage this for:
- Spark SQL time-series queries with aggregate pushdown
- 10-100x performance improvement for temporal analytics
- Support for standard SQL date functions (date_trunc, date_format, etc.)
- Multi-dimensional time-series analysis
## Acceptance Criteria
- [ ] DateHistogramAggregation supports `addSubAggregation()` method
- [ ] DateHistogramResult.Bucket contains Map of sub-aggregation results
- [ ] Native JNI layer properly deserializes date histogram buckets with metrics
- [ ] Support for all basic metric aggregations (AVG, MIN, MAX, SUM, COUNT)
- [ ] Support for percentile aggregations (P50, P95, P99)
- [ ] Support for cardinality aggregation (COUNT DISTINCT)
- [ ] Timezone parameter works correctly (UTC, America/New_York, etc.)
- [ ] Fixed interval support (1s, 1m, 1h, 1d, 1w, 1M, 1y)
- [ ] Offset parameter works correctly
- [ ] Extended bounds parameter works correctly
- [ ] Comprehensive test suite (unit + integration tests)
- [ ] Documentation updated with examples
- [ ] Performance benchmarks show 10x+ improvement vs manual aggregation
- [ ] Works with SplitSearcher for distributed split-based searches
- [ ] Works with S3-stored splits
## Dependencies
### External Dependencies
- **Tantivy** (via Quickwit fork) - Already has full date histogram support ✅
- **chrono** - Rust datetime library for timestamp formatting
- **chrono-tz** - Rust timezone library
### Internal Dependencies
- Existing aggregation infrastructure in tantivy4java
- SplitSearcher and aggregation result parsing
- JNI bridge for complex nested results
## References
- **IndexTables4Spark Issue:** https://github.com/indextables/indextables_spark/issues/13
- **Quickwit Aggregations:** `quickwit/quickwit-query/src/aggregations.rs`
- **Tantivy Aggregations:** https://github.com/quickwit-oss/tantivy (aggregation module)
- **Existing TermsAggregation Pattern:** `tantivy4java/src/main/java/io/indextables/tantivy4java/aggregation/TermsAggregation.java`
## Priority
**HIGH** - This feature is a critical prerequisite for IndexTables4Spark time-series analytics capabilities and provides significant performance improvements for common use cases.
## Estimated Total Effort
**4-6 weeks** (20-29 working days) for complete implementation including:
- Java API enhancements
- Native JNI layer updates
- Comprehensive testing
- Documentation and examples
- Performance validation
With phased rollout, MVP can be delivered in **2-3 weeks**.