diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 8de08a1d588185..b3faa9b8ce8b34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -50,6 +50,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -783,6 +784,14 @@ private void modifyPropertiesInternal(Map jobProperties, if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); } + if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { + String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY); + if ("ERROR".equalsIgnoreCase(policy)) { + this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR; + } else { + this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + } + } } LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", this.id, jobProperties, dataSourceProperties); @@ -955,6 +964,6 @@ public NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserExce return new NereidsRoutineLoadTaskInfo(execMemLimit, new HashMap<>(jobProperties), maxBatchIntervalS, partitions, mergeType, deleteCondition, sequenceCol, maxFilterRatio, importColumnDescs, precedingFilter, whereExpr, columnSeparator, lineDelimiter, enclose, escape, sendBatchParallelism, loadToSingleTablet, - isPartialUpdate, memtableOnSinkNode); + isPartialUpdate, partialUpdateNewKeyPolicy, memtableOnSinkNode); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a3072b8dbd517b..f0d17e32e74dba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -68,6 +68,7 @@ import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; @@ -223,6 +224,7 @@ public boolean isFinalState() { protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; protected boolean isPartialUpdate = false; + protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; protected String sequenceCol; @@ -388,6 +390,9 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException { jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? "true" : "false"); if (info.isPartialUpdate()) { this.isPartialUpdate = true; + this.partialUpdateNewKeyPolicy = info.getPartialUpdateNewKeyPolicy(); + jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY, + this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND"); } jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); @@ -1869,6 +1874,10 @@ public String jobPropertiesToJsonString() { // job properties defined in CreateRoutineLoadStmt jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + if (isPartialUpdate) { + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY, + partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND"); + } jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum)); jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS)); jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows)); @@ -1921,6 +1930,12 @@ public void gsonPostProcess() throws IOException { jobProperties.forEach((k, v) -> { if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { isPartialUpdate = Boolean.parseBoolean(v); + } else if (k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { + if ("ERROR".equalsIgnoreCase(v)) { + partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR; + } else { + partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + } } }); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java index 381912882d4e55..eb0ae35eeab106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java @@ -28,6 +28,8 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Strings; @@ -65,7 +67,8 @@ public class NereidsRoutineLoadTaskInfo implements NereidsLoadTaskInfo { protected boolean emptyFieldAsNull; protected int sendBatchParallelism; protected boolean loadToSingleTablet; - protected boolean isPartialUpdate; + protected TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; protected boolean memtableOnSinkNode; protected int timeoutSec; @@ -77,7 +80,8 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map jobProp String sequenceCol, double maxFilterRatio, NereidsImportColumnDescs columnDescs, Expression precedingFilter, Expression whereExpr, Separator columnSeparator, Separator lineDelimiter, byte enclose, byte escape, int sendBatchParallelism, - boolean loadToSingleTablet, boolean isPartialUpdate, boolean memtableOnSinkNode) { + boolean loadToSingleTablet, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, + boolean memtableOnSinkNode) { this.execMemLimit = execMemLimit; this.jobProperties = jobProperties; this.maxBatchIntervalS = maxBatchIntervalS; @@ -95,7 +99,10 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map jobProp this.escape = escape; this.sendBatchParallelism = sendBatchParallelism; this.loadToSingleTablet = loadToSingleTablet; - this.isPartialUpdate = isPartialUpdate; + if (isPartialUpdate) { + this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.memtableOnSinkNode = memtableOnSinkNode; this.timeoutSec = calTimeoutSec(); } @@ -311,7 +318,22 @@ public List getHiddenColumns() { @Override public boolean isFixedPartialUpdate() { - return isPartialUpdate; + return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + + @Override + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return uniquekeyUpdateMode; + } + + @Override + public boolean isFlexiblePartialUpdate() { + return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; + } + + @Override + public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() { + return partialUpdateNewKeyPolicy; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java index e5fe196799ff02..4b91b0b705865f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java @@ -56,6 +56,7 @@ import org.apache.doris.nereids.util.PlanUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -88,6 +89,7 @@ public class CreateRoutineLoadInfo { public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit"; public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior"; public static final String WORKLOAD_GROUP = "workload_group"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; @@ -122,6 +124,7 @@ public class CreateRoutineLoadInfo { .add(SEND_BATCH_PARALLELISM) .add(LOAD_TO_SINGLE_TABLET) .add(PARTIAL_COLUMNS) + .add(PARTIAL_UPDATE_NEW_KEY_POLICY) .add(WORKLOAD_GROUP) .add(FileFormatProperties.PROP_FORMAT) .add(JsonFileFormatProperties.PROP_JSON_PATHS) @@ -166,6 +169,7 @@ public class CreateRoutineLoadInfo { * support partial columns load(Only Unique Key Columns) */ private boolean isPartialUpdate = false; + private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; private String comment = ""; @@ -195,6 +199,15 @@ public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, String tableName, .createDataSource(typeName, dataSourceProperties, this.isMultiTable); this.mergeType = mergeType; this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); + if (this.isPartialUpdate && this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) { + String policyStr = this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase(); + if ("APPEND".equals(policyStr)) { + this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + } else if ("ERROR".equals(policyStr)) { + this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR; + } + // validation will be done in checkJobProperties() + } if (comment != null) { this.comment = comment; } @@ -276,6 +289,10 @@ public boolean isPartialUpdate() { return isPartialUpdate; } + public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() { + return partialUpdateNewKeyPolicy; + } + public String getComment() { return comment; } @@ -515,6 +532,19 @@ public void checkJobProperties() throws UserException { } timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, timezone)); + // check partial_update_new_key_behavior + if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) { + if (!isPartialUpdate) { + throw new AnalysisException( + PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when partial_columns is true"); + } + String policy = jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase(); + if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) { + throw new AnalysisException( + PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of {'APPEND', 'ERROR'}, but found " + policy); + } + } + String format = jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv"); fileFormatProperties = FileFormatProperties.createFileFormatProperties(format); fileFormatProperties.analyzeFileFormatProperties(jobProperties, false); diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out new file mode 100644 index 00000000000000..a8eed2b2a057fb --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_initial -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_partial_update -- +1 alice 150 20 +2 bob 95 21 +3 charlie 80 22 +100 \N 100 \N + diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out new file mode 100644 index 00000000000000..45b434b8cad23f --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.out @@ -0,0 +1,27 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_initial -- +1 1 1 1 +2 2 2 2 +3 3 3 3 + +-- !select_after_append -- +1 10 1 1 +2 20 2 2 +3 3 3 3 +4 40 \N \N +5 50 \N \N + +-- !select_after_error_mode -- +1 1 100 1 +2 2 200 2 +3 3 3 3 +4 4 40 4 +5 5 50 5 + +-- !select_after_error_rejected -- +1 1 100 1 +2 2 200 2 +3 3 3 3 +4 4 40 4 +5 5 50 5 + diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy new file mode 100644 index 00000000000000..85888b9e439b9b --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update.groovy @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.RoutineLoadTestUtils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord + +suite("test_routine_load_partial_update", "nonConcurrent") { + def kafkaCsvTopic = "test_routine_load_partial_update" + + if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) { + def runSql = { String q -> sql q } + def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context) + def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker) + + def tableName = "test_routine_load_partial_update" + def job = "test_partial_update_job" + + sql """ DROP TABLE IF EXISTS ${tableName} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` int NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'test partial update' + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial "SELECT * FROM ${tableName} ORDER BY id" + + try { + // create routine load with partial_columns=true + // only update id and score columns + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY ",", + COLUMNS (id, score) + PROPERTIES + ( + "max_batch_interval" = "10", + "partial_columns" = "true" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send partial update data to kafka + // update score for id=1 from 100 to 150 + // update score for id=2 from 90 to 95 + def data = [ + "1,150", + "2,95", + "100,100" + ] + + data.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3) + + // verify partial update: score should be updated, name and age should remain unchanged + qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job}" + } + } +} diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy new file mode 100644 index 00000000000000..a6a97253e986d0 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.RoutineLoadTestUtils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord + +suite("test_routine_load_partial_update_new_key_behavior", "nonConcurrent") { + def kafkaCsvTopic = "test_routine_load_partial_update_new_key_behavior" + + if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) { + def runSql = { String q -> sql q } + def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context) + def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker) + + // Test 1: partial_update_new_key_behavior=APPEND (default) + def tableName1 = "test_routine_load_pu_new_key_append" + def job1 = "test_new_key_behavior_append" + + sql """ DROP TABLE IF EXISTS ${tableName1} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + sql """ + INSERT INTO ${tableName1} VALUES + (1, 1, 1, 1), + (2, 2, 2, 2), + (3, 3, 3, 3) + """ + + qt_select_initial "SELECT * FROM ${tableName1} ORDER BY k" + try { + sql """ + CREATE ROUTINE LOAD ${job1} ON ${tableName1} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c1) + PROPERTIES + ( + "max_batch_interval" = "10", + "partial_columns" = "true", + "partial_update_new_key_behavior" = "append" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send data with existing keys and new keys + def data1 = [ + "1,10", // update existing key + "2,20", // update existing key + "4,40", // new key - should be appended with default values for c2 and c3 + "5,50" // new key - should be appended with default values for c2 and c3 + ] + + data1.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + sql "set skip_delete_bitmap=true;" + sql "sync;" + RoutineLoadTestUtils.waitForTaskFinish(runSql, job1, tableName1, 6) + sql "set skip_delete_bitmap=false;" + sql "sync;" + + // verify: new keys should be appended + qt_select_after_append "SELECT * FROM ${tableName1} ORDER BY k" + + } catch (Exception e) { + logger.info("Caught expected exception: ${e.getMessage()}") + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job1}" + } + + // Test 2: partial_update_new_key_behavior=ERROR + def tableName2 = "test_routine_load_pu_new_key_error" + def job2 = "test_new_key_behavior_error" + + sql """ DROP TABLE IF EXISTS ${tableName2} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + sql """ + INSERT INTO ${tableName2} VALUES + (1, 1, 1, 1), + (2, 2, 2, 2), + (3, 3, 3, 3), + (4, 4, 4, 4), + (5, 5, 5, 5) + """ + try { + sql """ + CREATE ROUTINE LOAD ${job2} ON ${tableName2} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c2) + PROPERTIES + ( + "max_batch_interval" = "10", + "partial_columns" = "true", + "partial_update_new_key_behavior" = "error" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send data with only existing keys (should succeed) + def data2 = [ + "1,100", + "2,200" + ] + + data2.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + sql "set skip_delete_bitmap=true;" + sql "sync;" + RoutineLoadTestUtils.waitForTaskFinish(runSql, job2, tableName2, 6) + sql "set skip_delete_bitmap=false;" + sql "sync;" + + // verify: existing keys should be updated + qt_select_after_error_mode "SELECT * FROM ${tableName2} ORDER BY k" + + // Now send data with new keys - this should fail the task + def data3 = [ + "10,1000", // new key - should cause error + "11,1100" // new key - should cause error + ] + + data3.each { line -> + logger.info("Sending to Kafka with new keys: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record).get() + } + producer.flush() + + RoutineLoadTestUtils.waitForTaskAbort(runSql, job2) + def state = sql "SHOW ROUTINE LOAD FOR ${job2}" + logger.info("routine load state after new keys: ${state[0][8].toString()}") + logger.info("routine load error rows: ${state[0][15].toString()}") + + // the data should not be loaded due to error + qt_select_after_error_rejected "SELECT * FROM ${tableName2} ORDER BY k" + + } catch (Exception e) { + logger.info("Caught expected exception: ${e.getMessage()}") + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job2}" + } + + // Test 3: Test invalid property value + def tableName3 = "test_routine_load_pu_invalid_prop" + sql """ DROP TABLE IF EXISTS ${tableName3} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD test_invalid_property ON ${tableName3} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c3) + PROPERTIES + ( + "partial_columns" = "true", + "partial_update_new_key_behavior" = "invalid" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "partial_update_new_key_behavior should be one of {'APPEND', 'ERROR'}" + } + + // Test 4: Test setting property without partial_columns + def tableName4 = "test_routine_load_pu_without_partial" + sql """ DROP TABLE IF EXISTS ${tableName4} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName4} ( + `k` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD test_without_partial_columns ON ${tableName4} + COLUMNS TERMINATED BY ",", + COLUMNS (k, c3) + PROPERTIES + ( + "partial_update_new_key_behavior" = "append" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "partial_update_new_key_behavior can only be set when partial_columns is true" + } + } +}