From cda06974187239bb82af21d6af40abd415db5604 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Thu, 22 Jan 2026 16:35:02 +0800 Subject: [PATCH] [FLUSS] Add AggMode support for undo recovery in aggregation tables This commit introduces AggMode (Aggregation Mode) to control how the server handles data aggregation when writing to tables with aggregation merge engine. Key changes: 1. New AggMode enum with three modes: - AGGREGATE (default): Data is aggregated through server-side merge engine - OVERWRITE: Bypass merge engine, directly replace values (for undo recovery) - LOCAL_AGGREGATE: Reserved for future client-side pre-aggregation 2. Client-side changes: - Upsert interface: Added aggregationMode(AggMode) method for fluent API - UpsertWriterImpl: Propagates aggMode through WriteRecord - KvWriteBatch: Validates aggMode consistency within batch - ClientRpcMessageUtils: Validates aggMode consistency across batches - WriteRecord: Added aggMode field for upsert/delete operations 3. Server-side changes: - KvTablet: Pre-creates overwriteRowMerger for OVERWRITE mode - putAsLeader(): Selects appropriate RowMerger based on aggMode - Replica/ReplicaManager: Propagates aggMode through call chain - TabletService: Extracts aggMode from PutKvRequest 4. Protocol changes: - FlussApi.proto: Added optional agg_mode field to PutKvRequest 5. Test coverage: - KvTabletAggModeTest: 9 tests covering OVERWRITE mode scenarios - KvWriteBatchTest: 3 tests for aggMode consistency validation - ClientRpcMessageUtilsTest: 4 tests for multi-batch aggMode validation This feature enables Flink connector to perform undo recovery by restoring exact historical values during checkpoint failover, bypassing the merge engine. --- .../client/table/writer/TableUpsert.java | 19 +- .../fluss/client/table/writer/Upsert.java | 41 +- .../client/table/writer/UpsertWriterImpl.java | 30 +- .../client/utils/ClientRpcMessageUtils.java | 21 +- .../fluss/client/write/KvWriteBatch.java | 19 + .../fluss/client/write/RecordAccumulator.java | 1 + .../apache/fluss/client/write/WriteBatch.java | 10 + .../fluss/client/write/WriteRecord.java | 79 ++- .../utils/ClientRpcMessageUtilsTest.java | 144 +++++ .../fluss/client/write/KvWriteBatchTest.java | 82 +++ .../apache/fluss/rpc/protocol/AggMode.java | 123 ++++ fluss-rpc/src/main/proto/FlussApi.proto | 8 + .../org/apache/fluss/server/kv/KvTablet.java | 38 +- .../apache/fluss/server/replica/Replica.java | 8 +- .../fluss/server/replica/ReplicaManager.java | 29 +- .../fluss/server/tablet/TabletService.java | 5 + .../fluss/server/kv/KvTabletAggModeTest.java | 557 ++++++++++++++++++ .../fluss/server/replica/ReplicaTest.java | 5 +- 18 files changed, 1191 insertions(+), 28 deletions(-) create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java index fe865eac43..9eea33e5c2 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java @@ -20,6 +20,7 @@ import org.apache.fluss.client.write.WriterClient; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; @@ -32,22 +33,24 @@ public class TableUpsert implements Upsert { private final TablePath tablePath; private final TableInfo tableInfo; private final WriterClient writerClient; - private final @Nullable int[] targetColumns; + private final AggMode aggMode; public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) { - this(tablePath, tableInfo, writerClient, null); + this(tablePath, tableInfo, writerClient, null, AggMode.AGGREGATE); } private TableUpsert( TablePath tablePath, TableInfo tableInfo, WriterClient writerClient, - @Nullable int[] targetColumns) { + @Nullable int[] targetColumns, + AggMode aggMode) { this.tablePath = tablePath; this.tableInfo = tableInfo; this.writerClient = writerClient; this.targetColumns = targetColumns; + this.aggMode = aggMode; } @Override @@ -68,7 +71,7 @@ public Upsert partialUpdate(@Nullable int[] targetColumns) { } } } - return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns); + return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns, this.aggMode); } @Override @@ -91,9 +94,15 @@ public Upsert partialUpdate(String... targetColumnNames) { return partialUpdate(targetColumns); } + @Override + public Upsert aggregationMode(AggMode mode) { + checkNotNull(mode, "aggregation mode"); + return new TableUpsert(tablePath, tableInfo, writerClient, this.targetColumns, mode); + } + @Override public UpsertWriter createWriter() { - return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient); + return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient, aggMode); } @Override diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java index 0843437fee..becf87b3d6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java @@ -18,6 +18,7 @@ package org.apache.fluss.client.table.writer; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.rpc.protocol.AggMode; import javax.annotation.Nullable; @@ -26,7 +27,8 @@ * Table. * *

{@link Upsert} objects are immutable and can be shared between threads. Refinement methods, - * like {@link #partialUpdate}, create new Upsert instances. + * like {@link #partialUpdate(int[])} and {@link #aggregationMode(AggMode)}, create new Upsert + * instances. * * @since 0.6 */ @@ -56,9 +58,44 @@ public interface Upsert { */ Upsert partialUpdate(String... targetColumnNames); + /** + * Specify aggregation mode for the UpsertWriter and returns a new Upsert instance. + * + *

This method controls how the created UpsertWriter handles data aggregation: + * + *

+ * + *

Example usage: + * + *

{@code
+     * // Normal aggregation mode (default)
+     * UpsertWriter normalWriter = table.newUpsert()
+     *     .aggregationMode(AggMode.AGGREGATE)
+     *     .createWriter();
+     *
+     * // Overwrite mode for undo recovery
+     * UpsertWriter undoWriter = table.newUpsert()
+     *     .aggregationMode(AggMode.OVERWRITE)
+     *     .createWriter();
+     * }
+ * + * @param mode the aggregation mode + * @return a new Upsert instance with the specified aggregation mode + * @since 0.9 + */ + Upsert aggregationMode(AggMode mode); + /** * Create a new {@link UpsertWriter} using {@code InternalRow} with the optional {@link - * #partialUpdate(String...)} information to upsert and delete data to a Primary Key Table. + * #partialUpdate(String...)} and {@link #aggregationMode(AggMode)} information to upsert and + * delete data to a Primary Key Table. */ UpsertWriter createWriter(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java index 20e5bcc5e5..b7f3e20d20 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java @@ -31,6 +31,7 @@ import org.apache.fluss.row.encode.KeyEncoder; import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; @@ -56,11 +57,25 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { private final FieldGetter[] fieldGetters; private final TableInfo tableInfo; + /** + * The aggregation mode for this writer. This controls how the server handles data aggregation. + */ + private final AggMode aggMode; + UpsertWriterImpl( TablePath tablePath, TableInfo tableInfo, @Nullable int[] partialUpdateColumns, WriterClient writerClient) { + this(tablePath, tableInfo, partialUpdateColumns, writerClient, AggMode.AGGREGATE); + } + + UpsertWriterImpl( + TablePath tablePath, + TableInfo tableInfo, + @Nullable int[] partialUpdateColumns, + WriterClient writerClient, + AggMode aggMode) { super(tablePath, tableInfo, writerClient); RowType rowType = tableInfo.getRowType(); sanityCheck( @@ -83,7 +98,16 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat); this.rowEncoder = RowEncoder.create(kvFormat, rowType); this.fieldGetters = InternalRow.createFieldGetters(rowType); + this.tableInfo = tableInfo; + + // LOCAL_AGGREGATE is reserved for future implementation + if (aggMode == AggMode.LOCAL_AGGREGATE) { + throw new UnsupportedOperationException( + "LOCAL_AGGREGATE mode is not yet supported. " + + "Please use AGGREGATE or OVERWRITE mode."); + } + this.aggMode = aggMode; } private static void sanityCheck( @@ -168,7 +192,8 @@ public CompletableFuture upsert(InternalRow row) { key, bucketKey, writeFormat, - targetColumns); + targetColumns, + aggMode); return send(record).thenApply(ignored -> UPSERT_SUCCESS); } @@ -191,7 +216,8 @@ public CompletableFuture delete(InternalRow row) { key, bucketKey, writeFormat, - targetColumns); + targetColumns, + aggMode); return send(record).thenApply(ignored -> DELETE_SUCCESS); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index f6b37f5d12..b82eb6ca15 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -79,6 +79,7 @@ import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; @@ -138,11 +139,12 @@ public static PutKvRequest makePutKvRequest( .setTimeoutMs(maxRequestTimeoutMs); // check the target columns in the batch list should be the same. If not same, // we throw exception directly currently. - int[] targetColumns = - ((KvWriteBatch) readyWriteBatches.get(0).writeBatch()).getTargetColumns(); + KvWriteBatch firstBatch = (KvWriteBatch) readyWriteBatches.get(0).writeBatch(); + int[] targetColumns = firstBatch.getTargetColumns(); + AggMode aggMode = firstBatch.getAggMode(); for (int i = 1; i < readyWriteBatches.size(); i++) { - int[] currentBatchTargetColumns = - ((KvWriteBatch) readyWriteBatches.get(i).writeBatch()).getTargetColumns(); + KvWriteBatch currentBatch = (KvWriteBatch) readyWriteBatches.get(i).writeBatch(); + int[] currentBatchTargetColumns = currentBatch.getTargetColumns(); if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) { throw new IllegalStateException( String.format( @@ -151,10 +153,21 @@ public static PutKvRequest makePutKvRequest( Arrays.toString(targetColumns), Arrays.toString(currentBatchTargetColumns))); } + // Validate aggMode consistency across batches + if (currentBatch.getAggMode() != aggMode) { + throw new IllegalStateException( + String.format( + "All the write batches to make put kv request should have the same aggMode, " + + "but got %s and %s.", + aggMode, currentBatch.getAggMode())); + } } if (targetColumns != null) { request.setTargetColumns(targetColumns); } + // Set aggMode in the request - this is the proper way to pass aggMode to server + request.setAggMode(aggMode.getProtoValue()); + readyWriteBatches.forEach( readyBatch -> { TableBucket tableBucket = readyBatch.tableBucket(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java index 72f596be9d..4557c572cc 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java @@ -28,6 +28,7 @@ import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.rpc.messages.PutKvRequest; +import org.apache.fluss.rpc.protocol.AggMode; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -51,6 +52,7 @@ public class KvWriteBatch extends WriteBatch { private final KvRecordBatchBuilder recordsBuilder; private final @Nullable int[] targetColumns; private final int schemaId; + private final AggMode aggMode; public KvWriteBatch( int bucketId, @@ -60,6 +62,7 @@ public KvWriteBatch( int writeLimit, AbstractPagedOutputView outputView, @Nullable int[] targetColumns, + AggMode aggMode, long createdMs) { super(bucketId, physicalTablePath, createdMs); this.outputView = outputView; @@ -67,6 +70,7 @@ public KvWriteBatch( KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat); this.targetColumns = targetColumns; this.schemaId = schemaId; + this.aggMode = aggMode; } @Override @@ -94,6 +98,15 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Arrays.toString(targetColumns))); } + // Validate aggMode consistency - records with different aggMode cannot be batched together + if (writeRecord.getAggMode() != this.aggMode) { + throw new IllegalStateException( + String.format( + "Cannot mix records with different aggMode in the same batch. " + + "Batch aggMode: %s, Record aggMode: %s", + this.aggMode, writeRecord.getAggMode())); + } + byte[] key = writeRecord.getKey(); checkNotNull(key, "key must be not null for kv record"); checkNotNull(callback, "write callback must be not null"); @@ -113,6 +126,11 @@ public int[] getTargetColumns() { return targetColumns; } + @Override + public AggMode getAggMode() { + return aggMode; + } + @Override public BytesView build() { try { @@ -163,6 +181,7 @@ public void abortRecordAppends() { recordsBuilder.abort(); } + @Override public void resetWriterState(long writerId, int batchSequence) { super.resetWriterState(writerId, batchSequence); recordsBuilder.resetWriterState(writerId, batchSequence); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 0c834a49f1..8ee5074edc 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -617,6 +617,7 @@ private WriteBatch createWriteBatch( outputView.getPreAllocatedSize(), outputView, writeRecord.getTargetColumns(), + writeRecord.getAggMode(), clock.milliseconds()); case ARROW_LOG: diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java index a35a9f58b7..74f237c41c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java @@ -22,6 +22,7 @@ import org.apache.fluss.memory.MemorySegmentPool; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.record.bytesview.BytesView; +import org.apache.fluss.rpc.protocol.AggMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,6 +121,15 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac public abstract void abortRecordAppends(); + /** + * Get the aggregation mode for this batch (only applicable to KV batches). + * + * @return the aggregation mode, defaults to AGGREGATE for log batches + */ + public AggMode getAggMode() { + return AggMode.AGGREGATE; + } + public boolean hasBatchSequence() { return batchSequence() != NO_BATCH_SEQUENCE; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java index 9265c5f779..155e01999c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java @@ -27,6 +27,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.compacted.CompactedRow; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.rpc.protocol.AggMode; import javax.annotation.Nullable; @@ -52,6 +53,27 @@ public static WriteRecord forUpsert( byte[] bucketKey, WriteFormat writeFormat, @Nullable int[] targetColumns) { + return forUpsert( + tableInfo, + tablePath, + row, + key, + bucketKey, + writeFormat, + targetColumns, + AggMode.AGGREGATE); + } + + /** Create a write record for upsert operation with aggregation mode control. */ + public static WriteRecord forUpsert( + TableInfo tableInfo, + PhysicalTablePath tablePath, + BinaryRow row, + byte[] key, + byte[] bucketKey, + WriteFormat writeFormat, + @Nullable int[] targetColumns, + AggMode aggMode) { checkNotNull(row, "row must not be null"); checkNotNull(key, "key must not be null"); checkNotNull(bucketKey, "bucketKey must not be null"); @@ -65,7 +87,8 @@ public static WriteRecord forUpsert( row, writeFormat, targetColumns, - estimatedSizeInBytes); + estimatedSizeInBytes, + aggMode); } /** Create a write record for delete operation and partial-delete update. */ @@ -76,6 +99,25 @@ public static WriteRecord forDelete( byte[] bucketKey, WriteFormat writeFormat, @Nullable int[] targetColumns) { + return forDelete( + tableInfo, + tablePath, + key, + bucketKey, + writeFormat, + targetColumns, + AggMode.AGGREGATE); + } + + /** Create a write record for delete operation with aggregation mode control. */ + public static WriteRecord forDelete( + TableInfo tableInfo, + PhysicalTablePath tablePath, + byte[] key, + byte[] bucketKey, + WriteFormat writeFormat, + @Nullable int[] targetColumns, + AggMode aggMode) { checkNotNull(key, "key must not be null"); checkNotNull(bucketKey, "key must not be null"); checkArgument(writeFormat.isKv(), "writeFormat must be a KV format"); @@ -88,7 +130,8 @@ public static WriteRecord forDelete( null, writeFormat, targetColumns, - estimatedSizeInBytes); + estimatedSizeInBytes, + aggMode); } /** Create a write record for append operation for indexed format. */ @@ -108,7 +151,8 @@ public static WriteRecord forIndexedAppend( row, WriteFormat.INDEXED_LOG, null, - estimatedSizeInBytes); + estimatedSizeInBytes, + AggMode.AGGREGATE); } /** Creates a write record for append operation for Arrow format. */ @@ -129,7 +173,8 @@ public static WriteRecord forArrowAppend( row, WriteFormat.ARROW_LOG, null, - estimatedSizeInBytes); + estimatedSizeInBytes, + AggMode.AGGREGATE); } /** Creates a write record for append operation for Compacted format. */ @@ -149,7 +194,8 @@ public static WriteRecord forCompactedAppend( row, WriteFormat.COMPACTED_LOG, null, - estimatedSizeInBytes); + estimatedSizeInBytes, + AggMode.AGGREGATE); } // ------------------------------------------------------------------------------------------ @@ -166,6 +212,16 @@ public static WriteRecord forCompactedAppend( private final int estimatedSizeInBytes; private final TableInfo tableInfo; + /** + * The aggregation mode for this record. This controls how the server handles data aggregation. + * + *
    + *
  • AGGREGATE: Normal aggregation through server-side merge engine + *
  • OVERWRITE: Bypass merge engine, directly replace values (for undo recovery) + *
+ */ + private final AggMode aggMode; + private WriteRecord( TableInfo tableInfo, PhysicalTablePath physicalTablePath, @@ -174,7 +230,8 @@ private WriteRecord( @Nullable InternalRow row, WriteFormat writeFormat, @Nullable int[] targetColumns, - int estimatedSizeInBytes) { + int estimatedSizeInBytes, + AggMode aggMode) { this.tableInfo = tableInfo; this.physicalTablePath = physicalTablePath; this.key = key; @@ -183,6 +240,7 @@ private WriteRecord( this.writeFormat = writeFormat; this.targetColumns = targetColumns; this.estimatedSizeInBytes = estimatedSizeInBytes; + this.aggMode = aggMode; } public PhysicalTablePath getPhysicalTablePath() { @@ -214,6 +272,15 @@ public WriteFormat getWriteFormat() { return writeFormat; } + /** + * Get the aggregation mode for this record. + * + * @return the aggregation mode + */ + public AggMode getAggMode() { + return aggMode; + } + /** * Get the estimated size in bytes of the record with batch header. * diff --git a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java new file mode 100644 index 0000000000..1c743fa737 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.utils; + +import org.apache.fluss.client.write.KvWriteBatch; +import org.apache.fluss.client.write.ReadyWriteBatch; +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.memory.PreAllocatedPagedOutputView; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.messages.PutKvRequest; +import org.apache.fluss.rpc.protocol.AggMode; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link ClientRpcMessageUtils}. + * + *

Focuses on AggMode consistency validation in makePutKvRequest. + */ +class ClientRpcMessageUtilsTest { + + private static final long TABLE_ID = DATA1_TABLE_ID_PK; + private static final int ACKS = 1; + private static final int TIMEOUT_MS = 30000; + + @Test + void testMakePutKvRequestWithConsistentAggMode() throws Exception { + // Create two batches with same aggMode (AGGREGATE) + KvWriteBatch batch1 = createKvWriteBatch(0, AggMode.AGGREGATE); + KvWriteBatch batch2 = createKvWriteBatch(1, AggMode.AGGREGATE); + + List readyBatches = + Arrays.asList( + new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch1), + new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), batch2)); + + // Should succeed without exception + PutKvRequest request = + ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches); + + // Verify aggMode is set correctly in request + assertThat(request.hasAggMode()).isTrue(); + assertThat(request.getAggMode()).isEqualTo(AggMode.AGGREGATE.getProtoValue()); + } + + @Test + void testMakePutKvRequestWithOverwriteMode() throws Exception { + // Create batches with OVERWRITE mode + KvWriteBatch batch1 = createKvWriteBatch(0, AggMode.OVERWRITE); + KvWriteBatch batch2 = createKvWriteBatch(1, AggMode.OVERWRITE); + + List readyBatches = + Arrays.asList( + new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch1), + new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), batch2)); + + PutKvRequest request = + ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches); + + // Verify OVERWRITE mode is set correctly + assertThat(request.hasAggMode()).isTrue(); + assertThat(request.getAggMode()).isEqualTo(AggMode.OVERWRITE.getProtoValue()); + } + + @Test + void testMakePutKvRequestWithInconsistentAggMode() throws Exception { + // Create batches with different aggModes + KvWriteBatch aggregateBatch = createKvWriteBatch(0, AggMode.AGGREGATE); + KvWriteBatch overwriteBatch = createKvWriteBatch(1, AggMode.OVERWRITE); + + List readyBatches = + Arrays.asList( + new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), aggregateBatch), + new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), overwriteBatch)); + + // Should throw exception due to inconsistent aggMode + assertThatThrownBy( + () -> + ClientRpcMessageUtils.makePutKvRequest( + TABLE_ID, ACKS, TIMEOUT_MS, readyBatches)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "All the write batches to make put kv request should have the same aggMode") + .hasMessageContaining("AGGREGATE") + .hasMessageContaining("OVERWRITE"); + } + + @Test + void testMakePutKvRequestWithSingleBatch() throws Exception { + // Single batch should always succeed + KvWriteBatch batch = createKvWriteBatch(0, AggMode.OVERWRITE); + + List readyBatches = + Collections.singletonList(new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch)); + + PutKvRequest request = + ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches); + + assertThat(request.getAggMode()).isEqualTo(AggMode.OVERWRITE.getProtoValue()); + } + + private KvWriteBatch createKvWriteBatch(int bucketId, AggMode aggMode) throws Exception { + MemorySegment segment = MemorySegment.allocateHeapMemory(1024); + PreAllocatedPagedOutputView outputView = + new PreAllocatedPagedOutputView(Collections.singletonList(segment)); + return new KvWriteBatch( + bucketId, + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + DATA1_TABLE_INFO_PK.getSchemaId(), + KvFormat.COMPACTED, + Integer.MAX_VALUE, + outputView, + null, + aggMode, + System.currentTimeMillis()); + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java index c62c7b0297..f58fc0c979 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java @@ -33,6 +33,7 @@ import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.types.DataType; import org.junit.jupiter.api.BeforeEach; @@ -227,6 +228,7 @@ private KvWriteBatch createKvWriteBatch( writeLimit, outputView, null, + AggMode.AGGREGATE, System.currentTimeMillis()); } @@ -254,4 +256,84 @@ private void assertDefaultKvRecordBatchEquals(DefaultKvRecordBatch recordBatch) assertThat(toArray(kvRecord.getKey())).isEqualTo(key); assertThat(kvRecord.getRow()).isEqualTo(row); } + + // ==================== AggMode Tests ==================== + + @Test + void testAggModeConsistencyValidation() throws Exception { + // Create batch with AGGREGATE mode + KvWriteBatch aggregateBatch = + createKvWriteBatchWithAggMode( + new TableBucket(DATA1_TABLE_ID_PK, 0), AggMode.AGGREGATE); + + // Append record with AGGREGATE mode should succeed + WriteRecord aggregateRecord = createWriteRecordWithAggMode(AggMode.AGGREGATE); + assertThat(aggregateBatch.tryAppend(aggregateRecord, newWriteCallback())).isTrue(); + + // Append record with OVERWRITE mode should fail + WriteRecord overwriteRecord = createWriteRecordWithAggMode(AggMode.OVERWRITE); + assertThatThrownBy(() -> aggregateBatch.tryAppend(overwriteRecord, newWriteCallback())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot mix records with different aggMode in the same batch") + .hasMessageContaining("Batch aggMode: AGGREGATE") + .hasMessageContaining("Record aggMode: OVERWRITE"); + } + + @Test + void testOverwriteModeBatch() throws Exception { + // Create batch with OVERWRITE mode + KvWriteBatch overwriteBatch = + createKvWriteBatchWithAggMode( + new TableBucket(DATA1_TABLE_ID_PK, 0), AggMode.OVERWRITE); + + // Verify batch has correct aggMode + assertThat(overwriteBatch.getAggMode()).isEqualTo(AggMode.OVERWRITE); + + // Append record with OVERWRITE mode should succeed + WriteRecord overwriteRecord = createWriteRecordWithAggMode(AggMode.OVERWRITE); + assertThat(overwriteBatch.tryAppend(overwriteRecord, newWriteCallback())).isTrue(); + + // Append record with AGGREGATE mode should fail + WriteRecord aggregateRecord = createWriteRecordWithAggMode(AggMode.AGGREGATE); + assertThatThrownBy(() -> overwriteBatch.tryAppend(aggregateRecord, newWriteCallback())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot mix records with different aggMode in the same batch") + .hasMessageContaining("Batch aggMode: OVERWRITE") + .hasMessageContaining("Record aggMode: AGGREGATE"); + } + + @Test + void testDefaultAggModeIsAggregate() throws Exception { + KvWriteBatch batch = createKvWriteBatch(new TableBucket(DATA1_TABLE_ID_PK, 0)); + assertThat(batch.getAggMode()).isEqualTo(AggMode.AGGREGATE); + } + + private KvWriteBatch createKvWriteBatchWithAggMode(TableBucket tb, AggMode aggMode) + throws Exception { + PreAllocatedPagedOutputView outputView = + new PreAllocatedPagedOutputView( + Collections.singletonList(memoryPool.nextSegment())); + return new KvWriteBatch( + tb.getBucket(), + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + DATA1_TABLE_INFO_PK.getSchemaId(), + KvFormat.COMPACTED, + Integer.MAX_VALUE, + outputView, + null, + aggMode, + System.currentTimeMillis()); + } + + private WriteRecord createWriteRecordWithAggMode(AggMode aggMode) { + return WriteRecord.forUpsert( + DATA1_TABLE_INFO_PK, + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + row, + key, + key, + WriteFormat.COMPACTED_KV, + null, + aggMode); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java new file mode 100644 index 0000000000..4b18765eea --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.rpc.protocol; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Aggregation mode for write operations. + * + *

This enum controls how the server handles data aggregation when writing to tables with + * aggregation merge engine. + * + * @since 0.9 + */ +@PublicEvolving +public enum AggMode { + + /** + * Aggregate mode (default): Data is aggregated through server-side merge engine. + * + *

This is the normal mode for aggregation tables. When writing to a table with aggregation + * merge engine, the server will apply the configured aggregation functions (e.g., SUM, MAX, + * MIN) to merge the new values with existing values. + */ + AGGREGATE(0), + + /** + * Overwrite mode: Data directly overwrites target values, bypassing merge engine. + * + *

This mode is used for undo recovery operations to restore exact historical values. When in + * overwrite mode, the server will not apply any aggregation functions and will directly replace + * the existing values with the new values. + * + *

This is typically used internally by the Flink connector during failover recovery to + * restore the state to a previous checkpoint. + */ + OVERWRITE(1), + + /** + * Client-side local aggregation mode (future): Data is pre-aggregated on client side before + * being sent to server for final aggregation. + * + *

This mode reduces write amplification and network traffic by aggregating multiple updates + * to the same key on the client side before sending to the server. + * + *

Note: This mode is reserved for future implementation. + */ + LOCAL_AGGREGATE(2); + + private final int value; + + AggMode(int value) { + this.value = value; + } + + /** + * Returns the integer value for this aggregation mode. + * + *

This value matches the agg_mode field values in the proto definition. + * + * @return the integer value + */ + public int getValue() { + return value; + } + + /** + * Returns the proto value for this aggregation mode. + * + *

This is an alias for {@link #getValue()} for clarity when working with proto messages. + * + * @return the proto value + */ + public int getProtoValue() { + return value; + } + + /** + * Converts an integer value to an AggMode enum. + * + * @param value the integer value + * @return the corresponding AggMode, or AGGREGATE if the value is invalid + */ + public static AggMode fromValue(int value) { + switch (value) { + case 0: + return AGGREGATE; + case 1: + return OVERWRITE; + case 2: + return LOCAL_AGGREGATE; + default: + return AGGREGATE; + } + } + + /** + * Converts a proto value to an AggMode enum. + * + *

This is an alias for {@link #fromValue(int)} for clarity when working with proto messages. + * + * @param protoValue the proto value + * @return the corresponding AggMode, or AGGREGATE if the value is invalid + */ + public static AggMode fromProtoValue(int protoValue) { + return fromValue(protoValue); + } +} diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index d3b1edb968..d800549d86 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -219,6 +219,11 @@ message FetchLogResponse { repeated PbFetchLogRespForTable tables_resp = 1; } +// Aggregation mode constants for write operations (using int32 instead of enum for proto3 compatibility) +// AGG_MODE_AGGREGATE = 0: Data is aggregated through server-side merge engine (default) +// AGG_MODE_OVERWRITE = 1: Bypass merge engine, directly replace values (for undo recovery) +// AGG_MODE_LOCAL_AGGREGATE = 2: Client-side local aggregation (reserved for future implementation) + // put kv request and response message PutKvRequest { required int32 acks = 1; @@ -228,6 +233,9 @@ message PutKvRequest { // if empty, means write all columns repeated int32 target_columns = 4 [packed = true]; repeated PbPutKvReqForBucket buckets_req = 5; + // Aggregation mode for this request (see AGG_MODE_* constants above) + // 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet supported) + optional int32 agg_mode = 6; } message PutKvResponse { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 79ab8eb211..137c2e1ab7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -45,6 +45,7 @@ import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.row.arrow.ArrowWriterProvider; import org.apache.fluss.row.encode.ValueDecoder; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -115,6 +116,9 @@ public final class KvTablet { private final KvFormat kvFormat; // defines how to merge rows on the same primary key private final RowMerger rowMerger; + // Pre-created DefaultRowMerger for OVERWRITE mode (undo recovery scenarios) + // This avoids creating a new instance on every putAsLeader call + private final RowMerger overwriteRowMerger; private final ArrowCompressionInfo arrowCompressionInfo; private final AutoIncrementManager autoIncrementManager; @@ -166,6 +170,9 @@ private KvTablet( this.memorySegmentPool = memorySegmentPool; this.kvFormat = kvFormat; this.rowMerger = rowMerger; + // Pre-create DefaultRowMerger for OVERWRITE mode to avoid creating new instances + // on every putAsLeader call. Used for undo recovery scenarios. + this.overwriteRowMerger = new DefaultRowMerger(kvFormat, DeleteBehavior.ALLOW); this.arrowCompressionInfo = arrowCompressionInfo; this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; @@ -273,6 +280,20 @@ public long getFlushedLogOffset() { return flushedLogOffset; } + /** + * Put the KvRecordBatch into the kv storage with default AGGREGATE mode. + * + *

This is a convenience method that calls {@link #putAsLeader(KvRecordBatch, int[], + * AggMode)} with {@link AggMode#AGGREGATE}. + * + * @param kvRecords the kv records to put into + * @param targetColumns the target columns to put, null if put all columns + */ + public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns) + throws Exception { + return putAsLeader(kvRecords, targetColumns, AggMode.AGGREGATE); + } + /** * Put the KvRecordBatch into the kv storage, and return the appended wal log info. * @@ -292,8 +313,10 @@ public long getFlushedLogOffset() { * * @param kvRecords the kv records to put into * @param targetColumns the target columns to put, null if put all columns + * @param aggMode the aggregation mode (AGGREGATE or OVERWRITE) */ - public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns) + public LogAppendInfo putAsLeader( + KvRecordBatch kvRecords, @Nullable int[] targetColumns, AggMode aggMode) throws Exception { return inWriteLock( kvLock, @@ -305,10 +328,17 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target short latestSchemaId = (short) schemaInfo.getSchemaId(); validateSchemaId(kvRecords.schemaId(), latestSchemaId); - // we only support ADD COLUMN, so targetColumns is fine to be used directly + // Determine the row merger based on aggMode: + // - AGGREGATE: Use the configured merge engine (rowMerger) + // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger + // to directly replace values (for undo recovery scenarios) + // We only support ADD COLUMN, so targetColumns is fine to be used directly. RowMerger currentMerger = - rowMerger.configureTargetColumns( - targetColumns, latestSchemaId, latestSchema); + (aggMode == AggMode.OVERWRITE) + ? overwriteRowMerger.configureTargetColumns( + targetColumns, latestSchemaId, latestSchema) + : rowMerger.configureTargetColumns( + targetColumns, latestSchemaId, latestSchema); AutoIncrementUpdater currentAutoIncrementUpdater = autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index ee158b7a4b..7079481cc1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -44,6 +44,7 @@ import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.SequenceIDCounter; import org.apache.fluss.server.coordinator.CoordinatorContext; @@ -940,7 +941,10 @@ public LogAppendInfo appendRecordsToFollower(MemoryLogRecords memoryLogRecords) } public LogAppendInfo putRecordsToLeader( - KvRecordBatch kvRecords, @Nullable int[] targetColumns, int requiredAcks) + KvRecordBatch kvRecords, + @Nullable int[] targetColumns, + AggMode aggMode, + int requiredAcks) throws Exception { return inReadLock( leaderIsrUpdateLock, @@ -958,7 +962,7 @@ public LogAppendInfo putRecordsToLeader( kv, "KvTablet for the replica to put kv records shouldn't be null."); LogAppendInfo logAppendInfo; try { - logAppendInfo = kv.putAsLeader(kvRecords, targetColumns); + logAppendInfo = kv.putAsLeader(kvRecords, targetColumns, aggMode); } catch (IOException e) { LOG.error("Error while putting records to {}", tableBucket, e); fatalErrorHandler.onFatalError(e); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 76e3ea1867..8faec3d090 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -55,6 +55,7 @@ import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse; import org.apache.fluss.rpc.messages.NotifyLakeTableOffsetResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.CoordinatorContext; @@ -544,6 +545,27 @@ public void fetchLogRecords( params, bucketFetchInfo, logReadResults, userContext, responseCallback); } + /** + * Put kv records to leader replicas of the buckets with default AGGREGATE mode. + * + *

This is a convenience method that calls {@link #putRecordsToKv(int, int, Map, int[], + * AggMode, Consumer)} with {@link AggMode#AGGREGATE}. + */ + public void putRecordsToKv( + int timeoutMs, + int requiredAcks, + Map entriesPerBucket, + @Nullable int[] targetColumns, + Consumer> responseCallback) { + putRecordsToKv( + timeoutMs, + requiredAcks, + entriesPerBucket, + targetColumns, + AggMode.AGGREGATE, + responseCallback); + } + /** * Put kv records to leader replicas of the buckets, the kv data will write to kv tablet and the * response callback need to wait for the cdc log to be replicated to other replicas if needed. @@ -553,6 +575,7 @@ public void putRecordsToKv( int requiredAcks, Map entriesPerBucket, @Nullable int[] targetColumns, + AggMode aggMode, Consumer> responseCallback) { if (isRequiredAcksInvalid(requiredAcks)) { throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks); @@ -560,7 +583,7 @@ public void putRecordsToKv( long startTime = System.currentTimeMillis(); Map kvPutResult = - putToLocalKv(entriesPerBucket, targetColumns, requiredAcks); + putToLocalKv(entriesPerBucket, targetColumns, aggMode, requiredAcks); LOG.debug( "Put records to local kv storage and wait generate cdc log in {} ms", System.currentTimeMillis() - startTime); @@ -1027,6 +1050,7 @@ private Map appendToLocalLog( private Map putToLocalKv( Map entriesPerBucket, @Nullable int[] targetColumns, + AggMode aggMode, int requiredAcks) { Map putResultForBucketMap = new HashMap<>(); for (Map.Entry entry : entriesPerBucket.entrySet()) { @@ -1038,7 +1062,8 @@ private Map putToLocalKv( tableMetrics = replica.tableMetrics(); tableMetrics.totalPutKvRequests().inc(); LogAppendInfo appendInfo = - replica.putRecordsToLeader(entry.getValue(), targetColumns, requiredAcks); + replica.putRecordsToLeader( + entry.getValue(), targetColumns, aggMode, requiredAcks); LOG.trace( "Written to local kv for {}, and the cdc log beginning at offset {} and ending at offset {}", tb, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 992b963334..4c48ac1317 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -60,6 +60,7 @@ import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.messages.UpdateMetadataResponse; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.security.acl.OperationType; @@ -224,12 +225,16 @@ public CompletableFuture putKv(PutKvRequest request) { authorizeTable(WRITE, request.getTableId()); Map putKvData = getPutKvData(request); + // Get aggMode from request, default to AGGREGATE if not set + AggMode aggMode = + request.hasAggMode() ? AggMode.fromValue(request.getAggMode()) : AggMode.AGGREGATE; CompletableFuture response = new CompletableFuture<>(); replicaManager.putRecordsToKv( request.getTimeoutMs(), request.getAcks(), putKvData, getTargetColumns(request), + aggMode, bucketResponse -> response.complete(makePutKvResponse(bucketResponse))); return response; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java new file mode 100644 index 0000000000..7e2be2802c --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.memory.TestingMemorySegmentPool; +import org.apache.fluss.metadata.AggFunctions; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.KvRecord; +import org.apache.fluss.record.KvRecordBatch; +import org.apache.fluss.record.KvRecordTestUtils; +import org.apache.fluss.record.LogRecords; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.rpc.protocol.AggMode; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory; +import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.log.FetchIsolation; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.log.LogTestUtils; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; +import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords; + +/** + * Tests for {@link KvTablet} with {@link AggMode} support. + * + *

These tests verify that OVERWRITE mode correctly bypasses the merge engine and directly + * replaces values, which is essential for undo recovery scenarios. + */ +class KvTabletAggModeTest { + + private static final short SCHEMA_ID = 1; + + private final Configuration conf = new Configuration(); + private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = + KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID); + + private @TempDir File tempLogDir; + private @TempDir File tmpKvDir; + + private TestingSchemaGetter schemaGetter; + private LogTablet logTablet; + private KvTablet kvTablet; + + // Schema with aggregation functions for testing + private static final Schema AGG_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("count", DataTypes.BIGINT(), AggFunctions.SUM()) + .column("max_val", DataTypes.INT(), AggFunctions.MAX()) + .column("name", DataTypes.STRING(), AggFunctions.LAST_VALUE()) + .primaryKey("id") + .build(); + + private static final RowType AGG_ROW_TYPE = AGG_SCHEMA.getRowType(); + + private final KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(AGG_ROW_TYPE); + + @BeforeEach + void setUp() throws Exception { + Map config = new HashMap<>(); + config.put("table.merge-engine", "aggregation"); + + TablePath tablePath = TablePath.of("testDb", "test_agg_mode"); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath); + schemaGetter = new TestingSchemaGetter(new SchemaInfo(AGG_SCHEMA, SCHEMA_ID)); + + File logTabletDir = + LogTestUtils.makeRandomLogTabletDir( + tempLogDir, + physicalTablePath.getDatabaseName(), + 0L, + physicalTablePath.getTableName()); + logTablet = + LogTablet.create( + physicalTablePath, + logTabletDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + 0, + new FlussScheduler(1), + LogFormat.ARROW, + 1, + true, + SystemClock.getInstance(), + true); + + TableBucket tableBucket = logTablet.getTableBucket(); + TableConfig tableConf = new TableConfig(Configuration.fromMap(config)); + RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager( + schemaGetter, + tablePath, + new TableConfig(new Configuration()), + new TestingSequenceGeneratorFactory()); + + kvTablet = + KvTablet.create( + physicalTablePath, + tableBucket, + logTablet, + tmpKvDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + new RootAllocator(Long.MAX_VALUE), + new TestingMemorySegmentPool(10 * 1024), + KvFormat.COMPACTED, + rowMerger, + DEFAULT_COMPRESSION, + schemaGetter, + tableConf.getChangelogImage(), + KvManager.getDefaultRateLimiter(), + autoIncrementManager); + } + + @AfterEach + void tearDown() throws Exception { + if (kvTablet != null) { + kvTablet.close(); + } + if (logTablet != null) { + logTablet.close(); + } + } + + // ==================== AGGREGATE Mode Tests ==================== + + @Test + void testAggregateModeAppliesMergeEngine() throws Exception { + // Insert initial record: id=1, count=10, max_val=100, name="Alice" + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Update with AGGREGATE mode: count should be summed, max_val should take max + // id=1, count=5, max_val=150, name="Bob" + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, 150, "Bob"})); + kvTablet.putAsLeader(batch2, null, AggMode.AGGREGATE); + + // Verify CDC log shows aggregated values + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, // before + new Object[] { + 1, 15L, 150, "Bob" + } // after: count=10+5, max=max(100,150) + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + // ==================== OVERWRITE Mode Tests ==================== + + @Test + void testOverwriteModeBypassesMergeEngine() throws Exception { + // Insert initial record with AGGREGATE mode + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Update with OVERWRITE mode: values should be directly replaced, not aggregated + // id=1, count=5, max_val=50, name="Bob" + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 5L, 50, "Bob"})); + kvTablet.putAsLeader(batch2, null, AggMode.OVERWRITE); + + // Verify CDC log shows directly replaced values (not aggregated) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, // before + new Object[] { + 1, 5L, 50, "Bob" + } // after: directly replaced, NOT aggregated + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + + // Key assertion: count=5 (not 15), max_val=50 (not 100) + // This proves OVERWRITE bypassed the merge engine + } + + @Test + void testOverwriteModeForUndoRecoveryScenario() throws Exception { + // Simulate a typical undo recovery scenario: + // 1. Initial state: id=1, count=100, max_val=500, name="Original" + // 2. After some operations: id=1, count=150, max_val=600, name="Updated" + // 3. Undo recovery needs to restore to: id=1, count=100, max_val=500, name="Original" + + // Step 1: Insert initial record + KvRecordBatch initialBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 100L, 500, "Original"})); + kvTablet.putAsLeader(initialBatch, null, AggMode.AGGREGATE); + + // Step 2: Simulate some aggregation operations + KvRecordBatch updateBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 50L, 600, "Updated"})); + kvTablet.putAsLeader(updateBatch, null, AggMode.AGGREGATE); + + long beforeUndoOffset = logTablet.localLogEndOffset(); + + // Step 3: Undo recovery - restore to original state using OVERWRITE mode + KvRecordBatch undoBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 100L, 500, "Original"})); + kvTablet.putAsLeader(undoBatch, null, AggMode.OVERWRITE); + + // Verify the undo operation produced correct CDC log + LogRecords actualLogRecords = readLogRecords(beforeUndoOffset); + MemoryLogRecords expectedLogs = + logRecords( + beforeUndoOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + // Before: the aggregated state (count=150, max_val=600) + new Object[] {1, 150L, 600, "Updated"}, + // After: restored to original (count=100, max_val=500) + new Object[] {1, 100L, 500, "Original"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithNewKey() throws Exception { + // OVERWRITE mode with a new key should behave like INSERT + KvRecordBatch batch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch, null, AggMode.OVERWRITE); + + LogRecords actualLogRecords = readLogRecords(0); + MemoryLogRecords expectedLogs = + logRecords( + 0L, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {1, 10L, 100, "Alice"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithDelete() throws Exception { + // Insert initial record + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Delete with OVERWRITE mode + KvRecordBatch deleteBatch = + kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("k1".getBytes(), null)); + kvTablet.putAsLeader(deleteBatch, null, AggMode.OVERWRITE); + + // Verify DELETE is produced + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Collections.singletonList(ChangeType.DELETE), + Collections.singletonList(new Object[] {1, 10L, 100, "Alice"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testMixedAggModeOperations() throws Exception { + // Test interleaved AGGREGATE and OVERWRITE operations + + // 1. Insert with AGGREGATE + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "v1"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + // 2. Update with AGGREGATE (should aggregate) + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 5L, 150, "v2"})); + kvTablet.putAsLeader(batch2, null, AggMode.AGGREGATE); + + long afterAggOffset = logTablet.localLogEndOffset(); + + // 3. Overwrite with specific value + KvRecordBatch batch3 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 20L, 80, "v3"})); + kvTablet.putAsLeader(batch3, null, AggMode.OVERWRITE); + + long afterOverwriteOffset = logTablet.localLogEndOffset(); + + // 4. Continue with AGGREGATE (should aggregate from overwritten value) + KvRecordBatch batch4 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 200, "v4"})); + kvTablet.putAsLeader(batch4, null, AggMode.AGGREGATE); + + // Verify the final aggregation is based on overwritten value + LogRecords actualLogRecords = readLogRecords(afterOverwriteOffset); + MemoryLogRecords expectedLogs = + logRecords( + afterOverwriteOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + // Before: overwritten value + new Object[] {1, 20L, 80, "v3"}, + // After: aggregated from overwritten value + // count=20+10=30, max_val=max(80,200)=200 + new Object[] {1, 30L, 200, "v4"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithPartialUpdate() throws Exception { + // Insert initial record + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Partial update with OVERWRITE mode (only update id and count columns) + int[] targetColumns = new int[] {0, 1}; // id and count + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, null, null})); + kvTablet.putAsLeader(batch2, targetColumns, AggMode.OVERWRITE); + + // Verify partial update with OVERWRITE: count should be replaced (not aggregated) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, // before + new Object[] { + 1, 5L, 100, "Alice" + } // after: count replaced, others unchanged + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithMultipleKeys() throws Exception { + // Insert multiple records + List initialRecords = + Arrays.asList( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"}), + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 20L, 200, "Bob"})); + KvRecordBatch batch1 = kvRecordBatchFactory.ofRecords(initialRecords); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Overwrite multiple keys in single batch + List overwriteRecords = + Arrays.asList( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, 50, "Alice2"}), + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 8L, 80, "Bob2"})); + KvRecordBatch batch2 = kvRecordBatchFactory.ofRecords(overwriteRecords); + kvTablet.putAsLeader(batch2, null, AggMode.OVERWRITE); + + // Verify both keys are overwritten (not aggregated) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList( + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER, + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, + new Object[] {1, 5L, 50, "Alice2"}, // k1 overwritten + new Object[] {2, 20L, 200, "Bob"}, + new Object[] {2, 8L, 80, "Bob2"} // k2 overwritten + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + // ==================== Default AggMode Tests ==================== + + @Test + void testDefaultAggModeIsAggregate() throws Exception { + // Insert initial record using default (no aggMode parameter) + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null); // Using overload without aggMode + + long endOffset = logTablet.localLogEndOffset(); + + // Update using default (should aggregate) + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, 150, "Bob"})); + kvTablet.putAsLeader(batch2, null); // Using overload without aggMode + + // Verify aggregation happened (proving default is AGGREGATE) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, + new Object[] { + 1, 15L, 150, "Bob" + } // count=10+5=15, max=max(100,150)=150 + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + // ==================== Helper Methods ==================== + + private LogRecords readLogRecords(long startOffset) throws Exception { + return logTablet + .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, false, null) + .getRecords(); + } + + private MemoryLogRecords logRecords( + long baseOffset, List changeTypes, List rows) throws Exception { + return createBasicMemoryLogRecords( + AGG_ROW_TYPE, + SCHEMA_ID, + baseOffset, + -1L, + CURRENT_LOG_MAGIC_VALUE, + NO_WRITER_ID, + NO_BATCH_SEQUENCE, + changeTypes, + rows, + LogFormat.ARROW, + DEFAULT_COMPRESSION); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index 8bdd46db66..276dfc927e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -32,6 +32,7 @@ import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.record.ProjectionPushdownCache; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -868,7 +869,9 @@ private static MemoryLogRecords logRecords( private LogAppendInfo putRecordsToLeader( Replica replica, KvRecordBatch kvRecords, int[] targetColumns) throws Exception { - LogAppendInfo logAppendInfo = replica.putRecordsToLeader(kvRecords, targetColumns, 0); + // Use AGGREGATE mode as default for tests + LogAppendInfo logAppendInfo = + replica.putRecordsToLeader(kvRecords, targetColumns, AggMode.AGGREGATE, 0); KvTablet kvTablet = checkNotNull(replica.getKvTablet()); // flush to make data visible kvTablet.flush(replica.getLocalLogEndOffset(), NOPErrorHandler.INSTANCE);