diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java index fe865eac43..9eea33e5c2 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableUpsert.java @@ -20,6 +20,7 @@ import org.apache.fluss.client.write.WriterClient; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; @@ -32,22 +33,24 @@ public class TableUpsert implements Upsert { private final TablePath tablePath; private final TableInfo tableInfo; private final WriterClient writerClient; - private final @Nullable int[] targetColumns; + private final AggMode aggMode; public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) { - this(tablePath, tableInfo, writerClient, null); + this(tablePath, tableInfo, writerClient, null, AggMode.AGGREGATE); } private TableUpsert( TablePath tablePath, TableInfo tableInfo, WriterClient writerClient, - @Nullable int[] targetColumns) { + @Nullable int[] targetColumns, + AggMode aggMode) { this.tablePath = tablePath; this.tableInfo = tableInfo; this.writerClient = writerClient; this.targetColumns = targetColumns; + this.aggMode = aggMode; } @Override @@ -68,7 +71,7 @@ public Upsert partialUpdate(@Nullable int[] targetColumns) { } } } - return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns); + return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns, this.aggMode); } @Override @@ -91,9 +94,15 @@ public Upsert partialUpdate(String... targetColumnNames) { return partialUpdate(targetColumns); } + @Override + public Upsert aggregationMode(AggMode mode) { + checkNotNull(mode, "aggregation mode"); + return new TableUpsert(tablePath, tableInfo, writerClient, this.targetColumns, mode); + } + @Override public UpsertWriter createWriter() { - return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient); + return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient, aggMode); } @Override diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java index 0843437fee..becf87b3d6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java @@ -18,6 +18,7 @@ package org.apache.fluss.client.table.writer; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.rpc.protocol.AggMode; import javax.annotation.Nullable; @@ -26,7 +27,8 @@ * Table. * *

{@link Upsert} objects are immutable and can be shared between threads. Refinement methods, - * like {@link #partialUpdate}, create new Upsert instances. + * like {@link #partialUpdate(int[])} and {@link #aggregationMode(AggMode)}, create new Upsert + * instances. * * @since 0.6 */ @@ -56,9 +58,44 @@ public interface Upsert { */ Upsert partialUpdate(String... targetColumnNames); + /** + * Specify aggregation mode for the UpsertWriter and returns a new Upsert instance. + * + *

This method controls how the created UpsertWriter handles data aggregation: + * + *

+ * + *

Example usage: + * + *

{@code
+     * // Normal aggregation mode (default)
+     * UpsertWriter normalWriter = table.newUpsert()
+     *     .aggregationMode(AggMode.AGGREGATE)
+     *     .createWriter();
+     *
+     * // Overwrite mode for undo recovery
+     * UpsertWriter undoWriter = table.newUpsert()
+     *     .aggregationMode(AggMode.OVERWRITE)
+     *     .createWriter();
+     * }
+ * + * @param mode the aggregation mode + * @return a new Upsert instance with the specified aggregation mode + * @since 0.9 + */ + Upsert aggregationMode(AggMode mode); + /** * Create a new {@link UpsertWriter} using {@code InternalRow} with the optional {@link - * #partialUpdate(String...)} information to upsert and delete data to a Primary Key Table. + * #partialUpdate(String...)} and {@link #aggregationMode(AggMode)} information to upsert and + * delete data to a Primary Key Table. */ UpsertWriter createWriter(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java index 20e5bcc5e5..b7f3e20d20 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java @@ -31,6 +31,7 @@ import org.apache.fluss.row.encode.KeyEncoder; import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; @@ -56,11 +57,25 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { private final FieldGetter[] fieldGetters; private final TableInfo tableInfo; + /** + * The aggregation mode for this writer. This controls how the server handles data aggregation. + */ + private final AggMode aggMode; + UpsertWriterImpl( TablePath tablePath, TableInfo tableInfo, @Nullable int[] partialUpdateColumns, WriterClient writerClient) { + this(tablePath, tableInfo, partialUpdateColumns, writerClient, AggMode.AGGREGATE); + } + + UpsertWriterImpl( + TablePath tablePath, + TableInfo tableInfo, + @Nullable int[] partialUpdateColumns, + WriterClient writerClient, + AggMode aggMode) { super(tablePath, tableInfo, writerClient); RowType rowType = tableInfo.getRowType(); sanityCheck( @@ -83,7 +98,16 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter { this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat); this.rowEncoder = RowEncoder.create(kvFormat, rowType); this.fieldGetters = InternalRow.createFieldGetters(rowType); + this.tableInfo = tableInfo; + + // LOCAL_AGGREGATE is reserved for future implementation + if (aggMode == AggMode.LOCAL_AGGREGATE) { + throw new UnsupportedOperationException( + "LOCAL_AGGREGATE mode is not yet supported. " + + "Please use AGGREGATE or OVERWRITE mode."); + } + this.aggMode = aggMode; } private static void sanityCheck( @@ -168,7 +192,8 @@ public CompletableFuture upsert(InternalRow row) { key, bucketKey, writeFormat, - targetColumns); + targetColumns, + aggMode); return send(record).thenApply(ignored -> UPSERT_SUCCESS); } @@ -191,7 +216,8 @@ public CompletableFuture delete(InternalRow row) { key, bucketKey, writeFormat, - targetColumns); + targetColumns, + aggMode); return send(record).thenApply(ignored -> DELETE_SUCCESS); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index f6b37f5d12..b82eb6ca15 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -79,6 +79,7 @@ import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; @@ -138,11 +139,12 @@ public static PutKvRequest makePutKvRequest( .setTimeoutMs(maxRequestTimeoutMs); // check the target columns in the batch list should be the same. If not same, // we throw exception directly currently. - int[] targetColumns = - ((KvWriteBatch) readyWriteBatches.get(0).writeBatch()).getTargetColumns(); + KvWriteBatch firstBatch = (KvWriteBatch) readyWriteBatches.get(0).writeBatch(); + int[] targetColumns = firstBatch.getTargetColumns(); + AggMode aggMode = firstBatch.getAggMode(); for (int i = 1; i < readyWriteBatches.size(); i++) { - int[] currentBatchTargetColumns = - ((KvWriteBatch) readyWriteBatches.get(i).writeBatch()).getTargetColumns(); + KvWriteBatch currentBatch = (KvWriteBatch) readyWriteBatches.get(i).writeBatch(); + int[] currentBatchTargetColumns = currentBatch.getTargetColumns(); if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) { throw new IllegalStateException( String.format( @@ -151,10 +153,21 @@ public static PutKvRequest makePutKvRequest( Arrays.toString(targetColumns), Arrays.toString(currentBatchTargetColumns))); } + // Validate aggMode consistency across batches + if (currentBatch.getAggMode() != aggMode) { + throw new IllegalStateException( + String.format( + "All the write batches to make put kv request should have the same aggMode, " + + "but got %s and %s.", + aggMode, currentBatch.getAggMode())); + } } if (targetColumns != null) { request.setTargetColumns(targetColumns); } + // Set aggMode in the request - this is the proper way to pass aggMode to server + request.setAggMode(aggMode.getProtoValue()); + readyWriteBatches.forEach( readyBatch -> { TableBucket tableBucket = readyBatch.tableBucket(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java index 72f596be9d..4557c572cc 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java @@ -28,6 +28,7 @@ import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.rpc.messages.PutKvRequest; +import org.apache.fluss.rpc.protocol.AggMode; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -51,6 +52,7 @@ public class KvWriteBatch extends WriteBatch { private final KvRecordBatchBuilder recordsBuilder; private final @Nullable int[] targetColumns; private final int schemaId; + private final AggMode aggMode; public KvWriteBatch( int bucketId, @@ -60,6 +62,7 @@ public KvWriteBatch( int writeLimit, AbstractPagedOutputView outputView, @Nullable int[] targetColumns, + AggMode aggMode, long createdMs) { super(bucketId, physicalTablePath, createdMs); this.outputView = outputView; @@ -67,6 +70,7 @@ public KvWriteBatch( KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat); this.targetColumns = targetColumns; this.schemaId = schemaId; + this.aggMode = aggMode; } @Override @@ -94,6 +98,15 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Arrays.toString(targetColumns))); } + // Validate aggMode consistency - records with different aggMode cannot be batched together + if (writeRecord.getAggMode() != this.aggMode) { + throw new IllegalStateException( + String.format( + "Cannot mix records with different aggMode in the same batch. " + + "Batch aggMode: %s, Record aggMode: %s", + this.aggMode, writeRecord.getAggMode())); + } + byte[] key = writeRecord.getKey(); checkNotNull(key, "key must be not null for kv record"); checkNotNull(callback, "write callback must be not null"); @@ -113,6 +126,11 @@ public int[] getTargetColumns() { return targetColumns; } + @Override + public AggMode getAggMode() { + return aggMode; + } + @Override public BytesView build() { try { @@ -163,6 +181,7 @@ public void abortRecordAppends() { recordsBuilder.abort(); } + @Override public void resetWriterState(long writerId, int batchSequence) { super.resetWriterState(writerId, batchSequence); recordsBuilder.resetWriterState(writerId, batchSequence); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 0c834a49f1..8ee5074edc 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -617,6 +617,7 @@ private WriteBatch createWriteBatch( outputView.getPreAllocatedSize(), outputView, writeRecord.getTargetColumns(), + writeRecord.getAggMode(), clock.milliseconds()); case ARROW_LOG: diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java index a35a9f58b7..74f237c41c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java @@ -22,6 +22,7 @@ import org.apache.fluss.memory.MemorySegmentPool; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.record.bytesview.BytesView; +import org.apache.fluss.rpc.protocol.AggMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,6 +121,15 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac public abstract void abortRecordAppends(); + /** + * Get the aggregation mode for this batch (only applicable to KV batches). + * + * @return the aggregation mode, defaults to AGGREGATE for log batches + */ + public AggMode getAggMode() { + return AggMode.AGGREGATE; + } + public boolean hasBatchSequence() { return batchSequence() != NO_BATCH_SEQUENCE; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java index 9265c5f779..155e01999c 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java @@ -27,6 +27,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.compacted.CompactedRow; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.rpc.protocol.AggMode; import javax.annotation.Nullable; @@ -52,6 +53,27 @@ public static WriteRecord forUpsert( byte[] bucketKey, WriteFormat writeFormat, @Nullable int[] targetColumns) { + return forUpsert( + tableInfo, + tablePath, + row, + key, + bucketKey, + writeFormat, + targetColumns, + AggMode.AGGREGATE); + } + + /** Create a write record for upsert operation with aggregation mode control. */ + public static WriteRecord forUpsert( + TableInfo tableInfo, + PhysicalTablePath tablePath, + BinaryRow row, + byte[] key, + byte[] bucketKey, + WriteFormat writeFormat, + @Nullable int[] targetColumns, + AggMode aggMode) { checkNotNull(row, "row must not be null"); checkNotNull(key, "key must not be null"); checkNotNull(bucketKey, "bucketKey must not be null"); @@ -65,7 +87,8 @@ public static WriteRecord forUpsert( row, writeFormat, targetColumns, - estimatedSizeInBytes); + estimatedSizeInBytes, + aggMode); } /** Create a write record for delete operation and partial-delete update. */ @@ -76,6 +99,25 @@ public static WriteRecord forDelete( byte[] bucketKey, WriteFormat writeFormat, @Nullable int[] targetColumns) { + return forDelete( + tableInfo, + tablePath, + key, + bucketKey, + writeFormat, + targetColumns, + AggMode.AGGREGATE); + } + + /** Create a write record for delete operation with aggregation mode control. */ + public static WriteRecord forDelete( + TableInfo tableInfo, + PhysicalTablePath tablePath, + byte[] key, + byte[] bucketKey, + WriteFormat writeFormat, + @Nullable int[] targetColumns, + AggMode aggMode) { checkNotNull(key, "key must not be null"); checkNotNull(bucketKey, "key must not be null"); checkArgument(writeFormat.isKv(), "writeFormat must be a KV format"); @@ -88,7 +130,8 @@ public static WriteRecord forDelete( null, writeFormat, targetColumns, - estimatedSizeInBytes); + estimatedSizeInBytes, + aggMode); } /** Create a write record for append operation for indexed format. */ @@ -108,7 +151,8 @@ public static WriteRecord forIndexedAppend( row, WriteFormat.INDEXED_LOG, null, - estimatedSizeInBytes); + estimatedSizeInBytes, + AggMode.AGGREGATE); } /** Creates a write record for append operation for Arrow format. */ @@ -129,7 +173,8 @@ public static WriteRecord forArrowAppend( row, WriteFormat.ARROW_LOG, null, - estimatedSizeInBytes); + estimatedSizeInBytes, + AggMode.AGGREGATE); } /** Creates a write record for append operation for Compacted format. */ @@ -149,7 +194,8 @@ public static WriteRecord forCompactedAppend( row, WriteFormat.COMPACTED_LOG, null, - estimatedSizeInBytes); + estimatedSizeInBytes, + AggMode.AGGREGATE); } // ------------------------------------------------------------------------------------------ @@ -166,6 +212,16 @@ public static WriteRecord forCompactedAppend( private final int estimatedSizeInBytes; private final TableInfo tableInfo; + /** + * The aggregation mode for this record. This controls how the server handles data aggregation. + * + * + */ + private final AggMode aggMode; + private WriteRecord( TableInfo tableInfo, PhysicalTablePath physicalTablePath, @@ -174,7 +230,8 @@ private WriteRecord( @Nullable InternalRow row, WriteFormat writeFormat, @Nullable int[] targetColumns, - int estimatedSizeInBytes) { + int estimatedSizeInBytes, + AggMode aggMode) { this.tableInfo = tableInfo; this.physicalTablePath = physicalTablePath; this.key = key; @@ -183,6 +240,7 @@ private WriteRecord( this.writeFormat = writeFormat; this.targetColumns = targetColumns; this.estimatedSizeInBytes = estimatedSizeInBytes; + this.aggMode = aggMode; } public PhysicalTablePath getPhysicalTablePath() { @@ -214,6 +272,15 @@ public WriteFormat getWriteFormat() { return writeFormat; } + /** + * Get the aggregation mode for this record. + * + * @return the aggregation mode + */ + public AggMode getAggMode() { + return aggMode; + } + /** * Get the estimated size in bytes of the record with batch header. * diff --git a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java new file mode 100644 index 0000000000..1c743fa737 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.utils; + +import org.apache.fluss.client.write.KvWriteBatch; +import org.apache.fluss.client.write.ReadyWriteBatch; +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.memory.PreAllocatedPagedOutputView; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.messages.PutKvRequest; +import org.apache.fluss.rpc.protocol.AggMode; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link ClientRpcMessageUtils}. + * + *

Focuses on AggMode consistency validation in makePutKvRequest. + */ +class ClientRpcMessageUtilsTest { + + private static final long TABLE_ID = DATA1_TABLE_ID_PK; + private static final int ACKS = 1; + private static final int TIMEOUT_MS = 30000; + + @Test + void testMakePutKvRequestWithConsistentAggMode() throws Exception { + // Create two batches with same aggMode (AGGREGATE) + KvWriteBatch batch1 = createKvWriteBatch(0, AggMode.AGGREGATE); + KvWriteBatch batch2 = createKvWriteBatch(1, AggMode.AGGREGATE); + + List readyBatches = + Arrays.asList( + new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch1), + new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), batch2)); + + // Should succeed without exception + PutKvRequest request = + ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches); + + // Verify aggMode is set correctly in request + assertThat(request.hasAggMode()).isTrue(); + assertThat(request.getAggMode()).isEqualTo(AggMode.AGGREGATE.getProtoValue()); + } + + @Test + void testMakePutKvRequestWithOverwriteMode() throws Exception { + // Create batches with OVERWRITE mode + KvWriteBatch batch1 = createKvWriteBatch(0, AggMode.OVERWRITE); + KvWriteBatch batch2 = createKvWriteBatch(1, AggMode.OVERWRITE); + + List readyBatches = + Arrays.asList( + new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch1), + new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), batch2)); + + PutKvRequest request = + ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches); + + // Verify OVERWRITE mode is set correctly + assertThat(request.hasAggMode()).isTrue(); + assertThat(request.getAggMode()).isEqualTo(AggMode.OVERWRITE.getProtoValue()); + } + + @Test + void testMakePutKvRequestWithInconsistentAggMode() throws Exception { + // Create batches with different aggModes + KvWriteBatch aggregateBatch = createKvWriteBatch(0, AggMode.AGGREGATE); + KvWriteBatch overwriteBatch = createKvWriteBatch(1, AggMode.OVERWRITE); + + List readyBatches = + Arrays.asList( + new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), aggregateBatch), + new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), overwriteBatch)); + + // Should throw exception due to inconsistent aggMode + assertThatThrownBy( + () -> + ClientRpcMessageUtils.makePutKvRequest( + TABLE_ID, ACKS, TIMEOUT_MS, readyBatches)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "All the write batches to make put kv request should have the same aggMode") + .hasMessageContaining("AGGREGATE") + .hasMessageContaining("OVERWRITE"); + } + + @Test + void testMakePutKvRequestWithSingleBatch() throws Exception { + // Single batch should always succeed + KvWriteBatch batch = createKvWriteBatch(0, AggMode.OVERWRITE); + + List readyBatches = + Collections.singletonList(new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch)); + + PutKvRequest request = + ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches); + + assertThat(request.getAggMode()).isEqualTo(AggMode.OVERWRITE.getProtoValue()); + } + + private KvWriteBatch createKvWriteBatch(int bucketId, AggMode aggMode) throws Exception { + MemorySegment segment = MemorySegment.allocateHeapMemory(1024); + PreAllocatedPagedOutputView outputView = + new PreAllocatedPagedOutputView(Collections.singletonList(segment)); + return new KvWriteBatch( + bucketId, + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + DATA1_TABLE_INFO_PK.getSchemaId(), + KvFormat.COMPACTED, + Integer.MAX_VALUE, + outputView, + null, + aggMode, + System.currentTimeMillis()); + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java index c62c7b0297..f58fc0c979 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java @@ -33,6 +33,7 @@ import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.types.DataType; import org.junit.jupiter.api.BeforeEach; @@ -227,6 +228,7 @@ private KvWriteBatch createKvWriteBatch( writeLimit, outputView, null, + AggMode.AGGREGATE, System.currentTimeMillis()); } @@ -254,4 +256,84 @@ private void assertDefaultKvRecordBatchEquals(DefaultKvRecordBatch recordBatch) assertThat(toArray(kvRecord.getKey())).isEqualTo(key); assertThat(kvRecord.getRow()).isEqualTo(row); } + + // ==================== AggMode Tests ==================== + + @Test + void testAggModeConsistencyValidation() throws Exception { + // Create batch with AGGREGATE mode + KvWriteBatch aggregateBatch = + createKvWriteBatchWithAggMode( + new TableBucket(DATA1_TABLE_ID_PK, 0), AggMode.AGGREGATE); + + // Append record with AGGREGATE mode should succeed + WriteRecord aggregateRecord = createWriteRecordWithAggMode(AggMode.AGGREGATE); + assertThat(aggregateBatch.tryAppend(aggregateRecord, newWriteCallback())).isTrue(); + + // Append record with OVERWRITE mode should fail + WriteRecord overwriteRecord = createWriteRecordWithAggMode(AggMode.OVERWRITE); + assertThatThrownBy(() -> aggregateBatch.tryAppend(overwriteRecord, newWriteCallback())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot mix records with different aggMode in the same batch") + .hasMessageContaining("Batch aggMode: AGGREGATE") + .hasMessageContaining("Record aggMode: OVERWRITE"); + } + + @Test + void testOverwriteModeBatch() throws Exception { + // Create batch with OVERWRITE mode + KvWriteBatch overwriteBatch = + createKvWriteBatchWithAggMode( + new TableBucket(DATA1_TABLE_ID_PK, 0), AggMode.OVERWRITE); + + // Verify batch has correct aggMode + assertThat(overwriteBatch.getAggMode()).isEqualTo(AggMode.OVERWRITE); + + // Append record with OVERWRITE mode should succeed + WriteRecord overwriteRecord = createWriteRecordWithAggMode(AggMode.OVERWRITE); + assertThat(overwriteBatch.tryAppend(overwriteRecord, newWriteCallback())).isTrue(); + + // Append record with AGGREGATE mode should fail + WriteRecord aggregateRecord = createWriteRecordWithAggMode(AggMode.AGGREGATE); + assertThatThrownBy(() -> overwriteBatch.tryAppend(aggregateRecord, newWriteCallback())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Cannot mix records with different aggMode in the same batch") + .hasMessageContaining("Batch aggMode: OVERWRITE") + .hasMessageContaining("Record aggMode: AGGREGATE"); + } + + @Test + void testDefaultAggModeIsAggregate() throws Exception { + KvWriteBatch batch = createKvWriteBatch(new TableBucket(DATA1_TABLE_ID_PK, 0)); + assertThat(batch.getAggMode()).isEqualTo(AggMode.AGGREGATE); + } + + private KvWriteBatch createKvWriteBatchWithAggMode(TableBucket tb, AggMode aggMode) + throws Exception { + PreAllocatedPagedOutputView outputView = + new PreAllocatedPagedOutputView( + Collections.singletonList(memoryPool.nextSegment())); + return new KvWriteBatch( + tb.getBucket(), + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + DATA1_TABLE_INFO_PK.getSchemaId(), + KvFormat.COMPACTED, + Integer.MAX_VALUE, + outputView, + null, + aggMode, + System.currentTimeMillis()); + } + + private WriteRecord createWriteRecordWithAggMode(AggMode aggMode) { + return WriteRecord.forUpsert( + DATA1_TABLE_INFO_PK, + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + row, + key, + key, + WriteFormat.COMPACTED_KV, + null, + aggMode); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java new file mode 100644 index 0000000000..4b18765eea --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.rpc.protocol; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Aggregation mode for write operations. + * + *

This enum controls how the server handles data aggregation when writing to tables with + * aggregation merge engine. + * + * @since 0.9 + */ +@PublicEvolving +public enum AggMode { + + /** + * Aggregate mode (default): Data is aggregated through server-side merge engine. + * + *

This is the normal mode for aggregation tables. When writing to a table with aggregation + * merge engine, the server will apply the configured aggregation functions (e.g., SUM, MAX, + * MIN) to merge the new values with existing values. + */ + AGGREGATE(0), + + /** + * Overwrite mode: Data directly overwrites target values, bypassing merge engine. + * + *

This mode is used for undo recovery operations to restore exact historical values. When in + * overwrite mode, the server will not apply any aggregation functions and will directly replace + * the existing values with the new values. + * + *

This is typically used internally by the Flink connector during failover recovery to + * restore the state to a previous checkpoint. + */ + OVERWRITE(1), + + /** + * Client-side local aggregation mode (future): Data is pre-aggregated on client side before + * being sent to server for final aggregation. + * + *

This mode reduces write amplification and network traffic by aggregating multiple updates + * to the same key on the client side before sending to the server. + * + *

Note: This mode is reserved for future implementation. + */ + LOCAL_AGGREGATE(2); + + private final int value; + + AggMode(int value) { + this.value = value; + } + + /** + * Returns the integer value for this aggregation mode. + * + *

This value matches the agg_mode field values in the proto definition. + * + * @return the integer value + */ + public int getValue() { + return value; + } + + /** + * Returns the proto value for this aggregation mode. + * + *

This is an alias for {@link #getValue()} for clarity when working with proto messages. + * + * @return the proto value + */ + public int getProtoValue() { + return value; + } + + /** + * Converts an integer value to an AggMode enum. + * + * @param value the integer value + * @return the corresponding AggMode, or AGGREGATE if the value is invalid + */ + public static AggMode fromValue(int value) { + switch (value) { + case 0: + return AGGREGATE; + case 1: + return OVERWRITE; + case 2: + return LOCAL_AGGREGATE; + default: + return AGGREGATE; + } + } + + /** + * Converts a proto value to an AggMode enum. + * + *

This is an alias for {@link #fromValue(int)} for clarity when working with proto messages. + * + * @param protoValue the proto value + * @return the corresponding AggMode, or AGGREGATE if the value is invalid + */ + public static AggMode fromProtoValue(int protoValue) { + return fromValue(protoValue); + } +} diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index d3b1edb968..d800549d86 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -219,6 +219,11 @@ message FetchLogResponse { repeated PbFetchLogRespForTable tables_resp = 1; } +// Aggregation mode constants for write operations (using int32 instead of enum for proto3 compatibility) +// AGG_MODE_AGGREGATE = 0: Data is aggregated through server-side merge engine (default) +// AGG_MODE_OVERWRITE = 1: Bypass merge engine, directly replace values (for undo recovery) +// AGG_MODE_LOCAL_AGGREGATE = 2: Client-side local aggregation (reserved for future implementation) + // put kv request and response message PutKvRequest { required int32 acks = 1; @@ -228,6 +233,9 @@ message PutKvRequest { // if empty, means write all columns repeated int32 target_columns = 4 [packed = true]; repeated PbPutKvReqForBucket buckets_req = 5; + // Aggregation mode for this request (see AGG_MODE_* constants above) + // 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet supported) + optional int32 agg_mode = 6; } message PutKvResponse { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 79ab8eb211..137c2e1ab7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -45,6 +45,7 @@ import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.row.arrow.ArrowWriterProvider; import org.apache.fluss.row.encode.ValueDecoder; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater; import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -115,6 +116,9 @@ public final class KvTablet { private final KvFormat kvFormat; // defines how to merge rows on the same primary key private final RowMerger rowMerger; + // Pre-created DefaultRowMerger for OVERWRITE mode (undo recovery scenarios) + // This avoids creating a new instance on every putAsLeader call + private final RowMerger overwriteRowMerger; private final ArrowCompressionInfo arrowCompressionInfo; private final AutoIncrementManager autoIncrementManager; @@ -166,6 +170,9 @@ private KvTablet( this.memorySegmentPool = memorySegmentPool; this.kvFormat = kvFormat; this.rowMerger = rowMerger; + // Pre-create DefaultRowMerger for OVERWRITE mode to avoid creating new instances + // on every putAsLeader call. Used for undo recovery scenarios. + this.overwriteRowMerger = new DefaultRowMerger(kvFormat, DeleteBehavior.ALLOW); this.arrowCompressionInfo = arrowCompressionInfo; this.schemaGetter = schemaGetter; this.changelogImage = changelogImage; @@ -273,6 +280,20 @@ public long getFlushedLogOffset() { return flushedLogOffset; } + /** + * Put the KvRecordBatch into the kv storage with default AGGREGATE mode. + * + *

This is a convenience method that calls {@link #putAsLeader(KvRecordBatch, int[], + * AggMode)} with {@link AggMode#AGGREGATE}. + * + * @param kvRecords the kv records to put into + * @param targetColumns the target columns to put, null if put all columns + */ + public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns) + throws Exception { + return putAsLeader(kvRecords, targetColumns, AggMode.AGGREGATE); + } + /** * Put the KvRecordBatch into the kv storage, and return the appended wal log info. * @@ -292,8 +313,10 @@ public long getFlushedLogOffset() { * * @param kvRecords the kv records to put into * @param targetColumns the target columns to put, null if put all columns + * @param aggMode the aggregation mode (AGGREGATE or OVERWRITE) */ - public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns) + public LogAppendInfo putAsLeader( + KvRecordBatch kvRecords, @Nullable int[] targetColumns, AggMode aggMode) throws Exception { return inWriteLock( kvLock, @@ -305,10 +328,17 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target short latestSchemaId = (short) schemaInfo.getSchemaId(); validateSchemaId(kvRecords.schemaId(), latestSchemaId); - // we only support ADD COLUMN, so targetColumns is fine to be used directly + // Determine the row merger based on aggMode: + // - AGGREGATE: Use the configured merge engine (rowMerger) + // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger + // to directly replace values (for undo recovery scenarios) + // We only support ADD COLUMN, so targetColumns is fine to be used directly. RowMerger currentMerger = - rowMerger.configureTargetColumns( - targetColumns, latestSchemaId, latestSchema); + (aggMode == AggMode.OVERWRITE) + ? overwriteRowMerger.configureTargetColumns( + targetColumns, latestSchemaId, latestSchema) + : rowMerger.configureTargetColumns( + targetColumns, latestSchemaId, latestSchema); AutoIncrementUpdater currentAutoIncrementUpdater = autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index ee158b7a4b..7079481cc1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -44,6 +44,7 @@ import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.SequenceIDCounter; import org.apache.fluss.server.coordinator.CoordinatorContext; @@ -940,7 +941,10 @@ public LogAppendInfo appendRecordsToFollower(MemoryLogRecords memoryLogRecords) } public LogAppendInfo putRecordsToLeader( - KvRecordBatch kvRecords, @Nullable int[] targetColumns, int requiredAcks) + KvRecordBatch kvRecords, + @Nullable int[] targetColumns, + AggMode aggMode, + int requiredAcks) throws Exception { return inReadLock( leaderIsrUpdateLock, @@ -958,7 +962,7 @@ public LogAppendInfo putRecordsToLeader( kv, "KvTablet for the replica to put kv records shouldn't be null."); LogAppendInfo logAppendInfo; try { - logAppendInfo = kv.putAsLeader(kvRecords, targetColumns); + logAppendInfo = kv.putAsLeader(kvRecords, targetColumns, aggMode); } catch (IOException e) { LOG.error("Error while putting records to {}", tableBucket, e); fatalErrorHandler.onFatalError(e); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 76e3ea1867..8faec3d090 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -55,6 +55,7 @@ import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse; import org.apache.fluss.rpc.messages.NotifyLakeTableOffsetResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.coordinator.CoordinatorContext; @@ -544,6 +545,27 @@ public void fetchLogRecords( params, bucketFetchInfo, logReadResults, userContext, responseCallback); } + /** + * Put kv records to leader replicas of the buckets with default AGGREGATE mode. + * + *

This is a convenience method that calls {@link #putRecordsToKv(int, int, Map, int[], + * AggMode, Consumer)} with {@link AggMode#AGGREGATE}. + */ + public void putRecordsToKv( + int timeoutMs, + int requiredAcks, + Map entriesPerBucket, + @Nullable int[] targetColumns, + Consumer> responseCallback) { + putRecordsToKv( + timeoutMs, + requiredAcks, + entriesPerBucket, + targetColumns, + AggMode.AGGREGATE, + responseCallback); + } + /** * Put kv records to leader replicas of the buckets, the kv data will write to kv tablet and the * response callback need to wait for the cdc log to be replicated to other replicas if needed. @@ -553,6 +575,7 @@ public void putRecordsToKv( int requiredAcks, Map entriesPerBucket, @Nullable int[] targetColumns, + AggMode aggMode, Consumer> responseCallback) { if (isRequiredAcksInvalid(requiredAcks)) { throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks); @@ -560,7 +583,7 @@ public void putRecordsToKv( long startTime = System.currentTimeMillis(); Map kvPutResult = - putToLocalKv(entriesPerBucket, targetColumns, requiredAcks); + putToLocalKv(entriesPerBucket, targetColumns, aggMode, requiredAcks); LOG.debug( "Put records to local kv storage and wait generate cdc log in {} ms", System.currentTimeMillis() - startTime); @@ -1027,6 +1050,7 @@ private Map appendToLocalLog( private Map putToLocalKv( Map entriesPerBucket, @Nullable int[] targetColumns, + AggMode aggMode, int requiredAcks) { Map putResultForBucketMap = new HashMap<>(); for (Map.Entry entry : entriesPerBucket.entrySet()) { @@ -1038,7 +1062,8 @@ private Map putToLocalKv( tableMetrics = replica.tableMetrics(); tableMetrics.totalPutKvRequests().inc(); LogAppendInfo appendInfo = - replica.putRecordsToLeader(entry.getValue(), targetColumns, requiredAcks); + replica.putRecordsToLeader( + entry.getValue(), targetColumns, aggMode, requiredAcks); LOG.trace( "Written to local kv for {}, and the cdc log beginning at offset {} and ending at offset {}", tb, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 992b963334..4c48ac1317 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -60,6 +60,7 @@ import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.messages.UpdateMetadataResponse; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.security.acl.OperationType; @@ -224,12 +225,16 @@ public CompletableFuture putKv(PutKvRequest request) { authorizeTable(WRITE, request.getTableId()); Map putKvData = getPutKvData(request); + // Get aggMode from request, default to AGGREGATE if not set + AggMode aggMode = + request.hasAggMode() ? AggMode.fromValue(request.getAggMode()) : AggMode.AGGREGATE; CompletableFuture response = new CompletableFuture<>(); replicaManager.putRecordsToKv( request.getTimeoutMs(), request.getAcks(), putKvData, getTargetColumns(request), + aggMode, bucketResponse -> response.complete(makePutKvResponse(bucketResponse))); return response; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java new file mode 100644 index 0000000000..7e2be2802c --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.memory.TestingMemorySegmentPool; +import org.apache.fluss.metadata.AggFunctions; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.KvRecord; +import org.apache.fluss.record.KvRecordBatch; +import org.apache.fluss.record.KvRecordTestUtils; +import org.apache.fluss.record.LogRecords; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.rpc.protocol.AggMode; +import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; +import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory; +import org.apache.fluss.server.kv.rowmerger.RowMerger; +import org.apache.fluss.server.log.FetchIsolation; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.log.LogTestUtils; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; +import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords; + +/** + * Tests for {@link KvTablet} with {@link AggMode} support. + * + *

These tests verify that OVERWRITE mode correctly bypasses the merge engine and directly + * replaces values, which is essential for undo recovery scenarios. + */ +class KvTabletAggModeTest { + + private static final short SCHEMA_ID = 1; + + private final Configuration conf = new Configuration(); + private final KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = + KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID); + + private @TempDir File tempLogDir; + private @TempDir File tmpKvDir; + + private TestingSchemaGetter schemaGetter; + private LogTablet logTablet; + private KvTablet kvTablet; + + // Schema with aggregation functions for testing + private static final Schema AGG_SCHEMA = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("count", DataTypes.BIGINT(), AggFunctions.SUM()) + .column("max_val", DataTypes.INT(), AggFunctions.MAX()) + .column("name", DataTypes.STRING(), AggFunctions.LAST_VALUE()) + .primaryKey("id") + .build(); + + private static final RowType AGG_ROW_TYPE = AGG_SCHEMA.getRowType(); + + private final KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(AGG_ROW_TYPE); + + @BeforeEach + void setUp() throws Exception { + Map config = new HashMap<>(); + config.put("table.merge-engine", "aggregation"); + + TablePath tablePath = TablePath.of("testDb", "test_agg_mode"); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath); + schemaGetter = new TestingSchemaGetter(new SchemaInfo(AGG_SCHEMA, SCHEMA_ID)); + + File logTabletDir = + LogTestUtils.makeRandomLogTabletDir( + tempLogDir, + physicalTablePath.getDatabaseName(), + 0L, + physicalTablePath.getTableName()); + logTablet = + LogTablet.create( + physicalTablePath, + logTabletDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + 0, + new FlussScheduler(1), + LogFormat.ARROW, + 1, + true, + SystemClock.getInstance(), + true); + + TableBucket tableBucket = logTablet.getTableBucket(); + TableConfig tableConf = new TableConfig(Configuration.fromMap(config)); + RowMerger rowMerger = RowMerger.create(tableConf, KvFormat.COMPACTED, schemaGetter); + AutoIncrementManager autoIncrementManager = + new AutoIncrementManager( + schemaGetter, + tablePath, + new TableConfig(new Configuration()), + new TestingSequenceGeneratorFactory()); + + kvTablet = + KvTablet.create( + physicalTablePath, + tableBucket, + logTablet, + tmpKvDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + new RootAllocator(Long.MAX_VALUE), + new TestingMemorySegmentPool(10 * 1024), + KvFormat.COMPACTED, + rowMerger, + DEFAULT_COMPRESSION, + schemaGetter, + tableConf.getChangelogImage(), + KvManager.getDefaultRateLimiter(), + autoIncrementManager); + } + + @AfterEach + void tearDown() throws Exception { + if (kvTablet != null) { + kvTablet.close(); + } + if (logTablet != null) { + logTablet.close(); + } + } + + // ==================== AGGREGATE Mode Tests ==================== + + @Test + void testAggregateModeAppliesMergeEngine() throws Exception { + // Insert initial record: id=1, count=10, max_val=100, name="Alice" + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Update with AGGREGATE mode: count should be summed, max_val should take max + // id=1, count=5, max_val=150, name="Bob" + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, 150, "Bob"})); + kvTablet.putAsLeader(batch2, null, AggMode.AGGREGATE); + + // Verify CDC log shows aggregated values + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, // before + new Object[] { + 1, 15L, 150, "Bob" + } // after: count=10+5, max=max(100,150) + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + // ==================== OVERWRITE Mode Tests ==================== + + @Test + void testOverwriteModeBypassesMergeEngine() throws Exception { + // Insert initial record with AGGREGATE mode + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Update with OVERWRITE mode: values should be directly replaced, not aggregated + // id=1, count=5, max_val=50, name="Bob" + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 5L, 50, "Bob"})); + kvTablet.putAsLeader(batch2, null, AggMode.OVERWRITE); + + // Verify CDC log shows directly replaced values (not aggregated) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, // before + new Object[] { + 1, 5L, 50, "Bob" + } // after: directly replaced, NOT aggregated + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + + // Key assertion: count=5 (not 15), max_val=50 (not 100) + // This proves OVERWRITE bypassed the merge engine + } + + @Test + void testOverwriteModeForUndoRecoveryScenario() throws Exception { + // Simulate a typical undo recovery scenario: + // 1. Initial state: id=1, count=100, max_val=500, name="Original" + // 2. After some operations: id=1, count=150, max_val=600, name="Updated" + // 3. Undo recovery needs to restore to: id=1, count=100, max_val=500, name="Original" + + // Step 1: Insert initial record + KvRecordBatch initialBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 100L, 500, "Original"})); + kvTablet.putAsLeader(initialBatch, null, AggMode.AGGREGATE); + + // Step 2: Simulate some aggregation operations + KvRecordBatch updateBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 50L, 600, "Updated"})); + kvTablet.putAsLeader(updateBatch, null, AggMode.AGGREGATE); + + long beforeUndoOffset = logTablet.localLogEndOffset(); + + // Step 3: Undo recovery - restore to original state using OVERWRITE mode + KvRecordBatch undoBatch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 100L, 500, "Original"})); + kvTablet.putAsLeader(undoBatch, null, AggMode.OVERWRITE); + + // Verify the undo operation produced correct CDC log + LogRecords actualLogRecords = readLogRecords(beforeUndoOffset); + MemoryLogRecords expectedLogs = + logRecords( + beforeUndoOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + // Before: the aggregated state (count=150, max_val=600) + new Object[] {1, 150L, 600, "Updated"}, + // After: restored to original (count=100, max_val=500) + new Object[] {1, 100L, 500, "Original"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithNewKey() throws Exception { + // OVERWRITE mode with a new key should behave like INSERT + KvRecordBatch batch = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch, null, AggMode.OVERWRITE); + + LogRecords actualLogRecords = readLogRecords(0); + MemoryLogRecords expectedLogs = + logRecords( + 0L, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {1, 10L, 100, "Alice"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithDelete() throws Exception { + // Insert initial record + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Delete with OVERWRITE mode + KvRecordBatch deleteBatch = + kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("k1".getBytes(), null)); + kvTablet.putAsLeader(deleteBatch, null, AggMode.OVERWRITE); + + // Verify DELETE is produced + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Collections.singletonList(ChangeType.DELETE), + Collections.singletonList(new Object[] {1, 10L, 100, "Alice"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testMixedAggModeOperations() throws Exception { + // Test interleaved AGGREGATE and OVERWRITE operations + + // 1. Insert with AGGREGATE + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "v1"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + // 2. Update with AGGREGATE (should aggregate) + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 5L, 150, "v2"})); + kvTablet.putAsLeader(batch2, null, AggMode.AGGREGATE); + + long afterAggOffset = logTablet.localLogEndOffset(); + + // 3. Overwrite with specific value + KvRecordBatch batch3 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 20L, 80, "v3"})); + kvTablet.putAsLeader(batch3, null, AggMode.OVERWRITE); + + long afterOverwriteOffset = logTablet.localLogEndOffset(); + + // 4. Continue with AGGREGATE (should aggregate from overwritten value) + KvRecordBatch batch4 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 200, "v4"})); + kvTablet.putAsLeader(batch4, null, AggMode.AGGREGATE); + + // Verify the final aggregation is based on overwritten value + LogRecords actualLogRecords = readLogRecords(afterOverwriteOffset); + MemoryLogRecords expectedLogs = + logRecords( + afterOverwriteOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + // Before: overwritten value + new Object[] {1, 20L, 80, "v3"}, + // After: aggregated from overwritten value + // count=20+10=30, max_val=max(80,200)=200 + new Object[] {1, 30L, 200, "v4"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithPartialUpdate() throws Exception { + // Insert initial record + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Partial update with OVERWRITE mode (only update id and count columns) + int[] targetColumns = new int[] {0, 1}; // id and count + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, null, null})); + kvTablet.putAsLeader(batch2, targetColumns, AggMode.OVERWRITE); + + // Verify partial update with OVERWRITE: count should be replaced (not aggregated) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, // before + new Object[] { + 1, 5L, 100, "Alice" + } // after: count replaced, others unchanged + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + @Test + void testOverwriteModeWithMultipleKeys() throws Exception { + // Insert multiple records + List initialRecords = + Arrays.asList( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"}), + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 20L, 200, "Bob"})); + KvRecordBatch batch1 = kvRecordBatchFactory.ofRecords(initialRecords); + kvTablet.putAsLeader(batch1, null, AggMode.AGGREGATE); + + long endOffset = logTablet.localLogEndOffset(); + + // Overwrite multiple keys in single batch + List overwriteRecords = + Arrays.asList( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, 50, "Alice2"}), + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 8L, 80, "Bob2"})); + KvRecordBatch batch2 = kvRecordBatchFactory.ofRecords(overwriteRecords); + kvTablet.putAsLeader(batch2, null, AggMode.OVERWRITE); + + // Verify both keys are overwritten (not aggregated) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList( + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER, + ChangeType.UPDATE_BEFORE, + ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, + new Object[] {1, 5L, 50, "Alice2"}, // k1 overwritten + new Object[] {2, 20L, 200, "Bob"}, + new Object[] {2, 8L, 80, "Bob2"} // k2 overwritten + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + // ==================== Default AggMode Tests ==================== + + @Test + void testDefaultAggModeIsAggregate() throws Exception { + // Insert initial record using default (no aggMode parameter) + KvRecordBatch batch1 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 10L, 100, "Alice"})); + kvTablet.putAsLeader(batch1, null); // Using overload without aggMode + + long endOffset = logTablet.localLogEndOffset(); + + // Update using default (should aggregate) + KvRecordBatch batch2 = + kvRecordBatchFactory.ofRecords( + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 5L, 150, "Bob"})); + kvTablet.putAsLeader(batch2, null); // Using overload without aggMode + + // Verify aggregation happened (proving default is AGGREGATE) + LogRecords actualLogRecords = readLogRecords(endOffset); + MemoryLogRecords expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 10L, 100, "Alice"}, + new Object[] { + 1, 15L, 150, "Bob" + } // count=10+5=15, max=max(100,150)=150 + )); + + assertThatLogRecords(actualLogRecords) + .withSchema(AGG_ROW_TYPE) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + + // ==================== Helper Methods ==================== + + private LogRecords readLogRecords(long startOffset) throws Exception { + return logTablet + .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, false, null) + .getRecords(); + } + + private MemoryLogRecords logRecords( + long baseOffset, List changeTypes, List rows) throws Exception { + return createBasicMemoryLogRecords( + AGG_ROW_TYPE, + SCHEMA_ID, + baseOffset, + -1L, + CURRENT_LOG_MAGIC_VALUE, + NO_WRITER_ID, + NO_BATCH_SEQUENCE, + changeTypes, + rows, + LogFormat.ARROW, + DEFAULT_COMPRESSION); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index 8bdd46db66..276dfc927e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -32,6 +32,7 @@ import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.record.ProjectionPushdownCache; +import org.apache.fluss.rpc.protocol.AggMode; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -868,7 +869,9 @@ private static MemoryLogRecords logRecords( private LogAppendInfo putRecordsToLeader( Replica replica, KvRecordBatch kvRecords, int[] targetColumns) throws Exception { - LogAppendInfo logAppendInfo = replica.putRecordsToLeader(kvRecords, targetColumns, 0); + // Use AGGREGATE mode as default for tests + LogAppendInfo logAppendInfo = + replica.putRecordsToLeader(kvRecords, targetColumns, AggMode.AGGREGATE, 0); KvTablet kvTablet = checkNotNull(replica.getKvTablet()); // flush to make data visible kvTablet.flush(replica.getLocalLogEndOffset(), NOPErrorHandler.INSTANCE);