Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

Expand Down Expand Up @@ -783,6 +784,14 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
}
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
if ("ERROR".equalsIgnoreCase(policy)) {
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
} else {
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
}
}
}
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
this.id, jobProperties, dataSourceProperties);
Expand Down Expand Up @@ -955,6 +964,6 @@ public NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserExce
return new NereidsRoutineLoadTaskInfo(execMemLimit, new HashMap<>(jobProperties), maxBatchIntervalS, partitions,
mergeType, deleteCondition, sequenceCol, maxFilterRatio, importColumnDescs, precedingFilter,
whereExpr, columnSeparator, lineDelimiter, enclose, escape, sendBatchParallelism, loadToSingleTablet,
isPartialUpdate, memtableOnSinkNode);
isPartialUpdate, partialUpdateNewKeyPolicy, memtableOnSinkNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
Expand Down Expand Up @@ -223,6 +224,7 @@ public boolean isFinalState() {
protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;

protected boolean isPartialUpdate = false;
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;

protected String sequenceCol;

Expand Down Expand Up @@ -388,6 +390,9 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException {
jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? "true" : "false");
if (info.isPartialUpdate()) {
this.isPartialUpdate = true;
this.partialUpdateNewKeyPolicy = info.getPartialUpdateNewKeyPolicy();
jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
}
jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));

Expand Down Expand Up @@ -1869,6 +1874,10 @@ public String jobPropertiesToJsonString() {

// job properties defined in CreateRoutineLoadStmt
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
if (isPartialUpdate) {
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
}
jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum));
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS));
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows));
Expand Down Expand Up @@ -1921,6 +1930,12 @@ public void gsonPostProcess() throws IOException {
jobProperties.forEach((k, v) -> {
if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
isPartialUpdate = Boolean.parseBoolean(v);
} else if (k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
if ("ERROR".equalsIgnoreCase(v)) {
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
} else {
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
}
}
});
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;

import com.google.common.base.Strings;

Expand Down Expand Up @@ -65,7 +67,8 @@ public class NereidsRoutineLoadTaskInfo implements NereidsLoadTaskInfo {
protected boolean emptyFieldAsNull;
protected int sendBatchParallelism;
protected boolean loadToSingleTablet;
protected boolean isPartialUpdate;
protected TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
protected boolean memtableOnSinkNode;
protected int timeoutSec;

Expand All @@ -77,7 +80,8 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProp
String sequenceCol, double maxFilterRatio, NereidsImportColumnDescs columnDescs,
Expression precedingFilter, Expression whereExpr, Separator columnSeparator,
Separator lineDelimiter, byte enclose, byte escape, int sendBatchParallelism,
boolean loadToSingleTablet, boolean isPartialUpdate, boolean memtableOnSinkNode) {
boolean loadToSingleTablet, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
boolean memtableOnSinkNode) {
this.execMemLimit = execMemLimit;
this.jobProperties = jobProperties;
this.maxBatchIntervalS = maxBatchIntervalS;
Expand All @@ -95,7 +99,10 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProp
this.escape = escape;
this.sendBatchParallelism = sendBatchParallelism;
this.loadToSingleTablet = loadToSingleTablet;
this.isPartialUpdate = isPartialUpdate;
if (isPartialUpdate) {
this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}
this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
this.memtableOnSinkNode = memtableOnSinkNode;
this.timeoutSec = calTimeoutSec();
}
Expand Down Expand Up @@ -311,7 +318,22 @@ public List<String> getHiddenColumns() {

@Override
public boolean isFixedPartialUpdate() {
return isPartialUpdate;
return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}

@Override
public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
return uniquekeyUpdateMode;
}

@Override
public boolean isFlexiblePartialUpdate() {
return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
}

@Override
public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() {
return partialUpdateNewKeyPolicy;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class CreateRoutineLoadInfo {
public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";

public static final String PARTIAL_COLUMNS = "partial_columns";
public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior";
public static final String WORKLOAD_GROUP = "workload_group";
public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
Expand Down Expand Up @@ -122,6 +124,7 @@ public class CreateRoutineLoadInfo {
.add(SEND_BATCH_PARALLELISM)
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.add(PARTIAL_UPDATE_NEW_KEY_POLICY)
.add(WORKLOAD_GROUP)
.add(FileFormatProperties.PROP_FORMAT)
.add(JsonFileFormatProperties.PROP_JSON_PATHS)
Expand Down Expand Up @@ -166,6 +169,7 @@ public class CreateRoutineLoadInfo {
* support partial columns load(Only Unique Key Columns)
*/
private boolean isPartialUpdate = false;
private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;

private String comment = "";

Expand Down Expand Up @@ -195,6 +199,15 @@ public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, String tableName,
.createDataSource(typeName, dataSourceProperties, this.isMultiTable);
this.mergeType = mergeType;
this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true");
if (this.isPartialUpdate && this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
String policyStr = this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
if ("APPEND".equals(policyStr)) {
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
} else if ("ERROR".equals(policyStr)) {
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
}
// validation will be done in checkJobProperties()
}
if (comment != null) {
this.comment = comment;
}
Expand Down Expand Up @@ -276,6 +289,10 @@ public boolean isPartialUpdate() {
return isPartialUpdate;
}

public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() {
return partialUpdateNewKeyPolicy;
}

public String getComment() {
return comment;
}
Expand Down Expand Up @@ -515,6 +532,19 @@ public void checkJobProperties() throws UserException {
}
timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, timezone));

// check partial_update_new_key_behavior
if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
if (!isPartialUpdate) {
throw new AnalysisException(
PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when partial_columns is true");
}
String policy = jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
throw new AnalysisException(
PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of {'APPEND', 'ERROR'}, but found " + policy);
}
}

String format = jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
fileFormatProperties = FileFormatProperties.createFileFormatProperties(format);
fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_initial --
1 alice 100 20
2 bob 90 21
3 charlie 80 22

-- !select_after_partial_update --
1 alice 150 20
2 bob 95 21
3 charlie 80 22
100 \N 100 \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_initial --
1 1 1 1
2 2 2 2
3 3 3 3

-- !select_after_append --
1 10 1 1
2 20 2 2
3 3 3 3
4 40 \N \N
5 50 \N \N

-- !select_after_error_mode --
1 1 100 1
2 2 200 2
3 3 3 3
4 4 40 4
5 5 50 5

-- !select_after_error_rejected --
1 1 100 1
2 2 200 2
3 3 3 3
4 4 40 4
5 5 50 5

Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.util.RoutineLoadTestUtils
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

suite("test_routine_load_partial_update", "nonConcurrent") {
def kafkaCsvTopic = "test_routine_load_partial_update"

if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
def runSql = { String q -> sql q }
def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)

def tableName = "test_routine_load_partial_update"
def job = "test_partial_update_job"

sql """ DROP TABLE IF EXISTS ${tableName} force;"""
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`id` int NULL,
`name` varchar(65533) NULL,
`score` int NULL,
`age` int NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'test partial update'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
"""

// insert initial data
sql """
INSERT INTO ${tableName} VALUES
(1, 'alice', 100, 20),
(2, 'bob', 90, 21),
(3, 'charlie', 80, 22)
"""

qt_select_initial "SELECT * FROM ${tableName} ORDER BY id"

try {
// create routine load with partial_columns=true
// only update id and score columns
sql """
CREATE ROUTINE LOAD ${job} ON ${tableName}
COLUMNS TERMINATED BY ",",
COLUMNS (id, score)
PROPERTIES
(
"max_batch_interval" = "10",
"partial_columns" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "${kafka_broker}",
"kafka_topic" = "${kafkaCsvTopic}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""

// send partial update data to kafka
// update score for id=1 from 100 to 150
// update score for id=2 from 90 to 95
def data = [
"1,150",
"2,95",
"100,100"
]

data.each { line ->
logger.info("Sending to Kafka: ${line}")
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
producer.send(record).get()
}
producer.flush()

// wait for routine load task to finish
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3)

// verify partial update: score should be updated, name and age should remain unchanged
qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER BY id"
} catch (Exception e) {
logger.error("Error during test: " + e.getMessage())
throw e
} finally {
sql "STOP ROUTINE LOAD FOR ${job}"
}
}
}
Loading
Loading