From 62ad263486d4891d9eb09c24278a3ad264bf2143 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Mon, 5 Jan 2026 09:58:52 +0800 Subject: [PATCH 01/10] Improve DeviceViewIntoOperator's return style to pipeline --- .../operator/process/AbstractIntoOperator.java | 6 ++++-- .../operator/process/DeviceViewIntoOperator.java | 10 ++++++++++ .../execution/operator/process/IntoOperator.java | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java index 99317ba7b09e..507e6946bcf0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java @@ -152,7 +152,7 @@ public TsBlock next() throws Exception { checkLastWriteOperation(); if (!processTsBlock(cachedTsBlock)) { - return null; + return tryToReturnPartialResult(); } cachedTsBlock = null; if (child.hasNextWithTimer()) { @@ -160,7 +160,7 @@ public TsBlock next() throws Exception { processTsBlock(inputTsBlock); // call child.next only once - return null; + return tryToReturnPartialResult(); } else { return tryToReturnResultTsBlock(); } @@ -218,6 +218,8 @@ private void checkLastWriteOperation() { protected abstract TsBlock tryToReturnResultTsBlock(); + protected abstract TsBlock tryToReturnPartialResult(); + protected static List constructInsertTabletStatementGenerators( Map> targetPathToSourceInputLocationMap, Map> targetPathToDataTypeMap, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index e584f79d1b65..539a65bfede8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -144,6 +144,16 @@ protected TsBlock tryToReturnResultTsBlock() { return resultTsBlockBuilder.build(); } + @Override + protected TsBlock tryToReturnPartialResult() { + if (resultTsBlockBuilder.isFull()) { + TsBlock res = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return res; + } + return null; + } + private List constructInsertTabletStatementGeneratorsByDevice(String currentDevice) { Map> targetPathToSourceInputLocationMap = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java index 5e516eeacfa0..a30cab76ae0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java @@ -102,6 +102,11 @@ protected TsBlock tryToReturnResultTsBlock() { return constructResultTsBlock(); } + @Override + protected TsBlock tryToReturnPartialResult() { + return null; + } + private TsBlock constructResultTsBlock() { List outputDataTypes = ColumnHeaderConstant.selectIntoColumnHeaders.stream() From 2dc26012d6f356a0dd98aa54c25aec53e191e21a Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 5 Jan 2026 17:05:47 +0800 Subject: [PATCH 02/10] Optimize DeviceViewIntoOperator to batch insert tablets across multiple devices --- .../process/AbstractIntoOperator.java | 9 ++- .../process/DeviceViewIntoOperator.java | 61 +++++++++++++++++-- .../plan/analyze/AnalyzeVisitor.java | 10 +-- .../plan/analyze/SelectIntoUtils.java | 42 +------------ .../DeviceViewIntoPathDescriptor.java | 5 +- .../plan/parameter/IntoPathDescriptor.java | 2 +- 6 files changed, 70 insertions(+), 59 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java index 507e6946bcf0..777a477eea76 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.process; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -191,7 +192,7 @@ private void checkLastWriteOperation() { String.format( "Error occurred while inserting tablets in SELECT INTO: %s", executionStatus.getMessage()); - throw new IntoProcessException(message); + throw new IoTDBRuntimeException(message, executionStatus.getCode()); } for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { @@ -288,7 +289,7 @@ protected void executeInsertMultiTabletsStatement( () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor); } - private boolean existFullStatement( + protected boolean existFullStatement( List insertTabletStatementGenerators) { for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { if (generator.isFull()) { @@ -551,6 +552,10 @@ public String getDevice() { return devicePath.toString(); } + public int getRowCount() { + return rowCount; + } + public int getWrittenCount(String measurement) { if (!writtenCounter.containsKey(measurement)) { return -1; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 539a65bfede8..ae87862add96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.queryengine.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.runtime.IntoProcessException; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; @@ -27,6 +30,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; @@ -38,6 +42,7 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,6 +53,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeviceViewIntoOperator.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private final Map>> deviceToTargetPathSourceInputLocationMap; @@ -59,6 +65,8 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { private final int deviceColumnIndex; private String currentDevice; + private int batchedRowCount = 0; + private final TsBlockBuilder resultTsBlockBuilder; @SuppressWarnings("squid:S107") @@ -102,7 +110,12 @@ protected boolean processTsBlock(TsBlock inputTsBlock) { constructInsertMultiTabletsStatement(false); updateResultTsBlock(); - insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + if (insertMultiTabletsStatement != null || insertTabletStatementGenerators == null) { + insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + } else { + insertTabletStatementGenerators.addAll( + constructInsertTabletStatementGeneratorsByDevice(device)); + } currentDevice = device; if (insertMultiTabletsStatement != null) { @@ -115,9 +128,15 @@ protected boolean processTsBlock(TsBlock inputTsBlock) { int readIndex = 0; while (readIndex < inputTsBlock.getPositionCount()) { int lastReadIndex = readIndex; - for (AbstractIntoOperator.InsertTabletStatementGenerator generator : - insertTabletStatementGenerators) { - lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); + + if (!insertTabletStatementGenerators.isEmpty()) { + InsertTabletStatementGenerator generatorOfCurrentDevice = + insertTabletStatementGenerators.get(insertTabletStatementGenerators.size() - 1); + int rowCountBeforeProcess = generatorOfCurrentDevice.getRowCount(); + lastReadIndex = + Math.max( + lastReadIndex, generatorOfCurrentDevice.processTsBlock(inputTsBlock, readIndex)); + batchedRowCount += generatorOfCurrentDevice.getRowCount() - rowCountBeforeProcess; } readIndex = lastReadIndex; if (insertMultiTabletsInternally(true)) { @@ -197,4 +216,38 @@ public long ramBytesUsed() { + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + resultTsBlockBuilder.getRetainedSizeInBytes(); } + + @Override + protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) { + if (insertTabletStatementGenerators == null + || (needCheck && !existFullStatement(insertTabletStatementGenerators))) { + return null; + } + + List insertTabletStatementList = new ArrayList<>(); + try { + if (child.hasNextWithTimer() + && batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()) { + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IntoProcessException(e.getMessage()); + } catch (Exception e) { + throw new IntoProcessException(e.getMessage()); + } + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + if (!generator.isEmpty()) { + insertTabletStatementList.add(generator.constructInsertTabletStatement()); + } + } + if (insertTabletStatementList.isEmpty()) { + return null; + } + + InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); + insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); + batchedRowCount = 0; + return insertMultiTabletsStatement; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index dab316392058..6098123a2be6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2383,7 +2383,6 @@ private void analyzeInto( intoComponent.validate(sourceDevices, sourceColumns); DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new DeviceViewIntoPathDescriptor(); - PathPatternTree targetPathTree = new PathPatternTree(); IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator = intoComponent.getIntoDeviceMeasurementIterator(); for (PartialPath sourceDevice : sourceDevices) { @@ -2405,7 +2404,6 @@ private void analyzeInto( deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement( sourceDevice, targetDevice, sourceColumn.getExpressionString(), targetMeasurement); - targetPathTree.appendFullPath(targetDevice, targetMeasurement); deviceViewIntoPathDescriptor.recordSourceColumnDataType( sourceColumn.getExpressionString(), analysis.getType(sourceColumn)); @@ -2415,13 +2413,7 @@ private void analyzeInto( intoDeviceMeasurementIterator.nextDevice(); } deviceViewIntoPathDescriptor.validate(); - - // fetch schema of target paths - long startTime = System.nanoTime(); - ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, true, context); - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); - deviceViewIntoPathDescriptor.bindType(targetSchemaTree); + deviceViewIntoPathDescriptor.bindType(); analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java index 2a64f06d1868..10056b8f3aa0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java @@ -21,12 +21,9 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.schema.view.LogicalViewSchema; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; -import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; @@ -37,7 +34,6 @@ import java.util.Map; import java.util.regex.Matcher; -import static com.google.common.base.Preconditions.checkState; import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS; import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN; import static org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString; @@ -134,48 +130,14 @@ public static boolean checkIsAllRawSeriesQuery(List expressions) { public static List> bindTypeForSourceTargetPathPairList( List> sourceTargetPathPairList, - Map sourceToDataTypeMap, - ISchemaTree targetSchemaTree) { + Map sourceToDataTypeMap) { List> sourceTypeBoundTargetPathPairList = new ArrayList<>(); for (Pair sourceTargetPathPair : sourceTargetPathPairList) { String sourceColumn = sourceTargetPathPair.left; TSDataType sourceColumnType = sourceToDataTypeMap.get(sourceColumn); - MeasurementPath targetPathWithSchema; PartialPath targetPath = sourceTargetPathPair.right; - List actualTargetPaths = - targetSchemaTree.searchMeasurementPaths(targetPath).left; - if (actualTargetPaths.isEmpty()) { - targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType); - } else { - checkState(actualTargetPaths.size() == 1); - MeasurementPath actualTargetPath = actualTargetPaths.get(0); - if (actualTargetPath.getMeasurementSchema().isLogicalView()) { - LogicalViewSchema viewSchema = - (LogicalViewSchema) actualTargetPath.getMeasurementSchema(); - if (viewSchema.isWritable()) { - MeasurementPath viewSourceSeriesPath = - targetSchemaTree - .searchMeasurementPaths(viewSchema.getSourcePathIfWritable()) - .left - .get(0); - actualTargetPath = - new MeasurementPath(targetPath, viewSourceSeriesPath.getSeriesType()); - actualTargetPath.setUnderAlignedEntity(viewSourceSeriesPath.isUnderAlignedEntity()); - } else { - throw new SemanticException( - String.format("View %s doesn't support data insertion.", targetPath)); - } - } - if (!TypeInferenceUtils.canAutoCast(sourceColumnType, actualTargetPath.getSeriesType())) { - throw new SemanticException( - String.format( - "The data type of target path (%s[%s]) is not compatible with the data type of source column (%s[%s]).", - targetPath, actualTargetPath.getSeriesType(), sourceColumn, sourceColumnType)); - } - // no need to check alignment, because the interface is common - targetPathWithSchema = actualTargetPath; - } + targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType); sourceTypeBoundTargetPathPairList.add(new Pair<>(sourceColumn, targetPathWithSchema)); } return sourceTypeBoundTargetPathPairList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java index 4b43abe5d72a..7197f4c7ca82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils; import org.apache.tsfile.enums.TSDataType; @@ -100,7 +99,7 @@ public void validate() { } } - public void bindType(ISchemaTree targetSchemaTree) { + public void bindType() { Map>> deviceToSourceTypeBoundTargetPathPairListMap = new HashMap<>(); for (Map.Entry>> sourceTargetEntry : @@ -108,7 +107,7 @@ public void bindType(ISchemaTree targetSchemaTree) { deviceToSourceTypeBoundTargetPathPairListMap.put( sourceTargetEntry.getKey(), SelectIntoUtils.bindTypeForSourceTargetPathPairList( - sourceTargetEntry.getValue(), sourceToDataTypeMap, targetSchemaTree)); + sourceTargetEntry.getValue(), sourceToDataTypeMap)); } this.deviceToSourceTargetPathPairListMap = deviceToSourceTypeBoundTargetPathPairListMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java index 35765ee5c079..de163628eb02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java @@ -104,7 +104,7 @@ public void validate() { public void bindType(ISchemaTree targetSchemaTree) { this.sourceTargetPathPairList = SelectIntoUtils.bindTypeForSourceTargetPathPairList( - sourceTargetPathPairList, sourceToDataTypeMap, targetSchemaTree); + sourceTargetPathPairList, sourceToDataTypeMap); } public List> getSourceTargetPathPairList() { From 2c1d71c8d71e968b043ad285682a0e54435def9b Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 5 Jan 2026 17:27:25 +0800 Subject: [PATCH 03/10] remove another 'fetchSchema' --- .../db/queryengine/plan/analyze/AnalyzeVisitor.java | 10 +--------- .../planner/plan/parameter/IntoPathDescriptor.java | 3 +-- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 6098123a2be6..e79fe9e3f406 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2437,7 +2437,6 @@ private void analyzeInto( intoComponent.validate(sourceColumns); IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor(); - PathPatternTree targetPathTree = new PathPatternTree(); IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator(); for (Pair pair : outputExpressions) { Expression sourceExpression = pair.left; @@ -2469,7 +2468,6 @@ private void analyzeInto( intoPathDescriptor.specifyDeviceAlignment( targetPath.getDevicePath().toString(), isAlignedDevice); - targetPathTree.appendFullPath(targetPath); intoPathDescriptor.recordSourceColumnDataType( sourceColumn, analysis.getType(sourceExpression)); @@ -2477,13 +2475,7 @@ private void analyzeInto( } intoPathDescriptor.validate(); - // fetch schema of target paths - long startTime = System.nanoTime(); - ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, true, context); - updateSchemaTreeByViews(analysis, targetSchemaTree, context); - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); - intoPathDescriptor.bindType(targetSchemaTree); + intoPathDescriptor.bindType(); analysis.setIntoPathDescriptor(intoPathDescriptor); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java index de163628eb02..29f04834f67d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils; import org.apache.commons.lang3.StringUtils; @@ -101,7 +100,7 @@ public void validate() { } } - public void bindType(ISchemaTree targetSchemaTree) { + public void bindType() { this.sourceTargetPathPairList = SelectIntoUtils.bindTypeForSourceTargetPathPairList( sourceTargetPathPairList, sourceToDataTypeMap); From 6a8354d151ce5b6e41aa583ab7cc49bf1bcea3d9 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 5 Jan 2026 18:57:45 +0800 Subject: [PATCH 04/10] fix it --- .../operator/process/AbstractIntoOperator.java | 3 +-- .../operator/process/DeviceViewIntoOperator.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java index 777a477eea76..2e40f92a2bdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.process; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -192,7 +191,7 @@ private void checkLastWriteOperation() { String.format( "Error occurred while inserting tablets in SELECT INTO: %s", executionStatus.getMessage()); - throw new IoTDBRuntimeException(message, executionStatus.getCode()); + throw new IntoProcessException(message); } for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index ae87862add96..4a6e09e028d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -250,4 +250,19 @@ protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boole batchedRowCount = 0; return insertMultiTabletsStatement; } + + @Override + protected int findWritten(String device, String measurement) { + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + if (!Objects.equals(generator.getDevice(), device)) { + continue; + } + int writtenCountInCurrentGenerator = generator.getWrittenCount(measurement); + if (writtenCountInCurrentGenerator >= 0) { + return writtenCountInCurrentGenerator; + } + continue; + } + return 0; + } } From e5a3b808c973b7886524626e68e0b3c3cdbc7ceb Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 5 Jan 2026 19:10:09 +0800 Subject: [PATCH 05/10] fix it --- .../db/it/selectinto/IoTDBSelectIntoIT.java | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java index 3e013c898ba8..4abd542befcc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java @@ -623,89 +623,89 @@ public void testDataTypeIncompatible() { // test INT32 assertTestFail( "select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]"); assertTestFail( "select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]"); // test INT64 assertTestFail( "select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]"); // test FLOAT assertTestFail( "select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value 0.0]"); // test DOUBLE assertTestFail( "select s_double into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value 0.0]"); // test BOOLEAN assertTestFail( "select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value false]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value false]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value false]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value false]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value false]"); // test TEXT assertTestFail( "select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value 0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value 0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value 0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value 0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value 0]"); } @Test From 4fefd5c8ed4ee2b3d9a82e3127231f9ec8212016 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 6 Jan 2026 09:39:28 +0800 Subject: [PATCH 06/10] fix it --- .../db/it/selectinto/IoTDBSelectIntoIT.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java index 4abd542befcc..f4acd5257c9a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java @@ -676,36 +676,36 @@ public void testDataTypeIncompatible() { // test BOOLEAN assertTestFail( "select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value false]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value false]"); + "301: Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value false]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value false]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value false]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value true]"); // test TEXT assertTestFail( "select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value 0]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value 0]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value 0]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value 0]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value 0]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value text0]"); } @Test From 39997ea458c245223a9d9bc364a9dea0e0d340d9 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 6 Jan 2026 14:45:58 +0800 Subject: [PATCH 07/10] fix status code --- .../iotdb/db/queryengine/plan/execution/QueryExecution.java | 2 +- .../apache/iotdb/db/queryengine/plan/planner/IPlanner.java | 4 +--- .../iotdb/db/queryengine/plan/planner/TreeModelPlanner.java | 5 ++--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 350dd6394d90..0ae403452683 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -617,7 +617,7 @@ private ExecutionResult getExecutionResult(QueryState state) { // info to client if (!CONFIG.isEnable13DataInsertAdapt() || IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) { - planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus, statusCode); + planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus); } return new ExecutionResult(context.getQueryId(), tsstatus); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java index 616cca3efb75..b9b239b5e26e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; -import org.apache.iotdb.rpc.TSStatusCode; import java.util.concurrent.ScheduledExecutorService; @@ -49,6 +48,5 @@ IScheduler doSchedule( ScheduledExecutorService getScheduledExecutorService(); - void setRedirectInfo( - IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode); + void setRedirectInfo(IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index 6ec87d089188..1da701a8e352 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -156,8 +156,7 @@ public ScheduledExecutorService getScheduledExecutorService() { } @Override - public void setRedirectInfo( - IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) { + public void setRedirectInfo(IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus) { Analysis analysis = (Analysis) iAnalysis; // Get the inner statement of PipeEnrichedStatement @@ -173,7 +172,7 @@ public void setRedirectInfo( if (insertStatement instanceof InsertRowsStatement || insertStatement instanceof InsertMultiTabletsStatement) { // multiple devices - if (statusCode == TSStatusCode.SUCCESS_STATUS) { + if (tsstatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { boolean needRedirect = false; List subStatus = new ArrayList<>(); for (TEndPoint endPoint : redirectNodeList) { From b54851d4dd244d441435edcc9adef6b941df5fd8 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 6 Jan 2026 15:59:54 +0800 Subject: [PATCH 08/10] fix reviewed issues --- .../process/DeviceViewIntoOperator.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 4a6e09e028d8..64f8872a1a81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -219,23 +219,31 @@ public long ramBytesUsed() { @Override protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) { - if (insertTabletStatementGenerators == null - || (needCheck && !existFullStatement(insertTabletStatementGenerators))) { + if (insertTabletStatementGenerators == null) { return null; } - - List insertTabletStatementList = new ArrayList<>(); - try { - if (child.hasNextWithTimer() - && batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()) { + boolean hasFullStatement = existFullStatement(insertTabletStatementGenerators); + if (needCheck) { + // When needCheck is true, we only proceed if there already exists a full statement. + if (!hasFullStatement) { return null; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IntoProcessException(e.getMessage()); - } catch (Exception e) { - throw new IntoProcessException(e.getMessage()); + } else { + // When needCheck is false, we may delay flushing to accumulate more rows + // if the batch is not yet at the configured row limit and the child has more data. + try { + if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit() + && child.hasNextWithTimer()) { + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IntoProcessException(e.getMessage(), e); + } catch (Exception e) { + throw new IntoProcessException(e.getMessage(), e); + } } + List insertTabletStatementList = new ArrayList<>(); for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { if (!generator.isEmpty()) { insertTabletStatementList.add(generator.constructInsertTabletStatement()); @@ -261,7 +269,6 @@ protected int findWritten(String device, String measurement) { if (writtenCountInCurrentGenerator >= 0) { return writtenCountInCurrentGenerator; } - continue; } return 0; } From 39a05fd9190c7789b28f25ddccb8cf7eab0b0223 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 6 Jan 2026 16:19:04 +0800 Subject: [PATCH 09/10] fix compile --- .../execution/operator/process/DeviceViewIntoOperator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 64f8872a1a81..51ab502fba3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -238,9 +238,9 @@ protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boole } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IntoProcessException(e.getMessage(), e); + throw new IntoProcessException(e.getMessage()); } catch (Exception e) { - throw new IntoProcessException(e.getMessage(), e); + throw new IntoProcessException(e.getMessage()); } } List insertTabletStatementList = new ArrayList<>(); From 7f4c0d37cb1389af5e504a88ea82c14da49ff4aa Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Tue, 6 Jan 2026 17:03:23 +0800 Subject: [PATCH 10/10] change to pipeline return style for IntoOperator --- .../process/AbstractIntoOperator.java | 7 +++- .../process/DeviceViewIntoOperator.java | 20 ++++++------ .../operator/process/IntoOperator.java | 32 ++++++++++++------- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java index 2e40f92a2bdf..7730dbe17f09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java @@ -38,6 +38,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.utils.Binary; @@ -82,18 +83,22 @@ public abstract class AbstractIntoOperator implements ProcessOperator { private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + protected final TsBlockBuilder resultTsBlockBuilder; + protected AbstractIntoOperator( OperatorContext operatorContext, Operator child, List inputColumnTypes, ExecutorService intoOperationExecutor, - long statementSizePerLine) { + long statementSizePerLine, + List outputDataTypes) { this.operatorContext = operatorContext; this.child = child; this.typeConvertors = inputColumnTypes.stream().map(TypeFactory::getType).collect(Collectors.toList()); this.writeOperationExecutor = intoOperationExecutor; + this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); initMemoryEstimates(statementSizePerLine); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 4a6e09e028d8..79da4f496fd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -36,7 +36,6 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -67,8 +66,6 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { private int batchedRowCount = 0; - private final TsBlockBuilder resultTsBlockBuilder; - @SuppressWarnings("squid:S107") public DeviceViewIntoOperator( OperatorContext operatorContext, @@ -82,18 +79,19 @@ public DeviceViewIntoOperator( Map sourceColumnToInputLocationMap, ExecutorService intoOperationExecutor, long statementSizePerLine) { - super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); + super( + operatorContext, + child, + inputColumnTypes, + intoOperationExecutor, + statementSizePerLine, + ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap; - - List outputDataTypes = - ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); - this.deviceColumnIndex = sourceColumnToInputLocationMap.get(ColumnHeaderConstant.DEVICE).getValueColumnIndex(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java index a30cab76ae0d..227b299c2d46 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java @@ -31,7 +31,6 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -49,6 +48,8 @@ public class IntoOperator extends AbstractIntoOperator { private final List> sourceTargetPathPairList; + private int outputIndex = 0; + @SuppressWarnings("squid:S107") public IntoOperator( OperatorContext operatorContext, @@ -60,7 +61,15 @@ public IntoOperator( List> sourceTargetPathPairList, ExecutorService intoOperationExecutor, long statementSizePerLine) { - super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); + super( + operatorContext, + child, + inputColumnTypes, + intoOperationExecutor, + statementSizePerLine, + ColumnHeaderConstant.selectIntoColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); this.sourceTargetPathPairList = sourceTargetPathPairList; insertTabletStatementGenerators = constructInsertTabletStatementGenerators( @@ -98,8 +107,9 @@ protected TsBlock tryToReturnResultTsBlock() { return null; } - finished = true; - return constructResultTsBlock(); + TsBlock res = constructResultTsBlock(); + finished = (outputIndex == sourceTargetPathPairList.size()); + return res; } @Override @@ -108,14 +118,12 @@ protected TsBlock tryToReturnPartialResult() { } private TsBlock constructResultTsBlock() { - List outputDataTypes = - ColumnHeaderConstant.selectIntoColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - for (Pair sourceTargetPathPair : sourceTargetPathPairList) { + for (int size = sourceTargetPathPairList.size(); + outputIndex < size && !resultTsBlockBuilder.isFull(); + outputIndex++) { + Pair sourceTargetPathPair = sourceTargetPathPairList.get(outputIndex); timeColumnBuilder.writeLong(0); columnBuilders[0].writeBinary( new Binary(sourceTargetPathPair.left, TSFileConfig.STRING_CHARSET)); @@ -126,7 +134,9 @@ private TsBlock constructResultTsBlock() { sourceTargetPathPair.right.getDevice(), sourceTargetPathPair.right.getMeasurement())); resultTsBlockBuilder.declarePosition(); } - return resultTsBlockBuilder.build(); + TsBlock res = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return res; } @Override