Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public class MetricNames {
public static final String FAILED_LOOKUP_REQUESTS_RATE = "failedLookupRequestsPerSecond";
public static final String TOTAL_PUT_KV_REQUESTS_RATE = "totalPutKvRequestsPerSecond";
public static final String FAILED_PUT_KV_REQUESTS_RATE = "failedPutKvRequestsPerSecond";
public static final String TOTAL_LOOKUP_OR_PUT_KV_REQUESTS_RATE =
"totalLookupOrPutKvRequestsPerSecond";
public static final String FAILED_LOOKUP_OR_PUT_KV_REQUESTS_RATE =
"failedLookupOrPutKvRequestsPerSecond";
public static final String TOTAL_LIMIT_SCAN_REQUESTS_RATE = "totalLimitScanRequestsPerSecond";
public static final String FAILED_LIMIT_SCAN_REQUESTS_RATE = "failedLimitScanRequestsPerSecond";
public static final String TOTAL_PREFIX_LOOKUP_REQUESTS_RATE =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.rpc.entity;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;

import java.util.List;

/** Result of {@link org.apache.fluss.rpc.messages.LookupOrPutKvRequest} for each table bucket. */
@Internal
public class LookupOrPutKvResultForBucket extends WriteResultForBucket {

private final List<byte[]> values;

public LookupOrPutKvResultForBucket(
TableBucket tableBucket, long changeLogEndOffset, List<byte[]> values) {
super(tableBucket, changeLogEndOffset, ApiError.NONE);
this.values = values;
}

public LookupOrPutKvResultForBucket(TableBucket tableBucket, ApiError error) {
super(tableBucket, -1L, error);
this.values = null;
}

public List<byte[]> lookupValues() {
return values;
}

@Override
public <T extends WriteResultForBucket> T copy(Errors newError) {
//noinspection unchecked
return (T) new LookupOrPutKvResultForBucket(tableBucket, newError.toApiError());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.fluss.rpc.messages.LimitScanResponse;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListOffsetsResponse;
import org.apache.fluss.rpc.messages.LookupOrPutKvRequest;
import org.apache.fluss.rpc.messages.LookupOrPutKvResponse;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.LookupResponse;
import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
Expand Down Expand Up @@ -105,6 +107,14 @@ CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
@RPC(api = ApiKeys.PUT_KV)
CompletableFuture<PutKvResponse> putKv(PutKvRequest request);

/**
* Put kv data to the specified table bucket.
*
* @return the produce response.
*/
@RPC(api = ApiKeys.LOOKUP_OR_PUT_KV)
CompletableFuture<LookupOrPutKvResponse> lookupOrPutKv(LookupOrPutKvRequest request);

/**
* Lookup value from the specified table bucket by key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public enum ApiKeys {
REBALANCE(1049, 0, 0, PUBLIC),
LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC),
CANCEL_REBALANCE(1051, 0, 0, PUBLIC),
PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE);
PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE),
LOOKUP_OR_PUT_KV(1053, 0, 0, PUBLIC);

private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
Expand Down
29 changes: 29 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,21 @@ message PutKvResponse {
repeated PbPutKvRespForBucket buckets_resp = 1;
}

// lookup or put kv request and response
message LookupOrPutKvRequest {
required int32 acks = 1;
required int64 table_id = 2;
required int32 timeout_ms = 3;
// the indexes for the columns to write,
// if empty, means write all columns
repeated int32 target_columns = 4 [packed = true];
repeated PbLookupOrPutKvRequest buckets_req = 5;
}

message LookupOrPutKvResponse {
repeated PbLookupOrPutKvRespForBucket buckets_resp = 1;
}

// lookup request and response
message LookupRequest {
required int64 table_id = 1;
Expand Down Expand Up @@ -740,6 +755,20 @@ message PbPutKvRespForBucket {
optional string error_message = 4;
}

message PbLookupOrPutKvRequest {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
required bytes records = 3;
}

message PbLookupOrPutKvRespForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int32 error_code = 3;
optional string error_message = 4;
repeated PbValue values = 5;
}

message PbLookupReqForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.ListTablesResponse;
import org.apache.fluss.rpc.messages.LookupOrPutKvRequest;
import org.apache.fluss.rpc.messages.LookupOrPutKvResponse;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.LookupResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
Expand Down Expand Up @@ -120,6 +122,11 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
return null;
}

@Override
public CompletableFuture<LookupOrPutKvResponse> lookupOrPutKv(LookupOrPutKvRequest request) {
return null;
}

@Override
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
return CompletableFuture.completedFuture(new LookupResponse());
Expand Down
105 changes: 105 additions & 0 deletions fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.DeletionDisabledException;
import org.apache.fluss.exception.InvalidRecordException;
import org.apache.fluss.exception.KvStorageException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.memory.MemorySegmentPool;
Expand Down Expand Up @@ -81,6 +82,7 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -367,6 +369,64 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
});
}

/**
* Lookup or put the KvRecordBatch into the kv storage, if the key exists, return the value;
* otherwise, put the record and return the put value.
*
* @param kvRecords the kv records to put into
* @param targetColumns the target columns to put, null if put all columns
*/
public LookupOrPutResult lookupOrPutAsLeader(
KvRecordBatch kvRecords, @Nullable int[] targetColumns) throws Exception {
return inWriteLock(
kvLock,
() -> {
rocksDBKv.checkIfRocksDBClosed();

SchemaInfo schemaInfo = schemaGetter.getLatestSchemaInfo();
Schema latestSchema = schemaInfo.getSchema();
short latestSchemaId = (short) schemaInfo.getSchemaId();
validateSchemaId(kvRecords.schemaId(), latestSchemaId);

RowMerger currentMerger =
rowMerger.configureTargetColumns(
targetColumns, latestSchemaId, latestSchema);
AutoIncrementUpdater currentAutoIncrementUpdater =
autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId);

RowType latestRowType = latestSchema.getRowType();
WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType);
walBuilder.setWriterState(kvRecords.writerId(), kvRecords.batchSequence());
PaddingRow latestSchemaRow = new PaddingRow(latestRowType.getFieldCount());
long logEndOffsetOfPrevBatch = logTablet.localLogEndOffset();

try {
List<byte[]> values =
processLookupOrPutRecords(
kvRecords,
kvRecords.schemaId(),
currentMerger,
currentAutoIncrementUpdater,
walBuilder,
latestSchemaRow,
logEndOffsetOfPrevBatch);

LogAppendInfo logAppendInfo = logTablet.appendAsLeader(walBuilder.build());

if (logAppendInfo.duplicated()) {
kvPreWriteBuffer.truncateTo(
logEndOffsetOfPrevBatch, TruncateReason.DUPLICATED);
}
return new LookupOrPutResult(logAppendInfo, values);
} catch (Throwable t) {
kvPreWriteBuffer.truncateTo(logEndOffsetOfPrevBatch, TruncateReason.ERROR);
throw t;
} finally {
walBuilder.deallocate();
}
});
}

private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) {
if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
throw new SchemaNotExistException(
Expand Down Expand Up @@ -423,6 +483,51 @@ private void processKvRecords(
}
}

private List<byte[]> processLookupOrPutRecords(
KvRecordBatch kvRecords,
short schemaIdOfNewData,
RowMerger currentMerger,
AutoIncrementUpdater autoIncrementUpdater,
WalBuilder walBuilder,
PaddingRow latestSchemaRow,
long startLogOffset)
throws Exception {
long logOffset = startLogOffset;
List<byte[]> values = new ArrayList<>();

KvRecordBatch.ReadContext readContext =
KvRecordReadContext.createReadContext(kvFormat, schemaGetter);
for (KvRecord kvRecord : kvRecords.records(readContext)) {
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
BinaryRow row = kvRecord.getRow();
if (row == null) {
throw new InvalidRecordException("LookupOrPut not support deletion");
}

BinaryValue currentValue = row == null ? null : new BinaryValue(schemaIdOfNewData, row);

byte[] oldValueBytes = getFromBufferOrKv(key);
if (oldValueBytes != null) {
values.add(oldValueBytes);
} else if (currentValue == null) {
// lookupOrPut not support deletion
throw new InvalidRecordException("LookupOrPut not support deletion");
} else {
// If not found, we insert the new record.
BinaryValue newValue =
autoIncrementUpdater.updateAutoIncrementColumns(currentValue);
byte[] newValueBytes = newValue.encodeValue();
values.add(newValueBytes);

walBuilder.append(ChangeType.INSERT, latestSchemaRow.replaceRow(newValue.row));
kvPreWriteBuffer.put(key, newValueBytes, logOffset);
logOffset++;
}
}
return values;
}

private long processDeletion(
KvPreWriteBuffer.Key key,
RowMerger currentMerger,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.kv;

import org.apache.fluss.server.log.LogAppendInfo;

import java.util.List;

/** Result of lookup or put. */
public final class LookupOrPutResult {
private final LogAppendInfo logAppendInfo;
private final List<byte[]> values;

public LookupOrPutResult(LogAppendInfo logAppendInfo, List<byte[]> values) {
this.logAppendInfo = logAppendInfo;
this.values = values;
}

public LogAppendInfo getLogAppendInfo() {
return logAppendInfo;
}

public List<byte[]> getValues() {
return values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ public Counter failedPutKvRequests() {
}
}

public Counter totalLookupOrPutKvRequests() {
if (kvMetrics == null) {
return NoOpCounter.INSTANCE;
} else {
return kvMetrics.totalLookupOrPutKvRequests;
}
}

public Counter failedLookupOrPutKvRequests() {
if (kvMetrics == null) {
return NoOpCounter.INSTANCE;
} else {
return kvMetrics.failedLookupOrPutKvRequests;
}
}

public Counter totalLimitScanRequests() {
if (kvMetrics == null) {
return NoOpCounter.INSTANCE;
Expand Down Expand Up @@ -462,6 +478,8 @@ private static class KvMetricGroup extends TabletMetricGroup {
private final Counter failedLookupRequests;
private final Counter totalPutKvRequests;
private final Counter failedPutKvRequests;
private final Counter totalLookupOrPutKvRequests;
private final Counter failedLookupOrPutKvRequests;
private final Counter totalLimitScanRequests;
private final Counter failedLimitScanRequests;
private final Counter totalPrefixLookupRequests;
Expand All @@ -480,6 +498,15 @@ public KvMetricGroup(TableMetricGroup tableMetricGroup) {
meter(MetricNames.TOTAL_PUT_KV_REQUESTS_RATE, new MeterView(totalPutKvRequests));
failedPutKvRequests = new ThreadSafeSimpleCounter();
meter(MetricNames.FAILED_PUT_KV_REQUESTS_RATE, new MeterView(failedPutKvRequests));
// for lookup or put kv request
totalLookupOrPutKvRequests = new ThreadSafeSimpleCounter();
meter(
MetricNames.TOTAL_LOOKUP_OR_PUT_KV_REQUESTS_RATE,
new MeterView(totalLookupOrPutKvRequests));
failedLookupOrPutKvRequests = new ThreadSafeSimpleCounter();
meter(
MetricNames.FAILED_LOOKUP_OR_PUT_KV_REQUESTS_RATE,
new MeterView(failedLookupOrPutKvRequests));
// for limit scan request
totalLimitScanRequests = new ThreadSafeSimpleCounter();
meter(
Expand Down
Loading
Loading