Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1086,13 +1086,14 @@ private void addSortForEachLastQueryNode(PlanNode root, Ordering timeseriesOrder
if (child instanceof LastQueryScanNode) {
// sort the measurements for LastQueryMergeOperator
LastQueryScanNode node = (LastQueryScanNode) child;
((LastQueryScanNode) child)
.getIdxOfMeasurementSchemas()
node.getIdxOfMeasurementSchemas()
.sort(
Comparator.comparing(
idx ->
new Binary(
node.getMeasurementSchema(idx).getMeasurementName(),
node.getGlobalMeasurementSchemaList()
.get(idx)
.getMeasurementName(),
TSFileConfig.STRING_CHARSET),
Comparator.naturalOrder()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ public void setGlobalMeasurementSchemaList(List<IMeasurementSchema> globalMeasur
this.globalMeasurementSchemaList = globalMeasurementSchemaList;
}

public List<IMeasurementSchema> getGlobalMeasurementSchemaList() {
return globalMeasurementSchemaList;
}

public IMeasurementSchema getMeasurementSchema(int idx) {
int globalIdx = indexOfMeasurementSchemas.get(idx);
return globalMeasurementSchemaList.get(globalIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -41,6 +47,57 @@

public class LastQueryTest {

@Test
public void testSortLastQueryScanNode() throws IllegalPathException {
LastQueryNode lastQueryNode = new LastQueryNode(new PlanNodeId("test"), null, true);

lastQueryNode.addDeviceLastQueryScanNode(
new PlanNodeId("test_last_query_scan1"),
new PartialPath("root.test.d1"),
true,
Arrays.asList(
new MeasurementSchema("s3", TSDataType.INT32),
new MeasurementSchema("s1", TSDataType.BOOLEAN),
new MeasurementSchema("s2", TSDataType.INT32)),
null,
null);
lastQueryNode.addDeviceLastQueryScanNode(
new PlanNodeId("test_last_query_scan2"),
new PartialPath("root.test.d0"),
false,
Collections.singletonList(new MeasurementSchema("s0", TSDataType.BOOLEAN)),
null,
null);

Analysis analysis = Util.constructAnalysis();
SourceRewriter sourceRewriter = new SourceRewriter(analysis);
DistributionPlanContext context =
new DistributionPlanContext(
new MPPQueryContext("", new QueryId("test"), null, new TEndPoint(), new TEndPoint()));
context.setOneSeriesInMultiRegion(true);
context.setQueryMultiRegion(true);
List<PlanNode> result = sourceRewriter.visitLastQuery(lastQueryNode, context);
Assert.assertEquals(1, result.size());
Assert.assertTrue(result.get(0) instanceof LastQueryMergeNode);
LastQueryMergeNode mergeNode = (LastQueryMergeNode) result.get(0);
Assert.assertEquals(1, mergeNode.getChildren().size());
Assert.assertTrue(mergeNode.getChildren().get(0) instanceof LastQueryNode);

LastQueryNode lastQueryNode2 = (LastQueryNode) mergeNode.getChildren().get(0);
Assert.assertEquals(2, lastQueryNode2.getChildren().size());
Assert.assertTrue(lastQueryNode2.getChildren().get(0) instanceof LastQueryScanNode);

LastQueryScanNode scanNodeChild1 = (LastQueryScanNode) lastQueryNode2.getChildren().get(0);
Assert.assertTrue(scanNodeChild1.getDevicePath().toString().contains("d0"));
Assert.assertEquals("s0", scanNodeChild1.getMeasurementSchemas().get(0).getMeasurementName());

LastQueryScanNode scanNodeChild2 = (LastQueryScanNode) lastQueryNode2.getChildren().get(1);
Assert.assertTrue(scanNodeChild2.getDevicePath().toString().contains("d1"));
Assert.assertEquals("s1", scanNodeChild2.getMeasurementSchemas().get(0).getMeasurementName());
Assert.assertEquals("s2", scanNodeChild2.getMeasurementSchemas().get(1).getMeasurementName());
Assert.assertEquals("s3", scanNodeChild2.getMeasurementSchemas().get(2).getMeasurementName());
}

@Test
public void testLastQuery1Series1Region() throws IllegalPathException {
String d2s1Path = "root.sg.d22.s1";
Expand Down
Loading