diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java index 3e013c898ba83..f4acd5257c9aa 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java @@ -623,89 +623,89 @@ public void testDataTypeIncompatible() { // test INT32 assertTestFail( "select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]"); assertTestFail( "select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]"); // test INT64 assertTestFail( "select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]"); // test FLOAT assertTestFail( "select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value 0.0]"); // test DOUBLE assertTestFail( "select s_double into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value 0.0]"); // test BOOLEAN assertTestFail( "select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "301: Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value true]"); // test TEXT assertTestFail( "select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value text0]"); } @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java index 99317ba7b09e5..7730dbe17f092 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java @@ -38,6 +38,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.utils.Binary; @@ -82,18 +83,22 @@ public abstract class AbstractIntoOperator implements ProcessOperator { private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + protected final TsBlockBuilder resultTsBlockBuilder; + protected AbstractIntoOperator( OperatorContext operatorContext, Operator child, List inputColumnTypes, ExecutorService intoOperationExecutor, - long statementSizePerLine) { + long statementSizePerLine, + List outputDataTypes) { this.operatorContext = operatorContext; this.child = child; this.typeConvertors = inputColumnTypes.stream().map(TypeFactory::getType).collect(Collectors.toList()); this.writeOperationExecutor = intoOperationExecutor; + this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); initMemoryEstimates(statementSizePerLine); } @@ -152,7 +157,7 @@ public TsBlock next() throws Exception { checkLastWriteOperation(); if (!processTsBlock(cachedTsBlock)) { - return null; + return tryToReturnPartialResult(); } cachedTsBlock = null; if (child.hasNextWithTimer()) { @@ -160,7 +165,7 @@ public TsBlock next() throws Exception { processTsBlock(inputTsBlock); // call child.next only once - return null; + return tryToReturnPartialResult(); } else { return tryToReturnResultTsBlock(); } @@ -218,6 +223,8 @@ private void checkLastWriteOperation() { protected abstract TsBlock tryToReturnResultTsBlock(); + protected abstract TsBlock tryToReturnPartialResult(); + protected static List constructInsertTabletStatementGenerators( Map> targetPathToSourceInputLocationMap, Map> targetPathToDataTypeMap, @@ -286,7 +293,7 @@ protected void executeInsertMultiTabletsStatement( () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor); } - private boolean existFullStatement( + protected boolean existFullStatement( List insertTabletStatementGenerators) { for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { if (generator.isFull()) { @@ -549,6 +556,10 @@ public String getDevice() { return devicePath.toString(); } + public int getRowCount() { + return rowCount; + } + public int getWrittenCount(String measurement) { if (!writtenCounter.containsKey(measurement)) { return -1; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index e584f79d1b65e..14ad2b4a3d46e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.queryengine.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.runtime.IntoProcessException; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; @@ -27,17 +30,18 @@ import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,6 +52,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeviceViewIntoOperator.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private final Map>> deviceToTargetPathSourceInputLocationMap; @@ -59,7 +64,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { private final int deviceColumnIndex; private String currentDevice; - private final TsBlockBuilder resultTsBlockBuilder; + private int batchedRowCount = 0; @SuppressWarnings("squid:S107") public DeviceViewIntoOperator( @@ -74,18 +79,19 @@ public DeviceViewIntoOperator( Map sourceColumnToInputLocationMap, ExecutorService intoOperationExecutor, long statementSizePerLine) { - super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); + super( + operatorContext, + child, + inputColumnTypes, + intoOperationExecutor, + statementSizePerLine, + ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap; - - List outputDataTypes = - ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); - this.deviceColumnIndex = sourceColumnToInputLocationMap.get(ColumnHeaderConstant.DEVICE).getValueColumnIndex(); } @@ -102,7 +108,12 @@ protected boolean processTsBlock(TsBlock inputTsBlock) { constructInsertMultiTabletsStatement(false); updateResultTsBlock(); - insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + if (insertMultiTabletsStatement != null || insertTabletStatementGenerators == null) { + insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + } else { + insertTabletStatementGenerators.addAll( + constructInsertTabletStatementGeneratorsByDevice(device)); + } currentDevice = device; if (insertMultiTabletsStatement != null) { @@ -115,9 +126,15 @@ protected boolean processTsBlock(TsBlock inputTsBlock) { int readIndex = 0; while (readIndex < inputTsBlock.getPositionCount()) { int lastReadIndex = readIndex; - for (AbstractIntoOperator.InsertTabletStatementGenerator generator : - insertTabletStatementGenerators) { - lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); + + if (!insertTabletStatementGenerators.isEmpty()) { + InsertTabletStatementGenerator generatorOfCurrentDevice = + insertTabletStatementGenerators.get(insertTabletStatementGenerators.size() - 1); + int rowCountBeforeProcess = generatorOfCurrentDevice.getRowCount(); + lastReadIndex = + Math.max( + lastReadIndex, generatorOfCurrentDevice.processTsBlock(inputTsBlock, readIndex)); + batchedRowCount += generatorOfCurrentDevice.getRowCount() - rowCountBeforeProcess; } readIndex = lastReadIndex; if (insertMultiTabletsInternally(true)) { @@ -144,6 +161,16 @@ protected TsBlock tryToReturnResultTsBlock() { return resultTsBlockBuilder.build(); } + @Override + protected TsBlock tryToReturnPartialResult() { + if (resultTsBlockBuilder.isFull()) { + TsBlock res = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return res; + } + return null; + } + private List constructInsertTabletStatementGeneratorsByDevice(String currentDevice) { Map> targetPathToSourceInputLocationMap = @@ -187,4 +214,60 @@ public long ramBytesUsed() { + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + resultTsBlockBuilder.getRetainedSizeInBytes(); } + + @Override + protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) { + if (insertTabletStatementGenerators == null) { + return null; + } + boolean hasFullStatement = existFullStatement(insertTabletStatementGenerators); + if (needCheck) { + // When needCheck is true, we only proceed if there already exists a full statement. + if (!hasFullStatement) { + return null; + } + } else { + // When needCheck is false, we may delay flushing to accumulate more rows + // if the batch is not yet at the configured row limit and the child has more data. + try { + if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit() + && child.hasNextWithTimer()) { + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IntoProcessException(e.getMessage()); + } catch (Exception e) { + throw new IntoProcessException(e.getMessage()); + } + } + List insertTabletStatementList = new ArrayList<>(); + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + if (!generator.isEmpty()) { + insertTabletStatementList.add(generator.constructInsertTabletStatement()); + } + } + if (insertTabletStatementList.isEmpty()) { + return null; + } + + InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); + insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); + batchedRowCount = 0; + return insertMultiTabletsStatement; + } + + @Override + protected int findWritten(String device, String measurement) { + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + if (!Objects.equals(generator.getDevice(), device)) { + continue; + } + int writtenCountInCurrentGenerator = generator.getWrittenCount(measurement); + if (writtenCountInCurrentGenerator >= 0) { + return writtenCountInCurrentGenerator; + } + } + return 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java index 5e516eeacfa02..227b299c2d46f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/IntoOperator.java @@ -31,7 +31,6 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -49,6 +48,8 @@ public class IntoOperator extends AbstractIntoOperator { private final List> sourceTargetPathPairList; + private int outputIndex = 0; + @SuppressWarnings("squid:S107") public IntoOperator( OperatorContext operatorContext, @@ -60,7 +61,15 @@ public IntoOperator( List> sourceTargetPathPairList, ExecutorService intoOperationExecutor, long statementSizePerLine) { - super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); + super( + operatorContext, + child, + inputColumnTypes, + intoOperationExecutor, + statementSizePerLine, + ColumnHeaderConstant.selectIntoColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); this.sourceTargetPathPairList = sourceTargetPathPairList; insertTabletStatementGenerators = constructInsertTabletStatementGenerators( @@ -98,19 +107,23 @@ protected TsBlock tryToReturnResultTsBlock() { return null; } - finished = true; - return constructResultTsBlock(); + TsBlock res = constructResultTsBlock(); + finished = (outputIndex == sourceTargetPathPairList.size()); + return res; + } + + @Override + protected TsBlock tryToReturnPartialResult() { + return null; } private TsBlock constructResultTsBlock() { - List outputDataTypes = - ColumnHeaderConstant.selectIntoColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - for (Pair sourceTargetPathPair : sourceTargetPathPairList) { + for (int size = sourceTargetPathPairList.size(); + outputIndex < size && !resultTsBlockBuilder.isFull(); + outputIndex++) { + Pair sourceTargetPathPair = sourceTargetPathPairList.get(outputIndex); timeColumnBuilder.writeLong(0); columnBuilders[0].writeBinary( new Binary(sourceTargetPathPair.left, TSFileConfig.STRING_CHARSET)); @@ -121,7 +134,9 @@ private TsBlock constructResultTsBlock() { sourceTargetPathPair.right.getDevice(), sourceTargetPathPair.right.getMeasurement())); resultTsBlockBuilder.declarePosition(); } - return resultTsBlockBuilder.build(); + TsBlock res = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return res; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index dab3163920588..e79fe9e3f4063 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2383,7 +2383,6 @@ private void analyzeInto( intoComponent.validate(sourceDevices, sourceColumns); DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new DeviceViewIntoPathDescriptor(); - PathPatternTree targetPathTree = new PathPatternTree(); IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator = intoComponent.getIntoDeviceMeasurementIterator(); for (PartialPath sourceDevice : sourceDevices) { @@ -2405,7 +2404,6 @@ private void analyzeInto( deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement( sourceDevice, targetDevice, sourceColumn.getExpressionString(), targetMeasurement); - targetPathTree.appendFullPath(targetDevice, targetMeasurement); deviceViewIntoPathDescriptor.recordSourceColumnDataType( sourceColumn.getExpressionString(), analysis.getType(sourceColumn)); @@ -2415,13 +2413,7 @@ private void analyzeInto( intoDeviceMeasurementIterator.nextDevice(); } deviceViewIntoPathDescriptor.validate(); - - // fetch schema of target paths - long startTime = System.nanoTime(); - ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, true, context); - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); - deviceViewIntoPathDescriptor.bindType(targetSchemaTree); + deviceViewIntoPathDescriptor.bindType(); analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor); } @@ -2445,7 +2437,6 @@ private void analyzeInto( intoComponent.validate(sourceColumns); IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor(); - PathPatternTree targetPathTree = new PathPatternTree(); IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator(); for (Pair pair : outputExpressions) { Expression sourceExpression = pair.left; @@ -2477,7 +2468,6 @@ private void analyzeInto( intoPathDescriptor.specifyDeviceAlignment( targetPath.getDevicePath().toString(), isAlignedDevice); - targetPathTree.appendFullPath(targetPath); intoPathDescriptor.recordSourceColumnDataType( sourceColumn, analysis.getType(sourceExpression)); @@ -2485,13 +2475,7 @@ private void analyzeInto( } intoPathDescriptor.validate(); - // fetch schema of target paths - long startTime = System.nanoTime(); - ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, true, context); - updateSchemaTreeByViews(analysis, targetSchemaTree, context); - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); - intoPathDescriptor.bindType(targetSchemaTree); + intoPathDescriptor.bindType(); analysis.setIntoPathDescriptor(intoPathDescriptor); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java index 2a64f06d18684..10056b8f3aa05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java @@ -21,12 +21,9 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.schema.view.LogicalViewSchema; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; -import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; @@ -37,7 +34,6 @@ import java.util.Map; import java.util.regex.Matcher; -import static com.google.common.base.Preconditions.checkState; import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS; import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN; import static org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString; @@ -134,48 +130,14 @@ public static boolean checkIsAllRawSeriesQuery(List expressions) { public static List> bindTypeForSourceTargetPathPairList( List> sourceTargetPathPairList, - Map sourceToDataTypeMap, - ISchemaTree targetSchemaTree) { + Map sourceToDataTypeMap) { List> sourceTypeBoundTargetPathPairList = new ArrayList<>(); for (Pair sourceTargetPathPair : sourceTargetPathPairList) { String sourceColumn = sourceTargetPathPair.left; TSDataType sourceColumnType = sourceToDataTypeMap.get(sourceColumn); - MeasurementPath targetPathWithSchema; PartialPath targetPath = sourceTargetPathPair.right; - List actualTargetPaths = - targetSchemaTree.searchMeasurementPaths(targetPath).left; - if (actualTargetPaths.isEmpty()) { - targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType); - } else { - checkState(actualTargetPaths.size() == 1); - MeasurementPath actualTargetPath = actualTargetPaths.get(0); - if (actualTargetPath.getMeasurementSchema().isLogicalView()) { - LogicalViewSchema viewSchema = - (LogicalViewSchema) actualTargetPath.getMeasurementSchema(); - if (viewSchema.isWritable()) { - MeasurementPath viewSourceSeriesPath = - targetSchemaTree - .searchMeasurementPaths(viewSchema.getSourcePathIfWritable()) - .left - .get(0); - actualTargetPath = - new MeasurementPath(targetPath, viewSourceSeriesPath.getSeriesType()); - actualTargetPath.setUnderAlignedEntity(viewSourceSeriesPath.isUnderAlignedEntity()); - } else { - throw new SemanticException( - String.format("View %s doesn't support data insertion.", targetPath)); - } - } - if (!TypeInferenceUtils.canAutoCast(sourceColumnType, actualTargetPath.getSeriesType())) { - throw new SemanticException( - String.format( - "The data type of target path (%s[%s]) is not compatible with the data type of source column (%s[%s]).", - targetPath, actualTargetPath.getSeriesType(), sourceColumn, sourceColumnType)); - } - // no need to check alignment, because the interface is common - targetPathWithSchema = actualTargetPath; - } + targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType); sourceTypeBoundTargetPathPairList.add(new Pair<>(sourceColumn, targetPathWithSchema)); } return sourceTypeBoundTargetPathPairList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 350dd6394d90a..0ae403452683a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -617,7 +617,7 @@ private ExecutionResult getExecutionResult(QueryState state) { // info to client if (!CONFIG.isEnable13DataInsertAdapt() || IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) { - planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus, statusCode); + planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus); } return new ExecutionResult(context.getQueryId(), tsstatus); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java index 616cca3efb750..b9b239b5e26e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; -import org.apache.iotdb.rpc.TSStatusCode; import java.util.concurrent.ScheduledExecutorService; @@ -49,6 +48,5 @@ IScheduler doSchedule( ScheduledExecutorService getScheduledExecutorService(); - void setRedirectInfo( - IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode); + void setRedirectInfo(IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index 6ec87d089188d..1da701a8e3529 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -156,8 +156,7 @@ public ScheduledExecutorService getScheduledExecutorService() { } @Override - public void setRedirectInfo( - IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) { + public void setRedirectInfo(IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus) { Analysis analysis = (Analysis) iAnalysis; // Get the inner statement of PipeEnrichedStatement @@ -173,7 +172,7 @@ public void setRedirectInfo( if (insertStatement instanceof InsertRowsStatement || insertStatement instanceof InsertMultiTabletsStatement) { // multiple devices - if (statusCode == TSStatusCode.SUCCESS_STATUS) { + if (tsstatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { boolean needRedirect = false; List subStatus = new ArrayList<>(); for (TEndPoint endPoint : redirectNodeList) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java index 4b43abe5d72aa..7197f4c7ca82b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils; import org.apache.tsfile.enums.TSDataType; @@ -100,7 +99,7 @@ public void validate() { } } - public void bindType(ISchemaTree targetSchemaTree) { + public void bindType() { Map>> deviceToSourceTypeBoundTargetPathPairListMap = new HashMap<>(); for (Map.Entry>> sourceTargetEntry : @@ -108,7 +107,7 @@ public void bindType(ISchemaTree targetSchemaTree) { deviceToSourceTypeBoundTargetPathPairListMap.put( sourceTargetEntry.getKey(), SelectIntoUtils.bindTypeForSourceTargetPathPairList( - sourceTargetEntry.getValue(), sourceToDataTypeMap, targetSchemaTree)); + sourceTargetEntry.getValue(), sourceToDataTypeMap)); } this.deviceToSourceTargetPathPairListMap = deviceToSourceTypeBoundTargetPathPairListMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java index 35765ee5c0797..29f04834f67dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils; import org.apache.commons.lang3.StringUtils; @@ -101,10 +100,10 @@ public void validate() { } } - public void bindType(ISchemaTree targetSchemaTree) { + public void bindType() { this.sourceTargetPathPairList = SelectIntoUtils.bindTypeForSourceTargetPathPairList( - sourceTargetPathPairList, sourceToDataTypeMap, targetSchemaTree); + sourceTargetPathPairList, sourceToDataTypeMap); } public List> getSourceTargetPathPairList() {