Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -32,22 +33,24 @@ public class TableUpsert implements Upsert {
private final TablePath tablePath;
private final TableInfo tableInfo;
private final WriterClient writerClient;

private final @Nullable int[] targetColumns;
private final AggMode aggMode;

public TableUpsert(TablePath tablePath, TableInfo tableInfo, WriterClient writerClient) {
this(tablePath, tableInfo, writerClient, null);
this(tablePath, tableInfo, writerClient, null, AggMode.AGGREGATE);
}

private TableUpsert(
TablePath tablePath,
TableInfo tableInfo,
WriterClient writerClient,
@Nullable int[] targetColumns) {
@Nullable int[] targetColumns,
AggMode aggMode) {
this.tablePath = tablePath;
this.tableInfo = tableInfo;
this.writerClient = writerClient;
this.targetColumns = targetColumns;
this.aggMode = aggMode;
}

@Override
Expand All @@ -68,7 +71,7 @@ public Upsert partialUpdate(@Nullable int[] targetColumns) {
}
}
}
return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns);
return new TableUpsert(tablePath, tableInfo, writerClient, targetColumns, this.aggMode);
}

@Override
Expand All @@ -91,9 +94,15 @@ public Upsert partialUpdate(String... targetColumnNames) {
return partialUpdate(targetColumns);
}

@Override
public Upsert aggregationMode(AggMode mode) {
checkNotNull(mode, "aggregation mode");
return new TableUpsert(tablePath, tableInfo, writerClient, this.targetColumns, mode);
}

@Override
public UpsertWriter createWriter() {
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient);
return new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient, aggMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.client.table.writer;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.rpc.protocol.AggMode;

import javax.annotation.Nullable;

Expand All @@ -26,7 +27,8 @@
* Table.
*
* <p>{@link Upsert} objects are immutable and can be shared between threads. Refinement methods,
* like {@link #partialUpdate}, create new Upsert instances.
* like {@link #partialUpdate(int[])} and {@link #aggregationMode(AggMode)}, create new Upsert
* instances.
*
* @since 0.6
*/
Expand Down Expand Up @@ -56,9 +58,44 @@ public interface Upsert {
*/
Upsert partialUpdate(String... targetColumnNames);

/**
* Specify aggregation mode for the UpsertWriter and returns a new Upsert instance.
*
* <p>This method controls how the created UpsertWriter handles data aggregation:
*
* <ul>
* <li>{@link AggMode#AGGREGATE} (default): Data is aggregated through server-side merge
* engine
* <li>{@link AggMode#OVERWRITE}: Data directly overwrites values, bypassing merge engine (for
* undo recovery)
* <li>{@link AggMode#LOCAL_AGGREGATE}: Client-side pre-aggregation before server-side
* aggregation (reserved for future implementation, not yet supported)
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* // Normal aggregation mode (default)
* UpsertWriter normalWriter = table.newUpsert()
* .aggregationMode(AggMode.AGGREGATE)
* .createWriter();
*
* // Overwrite mode for undo recovery
* UpsertWriter undoWriter = table.newUpsert()
* .aggregationMode(AggMode.OVERWRITE)
* .createWriter();
* }</pre>
*
* @param mode the aggregation mode
* @return a new Upsert instance with the specified aggregation mode
* @since 0.9
*/
Upsert aggregationMode(AggMode mode);

/**
* Create a new {@link UpsertWriter} using {@code InternalRow} with the optional {@link
* #partialUpdate(String...)} information to upsert and delete data to a Primary Key Table.
* #partialUpdate(String...)} and {@link #aggregationMode(AggMode)} information to upsert and
* delete data to a Primary Key Table.
*/
UpsertWriter createWriter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.fluss.row.encode.KeyEncoder;
import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -56,11 +57,25 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
private final FieldGetter[] fieldGetters;
private final TableInfo tableInfo;

/**
* The aggregation mode for this writer. This controls how the server handles data aggregation.
*/
private final AggMode aggMode;

UpsertWriterImpl(
TablePath tablePath,
TableInfo tableInfo,
@Nullable int[] partialUpdateColumns,
WriterClient writerClient) {
this(tablePath, tableInfo, partialUpdateColumns, writerClient, AggMode.AGGREGATE);
}

UpsertWriterImpl(
TablePath tablePath,
TableInfo tableInfo,
@Nullable int[] partialUpdateColumns,
WriterClient writerClient,
AggMode aggMode) {
super(tablePath, tableInfo, writerClient);
RowType rowType = tableInfo.getRowType();
sanityCheck(
Expand All @@ -83,7 +98,16 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
this.fieldGetters = InternalRow.createFieldGetters(rowType);

this.tableInfo = tableInfo;

// LOCAL_AGGREGATE is reserved for future implementation
if (aggMode == AggMode.LOCAL_AGGREGATE) {
throw new UnsupportedOperationException(
"LOCAL_AGGREGATE mode is not yet supported. "
+ "Please use AGGREGATE or OVERWRITE mode.");
}
this.aggMode = aggMode;
}

private static void sanityCheck(
Expand Down Expand Up @@ -168,7 +192,8 @@ public CompletableFuture<UpsertResult> upsert(InternalRow row) {
key,
bucketKey,
writeFormat,
targetColumns);
targetColumns,
aggMode);
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
}

Expand All @@ -191,7 +216,8 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
key,
bucketKey,
writeFormat,
targetColumns);
targetColumns,
aggMode);
return send(record).thenApply(ignored -> DELETE_SUCCESS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
import org.apache.fluss.rpc.protocol.AggMode;
import org.apache.fluss.utils.json.DataTypeJsonSerde;
import org.apache.fluss.utils.json.JsonSerdeUtils;

Expand Down Expand Up @@ -138,11 +139,12 @@ public static PutKvRequest makePutKvRequest(
.setTimeoutMs(maxRequestTimeoutMs);
// check the target columns in the batch list should be the same. If not same,
// we throw exception directly currently.
int[] targetColumns =
((KvWriteBatch) readyWriteBatches.get(0).writeBatch()).getTargetColumns();
KvWriteBatch firstBatch = (KvWriteBatch) readyWriteBatches.get(0).writeBatch();
int[] targetColumns = firstBatch.getTargetColumns();
AggMode aggMode = firstBatch.getAggMode();
for (int i = 1; i < readyWriteBatches.size(); i++) {
int[] currentBatchTargetColumns =
((KvWriteBatch) readyWriteBatches.get(i).writeBatch()).getTargetColumns();
KvWriteBatch currentBatch = (KvWriteBatch) readyWriteBatches.get(i).writeBatch();
int[] currentBatchTargetColumns = currentBatch.getTargetColumns();
if (!Arrays.equals(targetColumns, currentBatchTargetColumns)) {
throw new IllegalStateException(
String.format(
Expand All @@ -151,10 +153,21 @@ public static PutKvRequest makePutKvRequest(
Arrays.toString(targetColumns),
Arrays.toString(currentBatchTargetColumns)));
}
// Validate aggMode consistency across batches
if (currentBatch.getAggMode() != aggMode) {
throw new IllegalStateException(
String.format(
"All the write batches to make put kv request should have the same aggMode, "
+ "but got %s and %s.",
aggMode, currentBatch.getAggMode()));
}
}
if (targetColumns != null) {
request.setTargetColumns(targetColumns);
}
// Set aggMode in the request - this is the proper way to pass aggMode to server
request.setAggMode(aggMode.getProtoValue());

readyWriteBatches.forEach(
readyBatch -> {
TableBucket tableBucket = readyBatch.tableBucket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.rpc.messages.PutKvRequest;
import org.apache.fluss.rpc.protocol.AggMode;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -51,6 +52,7 @@ public class KvWriteBatch extends WriteBatch {
private final KvRecordBatchBuilder recordsBuilder;
private final @Nullable int[] targetColumns;
private final int schemaId;
private final AggMode aggMode;

public KvWriteBatch(
int bucketId,
Expand All @@ -60,13 +62,15 @@ public KvWriteBatch(
int writeLimit,
AbstractPagedOutputView outputView,
@Nullable int[] targetColumns,
AggMode aggMode,
long createdMs) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat);
this.targetColumns = targetColumns;
this.schemaId = schemaId;
this.aggMode = aggMode;
}

@Override
Expand Down Expand Up @@ -94,6 +98,15 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
Arrays.toString(targetColumns)));
}

// Validate aggMode consistency - records with different aggMode cannot be batched together
if (writeRecord.getAggMode() != this.aggMode) {
throw new IllegalStateException(
String.format(
"Cannot mix records with different aggMode in the same batch. "
+ "Batch aggMode: %s, Record aggMode: %s",
this.aggMode, writeRecord.getAggMode()));
}

byte[] key = writeRecord.getKey();
checkNotNull(key, "key must be not null for kv record");
checkNotNull(callback, "write callback must be not null");
Expand All @@ -113,6 +126,11 @@ public int[] getTargetColumns() {
return targetColumns;
}

@Override
public AggMode getAggMode() {
return aggMode;
}

@Override
public BytesView build() {
try {
Expand Down Expand Up @@ -163,6 +181,7 @@ public void abortRecordAppends() {
recordsBuilder.abort();
}

@Override
public void resetWriterState(long writerId, int batchSequence) {
super.resetWriterState(writerId, batchSequence);
recordsBuilder.resetWriterState(writerId, batchSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ private WriteBatch createWriteBatch(
outputView.getPreAllocatedSize(),
outputView,
writeRecord.getTargetColumns(),
writeRecord.getAggMode(),
clock.milliseconds());

case ARROW_LOG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.memory.MemorySegmentPool;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.rpc.protocol.AggMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -120,6 +121,15 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac

public abstract void abortRecordAppends();

/**
* Get the aggregation mode for this batch (only applicable to KV batches).
*
* @return the aggregation mode, defaults to AGGREGATE for log batches
*/
public AggMode getAggMode() {
return AggMode.AGGREGATE;
}

public boolean hasBatchSequence() {
return batchSequence() != NO_BATCH_SEQUENCE;
}
Expand Down
Loading
Loading