Skip to content

Conversation

@platinumhamburg
Copy link
Contributor

This commit introduces AggMode (Aggregation Mode) to control how the server handles data aggregation when writing to tables with aggregation merge engine.

Key changes:

  1. New AggMode enum with three modes:

    • AGGREGATE (default): Data is aggregated through server-side merge engine
    • OVERWRITE: Bypass merge engine, directly replace values (for undo recovery)
    • LOCAL_AGGREGATE: Reserved for future client-side pre-aggregation
  2. Client-side changes:

    • Upsert interface: Added aggregationMode(AggMode) method for fluent API
    • UpsertWriterImpl: Propagates aggMode through WriteRecord
    • KvWriteBatch: Validates aggMode consistency within batch
    • ClientRpcMessageUtils: Validates aggMode consistency across batches
    • WriteRecord: Added aggMode field for upsert/delete operations
  3. Server-side changes:

    • KvTablet: Pre-creates overwriteRowMerger for OVERWRITE mode
    • putAsLeader(): Selects appropriate RowMerger based on aggMode
    • Replica/ReplicaManager: Propagates aggMode through call chain
    • TabletService: Extracts aggMode from PutKvRequest
  4. Protocol changes:

    • FlussApi.proto: Added optional agg_mode field to PutKvRequest
  5. Test coverage:

    • KvTabletAggModeTest: 9 tests covering OVERWRITE mode scenarios
    • KvWriteBatchTest: 3 tests for aggMode consistency validation
    • ClientRpcMessageUtilsTest: 4 tests for multi-batch aggMode validation

This feature enables Flink connector to perform undo recovery by restoring exact historical values during checkpoint failover, bypassing the merge engine.

Purpose

Linked issue: close #2491 2491

Brief change log

Tests

API and Format

Documentation

This commit introduces AggMode (Aggregation Mode) to control how the server
handles data aggregation when writing to tables with aggregation merge engine.

Key changes:

1. New AggMode enum with three modes:
   - AGGREGATE (default): Data is aggregated through server-side merge engine
   - OVERWRITE: Bypass merge engine, directly replace values (for undo recovery)
   - LOCAL_AGGREGATE: Reserved for future client-side pre-aggregation

2. Client-side changes:
   - Upsert interface: Added aggregationMode(AggMode) method for fluent API
   - UpsertWriterImpl: Propagates aggMode through WriteRecord
   - KvWriteBatch: Validates aggMode consistency within batch
   - ClientRpcMessageUtils: Validates aggMode consistency across batches
   - WriteRecord: Added aggMode field for upsert/delete operations

3. Server-side changes:
   - KvTablet: Pre-creates overwriteRowMerger for OVERWRITE mode
   - putAsLeader(): Selects appropriate RowMerger based on aggMode
   - Replica/ReplicaManager: Propagates aggMode through call chain
   - TabletService: Extracts aggMode from PutKvRequest

4. Protocol changes:
   - FlussApi.proto: Added optional agg_mode field to PutKvRequest

5. Test coverage:
   - KvTabletAggModeTest: 9 tests covering OVERWRITE mode scenarios
   - KvWriteBatchTest: 3 tests for aggMode consistency validation
   - ClientRpcMessageUtilsTest: 4 tests for multi-batch aggMode validation

This feature enables Flink connector to perform undo recovery by restoring
exact historical values during checkpoint failover, bypassing the merge engine.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add AggMode support for undo recovery in aggregation tables

1 participant