Skip to content

Feature Request: Time-Series Histogram Aggregation with Sub-Aggregations #19

@schenksj

Description

@schenksj

Feature Request: Time-Series Histogram Aggregation with Sub-Aggregations for tantivy4java

Related Issue: IndexTables4Spark #13 - Time-Series Histogram Aggregation Support

Executive Summary

Enable advanced time-series analytics in tantivy4java by fully implementing date histogram aggregations with metric sub-aggregations. This feature leverages existing Tantivy/Quickwit aggregation infrastructure to provide high-performance temporal bucketing with nested metric calculations (AVG, MIN, MAX, SUM, PERCENTILE, etc.).

Current State Analysis

✅ What Already Works

  1. DateHistogramAggregation Java API - Fully implemented class with interval, timezone, offset, and bounds support
  2. Bucket aggregations - Terms, Range, Histogram aggregations working for SplitSearcher
  3. Metric aggregations - Count, Sum, Average, Min, Max, Stats all implemented as separate aggregations
  4. Sub-aggregation infrastructure - TermsAggregation already supports addSubAggregation() method
  5. Tantivy native support - Underlying Tantivy library has complete date histogram + sub-aggregation support
  6. JSON serialization - DateHistogramAggregation generates proper Quickwit-compatible JSON

❌ What's Missing

  1. Sub-aggregation support for DateHistogramAggregation - No addSubAggregation() method
  2. DateHistogramResult parsing - Result class doesn't handle sub-aggregation results in buckets
  3. Native JNI bridge - Rust/JNI layer doesn't expose date histogram results with nested metrics
  4. SplitSearcher integration - DateHistogramAggregation not fully functional in split-based searches
  5. Result deserialization - Complex nested results from Tantivy not properly mapped to Java objects

Technical Analysis

Tantivy/Quickwit Foundation

Based on analysis of quickwit/quickwit-query/src/aggregations.rs and quickwit/quickwit-search/src/leaf.rs:

Tantivy Aggregation Architecture:

// Tantivy supports nested aggregations natively
pub enum BucketResult {
    Histogram {
        buckets: BucketEntries<BucketEntry>,  // Each bucket can contain sub-aggregations
    },
    // ... other bucket types
}

pub struct BucketEntry {
    pub key: Key,
    pub doc_count: u64,
    pub sub_aggregation: AggregationResults,  // Nested metric results
}

pub enum MetricResult {
    Average(SingleMetricResult),
    Count(SingleMetricResult),
    Max(SingleMetricResult),
    Min(SingleMetricResult),
    Sum(SingleMetricResult),
    Percentiles(PercentilesMetricResult),
    Stats(Stats),
    ExtendedStats(Box<ExtendedStats>),
    TopHits(TopHitsMetricResult),
    Cardinality(SingleMetricResult),
}

Key Insight: Tantivy already returns date histogram buckets with nested metric aggregation results. tantivy4java just needs to properly deserialize and expose them to Java.

Current tantivy4java Implementation Gap

Java Side (Partially Complete):

// DateHistogramAggregation.java - HAS basic structure
public class DateHistogramAggregation extends SplitAggregation {
    private String fixedInterval;
    private String timeZone;
    // ... BUT MISSING:
    // private Map<String, SplitAggregation> subAggregations; ❌
    // public DateHistogramAggregation addSubAggregation(...) ❌
}

// DateHistogramResult.java - EXISTS but incomplete
public class DateHistogramResult extends AggregationResult {
    private List<DateHistogramBucket> buckets;

    public static class DateHistogramBucket {
        private final String key;
        private final long docCount;
        // ... BUT MISSING:
        // private Map<String, AggregationResult> subAggregations; ❌
    }
}

Native Side (Needs Implementation):

// searcher.rs - Has aggregation infrastructure but needs enhancement
// Currently handles simple aggregations, needs to:
// 1. Parse date histogram results from Tantivy
// 2. Extract nested metric results from each bucket
// 3. Serialize to Java-compatible format

Proposed Implementation

Phase 1: Extend Java API (Estimated: 3-5 days)

1.1 Add Sub-Aggregation Support to DateHistogramAggregation

File: src/main/java/io/indextables/tantivy4java/aggregation/DateHistogramAggregation.java

public class DateHistogramAggregation extends SplitAggregation {
    // Existing fields...
    private final Map<String, SplitAggregation> subAggregations;

    // NEW: Constructor initialization
    public DateHistogramAggregation(String name, String fieldName) {
        super(name);
        // ... existing code
        this.subAggregations = new HashMap<>();
    }

    // NEW: Add sub-aggregation support
    /**
     * Adds a metric sub-aggregation to be computed for each time bucket.
     *
     * Example:
     * <pre>{@code
     * DateHistogramAggregation hourly = new DateHistogramAggregation("hourly", "timestamp")
     *     .setFixedInterval("1h")
     *     .addSubAggregation("avg_latency", new AverageAggregation("latency"))
     *     .addSubAggregation("p95_latency", new PercentileAggregation("latency", 95.0))
     *     .addSubAggregation("total_requests", new CountAggregation());
     * }</pre>
     */
    public DateHistogramAggregation addSubAggregation(String name, SplitAggregation aggregation) {
        if (name == null || name.trim().isEmpty()) {
            throw new IllegalArgumentException("Sub-aggregation name cannot be null or empty");
        }
        if (aggregation == null) {
            throw new IllegalArgumentException("Sub-aggregation cannot be null");
        }
        subAggregations.put(name.trim(), aggregation);
        return this;
    }

    // NEW: Update JSON generation to include sub-aggregations
    @Override
    public String toAggregationJson() {
        // ... existing date_histogram JSON generation

        // Add sub-aggregations
        if (!subAggregations.isEmpty()) {
            json.append(", \"aggs\": {");
            boolean first = true;
            for (Map.Entry<String, SplitAggregation> entry : subAggregations.entrySet()) {
                if (!first) json.append(", ");
                json.append("\"").append(entry.getKey()).append("\": ");
                json.append(entry.getValue().toAggregationJson());
                first = false;
            }
            json.append("}");
        }

        json.append("}}");
        return json.toString();
    }

    public Map<String, SplitAggregation> getSubAggregations() {
        return subAggregations;
    }
}

Generated JSON Example:

{
  "date_histogram": {
    "field": "timestamp",
    "fixed_interval": "1h",
    "time_zone": "UTC",
    "aggs": {
      "avg_latency": {"avg": {"field": "latency"}},
      "p95_latency": {"percentiles": {"field": "latency", "percents": [95.0]}},
      "total_requests": {"value_count": {"field": "_id"}}
    }
  }
}

1.2 Enhance DateHistogramResult to Handle Sub-Aggregations

File: src/main/java/io/indextables/tantivy4java/aggregation/DateHistogramResult.java

public class DateHistogramResult extends AggregationResult {
    private final List<DateHistogramBucket> buckets;

    // NEW: Enhanced bucket class with sub-aggregation support
    public static class DateHistogramBucket {
        private final String key;               // ISO timestamp or formatted string
        private final long keyAsLong;           // Unix timestamp in milliseconds
        private final long docCount;
        private final Map<String, AggregationResult> subAggregations;  // NEW

        public DateHistogramBucket(String key, long keyAsLong, long docCount) {
            this(key, keyAsLong, docCount, new HashMap<>());
        }

        // NEW: Constructor with sub-aggregations
        public DateHistogramBucket(String key, long keyAsLong, long docCount,
                                  Map<String, AggregationResult> subAggregations) {
            this.key = key;
            this.keyAsLong = keyAsLong;
            this.docCount = docCount;
            this.subAggregations = subAggregations;
        }

        // Existing getters...

        // NEW: Sub-aggregation accessors
        public Map<String, AggregationResult> getSubAggregations() {
            return subAggregations;
        }

        public AggregationResult getSubAggregation(String name) {
            return subAggregations.get(name);
        }

        public boolean hasSubAggregations() {
            return !subAggregations.isEmpty();
        }

        // NEW: Typed accessor helpers
        public SingleMetricResult getMetric(String name) {
            AggregationResult result = subAggregations.get(name);
            if (result instanceof AverageResult) {
                return ((AverageResult) result).getValue();
            } else if (result instanceof SumResult) {
                return ((SumResult) result).getValue();
            } else if (result instanceof MinResult) {
                return ((MinResult) result).getValue();
            } else if (result instanceof MaxResult) {
                return ((MaxResult) result).getValue();
            }
            return null;
        }
    }

    // ... existing methods
}

1.3 Create Missing Metric Aggregation Classes

Files to create/enhance:

  • PercentileAggregation.java - Support for percentile calculations
  • PercentilesResult.java - Multi-percentile results
  • CardinalityAggregation.java - COUNT(DISTINCT) support
  • ExtendedStatsAggregation.java - Variance, stddev, etc.

Example: PercentileAggregation.java

package io.indextables.tantivy4java.aggregation;

public class PercentileAggregation extends SplitAggregation {
    private final String fieldName;
    private final double[] percentiles;

    /**
     * Creates a percentile aggregation for a single percentile.
     */
    public PercentileAggregation(String fieldName, double percentile) {
        this("percentile_" + fieldName, fieldName, new double[]{percentile});
    }

    /**
     * Creates a percentile aggregation for multiple percentiles.
     */
    public PercentileAggregation(String name, String fieldName, double[] percentiles) {
        super(name);
        this.fieldName = fieldName;
        this.percentiles = percentiles;

        // Validate percentiles
        for (double p : percentiles) {
            if (p < 0.0 || p > 100.0) {
                throw new IllegalArgumentException("Percentile must be between 0 and 100");
            }
        }
    }

    @Override
    public String getFieldName() {
        return fieldName;
    }

    @Override
    public String getAggregationType() {
        return "percentiles";
    }

    @Override
    public String toAggregationJson() {
        StringBuilder json = new StringBuilder();
        json.append("{\"percentiles\": {\"field\": \"").append(fieldName).append("\"");
        json.append(", \"percents\": [");
        for (int i = 0; i < percentiles.length; i++) {
            if (i > 0) json.append(", ");
            json.append(percentiles[i]);
        }
        json.append("]}}");
        return json.toString();
    }
}

Phase 2: Enhance Native JNI Layer (Estimated: 5-7 days)

2.1 Update Rust Aggregation Deserialization

File: native/src/searcher.rs

// NEW: Enhanced date histogram result parsing
fn parse_date_histogram_result(
    env: &mut JNIEnv,
    bucket_result: &tantivy::aggregation::agg_result::BucketResult,
) -> Result<jobject, String> {
    use tantivy::aggregation::agg_result::BucketResult;

    match bucket_result {
        BucketResult::Histogram { buckets } => {
            // Create ArrayList<DateHistogramBucket>
            let bucket_list = create_java_arraylist(env)?;

            for (key, bucket_entry) in buckets.entries.iter() {
                // Extract bucket key (timestamp)
                let key_as_long = match key {
                    Key::F64(f) => *f as i64,
                    Key::I64(i) => *i,
                    _ => return Err("Invalid date histogram key type".to_string()),
                };

                // Format key as ISO8601 string
                let key_as_string = format_timestamp_iso8601(key_as_long)?;

                // Extract document count
                let doc_count = bucket_entry.doc_count;

                // NEW: Parse sub-aggregations for this bucket
                let sub_aggs_map = if !bucket_entry.sub_aggregation.0.is_empty() {
                    parse_sub_aggregations(env, &bucket_entry.sub_aggregation)?
                } else {
                    create_empty_java_hashmap(env)?
                };

                // Create DateHistogramBucket Java object
                let bucket_class = env.find_class(
                    "io/indextables/tantivy4java/aggregation/DateHistogramResult$DateHistogramBucket"
                )?;

                let bucket_obj = env.new_object(
                    bucket_class,
                    "(Ljava/lang/String;JJLjava/util/Map;)V",
                    &[
                        JValue::Object(&env.new_string(&key_as_string)?),
                        JValue::Long(key_as_long),
                        JValue::Long(doc_count as i64),
                        JValue::Object(&sub_aggs_map),
                    ],
                )?;

                // Add to list
                env.call_method(&bucket_list, "add", "(Ljava/lang/Object;)Z", &[JValue::Object(&bucket_obj)])?;
            }

            // Create DateHistogramResult object
            let result_class = env.find_class("io/indextables/tantivy4java/aggregation/DateHistogramResult")?;
            let result_obj = env.new_object(
                result_class,
                "(Ljava/util/List;)V",
                &[JValue::Object(&bucket_list)],
            )?;

            Ok(result_obj.into_raw())
        }
        _ => Err("Expected histogram bucket result".to_string()),
    }
}

// NEW: Parse nested sub-aggregation results
fn parse_sub_aggregations(
    env: &mut JNIEnv,
    agg_results: &tantivy::aggregation::agg_result::AggregationResults,
) -> Result<jobject, String> {
    let hashmap = create_java_hashmap(env)?;

    for (name, result) in agg_results.0.iter() {
        let java_result = match result {
            AggregationResult::MetricResult(metric) => {
                parse_metric_result(env, metric)?
            }
            AggregationResult::BucketResult(bucket) => {
                // Nested buckets (rare but possible)
                parse_bucket_result(env, bucket)?
            }
        };

        let name_jstring = env.new_string(name)?;
        env.call_method(
            &hashmap,
            "put",
            "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
            &[JValue::Object(&name_jstring), JValue::Object(&java_result)],
        )?;
    }

    Ok(hashmap.into_raw())
}

// NEW: Parse metric aggregation results
fn parse_metric_result(
    env: &mut JNIEnv,
    metric: &tantivy::aggregation::agg_result::MetricResult,
) -> Result<jobject, String> {
    use tantivy::aggregation::agg_result::MetricResult;

    match metric {
        MetricResult::Average(single) => {
            create_java_average_result(env, single.value)
        }
        MetricResult::Sum(single) => {
            create_java_sum_result(env, single.value)
        }
        MetricResult::Min(single) => {
            create_java_min_result(env, single.value)
        }
        MetricResult::Max(single) => {
            create_java_max_result(env, single.value)
        }
        MetricResult::Count(single) => {
            create_java_count_result(env, single.value as u64)
        }
        MetricResult::Stats(stats) => {
            create_java_stats_result(env, stats)
        }
        MetricResult::Percentiles(percentiles) => {
            create_java_percentiles_result(env, percentiles)
        }
        MetricResult::Cardinality(single) => {
            create_java_cardinality_result(env, single.value as u64)
        }
        _ => Err("Unsupported metric type".to_string()),
    }
}

// Helper functions for creating Java result objects
fn create_java_average_result(env: &mut JNIEnv, value: f64) -> Result<jobject, String> {
    let class = env.find_class("io/indextables/tantivy4java/aggregation/AverageResult")?;
    let obj = env.new_object(class, "(D)V", &[JValue::Double(value)])?;
    Ok(obj.into_raw())
}

// ... similar helpers for Sum, Min, Max, Count, Stats, Percentiles, Cardinality

2.2 Add Timestamp Formatting Utilities

// NEW: Format Unix timestamp as ISO8601 string
fn format_timestamp_iso8601(timestamp_ms: i64) -> Result<String, String> {
    use chrono::{DateTime, Utc, NaiveDateTime};

    let seconds = timestamp_ms / 1000;
    let nanos = ((timestamp_ms % 1000) * 1_000_000) as u32;

    let naive = NaiveDateTime::from_timestamp_opt(seconds, nanos)
        .ok_or_else(|| "Invalid timestamp".to_string())?;

    let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc);
    Ok(datetime.to_rfc3339())
}

// NEW: Format with custom timezone
fn format_timestamp_with_tz(timestamp_ms: i64, timezone: &str) -> Result<String, String> {
    use chrono_tz::Tz;

    let tz: Tz = timezone.parse()
        .map_err(|_| format!("Invalid timezone: {}", timezone))?;

    let seconds = timestamp_ms / 1000;
    let nanos = ((timestamp_ms % 1000) * 1_000_000) as u32;

    let naive = chrono::NaiveDateTime::from_timestamp_opt(seconds, nanos)
        .ok_or_else(|| "Invalid timestamp".to_string())?;

    let utc_time = chrono::DateTime::<chrono::Utc>::from_utc(naive, chrono::Utc);
    let local_time = utc_time.with_timezone(&tz);

    Ok(local_time.to_rfc3339())
}

2.3 Update Cargo.toml Dependencies

[dependencies]
# Existing dependencies...
chrono = "0.4"
chrono-tz = "0.8"

Phase 3: Integration & Testing (Estimated: 3-5 days)

3.1 Comprehensive Test Suite

File: src/test/java/io/indextables/tantivy4java/aggregation/DateHistogramSubAggregationTest.java

public class DateHistogramSubAggregationTest {

    @Test
    public void testDateHistogramWithSingleMetric() throws Exception {
        // Create test data with timestamps and latency values
        // ...

        // Create date histogram with avg sub-aggregation
        DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
            .setFixedInterval("1h")
            .setTimeZone("UTC")
            .addSubAggregation("avg_latency", new AverageAggregation("latency"));

        SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
        DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");

        assertNotNull(histResult);
        assertFalse(histResult.getBuckets().isEmpty());

        for (DateHistogramBucket bucket : histResult.getBuckets()) {
            assertTrue(bucket.hasSubAggregations());
            AggregationResult avgResult = bucket.getSubAggregation("avg_latency");
            assertNotNull(avgResult);
            assertTrue(avgResult instanceof AverageResult);

            double avgLatency = ((AverageResult) avgResult).getValue();
            assertTrue(avgLatency > 0);
        }
    }

    @Test
    public void testDateHistogramWithMultipleMetrics() throws Exception {
        // Create date histogram with multiple sub-aggregations
        DateHistogramAggregation histogram = new DateHistogramAggregation("daily", "timestamp")
            .setFixedInterval("1d")
            .setTimeZone("America/New_York")
            .addSubAggregation("avg_latency", new AverageAggregation("latency"))
            .addSubAggregation("min_latency", new MinAggregation("latency"))
            .addSubAggregation("max_latency", new MaxAggregation("latency"))
            .addSubAggregation("p95_latency", new PercentileAggregation("latency", 95.0))
            .addSubAggregation("total_requests", new CountAggregation());

        SearchResult result = searcher.search(query, 10, Map.of("daily", histogram));
        DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("daily");

        for (DateHistogramBucket bucket : histResult.getBuckets()) {
            assertEquals(5, bucket.getSubAggregations().size());

            // Validate each metric
            assertNotNull(bucket.getSubAggregation("avg_latency"));
            assertNotNull(bucket.getSubAggregation("min_latency"));
            assertNotNull(bucket.getSubAggregation("max_latency"));
            assertNotNull(bucket.getSubAggregation("p95_latency"));
            assertNotNull(bucket.getSubAggregation("total_requests"));
        }
    }

    @Test
    public void testDateHistogramWithTimezoneConversion() throws Exception {
        // Test that timezone parameter correctly shifts bucket boundaries
        DateHistogramAggregation utcHistogram = new DateHistogramAggregation("utc", "timestamp")
            .setFixedInterval("1d")
            .setTimeZone("UTC");

        DateHistogramAggregation etHistogram = new DateHistogramAggregation("et", "timestamp")
            .setFixedInterval("1d")
            .setTimeZone("America/New_York");

        Map<String, SplitAggregation> aggs = Map.of(
            "utc", utcHistogram,
            "et", etHistogram
        );

        SearchResult result = searcher.search(query, 10, aggs);

        DateHistogramResult utcResult = (DateHistogramResult) result.getAggregation("utc");
        DateHistogramResult etResult = (DateHistogramResult) result.getAggregation("et");

        // Verify bucket keys are different due to timezone shift
        assertNotEquals(
            utcResult.getBuckets().get(0).getKey(),
            etResult.getBuckets().get(0).getKey()
        );
    }

    @Test
    public void testDateHistogramWithExtendedBounds() throws Exception {
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (30L * 24 * 60 * 60 * 1000); // 30 days later

        DateHistogramAggregation histogram = new DateHistogramAggregation("daily", "timestamp")
            .setFixedInterval("1d")
            .setExtendedBounds(startTime, endTime);

        SearchResult result = searcher.search(query, 10, Map.of("daily", histogram));
        DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("daily");

        // Should have 30 buckets even if some have 0 documents
        assertEquals(30, histResult.getBuckets().size());
    }

    @Test
    public void testDateHistogramJsonSerialization() {
        DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
            .setFixedInterval("1h")
            .setTimeZone("UTC")
            .setOffset("-4h")
            .addSubAggregation("avg_value", new AverageAggregation("value"))
            .addSubAggregation("count", new CountAggregation());

        String json = histogram.toAggregationJson();

        // Verify JSON structure
        assertTrue(json.contains("\"date_histogram\""));
        assertTrue(json.contains("\"field\": \"timestamp\""));
        assertTrue(json.contains("\"fixed_interval\": \"1h\""));
        assertTrue(json.contains("\"time_zone\": \"UTC\""));
        assertTrue(json.contains("\"offset\": \"-4h\""));
        assertTrue(json.contains("\"aggs\""));
        assertTrue(json.contains("\"avg_value\""));
        assertTrue(json.contains("\"count\""));
    }
}

3.2 Integration Test with Real S3 Splits

@Test
public void testDateHistogramOnS3Split() throws Exception {
    // Use SplitSearcher with real split file
    SplitCacheManager.CacheConfig config = new SplitCacheManager.CacheConfig("test-cache")
        .withMaxCacheSize(100_000_000)
        .withAwsCredentials(accessKey, secretKey)
        .withAwsRegion("us-east-1");

    try (SplitCacheManager cacheManager = SplitCacheManager.getInstance(config)) {
        try (SplitSearcher searcher = cacheManager.createSplitSearcher("s3://bucket/test.split")) {

            DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "event_timestamp")
                .setFixedInterval("1h")
                .addSubAggregation("avg_latency", new AverageAggregation("latency"))
                .addSubAggregation("p95_latency", new PercentileAggregation("latency", 95.0));

            SplitQuery query = new SplitMatchAllQuery();
            SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));

            DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");
            assertNotNull(histResult);
            assertTrue(histResult.getBuckets().size() > 0);

            // Verify sub-aggregations work correctly
            for (DateHistogramBucket bucket : histResult.getBuckets()) {
                if (bucket.getDocCount() > 0) {
                    assertTrue(bucket.hasSubAggregations());
                    assertNotNull(bucket.getSubAggregation("avg_latency"));
                    assertNotNull(bucket.getSubAggregation("p95_latency"));
                }
            }
        }
    }
}

3.3 Performance Benchmark Tests

@Test
public void benchmarkDateHistogramPerformance() throws Exception {
    // Compare performance: date histogram vs Spark-side aggregation

    // 1. Date histogram aggregation (pushed down)
    long startPushdown = System.currentTimeMillis();
    DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
        .setFixedInterval("1h")
        .addSubAggregation("avg_latency", new AverageAggregation("latency"))
        .addSubAggregation("max_latency", new MaxAggregation("latency"))
        .addSubAggregation("count", new CountAggregation());

    SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
    long endPushdown = System.currentTimeMillis();

    System.out.println("Pushdown time: " + (endPushdown - startPushdown) + "ms");

    // 2. Manual aggregation (retrieve all docs, aggregate in Java)
    long startManual = System.currentTimeMillis();
    SearchResult allDocs = searcher.search(new SplitMatchAllQuery(), 1_000_000);
    Map<Long, List<Double>> hourlyBuckets = new HashMap<>();
    // ... manual bucketing and metric calculation
    long endManual = System.currentTimeMillis();

    System.out.println("Manual time: " + (endManual - startManual) + "ms");
    System.out.println("Speedup: " + ((double)(endManual - startManual) / (endPushdown - startPushdown)) + "x");

    // Assert significant speedup
    assertTrue((endManual - startManual) > (endPushdown - startPushdown) * 5);
}

Phase 4: Documentation & Examples (Estimated: 2-3 days)

4.1 Update README.md

### Time-Series Histogram Aggregations

Analyze time-series data with date histogram aggregations and nested metric calculations:

```java
// Hourly performance metrics
DateHistogramAggregation hourlyMetrics = new DateHistogramAggregation("hourly", "timestamp")
    .setFixedInterval("1h")
    .setTimeZone("America/New_York")
    .addSubAggregation("avg_response_time", new AverageAggregation("response_time"))
    .addSubAggregation("p95_response_time", new PercentileAggregation("response_time", 95.0))
    .addSubAggregation("p99_response_time", new PercentileAggregation("response_time", 99.0))
    .addSubAggregation("total_requests", new CountAggregation())
    .addSubAggregation("unique_users", new CardinalityAggregation("user_id"));

SearchResult result = searcher.search(query, 10, Map.of("hourly", hourlyMetrics));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");

for (DateHistogramBucket bucket : histResult.getBuckets()) {
    System.out.println("Hour: " + bucket.getKey());
    System.out.println("  Requests: " + bucket.getDocCount());

    AverageResult avg = (AverageResult) bucket.getSubAggregation("avg_response_time");
    System.out.println("  Avg Response: " + avg.getValue() + "ms");

    PercentilesResult p95 = (PercentilesResult) bucket.getSubAggregation("p95_response_time");
    System.out.println("  P95 Response: " + p95.getValue(95.0) + "ms");
}

Supported Intervals:

  • "1s", "30s" - Seconds
  • "1m", "5m", "15m" - Minutes
  • "1h", "6h", "12h" - Hours
  • "1d", "7d" - Days
  • "1w" - Week
  • "1M" - Month
  • "1y" - Year

Supported Sub-Aggregations:

  • AverageAggregation - Average value
  • SumAggregation - Sum of values
  • MinAggregation - Minimum value
  • MaxAggregation - Maximum value
  • CountAggregation - Document count
  • PercentileAggregation - Percentile values (p50, p95, p99, etc.)
  • StatsAggregation - Complete statistics (min, max, avg, sum, count)
  • CardinalityAggregation - Unique value count (approximate)

#### 4.2 Create Comprehensive Guide

**File:** `docs/TIME_SERIES_AGGREGATION_GUIDE.md`

```markdown
# Time-Series Aggregation Guide

## Overview
Date histogram aggregations enable efficient time-series analytics by bucketing documents into time intervals and computing metrics for each bucket.

## Basic Usage

### Simple Hourly Histogram
```java
DateHistogramAggregation hourly = new DateHistogramAggregation("hourly", "timestamp")
    .setFixedInterval("1h");

SearchResult result = searcher.search(query, 10, Map.of("hourly", hourly));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");

for (DateHistogramBucket bucket : histResult.getBuckets()) {
    System.out.println(bucket.getKey() + ": " + bucket.getDocCount() + " events");
}

Advanced Features

Timezone Support

// Business hours in Eastern Time
DateHistogramAggregation businessHours = new DateHistogramAggregation("hours", "timestamp")
    .setFixedInterval("1h")
    .setTimeZone("America/New_York");

Time Offset

// Shift bucket boundaries by 30 minutes
DateHistogramAggregation shifted = new DateHistogramAggregation("shifted", "timestamp")
    .setFixedInterval("1h")
    .setOffset("30m");

Extended Bounds (Fill Empty Buckets)

long startTime = /* ... */;
long endTime = /* ... */;

DateHistogramAggregation complete = new DateHistogramAggregation("complete", "timestamp")
    .setFixedInterval("1d")
    .setExtendedBounds(startTime, endTime);
// Returns buckets for entire range, even with 0 documents

Sub-Aggregations

Multiple Metrics Per Bucket

DateHistogramAggregation comprehensive = new DateHistogramAggregation("analysis", "timestamp")
    .setFixedInterval("1h")
    .addSubAggregation("avg_latency", new AverageAggregation("latency"))
    .addSubAggregation("min_latency", new MinAggregation("latency"))
    .addSubAggregation("max_latency", new MaxAggregation("latency"))
    .addSubAggregation("sum_bytes", new SumAggregation("bytes_transferred"))
    .addSubAggregation("unique_users", new CardinalityAggregation("user_id"));

SearchResult result = searcher.search(query, 10, Map.of("analysis", comprehensive));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("analysis");

for (DateHistogramBucket bucket : histResult.getBuckets()) {
    System.out.println("Time: " + bucket.getKey());
    System.out.println("Events: " + bucket.getDocCount());

    // Access metrics
    AverageResult avg = (AverageResult) bucket.getSubAggregation("avg_latency");
    System.out.println("Avg Latency: " + avg.getValue());

    MinResult min = (MinResult) bucket.getSubAggregation("min_latency");
    System.out.println("Min Latency: " + min.getValue());

    MaxResult max = (MaxResult) bucket.getSubAggregation("max_latency");
    System.out.println("Max Latency: " + max.getValue());

    SumResult sum = (SumResult) bucket.getSubAggregation("sum_bytes");
    System.out.println("Total Bytes: " + sum.getValue());

    CardinalityResult cardinality = (CardinalityResult) bucket.getSubAggregation("unique_users");
    System.out.println("Unique Users: " + cardinality.getValue());
}

Percentiles for SLA Monitoring

DateHistogramAggregation slaMonitoring = new DateHistogramAggregation("sla", "timestamp")
    .setFixedInterval("5m")
    .addSubAggregation("p50", new PercentileAggregation("response_time", 50.0))
    .addSubAggregation("p95", new PercentileAggregation("response_time", 95.0))
    .addSubAggregation("p99", new PercentileAggregation("response_time", 99.0))
    .addSubAggregation("p999", new PercentileAggregation("response_time", 99.9));

SearchResult result = searcher.search(query, 10, Map.of("sla", slaMonitoring));
DateHistogramResult slaResult = (DateHistogramResult) result.getAggregation("sla");

for (DateHistogramBucket bucket : slaResult.getBuckets()) {
    PercentilesResult p95 = (PercentilesResult) bucket.getSubAggregation("p95");
    PercentilesResult p99 = (PercentilesResult) bucket.getSubAggregation("p99");

    if (p95.getValue(95.0) > 200 || p99.getValue(99.0) > 500) {
        System.out.println("SLA VIOLATION at " + bucket.getKey());
        System.out.println("  P95: " + p95.getValue(95.0) + "ms");
        System.out.println("  P99: " + p99.getValue(99.0) + "ms");
    }
}

Real-World Use Cases

1. Application Performance Monitoring

// Monitor application latency by hour
DateHistogramAggregation apmHourly = new DateHistogramAggregation("apm_hourly", "request_timestamp")
    .setFixedInterval("1h")
    .setTimeZone("UTC")
    .addSubAggregation("avg_latency", new AverageAggregation("latency_ms"))
    .addSubAggregation("p95_latency", new PercentileAggregation("latency_ms", 95.0))
    .addSubAggregation("p99_latency", new PercentileAggregation("latency_ms", 99.0))
    .addSubAggregation("error_count", new CountAggregation())
    .addSubAggregation("unique_users", new CardinalityAggregation("user_id"));

2. Log Analytics Dashboard

// Analyze log volume and error rates by 5-minute intervals
DateHistogramAggregation logAnalytics = new DateHistogramAggregation("logs_5m", "log_timestamp")
    .setFixedInterval("5m")
    .addSubAggregation("log_count", new CountAggregation())
    .addSubAggregation("avg_severity", new AverageAggregation("severity_level"))
    .addSubAggregation("unique_services", new CardinalityAggregation("service_name"));

3. Business Metrics Tracking

// Track sales metrics by day
DateHistogramAggregation dailySales = new DateHistogramAggregation("daily_sales", "order_timestamp")
    .setFixedInterval("1d")
    .setTimeZone("America/Los_Angeles")
    .addSubAggregation("total_revenue", new SumAggregation("order_amount"))
    .addSubAggregation("avg_order_value", new AverageAggregation("order_amount"))
    .addSubAggregation("order_count", new CountAggregation())
    .addSubAggregation("unique_customers", new CardinalityAggregation("customer_id"));

Performance Considerations

Field Configuration

Date histogram requires fields to be configured as fast fields:

// During indexing, mark timestamp fields as fast
schema.addDateField("timestamp", true, true, true);  // stored, indexed, fast
schema.addIntegerField("latency", true, true, true); // for metrics

Query Optimization

  • Use filters to reduce time range before aggregation
  • Limit bucket count with appropriate intervals
  • Cache frequently used queries

Expected Performance

  • 10-100x faster than fetching all documents and aggregating in application code
  • Minimal memory usage - results are bucketed aggregates, not raw documents
  • Scalable - performance independent of document count for aggregation operations

Migration from Manual Aggregation

Before (Manual):

// Fetch all documents
SearchResult all = searcher.search(query, 1_000_000);
Map<Long, List<Double>> buckets = new HashMap<>();

// Manual bucketing
for (Hit hit : all.getHits()) {
    Document doc = searcher.doc(hit.getDocAddress());
    long timestamp = (long) doc.getFirst("timestamp");
    double latency = (double) doc.getFirst("latency");

    long hourBucket = timestamp / (60 * 60 * 1000);
    buckets.computeIfAbsent(hourBucket, k -> new ArrayList<>()).add(latency);
}

// Manual metric calculation
for (Map.Entry<Long, List<Double>> entry : buckets.entrySet()) {
    double avg = entry.getValue().stream().mapToDouble(Double::doubleValue).average().orElse(0);
    System.out.println("Hour: " + entry.getKey() + ", Avg: " + avg);
}

After (Optimized):

// Single pushdown query
DateHistogramAggregation histogram = new DateHistogramAggregation("hourly", "timestamp")
    .setFixedInterval("1h")
    .addSubAggregation("avg_latency", new AverageAggregation("latency"));

SearchResult result = searcher.search(query, 10, Map.of("hourly", histogram));
DateHistogramResult histResult = (DateHistogramResult) result.getAggregation("hourly");

for (DateHistogramBucket bucket : histResult.getBuckets()) {
    AverageResult avg = (AverageResult) bucket.getSubAggregation("avg_latency");
    System.out.println("Hour: " + bucket.getKey() + ", Avg: " + avg.getValue());
}

Benefits:

  • 10-100x performance improvement
  • Reduced memory usage
  • Cleaner, more maintainable code
  • Native Tantivy performance

## Implementation Recommendation

Based on analysis of Quickwit and tantivy4java architecture:

### ✅ **RECOMMENDED APPROACH: Full Sub-Aggregation Support**

**Rationale:**
1. **Tantivy already supports it** - The underlying Tantivy library has complete date histogram + sub-aggregation support
2. **Pattern already exists** - TermsAggregation already implements sub-aggregation pattern that can be replicated
3. **Critical for use case** - Time-series analytics without metrics (AVG, MIN, MAX, P95, etc.) has limited value
4. **Minimal additional complexity** - Once date histogram buckets work, adding sub-aggregations is straightforward
5. **Future-proof** - Matches Elasticsearch/Quickwit patterns, making future enhancements easier

### 📊 **Implementation Feasibility Assessment**

| Component | Complexity | Estimated Effort | Risk |
|-----------|-----------|------------------|------|
| Java API Enhancement | Low | 3-5 days | Low |
| Metric Aggregation Classes | Low | 2-3 days | Low |
| Native JNI Date Histogram Parsing | Medium | 4-5 days | Medium |
| Native Sub-Aggregation Parsing | Medium | 3-4 days | Medium |
| Result Deserialization | Medium-High | 3-4 days | Medium |
| Testing & Validation | Medium | 3-5 days | Low |
| Documentation | Low | 2-3 days | Low |
| **TOTAL** | **Medium** | **20-29 days (4-6 weeks)** | **Medium** |

### 🎯 **Phased Rollout Strategy**

**Phase 1 (MVP): Date Histogram with Basic Metrics (2-3 weeks)**
- Date histogram buckets (count only)
- Single metric sub-aggregations (AVG, MIN, MAX, SUM, COUNT)
- No percentiles or cardinality yet
- Basic timezone support

**Phase 2 (Full Feature): Advanced Metrics (1-2 weeks)**
- Percentile aggregations (P50, P95, P99)
- Cardinality aggregations (COUNT DISTINCT)
- Stats aggregation (comprehensive statistics)
- Extended bounds support
- Full timezone and offset support

**Phase 3 (Optimization): Performance & Polish (1 week)**
- Performance optimization
- Comprehensive test suite
- Documentation and examples
- Production hardening

## Expected Benefits

### Performance
- **10-100x speedup** compared to application-side aggregation
- **Minimal data transfer** - Only aggregated results returned, not raw documents
- **Memory efficient** - Aggregation happens in Tantivy, not JVM heap
- **Scalable** - Performance independent of document count

### Use Cases Enabled
- Real-time application performance monitoring dashboards
- Log analytics with time-series metrics
- Business intelligence reporting
- SLA monitoring and alerting
- Anomaly detection in time-series data

### IndexTables4Spark Integration
Once implemented in tantivy4java, IndexTables4Spark can leverage this for:
- Spark SQL time-series queries with aggregate pushdown
- 10-100x performance improvement for temporal analytics
- Support for standard SQL date functions (date_trunc, date_format, etc.)
- Multi-dimensional time-series analysis

## Acceptance Criteria

- [ ] DateHistogramAggregation supports `addSubAggregation()` method
- [ ] DateHistogramResult.Bucket contains Map of sub-aggregation results
- [ ] Native JNI layer properly deserializes date histogram buckets with metrics
- [ ] Support for all basic metric aggregations (AVG, MIN, MAX, SUM, COUNT)
- [ ] Support for percentile aggregations (P50, P95, P99)
- [ ] Support for cardinality aggregation (COUNT DISTINCT)
- [ ] Timezone parameter works correctly (UTC, America/New_York, etc.)
- [ ] Fixed interval support (1s, 1m, 1h, 1d, 1w, 1M, 1y)
- [ ] Offset parameter works correctly
- [ ] Extended bounds parameter works correctly
- [ ] Comprehensive test suite (unit + integration tests)
- [ ] Documentation updated with examples
- [ ] Performance benchmarks show 10x+ improvement vs manual aggregation
- [ ] Works with SplitSearcher for distributed split-based searches
- [ ] Works with S3-stored splits

## Dependencies

### External Dependencies
- **Tantivy** (via Quickwit fork) - Already has full date histogram support ✅
- **chrono** - Rust datetime library for timestamp formatting
- **chrono-tz** - Rust timezone library

### Internal Dependencies
- Existing aggregation infrastructure in tantivy4java
- SplitSearcher and aggregation result parsing
- JNI bridge for complex nested results

## References

- **IndexTables4Spark Issue:** https://github.com/indextables/indextables_spark/issues/13
- **Quickwit Aggregations:** `quickwit/quickwit-query/src/aggregations.rs`
- **Tantivy Aggregations:** https://github.com/quickwit-oss/tantivy (aggregation module)
- **Existing TermsAggregation Pattern:** `tantivy4java/src/main/java/io/indextables/tantivy4java/aggregation/TermsAggregation.java`

## Priority

**HIGH** - This feature is a critical prerequisite for IndexTables4Spark time-series analytics capabilities and provides significant performance improvements for common use cases.

## Estimated Total Effort

**4-6 weeks** (20-29 working days) for complete implementation including:
- Java API enhancements
- Native JNI layer updates
- Comprehensive testing
- Documentation and examples
- Performance validation

With phased rollout, MVP can be delivered in **2-3 weeks**.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions