{@link Upsert} objects are immutable and can be shared between threads. Refinement methods,
- * like {@link #partialUpdate}, create new Upsert instances.
+ * like {@link #partialUpdate(int[])} and {@link #aggregationMode(AggMode)}, create new Upsert
+ * instances.
*
* @since 0.6
*/
@@ -56,9 +58,44 @@ public interface Upsert {
*/
Upsert partialUpdate(String... targetColumnNames);
+ /**
+ * Specify aggregation mode for the UpsertWriter and returns a new Upsert instance.
+ *
+ *
This method controls how the created UpsertWriter handles data aggregation:
+ *
+ *
+ *
{@link AggMode#AGGREGATE} (default): Data is aggregated through server-side merge
+ * engine
+ *
+ */
+ private final AggMode aggMode;
+
private WriteRecord(
TableInfo tableInfo,
PhysicalTablePath physicalTablePath,
@@ -174,7 +230,8 @@ private WriteRecord(
@Nullable InternalRow row,
WriteFormat writeFormat,
@Nullable int[] targetColumns,
- int estimatedSizeInBytes) {
+ int estimatedSizeInBytes,
+ AggMode aggMode) {
this.tableInfo = tableInfo;
this.physicalTablePath = physicalTablePath;
this.key = key;
@@ -183,6 +240,7 @@ private WriteRecord(
this.writeFormat = writeFormat;
this.targetColumns = targetColumns;
this.estimatedSizeInBytes = estimatedSizeInBytes;
+ this.aggMode = aggMode;
}
public PhysicalTablePath getPhysicalTablePath() {
@@ -214,6 +272,15 @@ public WriteFormat getWriteFormat() {
return writeFormat;
}
+ /**
+ * Get the aggregation mode for this record.
+ *
+ * @return the aggregation mode
+ */
+ public AggMode getAggMode() {
+ return aggMode;
+ }
+
/**
* Get the estimated size in bytes of the record with batch header.
*
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
new file mode 100644
index 0000000000..1c743fa737
--- /dev/null
+++ b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.client.write.KvWriteBatch;
+import org.apache.fluss.client.write.ReadyWriteBatch;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.memory.PreAllocatedPagedOutputView;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.rpc.messages.PutKvRequest;
+import org.apache.fluss.rpc.protocol.AggMode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link ClientRpcMessageUtils}.
+ *
+ *
Focuses on AggMode consistency validation in makePutKvRequest.
+ */
+class ClientRpcMessageUtilsTest {
+
+ private static final long TABLE_ID = DATA1_TABLE_ID_PK;
+ private static final int ACKS = 1;
+ private static final int TIMEOUT_MS = 30000;
+
+ @Test
+ void testMakePutKvRequestWithConsistentAggMode() throws Exception {
+ // Create two batches with same aggMode (AGGREGATE)
+ KvWriteBatch batch1 = createKvWriteBatch(0, AggMode.AGGREGATE);
+ KvWriteBatch batch2 = createKvWriteBatch(1, AggMode.AGGREGATE);
+
+ List readyBatches =
+ Arrays.asList(
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch1),
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), batch2));
+
+ // Should succeed without exception
+ PutKvRequest request =
+ ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches);
+
+ // Verify aggMode is set correctly in request
+ assertThat(request.hasAggMode()).isTrue();
+ assertThat(request.getAggMode()).isEqualTo(AggMode.AGGREGATE.getProtoValue());
+ }
+
+ @Test
+ void testMakePutKvRequestWithOverwriteMode() throws Exception {
+ // Create batches with OVERWRITE mode
+ KvWriteBatch batch1 = createKvWriteBatch(0, AggMode.OVERWRITE);
+ KvWriteBatch batch2 = createKvWriteBatch(1, AggMode.OVERWRITE);
+
+ List readyBatches =
+ Arrays.asList(
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch1),
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), batch2));
+
+ PutKvRequest request =
+ ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches);
+
+ // Verify OVERWRITE mode is set correctly
+ assertThat(request.hasAggMode()).isTrue();
+ assertThat(request.getAggMode()).isEqualTo(AggMode.OVERWRITE.getProtoValue());
+ }
+
+ @Test
+ void testMakePutKvRequestWithInconsistentAggMode() throws Exception {
+ // Create batches with different aggModes
+ KvWriteBatch aggregateBatch = createKvWriteBatch(0, AggMode.AGGREGATE);
+ KvWriteBatch overwriteBatch = createKvWriteBatch(1, AggMode.OVERWRITE);
+
+ List readyBatches =
+ Arrays.asList(
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), aggregateBatch),
+ new ReadyWriteBatch(new TableBucket(TABLE_ID, 1), overwriteBatch));
+
+ // Should throw exception due to inconsistent aggMode
+ assertThatThrownBy(
+ () ->
+ ClientRpcMessageUtils.makePutKvRequest(
+ TABLE_ID, ACKS, TIMEOUT_MS, readyBatches))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "All the write batches to make put kv request should have the same aggMode")
+ .hasMessageContaining("AGGREGATE")
+ .hasMessageContaining("OVERWRITE");
+ }
+
+ @Test
+ void testMakePutKvRequestWithSingleBatch() throws Exception {
+ // Single batch should always succeed
+ KvWriteBatch batch = createKvWriteBatch(0, AggMode.OVERWRITE);
+
+ List readyBatches =
+ Collections.singletonList(new ReadyWriteBatch(new TableBucket(TABLE_ID, 0), batch));
+
+ PutKvRequest request =
+ ClientRpcMessageUtils.makePutKvRequest(TABLE_ID, ACKS, TIMEOUT_MS, readyBatches);
+
+ assertThat(request.getAggMode()).isEqualTo(AggMode.OVERWRITE.getProtoValue());
+ }
+
+ private KvWriteBatch createKvWriteBatch(int bucketId, AggMode aggMode) throws Exception {
+ MemorySegment segment = MemorySegment.allocateHeapMemory(1024);
+ PreAllocatedPagedOutputView outputView =
+ new PreAllocatedPagedOutputView(Collections.singletonList(segment));
+ return new KvWriteBatch(
+ bucketId,
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ DATA1_TABLE_INFO_PK.getSchemaId(),
+ KvFormat.COMPACTED,
+ Integer.MAX_VALUE,
+ outputView,
+ null,
+ aggMode,
+ System.currentTimeMillis());
+ }
+}
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index c62c7b0297..f58fc0c979 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -33,6 +33,7 @@
import org.apache.fluss.record.TestingSchemaGetter;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.encode.CompactedKeyEncoder;
+import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.types.DataType;
import org.junit.jupiter.api.BeforeEach;
@@ -227,6 +228,7 @@ private KvWriteBatch createKvWriteBatch(
writeLimit,
outputView,
null,
+ AggMode.AGGREGATE,
System.currentTimeMillis());
}
@@ -254,4 +256,84 @@ private void assertDefaultKvRecordBatchEquals(DefaultKvRecordBatch recordBatch)
assertThat(toArray(kvRecord.getKey())).isEqualTo(key);
assertThat(kvRecord.getRow()).isEqualTo(row);
}
+
+ // ==================== AggMode Tests ====================
+
+ @Test
+ void testAggModeConsistencyValidation() throws Exception {
+ // Create batch with AGGREGATE mode
+ KvWriteBatch aggregateBatch =
+ createKvWriteBatchWithAggMode(
+ new TableBucket(DATA1_TABLE_ID_PK, 0), AggMode.AGGREGATE);
+
+ // Append record with AGGREGATE mode should succeed
+ WriteRecord aggregateRecord = createWriteRecordWithAggMode(AggMode.AGGREGATE);
+ assertThat(aggregateBatch.tryAppend(aggregateRecord, newWriteCallback())).isTrue();
+
+ // Append record with OVERWRITE mode should fail
+ WriteRecord overwriteRecord = createWriteRecordWithAggMode(AggMode.OVERWRITE);
+ assertThatThrownBy(() -> aggregateBatch.tryAppend(overwriteRecord, newWriteCallback()))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Cannot mix records with different aggMode in the same batch")
+ .hasMessageContaining("Batch aggMode: AGGREGATE")
+ .hasMessageContaining("Record aggMode: OVERWRITE");
+ }
+
+ @Test
+ void testOverwriteModeBatch() throws Exception {
+ // Create batch with OVERWRITE mode
+ KvWriteBatch overwriteBatch =
+ createKvWriteBatchWithAggMode(
+ new TableBucket(DATA1_TABLE_ID_PK, 0), AggMode.OVERWRITE);
+
+ // Verify batch has correct aggMode
+ assertThat(overwriteBatch.getAggMode()).isEqualTo(AggMode.OVERWRITE);
+
+ // Append record with OVERWRITE mode should succeed
+ WriteRecord overwriteRecord = createWriteRecordWithAggMode(AggMode.OVERWRITE);
+ assertThat(overwriteBatch.tryAppend(overwriteRecord, newWriteCallback())).isTrue();
+
+ // Append record with AGGREGATE mode should fail
+ WriteRecord aggregateRecord = createWriteRecordWithAggMode(AggMode.AGGREGATE);
+ assertThatThrownBy(() -> overwriteBatch.tryAppend(aggregateRecord, newWriteCallback()))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Cannot mix records with different aggMode in the same batch")
+ .hasMessageContaining("Batch aggMode: OVERWRITE")
+ .hasMessageContaining("Record aggMode: AGGREGATE");
+ }
+
+ @Test
+ void testDefaultAggModeIsAggregate() throws Exception {
+ KvWriteBatch batch = createKvWriteBatch(new TableBucket(DATA1_TABLE_ID_PK, 0));
+ assertThat(batch.getAggMode()).isEqualTo(AggMode.AGGREGATE);
+ }
+
+ private KvWriteBatch createKvWriteBatchWithAggMode(TableBucket tb, AggMode aggMode)
+ throws Exception {
+ PreAllocatedPagedOutputView outputView =
+ new PreAllocatedPagedOutputView(
+ Collections.singletonList(memoryPool.nextSegment()));
+ return new KvWriteBatch(
+ tb.getBucket(),
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ DATA1_TABLE_INFO_PK.getSchemaId(),
+ KvFormat.COMPACTED,
+ Integer.MAX_VALUE,
+ outputView,
+ null,
+ aggMode,
+ System.currentTimeMillis());
+ }
+
+ private WriteRecord createWriteRecordWithAggMode(AggMode aggMode) {
+ return WriteRecord.forUpsert(
+ DATA1_TABLE_INFO_PK,
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ row,
+ key,
+ key,
+ WriteFormat.COMPACTED_KV,
+ null,
+ aggMode);
+ }
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java
new file mode 100644
index 0000000000..4b18765eea
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/rpc/protocol/AggMode.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.protocol;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Aggregation mode for write operations.
+ *
+ *
This enum controls how the server handles data aggregation when writing to tables with
+ * aggregation merge engine.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public enum AggMode {
+
+ /**
+ * Aggregate mode (default): Data is aggregated through server-side merge engine.
+ *
+ *
This is the normal mode for aggregation tables. When writing to a table with aggregation
+ * merge engine, the server will apply the configured aggregation functions (e.g., SUM, MAX,
+ * MIN) to merge the new values with existing values.
+ */
+ AGGREGATE(0),
+
+ /**
+ * Overwrite mode: Data directly overwrites target values, bypassing merge engine.
+ *
+ *
This mode is used for undo recovery operations to restore exact historical values. When in
+ * overwrite mode, the server will not apply any aggregation functions and will directly replace
+ * the existing values with the new values.
+ *
+ *
This is typically used internally by the Flink connector during failover recovery to
+ * restore the state to a previous checkpoint.
+ */
+ OVERWRITE(1),
+
+ /**
+ * Client-side local aggregation mode (future): Data is pre-aggregated on client side before
+ * being sent to server for final aggregation.
+ *
+ *
This mode reduces write amplification and network traffic by aggregating multiple updates
+ * to the same key on the client side before sending to the server.
+ *
+ *
Note: This mode is reserved for future implementation.
+ */
+ LOCAL_AGGREGATE(2);
+
+ private final int value;
+
+ AggMode(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns the integer value for this aggregation mode.
+ *
+ *
This value matches the agg_mode field values in the proto definition.
+ *
+ * @return the integer value
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Returns the proto value for this aggregation mode.
+ *
+ *
This is an alias for {@link #getValue()} for clarity when working with proto messages.
+ *
+ * @return the proto value
+ */
+ public int getProtoValue() {
+ return value;
+ }
+
+ /**
+ * Converts an integer value to an AggMode enum.
+ *
+ * @param value the integer value
+ * @return the corresponding AggMode, or AGGREGATE if the value is invalid
+ */
+ public static AggMode fromValue(int value) {
+ switch (value) {
+ case 0:
+ return AGGREGATE;
+ case 1:
+ return OVERWRITE;
+ case 2:
+ return LOCAL_AGGREGATE;
+ default:
+ return AGGREGATE;
+ }
+ }
+
+ /**
+ * Converts a proto value to an AggMode enum.
+ *
+ *
This is an alias for {@link #fromValue(int)} for clarity when working with proto messages.
+ *
+ * @param protoValue the proto value
+ * @return the corresponding AggMode, or AGGREGATE if the value is invalid
+ */
+ public static AggMode fromProtoValue(int protoValue) {
+ return fromValue(protoValue);
+ }
+}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto
index d3b1edb968..d800549d86 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -219,6 +219,11 @@ message FetchLogResponse {
repeated PbFetchLogRespForTable tables_resp = 1;
}
+// Aggregation mode constants for write operations (using int32 instead of enum for proto3 compatibility)
+// AGG_MODE_AGGREGATE = 0: Data is aggregated through server-side merge engine (default)
+// AGG_MODE_OVERWRITE = 1: Bypass merge engine, directly replace values (for undo recovery)
+// AGG_MODE_LOCAL_AGGREGATE = 2: Client-side local aggregation (reserved for future implementation)
+
// put kv request and response
message PutKvRequest {
required int32 acks = 1;
@@ -228,6 +233,9 @@ message PutKvRequest {
// if empty, means write all columns
repeated int32 target_columns = 4 [packed = true];
repeated PbPutKvReqForBucket buckets_req = 5;
+ // Aggregation mode for this request (see AGG_MODE_* constants above)
+ // 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet supported)
+ optional int32 agg_mode = 6;
}
message PutKvResponse {
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 79ab8eb211..137c2e1ab7 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -45,6 +45,7 @@
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.ArrowWriterProvider;
import org.apache.fluss.row.encode.ValueDecoder;
+import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
import org.apache.fluss.server.kv.autoinc.AutoIncrementUpdater;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer;
@@ -115,6 +116,9 @@ public final class KvTablet {
private final KvFormat kvFormat;
// defines how to merge rows on the same primary key
private final RowMerger rowMerger;
+ // Pre-created DefaultRowMerger for OVERWRITE mode (undo recovery scenarios)
+ // This avoids creating a new instance on every putAsLeader call
+ private final RowMerger overwriteRowMerger;
private final ArrowCompressionInfo arrowCompressionInfo;
private final AutoIncrementManager autoIncrementManager;
@@ -166,6 +170,9 @@ private KvTablet(
this.memorySegmentPool = memorySegmentPool;
this.kvFormat = kvFormat;
this.rowMerger = rowMerger;
+ // Pre-create DefaultRowMerger for OVERWRITE mode to avoid creating new instances
+ // on every putAsLeader call. Used for undo recovery scenarios.
+ this.overwriteRowMerger = new DefaultRowMerger(kvFormat, DeleteBehavior.ALLOW);
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
this.changelogImage = changelogImage;
@@ -273,6 +280,20 @@ public long getFlushedLogOffset() {
return flushedLogOffset;
}
+ /**
+ * Put the KvRecordBatch into the kv storage with default AGGREGATE mode.
+ *
+ *
This is a convenience method that calls {@link #putAsLeader(KvRecordBatch, int[],
+ * AggMode)} with {@link AggMode#AGGREGATE}.
+ *
+ * @param kvRecords the kv records to put into
+ * @param targetColumns the target columns to put, null if put all columns
+ */
+ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns)
+ throws Exception {
+ return putAsLeader(kvRecords, targetColumns, AggMode.AGGREGATE);
+ }
+
/**
* Put the KvRecordBatch into the kv storage, and return the appended wal log info.
*
@@ -292,8 +313,10 @@ public long getFlushedLogOffset() {
*
* @param kvRecords the kv records to put into
* @param targetColumns the target columns to put, null if put all columns
+ * @param aggMode the aggregation mode (AGGREGATE or OVERWRITE)
*/
- public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns)
+ public LogAppendInfo putAsLeader(
+ KvRecordBatch kvRecords, @Nullable int[] targetColumns, AggMode aggMode)
throws Exception {
return inWriteLock(
kvLock,
@@ -305,10 +328,17 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
short latestSchemaId = (short) schemaInfo.getSchemaId();
validateSchemaId(kvRecords.schemaId(), latestSchemaId);
- // we only support ADD COLUMN, so targetColumns is fine to be used directly
+ // Determine the row merger based on aggMode:
+ // - AGGREGATE: Use the configured merge engine (rowMerger)
+ // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger
+ // to directly replace values (for undo recovery scenarios)
+ // We only support ADD COLUMN, so targetColumns is fine to be used directly.
RowMerger currentMerger =
- rowMerger.configureTargetColumns(
- targetColumns, latestSchemaId, latestSchema);
+ (aggMode == AggMode.OVERWRITE)
+ ? overwriteRowMerger.configureTargetColumns(
+ targetColumns, latestSchemaId, latestSchema)
+ : rowMerger.configureTargetColumns(
+ targetColumns, latestSchemaId, latestSchema);
AutoIncrementUpdater currentAutoIncrementUpdater =
autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId);
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index ee158b7a4b..7079481cc1 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -44,6 +44,7 @@
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.LogRecords;
import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.SequenceIDCounter;
import org.apache.fluss.server.coordinator.CoordinatorContext;
@@ -940,7 +941,10 @@ public LogAppendInfo appendRecordsToFollower(MemoryLogRecords memoryLogRecords)
}
public LogAppendInfo putRecordsToLeader(
- KvRecordBatch kvRecords, @Nullable int[] targetColumns, int requiredAcks)
+ KvRecordBatch kvRecords,
+ @Nullable int[] targetColumns,
+ AggMode aggMode,
+ int requiredAcks)
throws Exception {
return inReadLock(
leaderIsrUpdateLock,
@@ -958,7 +962,7 @@ public LogAppendInfo putRecordsToLeader(
kv, "KvTablet for the replica to put kv records shouldn't be null.");
LogAppendInfo logAppendInfo;
try {
- logAppendInfo = kv.putAsLeader(kvRecords, targetColumns);
+ logAppendInfo = kv.putAsLeader(kvRecords, targetColumns, aggMode);
} catch (IOException e) {
LOG.error("Error while putting records to {}", tableBucket, e);
fatalErrorHandler.onFatalError(e);
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 76e3ea1867..8faec3d090 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -55,6 +55,7 @@
import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse;
import org.apache.fluss.rpc.messages.NotifyLakeTableOffsetResponse;
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse;
+import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.coordinator.CoordinatorContext;
@@ -544,6 +545,27 @@ public void fetchLogRecords(
params, bucketFetchInfo, logReadResults, userContext, responseCallback);
}
+ /**
+ * Put kv records to leader replicas of the buckets with default AGGREGATE mode.
+ *
+ *
This is a convenience method that calls {@link #putRecordsToKv(int, int, Map, int[],
+ * AggMode, Consumer)} with {@link AggMode#AGGREGATE}.
+ */
+ public void putRecordsToKv(
+ int timeoutMs,
+ int requiredAcks,
+ Map entriesPerBucket,
+ @Nullable int[] targetColumns,
+ Consumer> responseCallback) {
+ putRecordsToKv(
+ timeoutMs,
+ requiredAcks,
+ entriesPerBucket,
+ targetColumns,
+ AggMode.AGGREGATE,
+ responseCallback);
+ }
+
/**
* Put kv records to leader replicas of the buckets, the kv data will write to kv tablet and the
* response callback need to wait for the cdc log to be replicated to other replicas if needed.
@@ -553,6 +575,7 @@ public void putRecordsToKv(
int requiredAcks,
Map entriesPerBucket,
@Nullable int[] targetColumns,
+ AggMode aggMode,
Consumer> responseCallback) {
if (isRequiredAcksInvalid(requiredAcks)) {
throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks);
@@ -560,7 +583,7 @@ public void putRecordsToKv(
long startTime = System.currentTimeMillis();
Map kvPutResult =
- putToLocalKv(entriesPerBucket, targetColumns, requiredAcks);
+ putToLocalKv(entriesPerBucket, targetColumns, aggMode, requiredAcks);
LOG.debug(
"Put records to local kv storage and wait generate cdc log in {} ms",
System.currentTimeMillis() - startTime);
@@ -1027,6 +1050,7 @@ private Map appendToLocalLog(
private Map putToLocalKv(
Map entriesPerBucket,
@Nullable int[] targetColumns,
+ AggMode aggMode,
int requiredAcks) {
Map putResultForBucketMap = new HashMap<>();
for (Map.Entry entry : entriesPerBucket.entrySet()) {
@@ -1038,7 +1062,8 @@ private Map putToLocalKv(
tableMetrics = replica.tableMetrics();
tableMetrics.totalPutKvRequests().inc();
LogAppendInfo appendInfo =
- replica.putRecordsToLeader(entry.getValue(), targetColumns, requiredAcks);
+ replica.putRecordsToLeader(
+ entry.getValue(), targetColumns, aggMode, requiredAcks);
LOG.trace(
"Written to local kv for {}, and the cdc log beginning at offset {} and ending at offset {}",
tb,
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 992b963334..4c48ac1317 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -60,6 +60,7 @@
import org.apache.fluss.rpc.messages.StopReplicaResponse;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataResponse;
+import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.security.acl.OperationType;
@@ -224,12 +225,16 @@ public CompletableFuture putKv(PutKvRequest request) {
authorizeTable(WRITE, request.getTableId());
Map putKvData = getPutKvData(request);
+ // Get aggMode from request, default to AGGREGATE if not set
+ AggMode aggMode =
+ request.hasAggMode() ? AggMode.fromValue(request.getAggMode()) : AggMode.AGGREGATE;
CompletableFuture response = new CompletableFuture<>();
replicaManager.putRecordsToKv(
request.getTimeoutMs(),
request.getAcks(),
putKvData,
getTargetColumns(request),
+ aggMode,
bucketResponse -> response.complete(makePutKvResponse(bucketResponse)));
return response;
}
diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java
new file mode 100644
index 0000000000..7e2be2802c
--- /dev/null
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletAggModeTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaInfo;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.KvRecord;
+import org.apache.fluss.record.KvRecordBatch;
+import org.apache.fluss.record.KvRecordTestUtils;
+import org.apache.fluss.record.LogRecords;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.TestingSchemaGetter;
+import org.apache.fluss.rpc.protocol.AggMode;
+import org.apache.fluss.server.kv.autoinc.AutoIncrementManager;
+import org.apache.fluss.server.kv.autoinc.TestingSequenceGeneratorFactory;
+import org.apache.fluss.server.kv.rowmerger.RowMerger;
+import org.apache.fluss.server.log.FetchIsolation;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;
+
+/**
+ * Tests for {@link KvTablet} with {@link AggMode} support.
+ *
+ *