Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -623,89 +623,89 @@ public void testDataTypeIncompatible() {
// test INT32
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]");
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]");

// test INT64
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type INT64, timestamp 0, value 0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]");

// test FLOAT
assertTestFail(
"select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value 0.0]");

// test DOUBLE
assertTestFail(
"select s_double into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value 0.0]");

// test BOOLEAN
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"301: Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value true]");

// test TEXT
assertTestFail(
"select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value text0]");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.utils.Binary;
Expand Down Expand Up @@ -82,18 +83,22 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();

protected final TsBlockBuilder resultTsBlockBuilder;

protected AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
List<TSDataType> inputColumnTypes,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
long statementSizePerLine,
List<TSDataType> outputDataTypes) {
this.operatorContext = operatorContext;
this.child = child;
this.typeConvertors =
inputColumnTypes.stream().map(TypeFactory::getType).collect(Collectors.toList());

this.writeOperationExecutor = intoOperationExecutor;
this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
initMemoryEstimates(statementSizePerLine);
}

Expand Down Expand Up @@ -152,15 +157,15 @@ public TsBlock next() throws Exception {
checkLastWriteOperation();

if (!processTsBlock(cachedTsBlock)) {
return null;
return tryToReturnPartialResult();
}
cachedTsBlock = null;
if (child.hasNextWithTimer()) {
TsBlock inputTsBlock = child.nextWithTimer();
processTsBlock(inputTsBlock);

// call child.next only once
return null;
return tryToReturnPartialResult();
} else {
return tryToReturnResultTsBlock();
}
Expand Down Expand Up @@ -218,6 +223,8 @@ private void checkLastWriteOperation() {

protected abstract TsBlock tryToReturnResultTsBlock();

protected abstract TsBlock tryToReturnPartialResult();

protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
Expand Down Expand Up @@ -286,7 +293,7 @@ protected void executeInsertMultiTabletsStatement(
() -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
}

private boolean existFullStatement(
protected boolean existFullStatement(
List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
if (generator.isFull()) {
Expand Down Expand Up @@ -549,6 +556,10 @@ public String getDevice() {
return devicePath.toString();
}

public int getRowCount() {
return rowCount;
}

public int getWrittenCount(String measurement) {
if (!writtenCounter.containsKey(measurement)) {
return -1;
Expand Down
Loading
Loading