From 34a1972e457a0bb404fa98696eca3c510fc04b32 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Mon, 26 Jan 2026 20:54:01 +0800 Subject: [PATCH] [kv] Implement Atomic Lookup-or-Put for KV Tables --- .../org/apache/fluss/metrics/MetricNames.java | 4 + .../entity/LookupOrPutKvResultForBucket.java | 53 +++++ .../rpc/gateway/TabletServerGateway.java | 10 + .../apache/fluss/rpc/protocol/ApiKeys.java | 3 +- fluss-rpc/src/main/proto/FlussApi.proto | 29 +++ .../rpc/TestingTabletGatewayService.java | 7 + .../org/apache/fluss/server/kv/KvTablet.java | 105 ++++++++++ .../fluss/server/kv/LookupOrPutResult.java | 41 ++++ .../metrics/group/TableMetricGroup.java | 27 +++ .../apache/fluss/server/replica/Replica.java | 33 ++++ .../fluss/server/replica/ReplicaManager.java | 83 ++++++++ .../fluss/server/tablet/TabletService.java | 18 ++ .../server/utils/ServerRpcMessageUtils.java | 59 ++++++ .../apache/fluss/server/kv/KvTabletTest.java | 183 ++++++++++++++++++ .../server/replica/ReplicaManagerTest.java | 60 ++++++ .../server/tablet/TabletServiceITCase.java | 72 +++++++ .../tablet/TestTabletServerGateway.java | 7 + .../fluss/server/testutils/KvTestUtils.java | 20 ++ .../server/testutils/RpcMessageTestUtils.java | 20 ++ 19 files changed, 833 insertions(+), 1 deletion(-) create mode 100644 fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/LookupOrPutKvResultForBucket.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/kv/LookupOrPutResult.java diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index 60eb942e8b..024438703b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -104,6 +104,10 @@ public class MetricNames { public static final String FAILED_LOOKUP_REQUESTS_RATE = "failedLookupRequestsPerSecond"; public static final String TOTAL_PUT_KV_REQUESTS_RATE = "totalPutKvRequestsPerSecond"; public static final String FAILED_PUT_KV_REQUESTS_RATE = "failedPutKvRequestsPerSecond"; + public static final String TOTAL_LOOKUP_OR_PUT_KV_REQUESTS_RATE = + "totalLookupOrPutKvRequestsPerSecond"; + public static final String FAILED_LOOKUP_OR_PUT_KV_REQUESTS_RATE = + "failedLookupOrPutKvRequestsPerSecond"; public static final String TOTAL_LIMIT_SCAN_REQUESTS_RATE = "totalLimitScanRequestsPerSecond"; public static final String FAILED_LIMIT_SCAN_REQUESTS_RATE = "failedLimitScanRequestsPerSecond"; public static final String TOTAL_PREFIX_LOOKUP_REQUESTS_RATE = diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/LookupOrPutKvResultForBucket.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/LookupOrPutKvResultForBucket.java new file mode 100644 index 0000000000..ea0db7b36c --- /dev/null +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/LookupOrPutKvResultForBucket.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.rpc.entity; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; + +import java.util.List; + +/** Result of {@link org.apache.fluss.rpc.messages.LookupOrPutKvRequest} for each table bucket. */ +@Internal +public class LookupOrPutKvResultForBucket extends WriteResultForBucket { + + private final List values; + + public LookupOrPutKvResultForBucket( + TableBucket tableBucket, long changeLogEndOffset, List values) { + super(tableBucket, changeLogEndOffset, ApiError.NONE); + this.values = values; + } + + public LookupOrPutKvResultForBucket(TableBucket tableBucket, ApiError error) { + super(tableBucket, -1L, error); + this.values = null; + } + + public List lookupValues() { + return values; + } + + @Override + public T copy(Errors newError) { + //noinspection unchecked + return (T) new LookupOrPutKvResultForBucket(tableBucket, newError.toApiError()); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java index 578b74e5e2..11f4e78ad5 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java @@ -26,6 +26,8 @@ import org.apache.fluss.rpc.messages.LimitScanResponse; import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListOffsetsResponse; +import org.apache.fluss.rpc.messages.LookupOrPutKvRequest; +import org.apache.fluss.rpc.messages.LookupOrPutKvResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest; @@ -105,6 +107,14 @@ CompletableFuture notifyLeaderAndIsr( @RPC(api = ApiKeys.PUT_KV) CompletableFuture putKv(PutKvRequest request); + /** + * Put kv data to the specified table bucket. + * + * @return the produce response. + */ + @RPC(api = ApiKeys.LOOKUP_OR_PUT_KV) + CompletableFuture lookupOrPutKv(LookupOrPutKvRequest request); + /** * Lookup value from the specified table bucket by key. * diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index cc033ba8a9..a0aeb3054c 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -80,7 +80,8 @@ public enum ApiKeys { REBALANCE(1049, 0, 0, PUBLIC), LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), CANCEL_REBALANCE(1051, 0, 0, PUBLIC), - PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE); + PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE), + LOOKUP_OR_PUT_KV(1053, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..5101bdc551 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -234,6 +234,21 @@ message PutKvResponse { repeated PbPutKvRespForBucket buckets_resp = 1; } +// lookup or put kv request and response +message LookupOrPutKvRequest { + required int32 acks = 1; + required int64 table_id = 2; + required int32 timeout_ms = 3; + // the indexes for the columns to write, + // if empty, means write all columns + repeated int32 target_columns = 4 [packed = true]; + repeated PbLookupOrPutKvRequest buckets_req = 5; +} + +message LookupOrPutKvResponse { + repeated PbLookupOrPutKvRespForBucket buckets_resp = 1; +} + // lookup request and response message LookupRequest { required int64 table_id = 1; @@ -740,6 +755,20 @@ message PbPutKvRespForBucket { optional string error_message = 4; } +message PbLookupOrPutKvRequest { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + required bytes records = 3; +} + +message PbLookupOrPutKvRespForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 error_code = 3; + optional string error_message = 4; + repeated PbValue values = 5; +} + message PbLookupReqForBucket { optional int64 partition_id = 1; required int32 bucket_id = 2; diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 7db3654383..a38073b45c 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -53,6 +53,8 @@ import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; +import org.apache.fluss.rpc.messages.LookupOrPutKvRequest; +import org.apache.fluss.rpc.messages.LookupOrPutKvResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -120,6 +122,11 @@ public CompletableFuture putKv(PutKvRequest request) { return null; } + @Override + public CompletableFuture lookupOrPutKv(LookupOrPutKvRequest request) { + return null; + } + @Override public CompletableFuture lookup(LookupRequest request) { return CompletableFuture.completedFuture(new LookupResponse()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 79ab8eb211..59afeb02e3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.DeletionDisabledException; +import org.apache.fluss.exception.InvalidRecordException; import org.apache.fluss.exception.KvStorageException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.memory.MemorySegmentPool; @@ -81,6 +82,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -367,6 +369,64 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target }); } + /** + * Lookup or put the KvRecordBatch into the kv storage, if the key exists, return the value; + * otherwise, put the record and return the put value. + * + * @param kvRecords the kv records to put into + * @param targetColumns the target columns to put, null if put all columns + */ + public LookupOrPutResult lookupOrPutAsLeader( + KvRecordBatch kvRecords, @Nullable int[] targetColumns) throws Exception { + return inWriteLock( + kvLock, + () -> { + rocksDBKv.checkIfRocksDBClosed(); + + SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo(); + Schema latestSchema = schemaInfo.getSchema(); + short latestSchemaId = (short) schemaInfo.getSchemaId(); + validateSchemaId(kvRecords.schemaId(), latestSchemaId); + + RowMerger currentMerger = + rowMerger.configureTargetColumns( + targetColumns, latestSchemaId, latestSchema); + AutoIncrementUpdater currentAutoIncrementUpdater = + autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); + + RowType latestRowType = latestSchema.getRowType(); + WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType); + walBuilder.setWriterState(kvRecords.writerId(), kvRecords.batchSequence()); + PaddingRow latestSchemaRow = new PaddingRow(latestRowType.getFieldCount()); + long logEndOffsetOfPrevBatch = logTablet.localLogEndOffset(); + + try { + List values = + processLookupOrPutRecords( + kvRecords, + kvRecords.schemaId(), + currentMerger, + currentAutoIncrementUpdater, + walBuilder, + latestSchemaRow, + logEndOffsetOfPrevBatch); + + LogAppendInfo logAppendInfo = logTablet.appendAsLeader(walBuilder.build()); + + if (logAppendInfo.duplicated()) { + kvPreWriteBuffer.truncateTo( + logEndOffsetOfPrevBatch, TruncateReason.DUPLICATED); + } + return new LookupOrPutResult(logAppendInfo, values); + } catch (Throwable t) { + kvPreWriteBuffer.truncateTo(logEndOffsetOfPrevBatch, TruncateReason.ERROR); + throw t; + } finally { + walBuilder.deallocate(); + } + }); + } + private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) { if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) { throw new SchemaNotExistException( @@ -423,6 +483,51 @@ private void processKvRecords( } } + private List processLookupOrPutRecords( + KvRecordBatch kvRecords, + short schemaIdOfNewData, + RowMerger currentMerger, + AutoIncrementUpdater autoIncrementUpdater, + WalBuilder walBuilder, + PaddingRow latestSchemaRow, + long startLogOffset) + throws Exception { + long logOffset = startLogOffset; + List values = new ArrayList<>(); + + KvRecordBatch.ReadContext readContext = + KvRecordReadContext.createReadContext(kvFormat, schemaGetter); + for (KvRecord kvRecord : kvRecords.records(readContext)) { + byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); + KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes); + BinaryRow row = kvRecord.getRow(); + if (row == null) { + throw new InvalidRecordException("LookupOrPut not support deletion"); + } + + BinaryValue currentValue = row == null ? null : new BinaryValue(schemaIdOfNewData, row); + + byte[] oldValueBytes = getFromBufferOrKv(key); + if (oldValueBytes != null) { + values.add(oldValueBytes); + } else if (currentValue == null) { + // lookupOrPut not support deletion + throw new InvalidRecordException("LookupOrPut not support deletion"); + } else { + // If not found, we insert the new record. + BinaryValue newValue = + autoIncrementUpdater.updateAutoIncrementColumns(currentValue); + byte[] newValueBytes = newValue.encodeValue(); + values.add(newValueBytes); + + walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(newValue.row)); + kvPreWriteBuffer.put(key, newValueBytes, logOffset); + logOffset++; + } + } + return values; + } + private long processDeletion( KvPreWriteBuffer.Key key, RowMerger currentMerger, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/LookupOrPutResult.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/LookupOrPutResult.java new file mode 100644 index 0000000000..a8951512d6 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/LookupOrPutResult.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv; + +import org.apache.fluss.server.log.LogAppendInfo; + +import java.util.List; + +/** Result of lookup or put. */ +public final class LookupOrPutResult { + private final LogAppendInfo logAppendInfo; + private final List values; + + public LookupOrPutResult(LogAppendInfo logAppendInfo, List values) { + this.logAppendInfo = logAppendInfo; + this.values = values; + } + + public LogAppendInfo getLogAppendInfo() { + return logAppendInfo; + } + + public List getValues() { + return values; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java index fc40463f2d..211bbaecfd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java @@ -194,6 +194,22 @@ public Counter failedPutKvRequests() { } } + public Counter totalLookupOrPutKvRequests() { + if (kvMetrics == null) { + return NoOpCounter.INSTANCE; + } else { + return kvMetrics.totalLookupOrPutKvRequests; + } + } + + public Counter failedLookupOrPutKvRequests() { + if (kvMetrics == null) { + return NoOpCounter.INSTANCE; + } else { + return kvMetrics.failedLookupOrPutKvRequests; + } + } + public Counter totalLimitScanRequests() { if (kvMetrics == null) { return NoOpCounter.INSTANCE; @@ -462,6 +478,8 @@ private static class KvMetricGroup extends TabletMetricGroup { private final Counter failedLookupRequests; private final Counter totalPutKvRequests; private final Counter failedPutKvRequests; + private final Counter totalLookupOrPutKvRequests; + private final Counter failedLookupOrPutKvRequests; private final Counter totalLimitScanRequests; private final Counter failedLimitScanRequests; private final Counter totalPrefixLookupRequests; @@ -480,6 +498,15 @@ public KvMetricGroup(TableMetricGroup tableMetricGroup) { meter(MetricNames.TOTAL_PUT_KV_REQUESTS_RATE, new MeterView(totalPutKvRequests)); failedPutKvRequests = new ThreadSafeSimpleCounter(); meter(MetricNames.FAILED_PUT_KV_REQUESTS_RATE, new MeterView(failedPutKvRequests)); + // for lookup or put kv request + totalLookupOrPutKvRequests = new ThreadSafeSimpleCounter(); + meter( + MetricNames.TOTAL_LOOKUP_OR_PUT_KV_REQUESTS_RATE, + new MeterView(totalLookupOrPutKvRequests)); + failedLookupOrPutKvRequests = new ThreadSafeSimpleCounter(); + meter( + MetricNames.FAILED_LOOKUP_OR_PUT_KV_REQUESTS_RATE, + new MeterView(failedLookupOrPutKvRequests)); // for limit scan request totalLimitScanRequests = new ThreadSafeSimpleCounter(); meter( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index ee158b7a4b..409373c94a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -51,6 +51,7 @@ import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.server.kv.KvRecoverHelper; import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.LookupOrPutResult; import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -971,6 +972,38 @@ public LogAppendInfo putRecordsToLeader( }); } + public LookupOrPutResult lookupOrPutRecordsToLeader( + KvRecordBatch kvRecords, @Nullable int[] targetColumns, int requiredAcks) + throws Exception { + return inReadLock( + leaderIsrUpdateLock, + () -> { + if (!isLeader()) { + throw new NotLeaderOrFollowerException( + String.format( + "Leader not local for bucket %s on tabletServer %d", + tableBucket, localTabletServerId)); + } + + validateInSyncReplicaSize(requiredAcks); + KvTablet kv = this.kvTablet; + checkNotNull( + kv, "KvTablet for the replica to put kv records shouldn't be null."); + LookupOrPutResult result; + try { + result = kv.lookupOrPutAsLeader(kvRecords, targetColumns); + } catch (IOException e) { + LOG.error("Error while lookup or putting records to {}", tableBucket, e); + fatalErrorHandler.onFatalError(e); + throw new KvStorageException( + "Error while lookup or putting records to " + tableBucket, e); + } + // we may need to increment high watermark. + maybeIncrementLeaderHW(logTablet, clock.milliseconds()); + return result; + }); + } + public LogReadInfo fetchRecords(FetchParams fetchParams) throws IOException { if (fetchParams.projection() != null && logFormat != LogFormat.ARROW) { throw new InvalidColumnProjectionException( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 76e3ea1867..ccb07a3116 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -46,6 +46,7 @@ import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.entity.LimitScanResultForBucket; import org.apache.fluss.rpc.entity.ListOffsetsResultForBucket; +import org.apache.fluss.rpc.entity.LookupOrPutKvResultForBucket; import org.apache.fluss.rpc.entity.LookupResultForBucket; import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; @@ -70,6 +71,7 @@ import org.apache.fluss.server.entity.UserContext; import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.server.kv.KvSnapshotResource; +import org.apache.fluss.server.kv.LookupOrPutResult; import org.apache.fluss.server.kv.snapshot.CompletedKvSnapshotCommitter; import org.apache.fluss.server.kv.snapshot.DefaultSnapshotContext; import org.apache.fluss.server.kv.snapshot.SnapshotContext; @@ -571,6 +573,87 @@ public void putRecordsToKv( timeoutMs, requiredAcks, entriesPerBucket.size(), kvPutResult, responseCallback); } + /** + * Lookup Or Put kv records to leader replicas of the buckets, the kv data will write to kv + * tablet and the response callback need to wait for the cdc log to be replicated to other + * replicas if needed. + */ + public void lookupOrPutRecordsToKv( + int timeoutMs, + int requiredAcks, + Map entriesPerBucket, + @Nullable int[] targetColumns, + Consumer> responseCallback) { + if (isRequiredAcksInvalid(requiredAcks)) { + throw new InvalidRequiredAcksException("Invalid required acks: " + requiredAcks); + } + + long startTime = System.currentTimeMillis(); + Map kvLookupOrPutResult = + lookupOrPutToLocalKv(entriesPerBucket, targetColumns, requiredAcks); + LOG.debug( + "Lookup or put records to local kv storage and wait generate cdc log in {} ms", + System.currentTimeMillis() - startTime); + + // maybe do delay write operation to write cdc log to be replicated to other follower + // replicas. + maybeAddDelayedWrite( + timeoutMs, + requiredAcks, + entriesPerBucket.size(), + kvLookupOrPutResult, + responseCallback); + } + + private Map lookupOrPutToLocalKv( + Map entriesPerBucket, + @Nullable int[] targetColumns, + int requiredAcks) { + Map lookupOrPutResultForBucketMap = + new HashMap<>(); + for (Map.Entry entry : entriesPerBucket.entrySet()) { + TableBucket tb = entry.getKey(); + TableMetricGroup tableMetrics = null; + try { + LOG.trace("Lookup or put records to local kv tablet for table bucket {}", tb); + Replica replica = getReplicaOrException(tb); + tableMetrics = replica.tableMetrics(); + tableMetrics.totalLookupOrPutKvRequests().inc(); + LookupOrPutResult result = + replica.lookupOrPutRecordsToLeader( + entry.getValue(), targetColumns, requiredAcks); + LogAppendInfo appendInfo = result.getLogAppendInfo(); + LOG.trace( + "Lookup or written to local kv for {}, and the cdc log beginning at offset {} and ending at offset {}", + tb, + appendInfo.firstOffset(), + appendInfo.lastOffset()); + lookupOrPutResultForBucketMap.put( + tb, + new LookupOrPutKvResultForBucket( + tb, appendInfo.lastOffset() + 1, result.getValues())); + + // metric for kv + tableMetrics.incKvMessageIn(entry.getValue().getRecordCount()); + tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes()); + // metric for cdc log of kv + tableMetrics.incLogBytesIn(appendInfo.validBytes()); + tableMetrics.incLogMessageIn(appendInfo.numMessages()); + } catch (Exception e) { + if (isUnexpectedException(e)) { + LOG.error("Error lookup or put records to local kv on replica {}", tb, e); + if (tableMetrics != null) { + tableMetrics.failedLookupOrPutKvRequests().inc(); + } + } + lookupOrPutResultForBucketMap.put( + tb, new LookupOrPutKvResultForBucket(tb, ApiError.fromThrowable(e))); + } + } + + return lookupOrPutResultForBucketMap; + } + /** Lookup a single key value. */ @VisibleForTesting protected void lookup(TableBucket tableBucket, byte[] key, Consumer responseCallback) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 992b963334..4630d58b15 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -38,6 +38,8 @@ import org.apache.fluss.rpc.messages.LimitScanResponse; import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListOffsetsResponse; +import org.apache.fluss.rpc.messages.LookupOrPutKvRequest; +import org.apache.fluss.rpc.messages.LookupOrPutKvResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -98,6 +100,7 @@ import static org.apache.fluss.server.log.FetchParams.DEFAULT_MAX_WAIT_MS_WHEN_MIN_BYTES_ENABLE; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getListOffsetsData; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getLookupOrPutKvData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifyLakeTableOffset; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrRequestData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifyRemoteLogOffsetsData; @@ -111,6 +114,7 @@ import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeInitWriterResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLimitScanResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListOffsetsResponse; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLookupOrPutKvResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLookupResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeNotifyLeaderAndIsrResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makePrefixLookupResponse; @@ -234,6 +238,20 @@ public CompletableFuture putKv(PutKvRequest request) { return response; } + @Override + public CompletableFuture lookupOrPutKv(LookupOrPutKvRequest request) { + authorizeTable(WRITE, request.getTableId()); + Map putKvData = getLookupOrPutKvData(request); + CompletableFuture response = new CompletableFuture<>(); + replicaManager.lookupOrPutRecordsToKv( + request.getTimeoutMs(), + request.getAcks(), + putKvData, + getTargetColumns(request), + bucketResponse -> response.complete(makeLookupOrPutKvResponse(bucketResponse))); + return response; + } + @Override public CompletableFuture lookup(LookupRequest request) { Map> lookupData = toLookupData(request); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f174f727a4..34ff1f42e9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -50,6 +50,7 @@ import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.entity.LimitScanResultForBucket; import org.apache.fluss.rpc.entity.ListOffsetsResultForBucket; +import org.apache.fluss.rpc.entity.LookupOrPutKvResultForBucket; import org.apache.fluss.rpc.entity.LookupResultForBucket; import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; @@ -76,6 +77,8 @@ import org.apache.fluss.rpc.messages.ListOffsetsResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; +import org.apache.fluss.rpc.messages.LookupOrPutKvRequest; +import org.apache.fluss.rpc.messages.LookupOrPutKvResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.MetadataResponse; @@ -109,6 +112,8 @@ import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo; import org.apache.fluss.rpc.messages.PbLakeTableSnapshotMetadata; import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; +import org.apache.fluss.rpc.messages.PbLookupOrPutKvRequest; +import org.apache.fluss.rpc.messages.PbLookupOrPutKvRespForBucket; import org.apache.fluss.rpc.messages.PbLookupReqForBucket; import org.apache.fluss.rpc.messages.PbLookupRespForBucket; import org.apache.fluss.rpc.messages.PbModifyColumn; @@ -964,6 +969,26 @@ public static Map getPutKvData(PutKvRequest putKvReq return produceEntryData; } + public static Map getLookupOrPutKvData( + LookupOrPutKvRequest lookupOrPutKvRequest) { + long tableId = lookupOrPutKvRequest.getTableId(); + Map produceEntryData = new HashMap<>(); + for (PbLookupOrPutKvRequest lookupOrPutKvReqForBucket : + lookupOrPutKvRequest.getBucketsReqsList()) { + ByteBuffer recordsBuffer = toByteBuffer(lookupOrPutKvReqForBucket.getRecordsSlice()); + DefaultKvRecordBatch kvRecords = DefaultKvRecordBatch.pointToByteBuffer(recordsBuffer); + TableBucket tb = + new TableBucket( + tableId, + lookupOrPutKvReqForBucket.hasPartitionId() + ? lookupOrPutKvReqForBucket.getPartitionId() + : null, + lookupOrPutKvReqForBucket.getBucketId()); + produceEntryData.put(tb, kvRecords); + } + return produceEntryData; + } + public static Map> toLookupData(LookupRequest lookupRequest) { long tableId = lookupRequest.getTableId(); Map> lookupEntryData = new HashMap<>(); @@ -1011,6 +1036,11 @@ public static Map> toPrefixLookupData( return targetColumns.length == 0 ? null : targetColumns; } + public static @Nullable int[] getTargetColumns(LookupOrPutKvRequest lookupOrPutKvRequest) { + int[] targetColumns = lookupOrPutKvRequest.getTargetColumns(); + return targetColumns.length == 0 ? null : targetColumns; + } + public static PutKvResponse makePutKvResponse(Collection kvPutResult) { PutKvResponse putKvResponse = new PutKvResponse(); List putKvRespForBucketList = new ArrayList<>(); @@ -1031,6 +1061,35 @@ public static PutKvResponse makePutKvResponse(Collection k return putKvResponse; } + public static LookupOrPutKvResponse makeLookupOrPutKvResponse( + Collection kvLookupOrPutResult) { + LookupOrPutKvResponse lookupOrPutKvResponse = new LookupOrPutKvResponse(); + List lookupOrPutKvRespForBucketList = new ArrayList<>(); + for (LookupOrPutKvResultForBucket bucketResult : kvLookupOrPutResult) { + PbLookupOrPutKvRespForBucket lookupOrPutKvBucket = + new PbLookupOrPutKvRespForBucket().setBucketId(bucketResult.getBucketId()); + TableBucket tableBucket = bucketResult.getTableBucket(); + if (tableBucket.getPartitionId() != null) { + lookupOrPutKvBucket.setPartitionId(tableBucket.getPartitionId()); + } + + if (bucketResult.failed()) { + lookupOrPutKvBucket.setError( + bucketResult.getErrorCode(), bucketResult.getErrorMessage()); + } else { + for (byte[] value : bucketResult.lookupValues()) { + PbValue pbValue = lookupOrPutKvBucket.addValue(); + if (value != null) { + pbValue.setValues(value); + } + } + } + lookupOrPutKvRespForBucketList.add(lookupOrPutKvBucket); + } + lookupOrPutKvResponse.addAllBucketsResps(lookupOrPutKvRespForBucketList); + return lookupOrPutKvResponse; + } + public static LimitScanResponse makeLimitScanResponse(LimitScanResultForBucket bucketResult) { LimitScanResponse limitScanResponse = new LimitScanResponse(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 1f0a8ec06e..56a3bc4e53 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -662,6 +662,172 @@ void testAutoIncrementWithMultiThread() throws Exception { assertThat(actualUids).isEqualTo(expectedUids); } + @Test + void testLookupOrPut() throws Exception { + Schema schema = TestData.DATA1_SCHEMA_PK; + RowType rowType = schema.getRowType(); + initLogTabletAndKvTablet(schema, Collections.emptyMap()); + + KvRecordTestUtils.KvRecordFactory data1kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(rowType); + + // 1. Key does not exist: first call should insert and return the value. + KvRecordBatch kvRecordBatch = + kvRecordBatchFactory.ofRecords( + data1kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v1"}), + data1kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v2"})); + + LookupOrPutResult result = kvTablet.lookupOrPutAsLeader(kvRecordBatch, null); + assertThat(result.getValues()).hasSize(2); + + byte[] v1 = result.getValues().get(0); + byte[] v2 = result.getValues().get(1); + + assertThat(v1) + .isEqualTo( + ValueEncoder.encodeValue( + schemaId, compactedRow(rowType, new Object[] {1, "v1"}))); + assertThat(v2) + .isEqualTo( + ValueEncoder.encodeValue( + schemaId, compactedRow(rowType, new Object[] {2, "v2"}))); + + // verify cdc log + LogRecords actualLogRecords = readLogRecords(); + List expectedLogs = + Collections.singletonList( + logRecords( + rowType, + 0, + Arrays.asList(ChangeType.INSERT, ChangeType.INSERT), + Arrays.asList(new Object[] {1, "v1"}, new Object[] {2, "v2"}))); + checkEqual(actualLogRecords, expectedLogs, rowType); + + long endOffset = logTablet.localLogEndOffset(); + + // 2. Key exists in Pre-write Buffer: second call should return existing value. + // Even if we provide different values in the second batch, it should return the old ones. + KvRecordBatch kvRecordBatch2 = + kvRecordBatchFactory.ofRecords( + data1kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v1_new"}), + data1kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, "v3"})); + + result = kvTablet.lookupOrPutAsLeader(kvRecordBatch2, null); + assertThat(result.getValues()).hasSize(2); + assertThat(result.getValues().get(0)).isEqualTo(v1); // old v1 + + byte[] v3 = result.getValues().get(1); + assertThat(v3) + .isEqualTo( + ValueEncoder.encodeValue( + schemaId, compactedRow(rowType, new Object[] {3, "v3"}))); // new v3 + + // verify cdc log: only k3 should be inserted + actualLogRecords = readLogRecords(endOffset); + expectedLogs = + Collections.singletonList( + logRecords( + rowType, + endOffset, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {3, "v3"}))); + checkEqual(actualLogRecords, expectedLogs, rowType); + + endOffset = logTablet.localLogEndOffset(); + + // 3. Key exists in RocksDB: after flush, call should find it in RocksDB. + kvTablet.flush( + Long.MAX_VALUE, NOPErrorHandler.INSTANCE); // Flush Pre-write Buffer to RocksDB + + KvRecordBatch kvRecordBatch3 = + kvRecordBatchFactory.ofRecords( + data1kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v2_new"}), + data1kvRecordFactory.ofRecord("k4".getBytes(), new Object[] {4, "v4"})); + + result = kvTablet.lookupOrPutAsLeader(kvRecordBatch3, null); + assertThat(result.getValues()).hasSize(2); + assertThat(result.getValues().get(0)).isEqualTo(v2); // old v2 from RocksDB + + byte[] v4 = result.getValues().get(1); + assertThat(v4) + .isEqualTo( + ValueEncoder.encodeValue( + schemaId, compactedRow(rowType, new Object[] {4, "v4"}))); // new v4 + + // verify cdc log: only k4 should be inserted + actualLogRecords = readLogRecords(endOffset); + expectedLogs = + Collections.singletonList( + logRecords( + rowType, + endOffset, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {4, "v4"}))); + checkEqual(actualLogRecords, expectedLogs, rowType); + } + + @Test + void testLookupOrPutWithAutoIncrement() throws Exception { + Schema schema = + Schema.newBuilder() + .column("user_name", DataTypes.STRING()) + .column("uid", DataTypes.INT()) + .primaryKey("user_name") + .enableAutoIncrement("uid") + .build(); + initLogTabletAndKvTablet(schema, new HashMap<>()); + RowType rowType = schema.getRowType(); + KvRecordTestUtils.KvRecordFactory recordFactory = + KvRecordTestUtils.KvRecordFactory.of(rowType); + + // insert k1, it should get uid = 1 + KvRecordBatch kvRecordBatch1 = + kvRecordBatchFactory.ofRecords( + recordFactory.ofRecord("k1".getBytes(), new Object[] {"k1", null})); + + LookupOrPutResult result = kvTablet.lookupOrPutAsLeader(kvRecordBatch1, null); + + assertThat(result.getValues()).hasSize(1); + byte[] v1 = result.getValues().get(0); + assertThat(v1) + .isEqualTo( + ValueEncoder.encodeValue( + schemaId, compactedRow(rowType, new Object[] {"k1", 1}))); + + // lookup k1, it should still be uid = 1 + result = kvTablet.lookupOrPutAsLeader(kvRecordBatch1, null); + assertThat(result.getValues()).hasSize(1); + assertThat(result.getValues().get(0)).isEqualTo(v1); + + // insert k2, it should get uid = 2 + KvRecordBatch kvRecordBatch2 = + kvRecordBatchFactory.ofRecords( + recordFactory.ofRecord("k2".getBytes(), new Object[] {"k2", null})); + result = kvTablet.lookupOrPutAsLeader(kvRecordBatch2, null); + assertThat(result.getValues()).hasSize(1); + assertThat(result.getValues().get(0)) + .isEqualTo( + ValueEncoder.encodeValue( + schemaId, compactedRow(rowType, new Object[] {"k2", 2}))); + + // verify cdc log + List expectedLogs = + Arrays.asList( + logRecords( + rowType, + 0, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {"k1", 1})), + logRecords(rowType, 1, Collections.emptyList(), Collections.emptyList()), + logRecords( + rowType, + 2, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {"k2", 2}))); + + checkEqual(readLogRecords(), expectedLogs, rowType); + } + @Test void testPutAsLeaderWithOutOfOrderSequenceException() throws Exception { initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>()); @@ -1401,6 +1567,23 @@ private void checkEqual(LogRecords actaulLogRecords, List expe LogTestBase.assertLogRecordsListEquals(expectedLogs, actaulLogRecords, baseRowType); } + private void assertValueEquals(byte[] actual, RowType rowType, Object[] expected) { + assertThat(actual).isEqualTo(encodeValue(rowType, expected)); + } + + private byte[] encodeValue(RowType rowType, Object[] values) { + return ValueEncoder.encodeValue(schemaId, compactedRow(rowType, values)); + } + + private void assertLogEquals( + long startOffset, RowType rowType, List changeTypes, List values) + throws Exception { + checkEqual( + readLogRecords(startOffset), + Collections.singletonList(logRecords(rowType, startOffset, changeTypes, values)), + rowType); + } + private Value valueOf(BinaryRow row) { return Value.of(ValueEncoder.encodeValue(schemaId, row)); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 7db23130fb..4e4fbdacdc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -44,6 +44,7 @@ import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.entity.LimitScanResultForBucket; import org.apache.fluss.rpc.entity.ListOffsetsResultForBucket; +import org.apache.fluss.rpc.entity.LookupOrPutKvResultForBucket; import org.apache.fluss.rpc.entity.LookupResultForBucket; import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; @@ -510,6 +511,65 @@ tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), "Unknown table or bucket: TableBucket{tableId=10001, bucket=0}"))); } + @Test + void testLookupOrPutKv() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); + + // 1. lookup or put kv records to kv store. + // DATA_1_WITH_KEY_AND_VALUE has 6 records, but only first 3 are new keys. + CompletableFuture> future = new CompletableFuture<>(); + replicaManager.lookupOrPutRecordsToKv( + 20000, + 1, + Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), + null, + future::complete); + List results = future.get(); + assertThat(results).hasSize(1); + LookupOrPutKvResultForBucket result = results.get(0); + assertThat(result.getTableBucket()).isEqualTo(tb); + assertThat(result.getWriteLogEndOffset()).isEqualTo(3L); + assertThat(result.lookupValues()).hasSize(6); + + // 2. lookup again with same records, should return same values but no new log produced. + future = new CompletableFuture<>(); + replicaManager.lookupOrPutRecordsToKv( + 20000, + 1, + Collections.singletonMap(tb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), + null, + future::complete); + results = future.get(); + assertThat(results).hasSize(1); + result = results.get(0); + assertThat(result.getWriteLogEndOffset()).isEqualTo(4L); // Empty batch appended + assertThat(result.lookupValues()).hasSize(6); + List lookupValues = result.lookupValues(); + // first 3 should be what we just inserted + // next 3 should be the same as first 3 + assertThat(lookupValues.get(3)).isEqualTo(lookupValues.get(0)); + assertThat(lookupValues.get(4)).isEqualTo(lookupValues.get(1)); + assertThat(lookupValues.get(5)).isEqualTo(lookupValues.get(2)); + + // 3. test with unknown table bucket. + TableBucket unknownTb = new TableBucket(10001, 0); + future = new CompletableFuture<>(); + replicaManager.lookupOrPutRecordsToKv( + 20000, + 1, + Collections.singletonMap(unknownTb, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE)), + null, + future::complete); + assertThat(future.get()) + .containsOnly( + new LookupOrPutKvResultForBucket( + unknownTb, + new ApiError( + Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION, + "Unknown table or bucket: TableBucket{tableId=10001, bucket=0}"))); + } + @Test void testPutKvWithOutOfBatchSequence() throws Exception { TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 5a28f9e2be..2a7cf529d1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -95,6 +95,7 @@ import static org.apache.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER; +import static org.apache.fluss.server.testutils.KvTestUtils.assertLookupOrPutKvResponse; import static org.apache.fluss.server.testutils.KvTestUtils.assertLookupResponse; import static org.apache.fluss.server.testutils.KvTestUtils.assertPrefixLookupResponse; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponse; @@ -104,6 +105,7 @@ import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newFetchLogRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newLimitScanRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newListOffsetsRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newLookupOrPutKvRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newPrefixLookupRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; @@ -484,6 +486,76 @@ void testPutKv() throws Exception { .hasMessageContaining("Invalid required acks"); } + @Test + void testLookupOrPutKv() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + // 1. Key does not exist: first call should insert and return the values. + // The first 3 records are inserts for keys 1, 2, 3. + // The next 3 records are also for keys 1, 2, 3, but since they already exist, + // it should return the values inserted by the first 3 records. + // lookupOrPutAsLeader processes records one by one. + // So for the first batch: + // 1. k1 -> insert (1, "a"), return (1, "a") + // 2. k2 -> insert (2, "b"), return (2, "b") + // 3. k3 -> insert (3, "c"), return (3, "c") + // 4. k1 -> exists (1, "a1"), return (1, "a") + // 5. k2 -> exists (2, "b1"), return (2, "b") + // 6. k3 -> exists (3, null), return (3, "c") + List expectedValues = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + expectedValues.add( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(DATA1_ROW_TYPE, DATA_1_WITH_KEY_AND_VALUE.get(i).f1))); + } + for (int i = 0; i < 3; i++) { + expectedValues.add(expectedValues.get(i)); + } + + assertLookupOrPutKvResponse( + leaderGateWay + .lookupOrPutKv( + newLookupOrPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get(), + expectedValues); + + // 2. Key exists: second call should return existing value. + assertLookupOrPutKvResponse( + leaderGateWay + .lookupOrPutKv( + newLookupOrPutKvRequest( + tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) + .get(), + expectedValues); + + // 3. test lookupOrPut with error acks. + assertThatThrownBy( + () -> + leaderGateWay + .lookupOrPutKv( + newLookupOrPutKvRequest( + tableId, + 0, + 100, + genKvRecordBatch( + DATA_1_WITH_KEY_AND_VALUE))) + .get()) + .cause() + .isInstanceOf(InvalidRequiredAcksException.class) + .hasMessageContaining("Invalid required acks"); + } + @Test void testLookup() throws Exception { long tableId = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 500d197fcf..9f5ef0258c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -58,6 +58,8 @@ import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; +import org.apache.fluss.rpc.messages.LookupOrPutKvRequest; +import org.apache.fluss.rpc.messages.LookupOrPutKvResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -189,6 +191,11 @@ public CompletableFuture putKv(PutKvRequest request) { return response; } + @Override + public CompletableFuture lookupOrPutKv(LookupOrPutKvRequest request) { + return null; + } + @Override public CompletableFuture lookup(LookupRequest request) { return null; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java index 273858d311..e4d0fd2eec 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/KvTestUtils.java @@ -20,7 +20,9 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.messages.LookupOrPutKvResponse; import org.apache.fluss.rpc.messages.LookupResponse; +import org.apache.fluss.rpc.messages.PbLookupOrPutKvRespForBucket; import org.apache.fluss.rpc.messages.PbLookupRespForBucket; import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket; import org.apache.fluss.rpc.messages.PbValue; @@ -176,6 +178,24 @@ public static void assertLookupResponse( assertThat(lookupValue).isEqualTo(expectedValue); } + public static void assertLookupOrPutKvResponse( + LookupOrPutKvResponse response, @Nullable List expectedValues) { + assertThat(response.getBucketsRespsCount()).isEqualTo(1); + PbLookupOrPutKvRespForBucket pbLookupOrPutKvRespForBucket = response.getBucketsRespAt(0); + if (expectedValues != null) { + assertThat(pbLookupOrPutKvRespForBucket.getValuesCount()) + .isEqualTo(expectedValues.size()); + for (int i = 0; i < expectedValues.size(); i++) { + byte[] expected = expectedValues.get(i); + byte[] actual = + pbLookupOrPutKvRespForBucket.getValueAt(i).hasValues() + ? pbLookupOrPutKvRespForBucket.getValueAt(i).getValues() + : null; + assertThat(actual).isEqualTo(expected); + } + } + } + public static void assertPrefixLookupResponse( PrefixLookupResponse prefixLookupResponse, List> expectedValues) { assertThat(prefixLookupResponse.getBucketsRespsCount()).isEqualTo(1); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java index 5055105e95..4f2c6c1619 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java @@ -47,6 +47,7 @@ import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListTablesRequest; +import org.apache.fluss.rpc.messages.LookupOrPutKvRequest; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.PbAddColumn; @@ -56,6 +57,7 @@ import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket; import org.apache.fluss.rpc.messages.PbFetchLogRespForTable; import org.apache.fluss.rpc.messages.PbKeyValue; +import org.apache.fluss.rpc.messages.PbLookupOrPutKvRequest; import org.apache.fluss.rpc.messages.PbLookupReqForBucket; import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket; import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket; @@ -225,6 +227,24 @@ public static PutKvRequest newPutKvRequest( return putKvRequest; } + public static LookupOrPutKvRequest newLookupOrPutKvRequest( + long tableId, int bucketId, int acks, KvRecordBatch kvRecordBatch) { + LookupOrPutKvRequest lookupOrPutKvRequest = new LookupOrPutKvRequest(); + lookupOrPutKvRequest.setTableId(tableId).setAcks(acks).setTimeoutMs(10000); + PbLookupOrPutKvRequest pbLookupOrPutKvRequest = new PbLookupOrPutKvRequest(); + pbLookupOrPutKvRequest.setBucketId(bucketId); + if (kvRecordBatch instanceof DefaultKvRecordBatch) { + DefaultKvRecordBatch batch = (DefaultKvRecordBatch) kvRecordBatch; + pbLookupOrPutKvRequest.setRecords( + batch.getMemorySegment(), batch.getPosition(), batch.sizeInBytes()); + } else { + throw new IllegalArgumentException( + "Unsupported KvRecordBatch type: " + kvRecordBatch.getClass().getName()); + } + lookupOrPutKvRequest.addAllBucketsReqs(Collections.singletonList(pbLookupOrPutKvRequest)); + return lookupOrPutKvRequest; + } + public static FetchLogRequest newFetchLogRequest( int followerId, long tableId, int bucketId, long fetchOffset) { return newFetchLogRequest(followerId, tableId, bucketId, fetchOffset, null);