Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -644,89 +644,74 @@ public void testDataTypeIncompatible() {
// test INT32
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32]).");
assertTestFail(
"select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]");

// test INT64
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type INT64, timestamp 0, value 0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 0]");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
assertTestFail(
"select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]");

// test FLOAT
assertTestFail(
"select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 0.0]");
assertTestFail(
"select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
assertTestFail(
"select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]");

// test DOUBLE
assertTestFail(
"select s_double into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 0.0]");
assertTestFail(
"select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
assertTestFail(
"select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]");

// test BOOLEAN
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
assertTestFail(
"select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]");

// test TEXT
assertTestFail(
"select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value text0]");
assertTestFail(
"select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value text0]");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public void testPrivileges() throws SQLException {

@Test
public void testConcurrentCteQueries() throws Exception {
final int threadCount = 10;
final int threadCount = 3;
final int queriesPerThread = 20;
final AtomicInteger successCount = new AtomicInteger(0);
final AtomicInteger failureCount = new AtomicInteger(0);
Expand Down Expand Up @@ -582,7 +582,7 @@ public void testConcurrentCteQueries() throws Exception {
int totalQueries = threadCount * queriesPerThread;
assertEquals("All queries should succeed", totalQueries, successCount.get());
assertEquals("No queries should fail", 0, failureCount.get());
assertEquals("Total query count should match", 340, totalCount.get());
assertEquals("Total query count should match", 102, totalCount.get());
}

private static void prepareData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ public TsBlock next() throws Exception {
checkLastWriteOperation();

if (!processTsBlock(cachedTsBlock)) {
return null;
return tryToReturnPartialResult();
}
cachedTsBlock = null;
if (child.hasNextWithTimer()) {
TsBlock inputTsBlock = child.nextWithTimer();
processTsBlock(inputTsBlock);

// call child.next only once
return null;
return tryToReturnPartialResult();
} else {
return tryToReturnResultTsBlock();
}
Expand Down Expand Up @@ -204,6 +204,8 @@ protected void checkLastWriteOperation() {

protected abstract TsBlock tryToReturnResultTsBlock();

protected abstract TsBlock tryToReturnPartialResult();

protected abstract void resetInsertTabletStatementGenerators();

private void setMaxRowNumberInStatement(long statementSizePerLine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.util.concurrent.Futures;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlockBuilder;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,15 +40,18 @@

public abstract class AbstractTreeIntoOperator extends AbstractIntoOperator {
protected List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
protected final TsBlockBuilder resultTsBlockBuilder;

protected AbstractTreeIntoOperator(
OperatorContext operatorContext,
Operator child,
List<TSDataType> inputColumnTypes,
ExecutorService intoOperationExecutor,
long statementSizePerLine) {
long statementSizePerLine,
List<TSDataType> outputDataTypes) {
super(operatorContext, child, inputColumnTypes, intoOperationExecutor, statementSizePerLine);
this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}

protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
Expand Down Expand Up @@ -134,7 +138,7 @@ protected void executeInsertMultiTabletsStatement(
() -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
}

private boolean existFullStatement(
protected boolean existFullStatement(
List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
if (generator.isFull()) {
Expand Down
Loading
Loading