From e6bfb7b6c23882f492ce7ef1b6107efe50b6ebbc Mon Sep 17 00:00:00 2001 From: Jackie Tien Date: Tue, 6 Jan 2026 23:10:13 +0800 Subject: [PATCH 1/6] [From dev/1.3] Improve DeviceViewIntoOperator's return style to pipeline (#16980) --- .../db/it/selectinto/IoTDBSelectIntoIT.java | 50 +++++----- .../process/AbstractIntoOperator.java | 6 +- .../process/AbstractTreeIntoOperator.java | 8 +- .../process/DeviceViewIntoOperator.java | 95 ++++++++++++++++--- .../InsertTabletStatementGenerator.java | 4 + .../operator/process/TableIntoOperator.java | 5 + .../operator/process/TreeIntoOperator.java | 37 +++++--- .../plan/analyze/AnalyzeVisitor.java | 21 +--- .../plan/analyze/SelectIntoUtils.java | 42 +------- .../DeviceViewIntoPathDescriptor.java | 5 +- .../plan/parameter/IntoPathDescriptor.java | 5 +- 11 files changed, 160 insertions(+), 118 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java index a728ac49e699..6e5a79fcf9b5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java @@ -644,89 +644,89 @@ public void testDataTypeIncompatible() { // test INT32 assertTestFail( "select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]"); assertTestFail( "select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]"); // test INT64 assertTestFail( "select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]"); assertTestFail( "select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]"); // test FLOAT assertTestFail( "select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]"); assertTestFail( "select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value 0.0]"); // test DOUBLE assertTestFail( "select s_double into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]"); assertTestFail( "select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value 0.0]"); // test BOOLEAN assertTestFail( "select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "301: Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value true]"); // test TEXT assertTestFail( "select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value text0]"); assertTestFail( "select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", - "The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT])."); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value text0]"); } @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java index 444c50019753..425449469b8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java @@ -102,7 +102,7 @@ public TsBlock next() throws Exception { checkLastWriteOperation(); if (!processTsBlock(cachedTsBlock)) { - return null; + return tryToReturnPartialResult(); } cachedTsBlock = null; if (child.hasNextWithTimer()) { @@ -110,7 +110,7 @@ public TsBlock next() throws Exception { processTsBlock(inputTsBlock); // call child.next only once - return null; + return tryToReturnPartialResult(); } else { return tryToReturnResultTsBlock(); } @@ -204,6 +204,8 @@ protected void checkLastWriteOperation() { protected abstract TsBlock tryToReturnResultTsBlock(); + protected abstract TsBlock tryToReturnPartialResult(); + protected abstract void resetInsertTabletStatementGenerators(); private void setMaxRowNumberInStatement(long statementSizePerLine) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java index a72907b977b2..0d15c72cfa1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractTreeIntoOperator.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.Futures; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlockBuilder; import java.util.ArrayList; import java.util.List; @@ -39,15 +40,18 @@ public abstract class AbstractTreeIntoOperator extends AbstractIntoOperator { protected List insertTabletStatementGenerators; + protected final TsBlockBuilder resultTsBlockBuilder; protected AbstractTreeIntoOperator( OperatorContext operatorContext, Operator child, List inputColumnTypes, ExecutorService intoOperationExecutor, - long statementSizePerLine) { + long statementSizePerLine, + List outputDataTypes) { super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); } protected static List constructInsertTabletStatementGenerators( @@ -134,7 +138,7 @@ protected void executeInsertMultiTabletsStatement( () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor); } - private boolean existFullStatement( + protected boolean existFullStatement( List insertTabletStatementGenerators) { for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { if (generator.isFull()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 6321a52914b6..03acc28d5d07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -22,22 +22,26 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.runtime.IntoProcessException; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,6 +52,7 @@ public class DeviceViewIntoOperator extends AbstractTreeIntoOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeviceViewIntoOperator.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private final Map>> deviceToTargetPathSourceInputLocationMap; @@ -59,7 +64,7 @@ public class DeviceViewIntoOperator extends AbstractTreeIntoOperator { private final int deviceColumnIndex; private String currentDevice; - private final TsBlockBuilder resultTsBlockBuilder; + private int batchedRowCount = 0; @SuppressWarnings("squid:S107") public DeviceViewIntoOperator( @@ -74,18 +79,19 @@ public DeviceViewIntoOperator( Map sourceColumnToInputLocationMap, ExecutorService intoOperationExecutor, long statementSizePerLine) { - super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); + super( + operatorContext, + child, + inputColumnTypes, + intoOperationExecutor, + statementSizePerLine, + ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap; - - List outputDataTypes = - ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); - this.deviceColumnIndex = sourceColumnToInputLocationMap.get(ColumnHeaderConstant.DEVICE).getValueColumnIndex(); } @@ -102,7 +108,12 @@ protected boolean processTsBlock(TsBlock inputTsBlock) { constructInsertMultiTabletsStatement(false); updateResultTsBlock(); - insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + if (insertMultiTabletsStatement != null || insertTabletStatementGenerators == null) { + insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + } else { + insertTabletStatementGenerators.addAll( + constructInsertTabletStatementGeneratorsByDevice(device)); + } currentDevice = device; if (insertMultiTabletsStatement != null) { @@ -115,8 +126,14 @@ protected boolean processTsBlock(TsBlock inputTsBlock) { int readIndex = 0; while (readIndex < inputTsBlock.getPositionCount()) { int lastReadIndex = readIndex; - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - lastReadIndex = Math.max(lastReadIndex, generator.processTsBlock(inputTsBlock, readIndex)); + if (!insertTabletStatementGenerators.isEmpty()) { + InsertTabletStatementGenerator generatorOfCurrentDevice = + insertTabletStatementGenerators.get(insertTabletStatementGenerators.size() - 1); + int rowCountBeforeProcess = generatorOfCurrentDevice.getRowCount(); + lastReadIndex = + Math.max( + lastReadIndex, generatorOfCurrentDevice.processTsBlock(inputTsBlock, readIndex)); + batchedRowCount += generatorOfCurrentDevice.getRowCount() - rowCountBeforeProcess; } readIndex = lastReadIndex; if (insertMultiTabletsInternally(true)) { @@ -143,6 +160,16 @@ protected TsBlock tryToReturnResultTsBlock() { return resultTsBlockBuilder.build(); } + @Override + protected TsBlock tryToReturnPartialResult() { + if (resultTsBlockBuilder.isFull()) { + TsBlock res = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return res; + } + return null; + } + private List constructInsertTabletStatementGeneratorsByDevice( String currentDevice) { Map> targetPathToSourceInputLocationMap = @@ -192,4 +219,46 @@ public long ramBytesUsed() { .mapToLong(InsertTabletStatementGenerator::ramBytesUsed) .sum()); } + + @Override + protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boolean needCheck) { + if (insertTabletStatementGenerators == null) { + return null; + } + boolean hasFullStatement = existFullStatement(insertTabletStatementGenerators); + if (needCheck) { + // When needCheck is true, we only proceed if there already exists a full statement. + if (!hasFullStatement) { + return null; + } + } else { + // When needCheck is false, we may delay flushing to accumulate more rows + // if the batch is not yet at the configured row limit and the child has more data. + try { + if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit() + && child.hasNextWithTimer()) { + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IntoProcessException(e.getMessage()); + } catch (Exception e) { + throw new IntoProcessException(e.getMessage()); + } + } + List insertTabletStatementList = new ArrayList<>(); + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + if (!generator.isEmpty()) { + insertTabletStatementList.add(generator.constructInsertTabletStatement()); + } + } + if (insertTabletStatementList.isEmpty()) { + return null; + } + + InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); + insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); + batchedRowCount = 0; + return insertMultiTabletsStatement; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java index b2fae4c25b10..47ff31573e3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java @@ -169,6 +169,10 @@ public boolean isEmpty() { return rowCount == 0; } + public int getRowCount() { + return rowCount; + } + public String getDevice() { return devicePath.toString(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java index d9af3229ea2e..40b1781b69d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java @@ -114,6 +114,11 @@ protected TsBlock tryToReturnResultTsBlock() { return constructResultTsBlock(); } + @Override + protected TsBlock tryToReturnPartialResult() { + return null; + } + @Override public long ramBytesUsed() { return INSTANCE_SIZE diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java index 32838483cef0..bed98c420864 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TreeIntoOperator.java @@ -31,7 +31,6 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -49,6 +48,8 @@ public class TreeIntoOperator extends AbstractTreeIntoOperator { private final List> sourceTargetPathPairList; + private int outputIndex = 0; + @SuppressWarnings("squid:S107") public TreeIntoOperator( OperatorContext operatorContext, @@ -60,7 +61,15 @@ public TreeIntoOperator( List> sourceTargetPathPairList, ExecutorService intoOperationExecutor, long statementSizePerLine) { - super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine); + super( + operatorContext, + child, + inputColumnTypes, + intoOperationExecutor, + statementSizePerLine, + ColumnHeaderConstant.selectIntoColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); this.sourceTargetPathPairList = sourceTargetPathPairList; insertTabletStatementGenerators = constructInsertTabletStatementGenerators( @@ -98,19 +107,23 @@ protected TsBlock tryToReturnResultTsBlock() { return null; } - finished = true; - return constructResultTsBlock(); + TsBlock res = constructResultTsBlock(); + finished = (outputIndex == sourceTargetPathPairList.size()); + return res; + } + + @Override + protected TsBlock tryToReturnPartialResult() { + return null; } private TsBlock constructResultTsBlock() { - List outputDataTypes = - ColumnHeaderConstant.selectIntoColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - for (Pair sourceTargetPathPair : sourceTargetPathPairList) { + for (int size = sourceTargetPathPairList.size(); + outputIndex < size && !resultTsBlockBuilder.isFull(); + outputIndex++) { + Pair sourceTargetPathPair = sourceTargetPathPairList.get(outputIndex); timeColumnBuilder.writeLong(0); columnBuilders[0].writeBinary( new Binary(sourceTargetPathPair.left, TSFileConfig.STRING_CHARSET)); @@ -122,7 +135,9 @@ private TsBlock constructResultTsBlock() { sourceTargetPathPair.right.getMeasurement())); resultTsBlockBuilder.declarePosition(); } - return resultTsBlockBuilder.build(); + TsBlock res = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); + return res; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 4cae2c0f2884..32bc088ff6e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2276,7 +2276,6 @@ private void analyzeInto( intoComponent.validate(sourceDevices, sourceColumns); DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new DeviceViewIntoPathDescriptor(); - PathPatternTree targetPathTree = new PathPatternTree(); IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator = intoComponent.getIntoDeviceMeasurementIterator(); for (PartialPath sourceDevice : sourceDevices) { @@ -2301,7 +2300,6 @@ private void analyzeInto( deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement( sourceDevice, targetDevice, sourceColumn.getExpressionString(), targetMeasurement); - targetPathTree.appendFullPath(targetDevice, targetMeasurement); deviceViewIntoPathDescriptor.recordSourceColumnDataType( sourceColumn.getExpressionString(), analysis.getType(sourceColumn)); @@ -2311,13 +2309,7 @@ private void analyzeInto( intoDeviceMeasurementIterator.nextDevice(); } deviceViewIntoPathDescriptor.validate(); - - // fetch schema of target paths - long startTime = System.nanoTime(); - ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, true, context, false); - QueryPlanCostMetricSet.getInstance() - .recordTreePlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); - deviceViewIntoPathDescriptor.bindType(targetSchemaTree); + deviceViewIntoPathDescriptor.bindType(); analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor); } @@ -2341,7 +2333,6 @@ private void analyzeInto( intoComponent.validate(sourceColumns); IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor(); - PathPatternTree targetPathTree = new PathPatternTree(); IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator(); for (Pair pair : outputExpressions) { Expression sourceExpression = pair.left; @@ -2381,21 +2372,13 @@ private void analyzeInto( intoPathDescriptor.specifyDeviceAlignment( targetPath.getDevicePath().toString(), isAlignedDevice); - targetPathTree.appendFullPath(targetPath); intoPathDescriptor.recordSourceColumnDataType( sourceColumn, analysis.getType(sourceExpression)); intoPathIterator.next(); } intoPathDescriptor.validate(); - - // fetch schema of target paths - long startTime = System.nanoTime(); - ISchemaTree targetSchemaTree = schemaFetcher.fetchSchema(targetPathTree, true, context, false); - updateSchemaTreeByViews(analysis, targetSchemaTree, context, false); - QueryPlanCostMetricSet.getInstance() - .recordTreePlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); - intoPathDescriptor.bindType(targetSchemaTree); + intoPathDescriptor.bindType(); analysis.setIntoPathDescriptor(intoPathDescriptor); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java index 9245d2eb6343..7e23b0b4590f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java @@ -21,12 +21,9 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.schema.view.LogicalViewSchema; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; -import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; @@ -37,7 +34,6 @@ import java.util.Map; import java.util.regex.Matcher; -import static com.google.common.base.Preconditions.checkState; import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS; import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN; import static org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString; @@ -134,48 +130,14 @@ public static boolean checkIsAllRawSeriesQuery(List expressions) { public static List> bindTypeForSourceTargetPathPairList( List> sourceTargetPathPairList, - Map sourceToDataTypeMap, - ISchemaTree targetSchemaTree) { + Map sourceToDataTypeMap) { List> sourceTypeBoundTargetPathPairList = new ArrayList<>(); for (Pair sourceTargetPathPair : sourceTargetPathPairList) { String sourceColumn = sourceTargetPathPair.left; TSDataType sourceColumnType = sourceToDataTypeMap.get(sourceColumn); - MeasurementPath targetPathWithSchema; PartialPath targetPath = sourceTargetPathPair.right; - List actualTargetPaths = - targetSchemaTree.searchMeasurementPaths(targetPath).left; - if (actualTargetPaths.isEmpty()) { - targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType); - } else { - checkState(actualTargetPaths.size() == 1); - MeasurementPath actualTargetPath = actualTargetPaths.get(0); - if (actualTargetPath.getMeasurementSchema().isLogicalView()) { - LogicalViewSchema viewSchema = - (LogicalViewSchema) actualTargetPath.getMeasurementSchema(); - if (viewSchema.isWritable()) { - MeasurementPath viewSourceSeriesPath = - targetSchemaTree - .searchMeasurementPaths(viewSchema.getSourcePathIfWritable()) - .left - .get(0); - actualTargetPath = - new MeasurementPath(targetPath, viewSourceSeriesPath.getSeriesType()); - actualTargetPath.setUnderAlignedEntity(viewSourceSeriesPath.isUnderAlignedEntity()); - } else { - throw new SemanticException( - String.format("View %s doesn't support data insertion.", targetPath)); - } - } - if (!TypeInferenceUtils.canAutoCast(sourceColumnType, actualTargetPath.getSeriesType())) { - throw new SemanticException( - String.format( - "The data type of target path (%s[%s]) is not compatible with the data type of source column (%s[%s]).", - targetPath, actualTargetPath.getSeriesType(), sourceColumn, sourceColumnType)); - } - // no need to check alignment, because the interface is common - targetPathWithSchema = actualTargetPath; - } + targetPathWithSchema = new MeasurementPath(targetPath, sourceColumnType); sourceTypeBoundTargetPathPairList.add(new Pair<>(sourceColumn, targetPathWithSchema)); } return sourceTypeBoundTargetPathPairList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java index eed727f8135d..a34bf2e1b79d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils; import org.apache.tsfile.enums.TSDataType; @@ -100,7 +99,7 @@ public void validate() { } } - public void bindType(ISchemaTree targetSchemaTree) { + public void bindType() { Map>> deviceToSourceTypeBoundTargetPathPairListMap = new HashMap<>(); for (Map.Entry>> sourceTargetEntry : @@ -108,7 +107,7 @@ public void bindType(ISchemaTree targetSchemaTree) { deviceToSourceTypeBoundTargetPathPairListMap.put( sourceTargetEntry.getKey(), SelectIntoUtils.bindTypeForSourceTargetPathPairList( - sourceTargetEntry.getValue(), sourceToDataTypeMap, targetSchemaTree)); + sourceTargetEntry.getValue(), sourceToDataTypeMap)); } this.deviceToSourceTargetPathPairListMap = deviceToSourceTypeBoundTargetPathPairListMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java index bfb03b4ea5ae..5dfc1e504280 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/IntoPathDescriptor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.exception.sql.SemanticException; -import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; import org.apache.iotdb.db.queryengine.plan.analyze.SelectIntoUtils; import org.apache.tsfile.enums.TSDataType; @@ -101,10 +100,10 @@ public void validate() { } } - public void bindType(ISchemaTree targetSchemaTree) { + public void bindType() { this.sourceTargetPathPairList = SelectIntoUtils.bindTypeForSourceTargetPathPairList( - sourceTargetPathPairList, sourceToDataTypeMap, targetSchemaTree); + sourceTargetPathPairList, sourceToDataTypeMap); } public List> getSourceTargetPathPairList() { From a38228c0638c224194dc77113efa3e413d32dd12 Mon Sep 17 00:00:00 2001 From: shizy Date: Wed, 7 Jan 2026 20:41:07 +0800 Subject: [PATCH 2/6] DeviceViewIntoOperatorTest & TreeIntoOperatorTest --- .../operator/DeviceViewIntoOperatorTest.java | 487 ++++++++++++++++++ .../operator/TreeIntoOperatorTest.java | 303 +++++++++++ 2 files changed, 790 insertions(+) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java new file mode 100644 index 000000000000..f7fe5fcf367d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.IFullPath; +import org.apache.iotdb.commons.path.NonAlignedFullPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class DeviceViewIntoOperatorTest { + + private static final String TEST_SG = "root.test"; + + private Map>> + deviceToTargetPathSourceInputLocationMap; + private Map>> deviceToTargetPathDataTypeMap; + private Map targetDeviceToAlignedMap; + private Map>> deviceToSourceTargetPathPairListMap; + private Map sourceColumnToInputLocationMap; + private List inputColumnTypes; + private List deviceColumnIndex; + + private final List deviceIds = new ArrayList<>(); + private final List measurementSchemas = new ArrayList<>(); + private final List seqResources = new ArrayList<>(); + private final List unSeqResources = new ArrayList<>(); + + private DeviceViewIntoOperator operator; + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0); + IoTDBDescriptor.getInstance().getConfig().setSelectIntoInsertTabletPlanRowLimit(4); + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(512); + + deviceToTargetPathSourceInputLocationMap = new HashMap<>(); + deviceToTargetPathDataTypeMap = new HashMap<>(); + targetDeviceToAlignedMap = new HashMap<>(); + deviceToSourceTargetPathPairListMap = new HashMap<>(); + sourceColumnToInputLocationMap = new HashMap<>(); + inputColumnTypes = new ArrayList<>(); + deviceColumnIndex = new ArrayList<>(); + + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unSeqResources, TEST_SG); + } + + @After + public void tearDown() throws Exception { + SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); + } + + private DeviceViewIntoOperator createAndInitOperatorForSingleDevices(int sensorNum) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + + // Create SeriesScanOperator for each sensor + List scanOperators = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + List columnMergers = new ArrayList<>(); + + for (int i = 0; i < sensorNum; i++) { + scanOperators.add( + createSeriesScanOperator(driverContext, i, "device0", i, measurementSchemas.get(i))); + dataTypes.add(TSDataType.INT32); + columnMergers.add(new SingleColumnMerger(new InputLocation(i, 0), new AscTimeComparator())); + } + + FullOuterTimeJoinOperator timeJoinOperator = + createTimeJoinOperator(driverContext, sensorNum, scanOperators, dataTypes, columnMergers); + + // Prepare data types for SingleDeviceViewOperator (device column + sensor columns) + inputColumnTypes.add(TSDataType.TEXT); // Device column + for (int i = 0; i < sensorNum; i++) { + deviceColumnIndex.add(i + 1); + inputColumnTypes.add(TSDataType.INT32); // Sensor columns + } + + SingleDeviceViewOperator singleDeviceViewOperator = + new SingleDeviceViewOperator( + addOperatorContext(driverContext, sensorNum + 1, SingleDeviceViewOperator.class), + TEST_SG + ".device0", + timeJoinOperator, + deviceColumnIndex, + inputColumnTypes); + + return createTestDeviceViewIntoOperator( + driverContext, + sensorNum + 2, + singleDeviceViewOperator, + inputColumnTypes, + instanceNotificationExecutor); + } + + private DeviceViewIntoOperator createAndInitOperatorForMultipleDevices( + int deviceNum, int sensorNum) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + + List devices = new ArrayList<>(); + List deviceOperators = new ArrayList<>(); + List> deviceColumnIndexes = new ArrayList<>(); + + List singleDeviceColumnIndex = new ArrayList<>(); + for (int i = 0; i < sensorNum; i++) { + singleDeviceColumnIndex.add(i + 1); + } + + int operatorIndex = 0; + for (int deviceIdx = 0; deviceIdx < deviceNum; deviceIdx++) { + String device = "device" + deviceIdx; + + List scanOperators = new ArrayList<>(); + List scanDataTypes = new ArrayList<>(); + List columnMergers = new ArrayList<>(); + + for (int sensorIdx = 0; sensorIdx < sensorNum; sensorIdx++) { + scanOperators.add( + createSeriesScanOperator( + driverContext, + operatorIndex, + device, + sensorIdx, + measurementSchemas.get(sensorIdx))); + scanDataTypes.add(TSDataType.INT32); + columnMergers.add( + new SingleColumnMerger(new InputLocation(sensorIdx, 0), new AscTimeComparator())); + operatorIndex++; + } + + FullOuterTimeJoinOperator timeJoinOperator = + createTimeJoinOperator( + driverContext, operatorIndex, scanOperators, scanDataTypes, columnMergers); + operatorIndex++; + + devices.add(IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + "." + device)); + deviceOperators.add(timeJoinOperator); + deviceColumnIndexes.add(singleDeviceColumnIndex); + } + + List dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.TEXT); // Device column + for (int i = 0; i < sensorNum; i++) { + dataTypes.add(TSDataType.INT32); // Sensor columns + } + + DeviceViewOperator deviceViewOperator = + new DeviceViewOperator( + addOperatorContext(driverContext, operatorIndex, DeviceViewOperator.class), + devices, + deviceOperators, + deviceColumnIndexes, + dataTypes); + + inputColumnTypes.add(TSDataType.TEXT); // Device column + for (int i = 0; i < sensorNum; i++) { + deviceColumnIndex.add(i + 1); + inputColumnTypes.add(TSDataType.INT32); // Sensor columns + } + + return createTestDeviceViewIntoOperator( + driverContext, + operatorIndex + 1, + deviceViewOperator, + inputColumnTypes, + instanceNotificationExecutor); + } + + /** Test scenario 1: single device with small amount of data, should return in single TsBlock */ + @Test + public void testSingleDeviceSmallData() throws Exception { + prepareDeviceData("device0", 2); + operator = createAndInitOperatorForSingleDevices(2); + + TsBlock result = null; + while (operator.isBlocked().isDone() && operator.hasNext()) { + result = operator.next(); + } + assertNotNull(result); + assertEquals(2, result.getPositionCount()); + + // Verify 4 value columns (device, source, target, count) + assertEquals(4, result.getValueColumnCount()); + assertTrue(operator.isFinished()); + } + + /** Test scenario 2: Single device with result set exceeds maxTsBlockSize */ + @Test + public void testSingleDeviceExceedsMaxTsBlockSize() throws Exception { + prepareDeviceData("device0", 10); + operator = createAndInitOperatorForSingleDevices(10); + + TsBlock result = null; + while (operator.isBlocked().isDone() && operator.hasNext()) { + result = operator.next(); + } + assertNotNull(result); + assertEquals(10, result.getPositionCount()); + + // Verify 4 value columns (device, source, target, count) + assertEquals(4, result.getValueColumnCount()); + assertTrue(operator.isFinished()); + } + + /** Test scenario 3: Multiple device with small amount of data */ + @Test + public void testMultipleDeviceSmallData() throws Exception { + prepareDeviceData("device0", 1); + prepareDeviceData("device1", 1); + operator = createAndInitOperatorForMultipleDevices(2, 1); + + TsBlock result = null; + while (operator.isBlocked().isDone() && operator.hasNext()) { + result = operator.next(); + } + assertNotNull(result); + assertEquals(2, result.getPositionCount()); + + // Verify 4 value columns (device, source, target, count) + assertEquals(4, result.getValueColumnCount()); + assertTrue(operator.isFinished()); + } + + /** Test scenario 4: Multiple devices, total size exceeds maxTsBlockSize */ + @Test + public void testMultipleDevicesExceedsTsBlockSize() throws Exception { + prepareDeviceData("device0", 2); + prepareDeviceData("device1", 2); + prepareDeviceData("device2", 2); + operator = createAndInitOperatorForMultipleDevices(3, 2); + + int totalRows = 0; + int totalBatches = 0; + // Loop through all batches + while (operator.isBlocked().isDone() && operator.hasNext()) { + TsBlock result = operator.next(); + if (result != null && !result.isEmpty()) { + totalRows += result.getPositionCount(); + totalBatches += 1; + } + } + + assertEquals(6, totalRows); + assertEquals(2, totalBatches); + assertTrue(operator.isFinished()); + } + + /** + * Helper method: Prepare test data for specified device + * + * @param device Device name + * @param sensorNum Number of path pairs for this device + */ + private void prepareDeviceData(String device, int sensorNum) throws Exception { + String sourceDevicePath = TEST_SG + "." + device; + String targetDevicePath = TEST_SG + ".new_" + device; + + Map> targetPathToSourceInputMap = + deviceToTargetPathSourceInputLocationMap.computeIfAbsent( + sourceDevicePath, k -> new HashMap<>()); + Map> targetDataTypeMap = + deviceToTargetPathDataTypeMap.computeIfAbsent(sourceDevicePath, k -> new HashMap<>()); + List> pairList = + deviceToSourceTargetPathPairListMap.computeIfAbsent( + sourceDevicePath, k -> new ArrayList<>()); + + Map columnToInputLocationMap = new HashMap<>(); + Map dataTypeMap = new HashMap<>(); + + columnToInputLocationMap.put(ColumnHeaderConstant.DEVICE, new InputLocation(0, 0)); + dataTypeMap.put(ColumnHeaderConstant.DEVICE, TSDataType.TEXT); + sourceColumnToInputLocationMap.put(ColumnHeaderConstant.DEVICE, new InputLocation(0, 0)); + + for (int i = 0; i < sensorNum; i++) { + String targetMeasurement = "sensor" + i; + String targetPath = targetDevicePath + "." + targetMeasurement; + PartialPath targetPartialPath = new PartialPath(targetPath); + + pairList.add(new Pair<>(targetMeasurement, targetPartialPath)); + columnToInputLocationMap.put(targetMeasurement, new InputLocation(0, i + 1)); + dataTypeMap.put(targetMeasurement, TSDataType.INT32); + + sourceColumnToInputLocationMap.put(targetMeasurement, new InputLocation(0, i + 1)); + } + PartialPath targetDevicePartialPath = new PartialPath(targetDevicePath); + targetPathToSourceInputMap.put(targetDevicePartialPath, columnToInputLocationMap); + targetDataTypeMap.put(targetDevicePartialPath, dataTypeMap); + + targetDeviceToAlignedMap.put(targetDevicePath, false); + } + + private SeriesScanOperator createSeriesScanOperator( + DriverContext driverContext, + int index, + String device, + int sensorIdx, + IMeasurementSchema measurementSchema) { + PlanNodeId planNodeId = new PlanNodeId(String.valueOf(index)); + driverContext.addOperatorContext(index, planNodeId, SeriesScanOperator.class.getSimpleName()); + + Set allSensors = new HashSet<>(); + allSensors.add("sensor" + sensorIdx); + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(allSensors); + + IFullPath measurementPath = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + "." + device), measurementSchema); + + SeriesScanOperator seriesScanOperator = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(index), + planNodeId, + measurementPath, + Ordering.ASC, + scanOptionsBuilder.build()); + seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + return seriesScanOperator; + } + + private FullOuterTimeJoinOperator createTimeJoinOperator( + DriverContext driverContext, + int index, + List scanOperators, + List dataTypes, + List columnMergers) { + addOperatorContext(driverContext, index, FullOuterTimeJoinOperator.class); + return new FullOuterTimeJoinOperator( + driverContext.getOperatorContexts().get(index), + scanOperators, + Ordering.ASC, + dataTypes, + columnMergers, + new AscTimeComparator()); + } + + private OperatorContext addOperatorContext( + DriverContext driverContext, int index, Class operatorClass) { + driverContext.addOperatorContext( + index, new PlanNodeId(String.valueOf(index)), operatorClass.getSimpleName()); + return driverContext.getOperatorContexts().get(index); + } + + private DeviceViewIntoOperator createTestDeviceViewIntoOperator( + DriverContext driverContext, + int index, + Operator child, + List types, + ExecutorService executor) { + addOperatorContext(driverContext, index, TestDeviceViewIntoOperator.class); + return new TestDeviceViewIntoOperator( + driverContext.getOperatorContexts().get(index), + child, + types, + deviceToTargetPathSourceInputLocationMap, + deviceToTargetPathDataTypeMap, + targetDeviceToAlignedMap, + deviceToSourceTargetPathPairListMap, + sourceColumnToInputLocationMap, + executor, + 100); + } + + /** + * Test version of DeviceViewIntoOperator that mocks the write operation. Instead of actually + * executing insertTablets, it returns an immediately completed Future. + */ + private static class TestDeviceViewIntoOperator extends DeviceViewIntoOperator { + + public TestDeviceViewIntoOperator( + OperatorContext operatorContext, + Operator child, + List inputColumnTypes, + Map>> + deviceToTargetPathSourceInputLocationMap, + Map>> deviceToTargetPathDataTypeMap, + Map targetDeviceToAlignedMap, + Map>> deviceToSourceTargetPathPairListMap, + Map sourceColumnToInputLocationMap, + ExecutorService intoOperationExecutor, + long statementSizePerLine) { + super( + operatorContext, + child, + inputColumnTypes, + deviceToTargetPathSourceInputLocationMap, + deviceToTargetPathDataTypeMap, + targetDeviceToAlignedMap, + deviceToSourceTargetPathPairListMap, + sourceColumnToInputLocationMap, + intoOperationExecutor, + statementSizePerLine); + } + + @Override + protected void executeInsertMultiTabletsStatement( + InsertMultiTabletsStatement insertMultiTabletsStatement) { + // Mock the write operation by setting an immediately completed Future + writeOperationFuture = immediateFuture(SUCCESS_STATUS); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java new file mode 100644 index 000000000000..4b4d08f558eb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.IFullPath; +import org.apache.iotdb.commons.path.NonAlignedFullPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.process.TreeIntoOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TreeIntoOperatorTest { + + private static final String TEST_SG = "root.test"; + + private List> sourceTargetPathPairList; + private Map> targetPathToSourceInputLocationMap; + private Map> targetPathToDataTypeMap; + private Map targetDeviceToAlignedMap; + private List inputColumnTypes; + + private final List deviceIds = new ArrayList<>(); + private final List measurementSchemas = new ArrayList<>(); + private final List seqResources = new ArrayList<>(); + private final List unSeqResources = new ArrayList<>(); + + private TreeIntoOperator operator; + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0); + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(512); + + sourceTargetPathPairList = new ArrayList<>(); + targetPathToSourceInputLocationMap = new HashMap<>(); + targetPathToDataTypeMap = new HashMap<>(); + targetDeviceToAlignedMap = new HashMap<>(); + inputColumnTypes = new ArrayList<>(); + + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unSeqResources, TEST_SG); + } + + @After + public void tearDown() throws Exception { + SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); + } + + /** + * Common setup method to create and initialize TreeIntoOperator with SeriesScanOperator as child. + */ + private TreeIntoOperator createAndInitOperator(int sensorNum) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + + // Create SeriesScanOperator for each sensor + List scanOperators = new ArrayList<>(); + List dataTypes = new ArrayList<>(); + List columnMergers = new ArrayList<>(); + + for (int i = 0; i < sensorNum; i++) { + PlanNodeId planNodeId = new PlanNodeId(String.valueOf(i)); + driverContext.addOperatorContext(i, planNodeId, SeriesScanOperator.class.getSimpleName()); + + Set allSensors = new HashSet<>(); + allSensors.add("sensor" + i); + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(allSensors); + + IFullPath measurementPath = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(TEST_SG + ".device0"), + measurementSchemas.get(i)); + + SeriesScanOperator seriesScanOperator = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(i), + planNodeId, + measurementPath, + Ordering.ASC, + scanOptionsBuilder.build()); + seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + scanOperators.add(seriesScanOperator); + dataTypes.add(TSDataType.INT32); + columnMergers.add(new SingleColumnMerger(new InputLocation(i, 0), new AscTimeComparator())); + } + + // Add context for FullOuterTimeJoinOperator + driverContext.addOperatorContext( + sensorNum, + new PlanNodeId(String.valueOf(sensorNum)), + FullOuterTimeJoinOperator.class.getSimpleName()); + // Add context for TreeIntoOperator + driverContext.addOperatorContext( + sensorNum + 1, + new PlanNodeId(String.valueOf(sensorNum + 1)), + TestTreeIntoOperator.class.getSimpleName()); + + // Join all sensor scans with FullOuterTimeJoinOperator + FullOuterTimeJoinOperator timeJoinOperator = + new FullOuterTimeJoinOperator( + driverContext.getOperatorContexts().get(sensorNum), + scanOperators, + Ordering.ASC, + dataTypes, + columnMergers, + new AscTimeComparator()); + + return new TestTreeIntoOperator( + driverContext.getOperatorContexts().get(sensorNum + 1), + timeJoinOperator, + inputColumnTypes, + targetPathToSourceInputLocationMap, + targetPathToDataTypeMap, + targetDeviceToAlignedMap, + sourceTargetPathPairList, + instanceNotificationExecutor, + 100); + } + + /** Test scenario 1: small amount of data, should return in single TsBlock */ + @Test + public void testAllResultsInSingleTsBlock() throws Exception { + prepareSourceTargetPairs(2); + operator = createAndInitOperator(2); + + TsBlock result = null; + while (operator.isBlocked().isDone() && operator.hasNext()) { + result = operator.next(); + } + assertNotNull(result); + assertEquals(2, result.getPositionCount()); + + // Verify 3 value columns (source, target, count) + assertEquals(3, result.getValueColumnCount()); + assertTrue(operator.isFinished()); + } + + /** + * Test scenario 2: Result set exceeds maxTsBlockSize, should return in batches Create a large + * number of path pairs to trigger size limit + */ + @Test + public void testResultsExceedMaxTsBlockSize() throws Exception { + prepareSourceTargetPairs(10); + operator = createAndInitOperator(10); + + int totalRows = 0; + // Loop through all batches + while (operator.isBlocked().isDone() && operator.hasNext()) { + TsBlock result = operator.next(); + if (result != null && !result.isEmpty()) { + int rowCount = result.getPositionCount(); + assertTrue(rowCount == 4 || rowCount == 2); + totalRows += rowCount; + } + } + + // Verify all data is returned + assertEquals(10, totalRows); + assertTrue(operator.isFinished()); + } + + private void prepareSourceTargetPairs(int sensorNum) throws Exception { + String sourceDevicePath = TEST_SG + ".device0"; + String targetDevicePath = TEST_SG + ".new_device0"; + targetDeviceToAlignedMap.put(targetDevicePath, false); + PartialPath targetDevicePartialPath = new PartialPath(targetDevicePath); + + for (int i = 0; i < sensorNum; i++) { + String targetMeasurement = "sensor" + i; + String sourcePath = sourceDevicePath + "." + targetMeasurement; + String targetPath = targetDevicePath + "." + targetMeasurement; + + sourceTargetPathPairList.add(new Pair<>(sourcePath, new PartialPath(targetPath))); + + Map inputLocationMap = + targetPathToSourceInputLocationMap.computeIfAbsent( + targetDevicePartialPath, k -> new HashMap<>()); + // Each sensor comes from a different input location (different SeriesScanOperator) + inputLocationMap.put(targetMeasurement, new InputLocation(0, i)); + + // Prepare targetPathToDataTypeMap + Map dataTypeMap = + targetPathToDataTypeMap.computeIfAbsent(targetDevicePartialPath, k -> new HashMap<>()); + dataTypeMap.put(targetMeasurement, TSDataType.INT32); + } + + // Prepare inputColumnTypes (one column for each sensor) + for (int i = 0; i < sensorNum; i++) { + inputColumnTypes.add(TSDataType.INT32); + } + } + + /** + * Test version of TreeIntoOperator that mocks the write operation. Instead of actually executing + * insertTablets, it returns an immediately completed Future. + */ + private static class TestTreeIntoOperator extends TreeIntoOperator { + + public TestTreeIntoOperator( + OperatorContext operatorContext, + Operator child, + List inputColumnTypes, + Map> targetPathToSourceInputLocationMap, + Map> targetPathToDataTypeMap, + Map targetDeviceToAlignedMap, + List> sourceTargetPathPairList, + ExecutorService intoOperationExecutor, + long statementSizePerLine) { + super( + operatorContext, + child, + inputColumnTypes, + targetPathToSourceInputLocationMap, + targetPathToDataTypeMap, + targetDeviceToAlignedMap, + sourceTargetPathPairList, + intoOperationExecutor, + statementSizePerLine); + } + + @Override + protected void executeInsertMultiTabletsStatement( + InsertMultiTabletsStatement insertMultiTabletsStatement) { + // Mock the write operation by setting an immediately completed Future + writeOperationFuture = immediateFuture(SUCCESS_STATUS); + } + } +} From e73f2b3e3285632acf7283b5b7f03b45b7390121 Mon Sep 17 00:00:00 2001 From: shizy Date: Wed, 7 Jan 2026 21:26:10 +0800 Subject: [PATCH 3/6] missing part --- .../operator/process/DeviceViewIntoOperator.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 03acc28d5d07..96de3eefcbe2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -261,4 +261,18 @@ protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boole batchedRowCount = 0; return insertMultiTabletsStatement; } + + @Override + protected long findWritten(String device, String measurement) { + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + if (!Objects.equals(generator.getDevice(), device)) { + continue; + } + long writtenCountInCurrentGenerator = generator.getWrittenCount(measurement); + if (writtenCountInCurrentGenerator >= 0) { + return writtenCountInCurrentGenerator; + } + } + return 0; + } } From 95e7bab61477a55850639d2e427d576ed078619d Mon Sep 17 00:00:00 2001 From: shizy Date: Fri, 9 Jan 2026 16:34:25 +0800 Subject: [PATCH 4/6] Optimize DeviceViewIntoOperator.findWritten --- .../execution/operator/process/DeviceViewIntoOperator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java index 96de3eefcbe2..727ef26c1c71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java @@ -264,7 +264,8 @@ protected InsertMultiTabletsStatement constructInsertMultiTabletsStatement(boole @Override protected long findWritten(String device, String measurement) { - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + for (int i = insertTabletStatementGenerators.size() - 1; i >= 0; i--) { + InsertTabletStatementGenerator generator = insertTabletStatementGenerators.get(i); if (!Objects.equals(generator.getDevice(), device)) { continue; } From 0ecf519048e4db22e2a8e93eb0e3b53ee7c41394 Mon Sep 17 00:00:00 2001 From: shizy Date: Mon, 12 Jan 2026 11:00:02 +0800 Subject: [PATCH 5/6] fix IoTDBSelectIntoIT --- .../db/it/selectinto/IoTDBSelectIntoIT.java | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java index 6e5a79fcf9b5..5a6b3be3db12 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java @@ -645,9 +645,6 @@ public void testDataTypeIncompatible() { assertTestFail( "select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]"); - assertTestFail( - "select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]"); // test INT64 assertTestFail( @@ -659,9 +656,6 @@ public void testDataTypeIncompatible() { assertTestFail( "select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]"); - assertTestFail( - "select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]"); // test FLOAT assertTestFail( @@ -673,9 +667,6 @@ public void testDataTypeIncompatible() { assertTestFail( "select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]"); - assertTestFail( - "select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value 0.0]"); // test DOUBLE assertTestFail( @@ -690,9 +681,6 @@ public void testDataTypeIncompatible() { assertTestFail( "select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;", "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]"); - assertTestFail( - "select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value 0.0]"); // test BOOLEAN assertTestFail( @@ -700,16 +688,13 @@ public void testDataTypeIncompatible() { "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;", - "301: Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]"); + "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;", "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]"); assertTestFail( "select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;", "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]"); - assertTestFail( - "select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;", - "Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value true]"); // test TEXT assertTestFail( From 527045f92f7a765f07b55a39f497e46785503100 Mon Sep 17 00:00:00 2001 From: shizy Date: Mon, 12 Jan 2026 11:06:38 +0800 Subject: [PATCH 6/6] change concurrent thread number for testConcurrentCteQueries --- .../apache/iotdb/relational/it/query/recent/IoTDBCteIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java index b79a5b7b7fa6..7f29e3f0197f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java @@ -470,7 +470,7 @@ public void testPrivileges() throws SQLException { @Test public void testConcurrentCteQueries() throws Exception { - final int threadCount = 10; + final int threadCount = 3; final int queriesPerThread = 20; final AtomicInteger successCount = new AtomicInteger(0); final AtomicInteger failureCount = new AtomicInteger(0); @@ -582,7 +582,7 @@ public void testConcurrentCteQueries() throws Exception { int totalQueries = threadCount * queriesPerThread; assertEquals("All queries should succeed", totalQueries, successCount.get()); assertEquals("No queries should fail", 0, failureCount.get()); - assertEquals("Total query count should match", 340, totalCount.get()); + assertEquals("Total query count should match", 102, totalCount.get()); } private static void prepareData() {