From fa450f17865ce878ba74e1c49b376d62f587cbc3 Mon Sep 17 00:00:00 2001 From: Sh-Zh-7 Date: Fri, 26 Dec 2025 03:15:01 +0800 Subject: [PATCH] Finish all rules, nodes and operators. --- .../operator/GroupedTopNBuilder.java | 13 + .../GroupedTopNRowNumberAccumulator.java | 538 ++++++++++++++++++ .../operator/GroupedTopNRowNumberBuilder.java | 162 ++++++ .../execution/operator/IdRegistry.java | 57 ++ .../operator/RowIdComparisonStrategy.java | 5 + .../execution/operator/RowIdHashStrategy.java | 8 + .../execution/operator/RowReference.java | 26 + .../operator/RowReferencePageManager.java | 361 ++++++++++++ .../SimpleTsBlockWithPositionComparator.java | 27 + .../TsBlockWithPositionComparator.java | 7 + .../operator/process/ValuesOperator.java | 120 ++++ .../process/window/RowNumberOperator.java | 214 +++++++ .../process/window/TopKRankingOperator.java | 277 +++++++++ .../grouped/array/IntArrayFIFOQueue.java | 182 ++++++ .../grouped/array/LongBigArrayFIFOQueue.java | 178 ++++++ .../grouped/hash/NoChannelGroupByHash.java | 53 ++ .../plan/planner/plan/node/PlanNodeType.java | 3 + .../plan/planner/plan/node/PlanVisitor.java | 11 + .../iterative/rule/GatherAndMergeWindows.java | 315 ++++++++++ .../rule/PushDownFilterIntoWindow.java | 147 +++++ .../rule/PushDownLimitIntoWindow.java | 81 +++ .../iterative/rule/RemoveRedundantWindow.java | 26 + .../rule/ReplaceWindowWithRowNumber.java | 43 ++ .../planner/iterative/rule/Util.java | 45 +- .../planner/node/RowNumberNode.java | 177 ++++++ .../planner/node/TopKRankingNode.java | 168 ++++++ .../relational/planner/node/ValuesNode.java | 211 +++++++ .../relational/planner/node/WindowNode.java | 4 + .../apache/iotdb/db/utils/HeapTraversal.java | 45 ++ 29 files changed, 3483 insertions(+), 21 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java new file mode 100644 index 0000000000000..54b1a2bf66a9e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNBuilder.java @@ -0,0 +1,13 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.Iterator; + +public interface GroupedTopNBuilder { + void addTsBlock(TsBlock tsBlock); + + Iterator getResult(); + + long getEstimatedSizeInBytes(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java new file mode 100644 index 0000000000000..5d53ff08b761c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberAccumulator.java @@ -0,0 +1,538 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue; +import org.apache.iotdb.db.utils.HeapTraversal; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.function.LongConsumer; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.lang.Math.abs; +import static java.lang.Math.max; +import static java.util.Objects.requireNonNull; + +public class GroupedTopNRowNumberAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRowNumberAccumulator.class); + private static final long UNKNOWN_INDEX = -1; + + private final GroupIdToHeapBuffer groupIdToHeapBuffer = new GroupIdToHeapBuffer(); + private final HeapNodeBuffer heapNodeBuffer = new HeapNodeBuffer(); + private final HeapTraversal heapTraversal = new HeapTraversal(); + + private final RowIdComparisonStrategy strategy; + private final int topN; + private final LongConsumer rowIdEvictionListener; + + public GroupedTopNRowNumberAccumulator( + RowIdComparisonStrategy strategy, int topN, LongConsumer rowIdEvictionListener) { + this.strategy = requireNonNull(strategy, "strategy is null"); + checkArgument(topN > 0, "topN must be greater than zero"); + this.topN = topN; + this.rowIdEvictionListener = + requireNonNull(rowIdEvictionListener, "rowIdEvictionListener is null"); + } + + public long sizeOf() { + return INSTANCE_SIZE + + groupIdToHeapBuffer.sizeOf() + + heapNodeBuffer.sizeOf() + + heapTraversal.sizeOf(); + } + + public int findFirstPositionToAdd( + TsBlock newPage, + int groupCount, + int[] groupIds, + TsBlockWithPositionComparator comparator, + RowReferencePageManager pageManager) { + int currentTotalGroups = groupIdToHeapBuffer.getTotalGroups(); + groupIdToHeapBuffer.allocateGroupIfNeeded(groupCount); + + for (int position = 0; position < newPage.getPositionCount(); position++) { + int groupId = groupIds[position]; + if (groupId >= currentTotalGroups || calculateRootRowNumber(groupId) < topN) { + return position; + } + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (heapRootNodeIndex == UNKNOWN_INDEX) { + return position; + } + long rowId = heapNodeBuffer.getRowId(heapRootNodeIndex); + TsBlock rightPage = pageManager.getPage(rowId); + int rightPosition = pageManager.getPosition(rowId); + if (comparator.compareTo(newPage, position, rightPage, rightPosition) < 0) { + return position; + } + } + return -1; + } + + /** + * Add the specified row to this accumulator. + * + *

This may trigger row eviction callbacks if other rows have to be evicted to make space. + * + * @return true if this row was incorporated, false otherwise + */ + public boolean add(int groupId, RowReference rowReference) { + groupIdToHeapBuffer.allocateGroupIfNeeded(groupId); + + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (heapRootNodeIndex == UNKNOWN_INDEX || calculateRootRowNumber(groupId) < topN) { + heapInsert(groupId, rowReference.allocateRowId()); + return true; + } + if (rowReference.compareTo(strategy, heapNodeBuffer.getRowId(heapRootNodeIndex)) < 0) { + heapPopAndInsert(groupId, rowReference.allocateRowId(), rowIdEvictionListener); + return true; + } + return false; + } + + /** + * Drain the contents of groupId from this accumulator to the provided output row ID buffer. + * + *

Rows will be presented in increasing rank order. Draining will not trigger any row eviction + * callbacks. After this method completion, the Accumulator will contain zero rows for the + * specified groupId. + * + * @return number of rows deposited to the output buffer + */ + public long drainTo(int groupId, LongBigArray rowIdOutput) { + long heapSize = groupIdToHeapBuffer.getHeapSize(groupId); + rowIdOutput.ensureCapacity(heapSize); + // Heap is inverted to output order, so insert back to front + for (long i = heapSize - 1; i >= 0; i--) { + rowIdOutput.set(i, peekRootRowId(groupId)); + // No eviction listener needed because this is an explicit caller directive to extract data + heapPop(groupId, null); + } + return heapSize; + } + + private long calculateRootRowNumber(int groupId) { + return groupIdToHeapBuffer.getHeapSize(groupId); + } + + private long peekRootRowId(int groupId) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "No root to peek"); + return heapNodeBuffer.getRowId(heapRootNodeIndex); + } + + private long getChildIndex(long heapNodeIndex, HeapTraversal.Child child) { + return child == HeapTraversal.Child.LEFT + ? heapNodeBuffer.getLeftChildHeapIndex(heapNodeIndex) + : heapNodeBuffer.getRightChildHeapIndex(heapNodeIndex); + } + + private void setChildIndex(long heapNodeIndex, HeapTraversal.Child child, long newChildIndex) { + if (child == HeapTraversal.Child.LEFT) { + heapNodeBuffer.setLeftChildHeapIndex(heapNodeIndex, newChildIndex); + } else { + heapNodeBuffer.setRightChildHeapIndex(heapNodeIndex, newChildIndex); + } + } + + /** + * Pop the root node off the group ID's max heap. + * + * @param contextEvictionListener optional callback for the root node that gets popped off + */ + private void heapPop(int groupId, LongConsumer contextEvictionListener) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "Group ID has an empty heap"); + + long lastNodeIndex = heapDetachLastInsertionLeaf(groupId); + long lastRowId = heapNodeBuffer.getRowId(lastNodeIndex); + heapNodeBuffer.deallocate(lastNodeIndex); + + if (lastNodeIndex == heapRootNodeIndex) { + // The root is the last node remaining + if (contextEvictionListener != null) { + contextEvictionListener.accept(lastRowId); + } + } else { + // Pop the root and insert lastRowId back into the heap to ensure a balanced tree + heapPopAndInsert(groupId, lastRowId, contextEvictionListener); + } + } + + /** + * Detaches (but does not deallocate) the leaf in the bottom right-most position in the heap. + * + *

Given the fixed insertion order, the bottom right-most leaf will correspond to the last leaf + * node inserted into the balanced heap. + * + * @return leaf node index that was detached from the heap + */ + private long heapDetachLastInsertionLeaf(int groupId) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + long heapSize = groupIdToHeapBuffer.getHeapSize(groupId); + + long previousNodeIndex = UNKNOWN_INDEX; + HeapTraversal.Child childPosition = null; + long currentNodeIndex = heapRootNodeIndex; + + heapTraversal.resetWithPathTo(heapSize); + while (!heapTraversal.isTarget()) { + previousNodeIndex = currentNodeIndex; + childPosition = heapTraversal.nextChild(); + currentNodeIndex = getChildIndex(currentNodeIndex, childPosition); + verify(currentNodeIndex != UNKNOWN_INDEX, "Target node must exist"); + } + + // Detach the last insertion leaf node, but do not deallocate yet + if (previousNodeIndex == UNKNOWN_INDEX) { + // Last insertion leaf was the root node + groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, UNKNOWN_INDEX); + groupIdToHeapBuffer.setHeapSize(groupId, 0); + } else { + setChildIndex(previousNodeIndex, childPosition, UNKNOWN_INDEX); + groupIdToHeapBuffer.addHeapSize(groupId, -1); + } + + return currentNodeIndex; + } + + /** + * Inserts a new row into the heap for the specified group ID. + * + *

The technique involves traversing the heap from the root to a new bottom left-priority leaf + * position, potentially swapping heap nodes along the way to find the proper insertion position + * for the new row. Insertions always fill the left child before the right, and fill up an entire + * heap level before moving to the next level. + */ + private void heapInsert(int groupId, long newRowId) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (heapRootNodeIndex == UNKNOWN_INDEX) { + // Heap is currently empty, so this will be the first node + heapRootNodeIndex = heapNodeBuffer.allocateNewNode(newRowId); + + groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, heapRootNodeIndex); + groupIdToHeapBuffer.setHeapSize(groupId, 1); + return; + } + + long previousHeapNodeIndex = UNKNOWN_INDEX; + HeapTraversal.Child childPosition = null; + long currentHeapNodeIndex = heapRootNodeIndex; + + heapTraversal.resetWithPathTo(groupIdToHeapBuffer.getHeapSize(groupId) + 1); + while (!heapTraversal.isTarget()) { + long currentRowId = heapNodeBuffer.getRowId(currentHeapNodeIndex); + if (strategy.compare(newRowId, currentRowId) > 0) { + // Swap the row values + heapNodeBuffer.setRowId(currentHeapNodeIndex, newRowId); + + newRowId = currentRowId; + } + + previousHeapNodeIndex = currentHeapNodeIndex; + childPosition = heapTraversal.nextChild(); + currentHeapNodeIndex = getChildIndex(currentHeapNodeIndex, childPosition); + } + + verify( + previousHeapNodeIndex != UNKNOWN_INDEX && childPosition != null, + "heap must have at least one node before starting traversal"); + verify(currentHeapNodeIndex == UNKNOWN_INDEX, "New child shouldn't exist yet"); + + long newHeapNodeIndex = heapNodeBuffer.allocateNewNode(newRowId); + + // Link the new child to the parent + setChildIndex(previousHeapNodeIndex, childPosition, newHeapNodeIndex); + + groupIdToHeapBuffer.incrementHeapSize(groupId); + } + + /** + * Pop the root node off the group ID's max heap and insert the newRowId. + * + *

These two operations are more efficient if performed together. The technique involves + * swapping the new row into the root position, and applying a heap down bubbling operation to + * heap-ify. + * + * @param contextEvictionListener optional callback for the root node that gets popped off + */ + private void heapPopAndInsert(int groupId, long newRowId, LongConsumer contextEvictionListener) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + checkState(heapRootNodeIndex != UNKNOWN_INDEX, "popAndInsert() requires at least a root node"); + + // Clear contents of the root node to create a vacancy for another row + long poppedRowId = heapNodeBuffer.getRowId(heapRootNodeIndex); + + long currentNodeIndex = heapRootNodeIndex; + while (true) { + long maxChildNodeIndex = heapNodeBuffer.getLeftChildHeapIndex(currentNodeIndex); + if (maxChildNodeIndex == UNKNOWN_INDEX) { + // Left is always inserted before right, so a missing left child means there can't be a + // right child, + // which means this must already be a leaf position. + break; + } + long maxChildRowId = heapNodeBuffer.getRowId(maxChildNodeIndex); + + long rightChildNodeIndex = heapNodeBuffer.getRightChildHeapIndex(currentNodeIndex); + if (rightChildNodeIndex != UNKNOWN_INDEX) { + long rightRowId = heapNodeBuffer.getRowId(rightChildNodeIndex); + if (strategy.compare(rightRowId, maxChildRowId) > 0) { + maxChildNodeIndex = rightChildNodeIndex; + maxChildRowId = rightRowId; + } + } + + if (strategy.compare(newRowId, maxChildRowId) >= 0) { + // New row is greater than or equal to both children, so the heap invariant is satisfied by + // inserting the + // new row at this position + break; + } + + // Swap the max child row value into the current node + heapNodeBuffer.setRowId(currentNodeIndex, maxChildRowId); + + // Max child now has an unfilled vacancy, so continue processing with that as the current node + currentNodeIndex = maxChildNodeIndex; + } + + heapNodeBuffer.setRowId(currentNodeIndex, newRowId); + + if (contextEvictionListener != null) { + contextEvictionListener.accept(poppedRowId); + } + } + + /** Sanity check the invariants of the underlying data structure. */ + @VisibleForTesting + void verifyIntegrity() { + long totalHeapNodes = 0; + for (int groupId = 0; groupId < groupIdToHeapBuffer.getTotalGroups(); groupId++) { + long heapSize = groupIdToHeapBuffer.getHeapSize(groupId); + long rootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + verify( + rootNodeIndex == UNKNOWN_INDEX || calculateRootRowNumber(groupId) <= topN, + "Max heap has more values than needed"); + IntegrityStats integrityStats = verifyHeapIntegrity(rootNodeIndex); + verify( + integrityStats.getNodeCount() == heapSize, + "Recorded heap size does not match actual heap size"); + totalHeapNodes += integrityStats.getNodeCount(); + } + verify( + totalHeapNodes == heapNodeBuffer.getActiveNodeCount(), + "Failed to deallocate some unused nodes"); + } + + private IntegrityStats verifyHeapIntegrity(long heapNodeIndex) { + if (heapNodeIndex == UNKNOWN_INDEX) { + return new IntegrityStats(0, 0); + } + long rowId = heapNodeBuffer.getRowId(heapNodeIndex); + long leftChildHeapIndex = heapNodeBuffer.getLeftChildHeapIndex(heapNodeIndex); + long rightChildHeapIndex = heapNodeBuffer.getRightChildHeapIndex(heapNodeIndex); + + if (leftChildHeapIndex != UNKNOWN_INDEX) { + verify( + strategy.compare(rowId, heapNodeBuffer.getRowId(leftChildHeapIndex)) >= 0, + "Max heap invariant violated"); + } + if (rightChildHeapIndex != UNKNOWN_INDEX) { + verify(leftChildHeapIndex != UNKNOWN_INDEX, "Left should always be inserted before right"); + verify( + strategy.compare(rowId, heapNodeBuffer.getRowId(rightChildHeapIndex)) >= 0, + "Max heap invariant violated"); + } + + IntegrityStats leftIntegrityStats = verifyHeapIntegrity(leftChildHeapIndex); + IntegrityStats rightIntegrityStats = verifyHeapIntegrity(rightChildHeapIndex); + + verify( + abs(leftIntegrityStats.getMaxDepth() - rightIntegrityStats.getMaxDepth()) <= 1, + "Heap not balanced"); + + return new IntegrityStats( + max(leftIntegrityStats.getMaxDepth(), rightIntegrityStats.getMaxDepth()) + 1, + leftIntegrityStats.getNodeCount() + rightIntegrityStats.getNodeCount() + 1); + } + + private static class IntegrityStats { + private final long maxDepth; + private final long nodeCount; + + public IntegrityStats(long maxDepth, long nodeCount) { + this.maxDepth = maxDepth; + this.nodeCount = nodeCount; + } + + public long getMaxDepth() { + return maxDepth; + } + + public long getNodeCount() { + return nodeCount; + } + } + + /** + * Buffer abstracting a mapping from group ID to a heap. The group ID provides the index for all + * operations. + */ + private static class GroupIdToHeapBuffer { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupIdToHeapBuffer.class); + + /* + * Memory layout: + * [LONG] heapNodeIndex1, + * [LONG] heapNodeIndex2, + * ... + */ + // Since we have a single element per group, this array is effectively indexed on group ID + private final LongBigArray heapIndexBuffer = new LongBigArray(UNKNOWN_INDEX); + + /* + * Memory layout: + * [LONG] heapSize1, + * [LONG] heapSize2, + * ... + */ + // Since we have a single element per group, this array is effectively indexed on group ID + private final LongBigArray sizeBuffer = new LongBigArray(0); + + private int totalGroups; + + public void allocateGroupIfNeeded(int groupId) { + if (totalGroups > groupId) { + return; + } + // Group IDs generated by GroupByHash are always generated consecutively starting from 0, so + // observing a + // group ID N means groups [0, N] inclusive must exist. + totalGroups = groupId + 1; + heapIndexBuffer.ensureCapacity(totalGroups); + sizeBuffer.ensureCapacity(totalGroups); + } + + public int getTotalGroups() { + return totalGroups; + } + + public long getHeapRootNodeIndex(int groupId) { + return heapIndexBuffer.get(groupId); + } + + public void setHeapRootNodeIndex(int groupId, long heapNodeIndex) { + heapIndexBuffer.set(groupId, heapNodeIndex); + } + + public long getHeapSize(int groupId) { + return sizeBuffer.get(groupId); + } + + public void setHeapSize(int groupId, long count) { + sizeBuffer.set(groupId, count); + } + + public void addHeapSize(int groupId, long delta) { + sizeBuffer.add(groupId, delta); + } + + public void incrementHeapSize(int groupId) { + sizeBuffer.increment(groupId); + } + + public long sizeOf() { + return INSTANCE_SIZE + heapIndexBuffer.sizeOf() + sizeBuffer.sizeOf(); + } + } + + /** + * Buffer abstracting storage of nodes in the heap. Nodes are referenced by their node index for + * operations. + */ + private static class HeapNodeBuffer { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(HeapNodeBuffer.class); + private static final int POSITIONS_PER_ENTRY = 3; + private static final int LEFT_CHILD_HEAP_INDEX_OFFSET = 1; + private static final int RIGHT_CHILD_HEAP_INDEX_OFFSET = 2; + + /* + * Memory layout: + * [LONG] rowId1, [LONG] leftChildNodeIndex1, [LONG] rightChildNodeIndex1, + * [LONG] rowId2, [LONG] leftChildNodeIndex2, [LONG] rightChildNodeIndex2, + * ... + */ + private final LongBigArray buffer = new LongBigArray(); + + private final LongBigArrayFIFOQueue emptySlots = new LongBigArrayFIFOQueue(); + + private long capacity; + + /** + * Allocates storage for a new heap node. + * + * @return index referencing the node + */ + public long allocateNewNode(long rowId) { + long newHeapIndex; + if (!emptySlots.isEmpty()) { + newHeapIndex = emptySlots.dequeueLong(); + } else { + newHeapIndex = capacity; + capacity++; + buffer.ensureCapacity(capacity * POSITIONS_PER_ENTRY); + } + + setRowId(newHeapIndex, rowId); + setLeftChildHeapIndex(newHeapIndex, UNKNOWN_INDEX); + setRightChildHeapIndex(newHeapIndex, UNKNOWN_INDEX); + + return newHeapIndex; + } + + public void deallocate(long index) { + emptySlots.enqueue(index); + } + + public long getActiveNodeCount() { + return capacity - emptySlots.longSize(); + } + + public long getRowId(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY); + } + + public void setRowId(long index, long rowId) { + buffer.set(index * POSITIONS_PER_ENTRY, rowId); + } + + public long getLeftChildHeapIndex(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + LEFT_CHILD_HEAP_INDEX_OFFSET); + } + + public void setLeftChildHeapIndex(long index, long childHeapIndex) { + buffer.set(index * POSITIONS_PER_ENTRY + LEFT_CHILD_HEAP_INDEX_OFFSET, childHeapIndex); + } + + public long getRightChildHeapIndex(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + RIGHT_CHILD_HEAP_INDEX_OFFSET); + } + + public void setRightChildHeapIndex(long index, long childHeapIndex) { + buffer.set(index * POSITIONS_PER_ENTRY + RIGHT_CHILD_HEAP_INDEX_OFFSET, childHeapIndex); + } + + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java new file mode 100644 index 0000000000000..941d2ccafa19d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java @@ -0,0 +1,162 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Iterator; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; + +public class GroupedTopNRowNumberBuilder implements GroupedTopNBuilder { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRowNumberBuilder.class); + + private final List sourceTypes; + private final boolean produceRowNumber; + private final int[] groupByChannels; + private final GroupByHash groupByHash; + private final RowReferencePageManager pageManager = new RowReferencePageManager(); + private final GroupedTopNRowNumberAccumulator groupedTopNRowNumberAccumulator; + private final TsBlockWithPositionComparator comparator; + + public GroupedTopNRowNumberBuilder( + List sourceTypes, + TsBlockWithPositionComparator comparator, + int topN, + boolean produceRowNumber, + int[] groupByChannels, + GroupByHash groupByHash) { + this.sourceTypes = sourceTypes; + this.produceRowNumber = produceRowNumber; + this.groupByChannels = groupByChannels; + this.groupByHash = groupByHash; + this.comparator = comparator; + + this.groupedTopNRowNumberAccumulator = + new GroupedTopNRowNumberAccumulator( + (leftRowId, rightRowId) -> { + TsBlock leftTsBlock = pageManager.getPage(leftRowId); + int leftPosition = pageManager.getPosition(leftRowId); + TsBlock rightTsBlock = pageManager.getPage(rightRowId); + int rightPosition = pageManager.getPosition(rightRowId); + return comparator.compareTo(leftTsBlock, leftPosition, rightTsBlock, rightPosition); + }, + topN, + pageManager::dereference); + } + + @Override + public void addTsBlock(TsBlock tsBlock) { + int[] groupIds = groupByHash.getGroupIds(tsBlock.getColumns(groupByChannels)); + int groupCount = groupByHash.getGroupCount(); + + processTsBlock(tsBlock, groupCount, groupIds); + } + + @Override + public Iterator getResult() { + return new ResultIterator(); + } + + @Override + public long getEstimatedSizeInBytes() { + return INSTANCE_SIZE + + groupByHash.getEstimatedSize() + + pageManager.sizeOf() + + groupedTopNRowNumberAccumulator.sizeOf(); + } + + private void processTsBlock(TsBlock newTsBlock, int groupCount, int[] groupIds) { + int firstPositionToAdd = + groupedTopNRowNumberAccumulator.findFirstPositionToAdd( + newTsBlock, groupCount, groupIds, comparator, pageManager); + if (firstPositionToAdd < 0) { + return; + } + + try (RowReferencePageManager.LoadCursor loadCursor = + pageManager.add(newTsBlock, firstPositionToAdd)) { + for (int position = firstPositionToAdd; + position < newTsBlock.getPositionCount(); + position++) { + int groupId = groupIds[position]; + loadCursor.advance(); + groupedTopNRowNumberAccumulator.add(groupId, loadCursor); + } + } + + pageManager.compactIfNeeded(); + } + + private class ResultIterator extends AbstractIterator { + private final TsBlockBuilder pageBuilder; + private final int groupIdCount = groupByHash.getGroupCount(); + private int currentGroupId = -1; + private final LongBigArray rowIdOutput = new LongBigArray(); + private long currentGroupSize; + private int currentIndexInGroup; + + ResultIterator() { + ImmutableList.Builder sourceTypesBuilders = + ImmutableList.builder().addAll(sourceTypes); + if (produceRowNumber) { + sourceTypesBuilders.add(TSDataType.INT64); + } + pageBuilder = new TsBlockBuilder(sourceTypesBuilders.build()); + } + + @Override + protected TsBlock computeNext() { + pageBuilder.reset(); + while (!pageBuilder.isFull()) { + while (currentIndexInGroup >= currentGroupSize) { + if (currentGroupId + 1 >= groupIdCount) { + if (pageBuilder.isEmpty()) { + return endOfData(); + } + return pageBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, pageBuilder.getPositionCount())); + } + currentGroupId++; + currentGroupSize = groupedTopNRowNumberAccumulator.drainTo(currentGroupId, rowIdOutput); + currentIndexInGroup = 0; + } + + long rowId = rowIdOutput.get(currentIndexInGroup); + TsBlock page = pageManager.getPage(rowId); + int position = pageManager.getPosition(rowId); + for (int i = 0; i < sourceTypes.size(); i++) { + ColumnBuilder builder = pageBuilder.getColumnBuilder(i); + Column column = page.getColumn(i); + builder.write(column, position); + } + if (produceRowNumber) { + ColumnBuilder builder = pageBuilder.getColumnBuilder(sourceTypes.size()); + builder.writeLong(currentGroupId + 1); + } + pageBuilder.declarePosition(); + currentIndexInGroup++; + + // Deference the row for hygiene, but no need to compact them at this point + pageManager.dereference(rowId); + } + + if (pageBuilder.isEmpty()) { + return endOfData(); + } + return pageBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, pageBuilder.getPositionCount())); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java new file mode 100644 index 0000000000000..99b2f53e9de57 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/IdRegistry.java @@ -0,0 +1,57 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.IntArrayFIFOQueue; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.HashMap; +import java.util.function.IntFunction; + +public class IdRegistry { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(IdRegistry.class); + + private final HashMap objects = new HashMap<>(); + private final IntFIFOQueue emptySlots = new IntFIFOQueue(); + + /** + * Provides a new ID referencing the provided object. + * + * @return ID referencing the provided object + */ + public T allocateId(IntFunction factory) { + T result; + if (emptySlots.size() != 0) { + int id = emptySlots.dequeueInt(); + result = factory.apply(id); + objects.put(id, result); + } else { + result = factory.apply(objects.size()); + objects.put(objects.size(), result); + } + return result; + } + + public void deallocate(int id) { + objects.remove(id); + emptySlots.enqueue(id); + } + + public T get(int id) { + return objects.get(id); + } + + /** Does not include the sizes of the referenced objects themselves. */ + public long sizeOf() { + return INSTANCE_SIZE + RamUsageEstimator.sizeOfMap(objects) + emptySlots.sizeOf(); + } + + private static class IntFIFOQueue extends IntArrayFIFOQueue { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(IntFIFOQueue.class); + + public long sizeOf() { + return INSTANCE_SIZE + RamUsageEstimator.sizeOf(array); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java new file mode 100644 index 0000000000000..9b6f3df232e7c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonStrategy.java @@ -0,0 +1,5 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +public interface RowIdComparisonStrategy { + int compare(long leftRowId, long rightRowId); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java new file mode 100644 index 0000000000000..137b55499e55d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdHashStrategy.java @@ -0,0 +1,8 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +/** Hash strategy that evaluates over row IDs */ +public interface RowIdHashStrategy { + boolean equals(long leftRowId, long rightRowId); + + long hashCode(long rowId); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java new file mode 100644 index 0000000000000..92e7a9fb4b450 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReference.java @@ -0,0 +1,26 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +/** + * Reference to a row. + * + *

Note: RowReference gives us the ability to defer row ID generation (which can be expensive in + * tight loops). + */ +public interface RowReference { + /** + * Compares the referenced row to the specified row ID using the provided RowIdComparisonStrategy. + */ + int compareTo(RowIdComparisonStrategy strategy, long rowId); + + /** + * Checks equality of the referenced row with the specified row ID using the provided + * RowIdHashStrategy. + */ + boolean equals(RowIdHashStrategy strategy, long rowId); + + /** Calculates the hash of the referenced row using the provided RowIdHashStrategy. */ + long hash(RowIdHashStrategy strategy); + + /** Allocate a stable row ID that can be used to reference this row at a future point. */ + long allocateRowId(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java new file mode 100644 index 0000000000000..31a2de738f2ea --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowReferencePageManager.java @@ -0,0 +1,361 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.IntBigArray; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue; + +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Arrays; +import java.util.HashSet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; + +public class RowReferencePageManager { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RowReferencePageManager.class); + private static final long PAGE_ACCOUNTING_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PageAccounting.class); + private static final int RESERVED_ROW_ID_FOR_CURSOR = -1; + + private final IdRegistry pages = new IdRegistry<>(); + private final RowIdBuffer rowIdBuffer = new RowIdBuffer(); + private final HashSet compactionCandidates = new HashSet<>(); + + private LoadCursor currentCursor; + private long pageBytes; + + public LoadCursor add(TsBlock tsBlock) { + return add(tsBlock, 0); + } + + public LoadCursor add(TsBlock page, int startingPosition) { + checkState(currentCursor == null, "Cursor still active"); + checkArgument( + startingPosition >= 0 && startingPosition <= page.getPositionCount(), + "invalid startingPosition: %s", + startingPosition); + + PageAccounting pageAccounting = pages.allocateId(id -> new PageAccounting(id, page)); + + pageAccounting.lockPage(); + currentCursor = + new LoadCursor( + pageAccounting, + startingPosition, + () -> { + // Initiate additional actions on close + checkState(currentCursor != null); + pageAccounting.unlockPage(); + pageAccounting.loadPageLoadIfNeeded(); + // Account for page size after lazy loading (which can change the page size) + pageBytes += pageAccounting.sizeOf(); + currentCursor = null; + + checkPageMaintenance(pageAccounting); + }); + + return currentCursor; + } + + public void dereference(long rowId) { + PageAccounting pageAccounting = pages.get(rowIdBuffer.getPageId(rowId)); + pageAccounting.dereference(rowId); + checkPageMaintenance(pageAccounting); + } + + private void checkPageMaintenance(PageAccounting pageAccounting) { + int pageId = pageAccounting.getPageId(); + if (pageAccounting.isPruneEligible()) { + compactionCandidates.remove(pageId); + pages.deallocate(pageId); + pageBytes -= pageAccounting.sizeOf(); + } else if (pageAccounting.isCompactionEligible()) { + compactionCandidates.add(pageId); + } + } + + public TsBlock getPage(long rowId) { + if (isCursorRowId(rowId)) { + checkState(currentCursor != null, "No active cursor"); + return currentCursor.getPage(); + } + int pageId = rowIdBuffer.getPageId(rowId); + return pages.get(pageId).getPage(); + } + + public int getPosition(long rowId) { + if (isCursorRowId(rowId)) { + checkState(currentCursor != null, "No active cursor"); + // rowId for cursors only reference the single current position + return currentCursor.getCurrentPosition(); + } + return rowIdBuffer.getPosition(rowId); + } + + private static boolean isCursorRowId(long rowId) { + return rowId == RESERVED_ROW_ID_FOR_CURSOR; + } + + public void compactIfNeeded() { + for (int pageId : compactionCandidates) { + PageAccounting pageAccounting = pages.get(pageId); + pageBytes -= pageAccounting.sizeOf(); + pageAccounting.compact(); + pageBytes += pageAccounting.sizeOf(); + } + compactionCandidates.clear(); + } + + public long sizeOf() { + return INSTANCE_SIZE + + pageBytes + + pages.sizeOf() + + rowIdBuffer.sizeOf() + + RamUsageEstimator.sizeOfHashSet(compactionCandidates); + } + + /** + * Cursor that allows callers to advance through the registered page and dictate whether a + * specific position should be preserved with a stable row ID. Row ID generation can be expensive + * in tight loops, so this allows callers to quickly skip positions that won't be needed. + */ + public static final class LoadCursor implements RowReference, AutoCloseable { + private final PageAccounting pageAccounting; + private final Runnable closeCallback; + + private int currentPosition; + + private LoadCursor( + PageAccounting pageAccounting, int startingPosition, Runnable closeCallback) { + this.pageAccounting = pageAccounting; + this.currentPosition = startingPosition - 1; + this.closeCallback = closeCallback; + } + + private TsBlock getPage() { + return pageAccounting.getPage(); + } + + private int getCurrentPosition() { + checkState(currentPosition >= 0, "Not yet advanced"); + return currentPosition; + } + + public boolean advance() { + if (currentPosition >= pageAccounting.getPage().getPositionCount() - 1) { + return false; + } + currentPosition++; + return true; + } + + @Override + public int compareTo(RowIdComparisonStrategy strategy, long rowId) { + checkState(currentPosition >= 0, "Not yet advanced"); + return strategy.compare(RESERVED_ROW_ID_FOR_CURSOR, rowId); + } + + @Override + public boolean equals(RowIdHashStrategy strategy, long rowId) { + checkState(currentPosition >= 0, "Not yet advanced"); + return strategy.equals(RESERVED_ROW_ID_FOR_CURSOR, rowId); + } + + @Override + public long hash(RowIdHashStrategy strategy) { + checkState(currentPosition >= 0, "Not yet advanced"); + return strategy.hashCode(RESERVED_ROW_ID_FOR_CURSOR); + } + + @Override + public long allocateRowId() { + checkState(currentPosition >= 0, "Not yet advanced"); + return pageAccounting.referencePosition(currentPosition); + } + + @Override + public void close() { + closeCallback.run(); + } + } + + private final class PageAccounting { + private static final int COMPACTION_MIN_FILL_MULTIPLIER = 2; + + private final int pageId; + private TsBlock page; + private boolean isPageLoaded; + private long[] rowIds; + // Start off locked to give the caller time to declare which rows to reference + private boolean lockedPage = true; + private int activePositions; + + public PageAccounting(int pageId, TsBlock page) { + this.pageId = pageId; + this.page = page; + rowIds = new long[page.getPositionCount()]; + Arrays.fill(rowIds, RowIdBuffer.UNKNOWN_ID); + } + + /** Record the position as referenced and return a corresponding stable row ID */ + public long referencePosition(int position) { + long rowId = rowIds[position]; + if (rowId == RowIdBuffer.UNKNOWN_ID) { + rowId = rowIdBuffer.allocateRowId(pageId, position); + rowIds[position] = rowId; + activePositions++; + } + return rowId; + } + + /** + * Locks the current page so that it can't be compacted (thus allowing for stable position-based + * access). + */ + public void lockPage() { + lockedPage = true; + } + + /** Unlocks the current page so that it becomes eligible for compaction. */ + public void unlockPage() { + lockedPage = false; + } + + public int getPageId() { + return pageId; + } + + public TsBlock getPage() { + return page; + } + + /** Dereferences the row ID from this page. */ + public void dereference(long rowId) { + int position = rowIdBuffer.getPosition(rowId); + checkArgument(rowId == rowIds[position], "rowId does not match this page"); + rowIds[position] = RowIdBuffer.UNKNOWN_ID; + activePositions--; + rowIdBuffer.deallocate(rowId); + } + + public boolean isPruneEligible() { + // Pruning is only allowed if the page is unlocked + return !lockedPage && activePositions == 0; + } + + public boolean isCompactionEligible() { + // Compaction is only allowed if the page is unlocked + return !lockedPage + && activePositions * COMPACTION_MIN_FILL_MULTIPLIER < page.getPositionCount(); + } + + public void loadPageLoadIfNeeded() { + if (!isPageLoaded && activePositions > 0) { + // page = page.getLoadedPage(); + isPageLoaded = true; + } + } + + public void compact() { + checkState(!lockedPage, "Should not attempt compaction when page is locked"); + + if (activePositions == page.getPositionCount()) { + return; + } + + loadPageLoadIfNeeded(); + + int newIndex = 0; + int[] positionsToKeep = new int[activePositions]; + long[] newRowIds = new long[activePositions]; + for (int i = 0; i < page.getPositionCount() && newIndex < positionsToKeep.length; i++) { + long rowId = rowIds[i]; + positionsToKeep[newIndex] = i; + newRowIds[newIndex] = rowId; + newIndex += rowId == RowIdBuffer.UNKNOWN_ID ? 0 : 1; + } + verify(newIndex == activePositions); + for (int i = 0; i < newRowIds.length; i++) { + rowIdBuffer.setPosition(newRowIds[i], i); + } + + // Compact page + // page = page.copyPositions(positionsToKeep, 0, positionsToKeep.length); + rowIds = newRowIds; + } + + public long sizeOf() { + // Getting the size of a page forces a lazy page to be loaded, so only provide the size after + // an explicit decision to load + long loadedPageSize = isPageLoaded ? page.getSizeInBytes() : 0; + return PAGE_ACCOUNTING_INSTANCE_SIZE + loadedPageSize + RamUsageEstimator.sizeOf(rowIds); + } + } + + /** Buffer abstracting a mapping between row IDs and their associated page IDs and positions. */ + private static class RowIdBuffer { + public static final long UNKNOWN_ID = -1; + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RowIdBuffer.class); + + /* + * Memory layout: + * [INT] pageId1, [INT] position1, + * [INT] pageId2, [INT] position2, + * ... + */ + private final IntBigArray buffer = new IntBigArray(); + + private final LongBigArrayFIFOQueue emptySlots = new LongBigArrayFIFOQueue(); + + private long capacity; + + /** + * Provides a new row ID referencing the provided page position. + * + * @return ID referencing the provided page position + */ + public long allocateRowId(int pageId, int position) { + long newRowId; + if (!emptySlots.isEmpty()) { + newRowId = emptySlots.dequeueLong(); + } else { + newRowId = capacity; + capacity++; + buffer.ensureCapacity(capacity * 2); + } + + setPageId(newRowId, pageId); + setPosition(newRowId, position); + + return newRowId; + } + + public void deallocate(long rowId) { + emptySlots.enqueue(rowId); + } + + public int getPageId(long rowId) { + return buffer.get(rowId * 2); + } + + public void setPageId(long rowId, int pageId) { + buffer.set(rowId * 2, pageId); + } + + public int getPosition(long rowId) { + return buffer.get(rowId * 2 + 1); + } + + public void setPosition(long rowId, int position) { + buffer.set(rowId * 2 + 1, position); + } + + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java new file mode 100644 index 0000000000000..a675ec97fb13d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionComparator.java @@ -0,0 +1,27 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; +import org.apache.iotdb.db.utils.datastructure.MergeSortKey; +import org.apache.iotdb.db.utils.datastructure.SortKey; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.Comparator; +import java.util.List; + +public class SimpleTsBlockWithPositionComparator implements TsBlockWithPositionComparator { + private final Comparator comparator; + + public SimpleTsBlockWithPositionComparator( + List types, List sortChannels, List sortItems) { + this.comparator = MergeSortComparator.getComparator(sortItems, sortChannels, types); + } + + @Override + public int compareTo(TsBlock left, int leftPosition, TsBlock right, int rightPosition) { + return comparator.compare( + new MergeSortKey(left, leftPosition), new MergeSortKey(right, rightPosition)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java new file mode 100644 index 0000000000000..dcb7e8ca6211e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionComparator.java @@ -0,0 +1,7 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public interface TsBlockWithPositionComparator { + int compareTo(TsBlock left, int leftPosition, TsBlock right, int rightPosition); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java new file mode 100644 index 0000000000000..eeac35d918908 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ValuesOperator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Iterator; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class ValuesOperator implements Operator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ValuesOperator.class); + + private final OperatorContext operatorContext; + private final Iterator tsBlocks; + private final long maxTsBlockSize; + private long currentRetainedSize; + + public ValuesOperator(OperatorContext operatorContext, List tsBlocks) { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + requireNonNull(tsBlocks, "tsBlocks is null"); + + this.tsBlocks = ImmutableList.copyOf(tsBlocks).iterator(); + + long maxSize = 0; + long totalSize = 0; + for (TsBlock tsBlock : tsBlocks) { + long blockSize = tsBlock.getRetainedSizeInBytes(); + maxSize = Math.max(maxSize, blockSize); + totalSize += blockSize; + } + + this.maxTsBlockSize = maxSize; + this.currentRetainedSize = totalSize; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture isBlocked() { + return NOT_BLOCKED; + } + + @Override + public TsBlock next() throws Exception { + if (!tsBlocks.hasNext()) { + return null; + } + + TsBlock tsBlock = tsBlocks.next(); + if (tsBlock != null) { + currentRetainedSize -= tsBlock.getRetainedSizeInBytes(); + } + + return tsBlock; + } + + @Override + public boolean hasNext() throws Exception { + return tsBlocks.hasNext(); + } + + @Override + public void close() throws Exception {} + + @Override + public boolean isFinished() throws Exception { + return !tsBlocks.hasNext(); + } + + @Override + public long calculateMaxPeekMemory() { + return maxTsBlockSize; + } + + @Override + public long calculateMaxReturnSize() { + return maxTsBlockSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return currentRetainedSize; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java new file mode 100644 index 0000000000000..8a31aa11c25ce --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash; + +public class RowNumberOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RowNumberOperator.class); + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final List outputChannels; + private final List partitionChannels; + private final TsBlockBuilder tsBlockBuilder; + + private final Optional groupByHash; + private final Optional maxRowsPerPartition; + private final Map partitionRowCounts; + + public RowNumberOperator( + OperatorContext operatorContext, + Operator inputOperator, + List inputDataTypes, + List outputChannels, + List partitionChannels, + Optional maxRowsPerPartition, + int expectedPositions) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.outputChannels = ImmutableList.copyOf(outputChannels); + this.partitionChannels = ImmutableList.copyOf(partitionChannels); + this.maxRowsPerPartition = maxRowsPerPartition; + + // Output data types + // original output channels + row number column + List outputDataTypes = new ArrayList<>(); + for (int channel : outputChannels) { + outputDataTypes.add(inputDataTypes.get(channel)); + } + outputDataTypes.add(TSDataType.INT64); + this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); + + if (partitionChannels.isEmpty()) { + this.groupByHash = Optional.empty(); + } else { + // Partition data types + List partitionDataTypes = new ArrayList<>(); + for (int channel : partitionChannels) { + TSDataType tsDataType = inputDataTypes.get(channel); + Type convertType = InternalTypeManager.fromTSDataType(tsDataType); + partitionDataTypes.add(convertType); + } + this.groupByHash = + Optional.of( + createGroupByHash(partitionDataTypes, false, expectedPositions, UpdateMemory.NOOP)); + } + + this.partitionRowCounts = new HashMap<>(expectedPositions); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + TsBlock tsBlock = inputOperator.nextWithTimer(); + if (tsBlock == null) { + return null; + } + + int[] partitionIds = getTsBlockPartitionIds(tsBlock); + for (int position = 0; position < tsBlock.getPositionCount(); position++) { + int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0; + long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L); + processRow(tsBlock, partitionId, rowCount + 1); + partitionRowCounts.put(partitionId, rowCount + 1); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + tsBlockBuilder.reset(); + return result; + } + + private void processRow(TsBlock tsBlock, int position, long rowNumber) { + // Check max rows per partition limit + if (maxRowsPerPartition.isPresent() && rowNumber >= maxRowsPerPartition.get()) { + return; // Skip this row, partition has reached limit + } + + // Copy origin values + for (int i = 0; i < outputChannels.size(); i++) { + Column column = tsBlock.getColumn(outputChannels.get(i)); + ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(i); + if (column.isNull(position)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(column, position); + } + } + // Write row number + int rowNumberChannel = outputChannels.size(); + ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(rowNumberChannel); + columnBuilder.writeLong(rowNumber); + + tsBlockBuilder.declarePosition(); + } + + private int[] getTsBlockPartitionIds(TsBlock tsBlock) { + if (groupByHash.isPresent()) { + Column[] partitionColumns = new Column[partitionChannels.size()]; + for (int i = 0; i < partitionChannels.size(); i++) { + partitionColumns[i] = tsBlock.getColumn(partitionChannels.get(i)); + } + return groupByHash.get().getGroupIds(partitionColumns); + } else { + return new int[] {0}; + } + } + + @Override + public boolean hasNext() throws Exception { + return inputOperator.hasNext(); + } + + @Override + public void close() throws Exception { + inputOperator.close(); + } + + @Override + public boolean isFinished() throws Exception { + return !this.hasNextWithTimer(); + } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemoryFromInput = inputOperator.calculateMaxPeekMemoryWithCounter(); + long maxPeekMemoryFromCurrent = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + return Math.max(maxPeekMemoryFromInput, maxPeekMemoryFromCurrent) + + inputOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long calculateMaxReturnSize() { + return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return inputOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + tsBlockBuilder.getRetainedSizeInBytes(); + } + + @Override + public ListenableFuture isBlocked() { + return inputOperator.isBlocked(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java new file mode 100644 index 0000000000000..b89ed378340f8 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java @@ -0,0 +1,277 @@ +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNBuilder; +import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRowNumberBuilder; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionComparator; +import org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionComparator; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.NoChannelGroupByHash; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.type.Type; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class TopKRankingOperator implements ProcessOperator { + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final TopKRankingNode.RankingType rankingType; + private final List inputTypes; + + private final List outputChannels; + private final List partitionChannels; + private final List partitionTypes; + private final List sortChannels; + private final List sortItems; + private final int maxRowCountPerPartition; + private final boolean partial; + private final boolean generateRanking; + private final Optional hashChannel; + private final int expectedPositions; + + private final long maxFlushableBytes; + + private final Supplier groupByHashSupplier; + private final Supplier groupedTopNBuilderSupplier; + + private GroupByHash groupByHash; + private GroupedTopNBuilder groupedTopNBuilder; + private boolean finishing; + private java.util.Iterator outputIterator; + + public TopKRankingOperator( + OperatorContext operatorContext, + Operator inputOperator, + TopKRankingNode.RankingType rankingType, + List inputTypes, + List outputChannels, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortItems, + int maxRowCountPerPartition, + boolean generateRanking, + Optional hashChannel, + int expectedPositions, + Optional maxPartialMemory) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.rankingType = rankingType; + this.inputTypes = inputTypes; + this.partitionChannels = partitionChannels; + this.partitionTypes = partitionTypes; + this.sortChannels = sortChannels; + this.sortItems = sortItems; + this.maxRowCountPerPartition = maxRowCountPerPartition; + this.partial = !generateRanking; + this.generateRanking = generateRanking; + this.hashChannel = hashChannel; + this.expectedPositions = expectedPositions; + this.maxFlushableBytes = maxPartialMemory.orElse(0L); + + ImmutableList.Builder outputChannelsBuilder = ImmutableList.builder(); + for (int channel : outputChannels) { + outputChannelsBuilder.add(channel); + } + if (generateRanking) { + outputChannelsBuilder.add(outputChannels.size()); + } + this.outputChannels = outputChannelsBuilder.build(); + + this.groupByHashSupplier = + getGroupByHashSupplier( + expectedPositions, + partitionTypes, + hashChannel.isPresent(), + operatorContext.getSessionInfo(), + UpdateMemory.NOOP); + + // Prepare grouped topN builder supplier + this.groupedTopNBuilderSupplier = + getGroupedTopNBuilderSupplier( + rankingType, + inputTypes, + partitionChannels, + sortChannels, + sortItems, + maxRowCountPerPartition, + generateRanking, + groupByHashSupplier); + } + + private static Supplier getGroupByHashSupplier( + int expectedPositions, + List partitionTsDataTypes, + boolean hasPrecomputedHash, + SessionInfo session, + UpdateMemory updateMemory) { + + if (partitionTsDataTypes.isEmpty()) { + return Suppliers.ofInstance(new NoChannelGroupByHash()); + } + + List partitionTypes = new ArrayList<>(partitionTsDataTypes.size()); + for (TSDataType partitionTsDataType : partitionTsDataTypes) { + partitionTypes.add(InternalTypeManager.fromTSDataType(partitionTsDataType)); + } + + return () -> + GroupByHash.createGroupByHash( + partitionTypes, hasPrecomputedHash, expectedPositions, updateMemory); + } + + private static Supplier getGroupedTopNBuilderSupplier( + TopKRankingNode.RankingType rankingType, + List sourceTypes, + List partitionChannels, + List sortChannels, + List sortItems, + int maxRankingPerPartition, + boolean generateRanking, + Supplier groupByHashSupplier) { + + if (rankingType == TopKRankingNode.RankingType.ROW_NUMBER) { + TsBlockWithPositionComparator comparator = + new SimpleTsBlockWithPositionComparator(sourceTypes, sortChannels, sortItems); + return () -> + new GroupedTopNRowNumberBuilder( + sourceTypes, + comparator, + maxRankingPerPartition, + generateRanking, + partitionChannels.stream().mapToInt(Integer::intValue).toArray(), + groupByHashSupplier.get()); + } + + // if (rankingType == TopKRankingNode.RankingType.RANK) { + // Comparator comparator = new SimpleTsBlockWithPositionComparator( + // sourceTypes, sortChannels, ascendingOrders); + // return () -> new GroupedTopNRankBuilder( + // sourceTypes, + // comparator, + // maxRankingPerPartition, + // generateRanking, + // groupByHashSupplier.get()); + // } + + if (rankingType == TopKRankingNode.RankingType.DENSE_RANK) { + throw new UnsupportedOperationException("DENSE_RANK not yet implemented"); + } + + throw new IllegalArgumentException("Unknown ranking type: " + rankingType); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + if (!finishing && (!partial || !isBuilderFull()) && outputIterator == null) { + // Still collecting input, nothing to output yet + return null; + } + + if (outputIterator == null && groupedTopNBuilder != null) { + // Start flushing results + outputIterator = groupedTopNBuilder.getResult(); + } + + if (outputIterator != null && outputIterator.hasNext()) { + return outputIterator.next(); + } else { + closeGroupedTopNBuilder(); + return null; + } + } + + @Override + public boolean hasNext() throws Exception { + // If we have an output iterator with more data, return true + if (outputIterator != null && outputIterator.hasNext()) { + return true; + } + + // If we're finishing and have no more output, return false + if (finishing && outputIterator == null && groupedTopNBuilder == null) { + return false; + } + + // If we have a builder that's full (partial) or we're finishing, we should have output + return (partial && isBuilderFull()) || finishing; + } + + @Override + public void close() throws Exception { + closeGroupedTopNBuilder(); + if (inputOperator != null) { + inputOperator.close(); + } + } + + @Override + public boolean isFinished() throws Exception { + return finishing && outputIterator == null && groupedTopNBuilder == null; + } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemoryFromInput = inputOperator.calculateMaxPeekMemoryWithCounter(); + long maxTsBlockMemory = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + long builderMemory = + groupedTopNBuilder != null ? groupedTopNBuilder.getEstimatedSizeInBytes() : 0; + return Math.max(maxTsBlockMemory + builderMemory, maxPeekMemoryFromInput) + + inputOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long calculateMaxReturnSize() { + return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + long retainedSize = inputOperator.calculateRetainedSizeAfterCallingNext(); + if (groupedTopNBuilder != null) { + retainedSize += groupedTopNBuilder.getEstimatedSizeInBytes(); + } + + return retainedSize; + } + + private void closeGroupedTopNBuilder() { + if (groupedTopNBuilder != null) { + groupedTopNBuilder = null; + } + if (groupByHash != null) { + groupByHash = null; + } + outputIterator = null; + } + + private boolean isBuilderFull() { + return groupedTopNBuilder != null + && groupedTopNBuilder.getEstimatedSizeInBytes() >= maxFlushableBytes; + } + + @Override + public long ramBytesUsed() { + return 0; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java new file mode 100644 index 0000000000000..7ef6fdc01f199 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/IntArrayFIFOQueue.java @@ -0,0 +1,182 @@ +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.NoSuchElementException; + +public class IntArrayFIFOQueue implements Serializable { + private static final long serialVersionUID = 0L; + public static final int INITIAL_CAPACITY = 4; + protected transient int[] array; + protected transient int length; + protected transient int start; + protected transient int end; + + public IntArrayFIFOQueue(int capacity) { + if (capacity > 2147483638) { + throw new IllegalArgumentException( + "Initial capacity (" + capacity + ") exceeds " + 2147483638); + } else if (capacity < 0) { + throw new IllegalArgumentException("Initial capacity (" + capacity + ") is negative"); + } else { + this.array = new int[Math.max(1, capacity + 1)]; + this.length = this.array.length; + } + } + + public IntArrayFIFOQueue() { + this(4); + } + + public IntComparator comparator() { + return null; + } + + public int dequeueInt() { + if (this.start == this.end) { + throw new NoSuchElementException(); + } else { + int t = this.array[this.start]; + if (++this.start == this.length) { + this.start = 0; + } + + this.reduce(); + return t; + } + } + + public int dequeueLastInt() { + if (this.start == this.end) { + throw new NoSuchElementException(); + } else { + if (this.end == 0) { + this.end = this.length; + } + + int t = this.array[--this.end]; + this.reduce(); + return t; + } + } + + private final void resize(int size, int newLength) { + int[] newArray = new int[newLength]; + if (this.start >= this.end) { + if (size != 0) { + System.arraycopy(this.array, this.start, newArray, 0, this.length - this.start); + System.arraycopy(this.array, 0, newArray, this.length - this.start, this.end); + } + } else { + System.arraycopy(this.array, this.start, newArray, 0, this.end - this.start); + } + + this.start = 0; + this.end = size; + this.array = newArray; + this.length = newLength; + } + + private final void expand() { + this.resize(this.length, (int) Math.min(2147483639L, 2L * (long) this.length)); + } + + private final void reduce() { + int size = this.size(); + if (this.length > 4 && size <= this.length / 4) { + this.resize(size, this.length / 2); + } + } + + public void enqueue(int x) { + this.array[this.end++] = x; + if (this.end == this.length) { + this.end = 0; + } + + if (this.end == this.start) { + this.expand(); + } + } + + public void enqueueFirst(int x) { + if (this.start == 0) { + this.start = this.length; + } + + this.array[--this.start] = x; + if (this.end == this.start) { + this.expand(); + } + } + + public int firstInt() { + if (this.start == this.end) { + throw new NoSuchElementException(); + } else { + return this.array[this.start]; + } + } + + public int lastInt() { + if (this.start == this.end) { + throw new NoSuchElementException(); + } else { + return this.array[(this.end == 0 ? this.length : this.end) - 1]; + } + } + + public void clear() { + this.start = this.end = 0; + } + + public void trim() { + int size = this.size(); + int[] newArray = new int[size + 1]; + if (this.start <= this.end) { + System.arraycopy(this.array, this.start, newArray, 0, this.end - this.start); + } else { + System.arraycopy(this.array, this.start, newArray, 0, this.length - this.start); + System.arraycopy(this.array, 0, newArray, this.length - this.start, this.end); + } + + this.start = 0; + this.length = (this.end = size) + 1; + this.array = newArray; + } + + public int size() { + int apparentLength = this.end - this.start; + return apparentLength >= 0 ? apparentLength : this.length + apparentLength; + } + + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + int size = this.size(); + s.writeInt(size); + int i = this.start; + + while (size-- != 0) { + s.writeInt(this.array[i++]); + if (i == this.length) { + i = 0; + } + } + } + + private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { + s.defaultReadObject(); + this.end = s.readInt(); + this.array = new int[this.length = nextPowerOfTwo(this.end + 1)]; + + for (int i = 0; i < this.end; ++i) { + this.array[i] = s.readInt(); + } + } + + private static int nextPowerOfTwo(int x) { + return 1 << 32 - Integer.numberOfLeadingZeros(x - 1); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java new file mode 100644 index 0000000000000..b87053254d948 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/LongBigArrayFIFOQueue.java @@ -0,0 +1,178 @@ +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array; + +import java.util.NoSuchElementException; + +import static java.lang.Math.toIntExact; +import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance; + +public class LongBigArrayFIFOQueue { + private static final long INSTANCE_SIZE = shallowSizeOfInstance(LongBigArrayFIFOQueue.class); + + /** The standard initial capacity of a queue. */ + public static final long INITIAL_CAPACITY = BigArrays.SEGMENT_SIZE; + + /** The backing array. */ + protected LongBigArray array; + + /** The current (cached) length of {@link #array}. */ + protected long length; + + /** The start position in {@link #array}. It is always strictly smaller than {@link #length}. */ + protected long start; + + /** + * The end position in {@link #array}. It is always strictly smaller than {@link #length}. Might + * be actually smaller than {@link #start} because {@link #array} is used cyclically. + */ + protected long end; + + /** + * Creates a new empty queue with given capacity. + * + * @param capacity the initial capacity of this queue. + */ + public LongBigArrayFIFOQueue(final long capacity) { + if (capacity < 0) { + throw new IllegalArgumentException("Initial capacity (" + capacity + ") is negative"); + } + array = new LongBigArray(); + length = + Math.max(INITIAL_CAPACITY, capacity); // Never build a queue smaller than INITIAL_CAPACITY + array.ensureCapacity(length); + } + + /** Creates a new empty queue with standard {@linkplain #INITIAL_CAPACITY initial capacity}. */ + public LongBigArrayFIFOQueue() { + this(INITIAL_CAPACITY); + } + + public long sizeOf() { + return INSTANCE_SIZE + array.sizeOf(); + } + + public long dequeueLong() { + if (start == end) { + throw new NoSuchElementException(); + } + final long t = array.get(start); + if (++start == length) { + start = 0; + } + reduce(); + return t; + } + + public long dequeueLastLong() { + if (start == end) { + throw new NoSuchElementException(); + } + if (end == 0) { + end = length; + } + final long t = array.get(--end); + reduce(); + return t; + } + + private void resize(final long size, final long newLength) { + final LongBigArray newArray = new LongBigArray(); + newArray.ensureCapacity(newLength); + if (start >= end) { + if (size != 0) { + array.copyTo(start, newArray, 0, length - start); + array.copyTo(0, newArray, length - start, end); + } + } else { + array.copyTo(start, newArray, 0, end - start); + } + start = 0; + end = size; + array = newArray; + length = newLength; + } + + private void expand() { + resize(length, 2L * length); + } + + private void reduce() { + final long size = longSize(); + if (length > INITIAL_CAPACITY && size <= length / 4) { + resize(size, length / 2); + } + } + + public void enqueue(long x) { + array.set(end++, x); + if (end == length) { + end = 0; + } + if (end == start) { + expand(); + } + } + + /** + * Enqueues a new element as the first element (in dequeuing order) of the queue. + * + * @param x the element to enqueue. + */ + public void enqueueFirst(long x) { + if (start == 0) { + start = length; + } + array.set(--start, x); + if (end == start) { + expand(); + } + } + + public long firstLong() { + if (start == end) { + throw new NoSuchElementException(); + } + return array.get(start); + } + + public long lastLong() { + if (start == end) { + throw new NoSuchElementException(); + } + return array.get((end == 0 ? length : end) - 1); + } + + public void clear() { + end = 0; + start = 0; + } + + /** Trims the queue to the smallest possible size. */ + public void trim() { + final long size = longSize(); + final LongBigArray newArray = new LongBigArray(); + newArray.ensureCapacity(size + 1); + if (start <= end) { + array.copyTo(start, newArray, 0, end - start); + } else { + array.copyTo(start, newArray, 0, length - start); + array.copyTo(0, newArray, length - start, end); + } + start = 0; + end = size; + length = size + 1; + array = newArray; + } + + public int size() { + return toIntExact(longSize()); + } + + public long longSize() { + final long apparentLength = end - start; + return apparentLength >= 0 ? apparentLength : length + apparentLength; + } + + public boolean isEmpty() { + return end == start; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java new file mode 100644 index 0000000000000..3d530c809f929 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/NoChannelGroupByHash.java @@ -0,0 +1,53 @@ +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +public class NoChannelGroupByHash implements GroupByHash { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(NoChannelGroupByHash.class); + + private int groupCount; + + @Override + public long getEstimatedSize() { + return INSTANCE_SIZE; + } + + @Override + public int getGroupCount() { + return groupCount; + } + + @Override + public void appendValuesTo(int groupId, TsBlockBuilder pageBuilder) { + throw new UnsupportedOperationException("NoChannelGroupByHash does not support appendValuesTo"); + } + + @Override + public void addPage(Column[] groupedColumns) { + updateGroupCount(groupedColumns); + } + + @Override + public int[] getGroupIds(Column[] groupedColumns) { + return new int[groupedColumns[0].getPositionCount()]; + } + + @Override + public long getRawHash(int groupId) { + throw new UnsupportedOperationException("NoChannelGroupByHash does not support getRawHash"); + } + + @Override + public int getCapacity() { + return 2; + } + + private void updateGroupCount(Column[] columns) { + if (columns[0].getPositionCount() > 0 && groupCount == 0) { + groupCount = 1; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 56ad6d59ba0e7..6ab628a64b713 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -316,6 +316,9 @@ public enum PlanNodeType { TABLE_UNION_NODE((short) 1034), TABLE_INTERSECT_NODE((short) 1035), TABLE_EXCEPT_NODE((short) 1036), + TABLE_TOPK_RANKING_NODE((short) 1037), + TABLE_ROW_NUMBER_NODE((short) 1038), + TABLE_VALUES_NODE((short) 1039), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index ea669491a5f19..5658cf0412921 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -132,6 +132,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -773,6 +774,16 @@ public R visitTopK( return visitMultiChildProcess(node, context); } + public R visitTopKRanking( + org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode node, + C context) { + return visitMultiChildProcess(node, context); + } + + public R visitRowNumber(RowNumberNode node, C context) { + return visitSingleChildProcess(node, context); + } + public R visitJoin( org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode node, C context) { return visitTwoChildProcess(node, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java new file mode 100644 index 0000000000000..3e301f8e3a3e6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/GatherAndMergeWindows.java @@ -0,0 +1,315 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.PropertyPattern; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictOutputs; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.transpose; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.project; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +public class GatherAndMergeWindows { + private GatherAndMergeWindows() {} + + public static Set> rules() { + // TODO convert to a pattern that allows for a sequence of ProjectNode, instead + // of a canned number, once the pattern system supports it. + return IntStream.range(0, 5) + .boxed() + .flatMap( + numProjects -> + Stream.of( + new MergeAdjacentWindowsOverProjects(numProjects), + new SwapAdjacentWindowsBySpecifications(numProjects))) + .collect(toImmutableSet()); + } + + private abstract static class ManipulateAdjacentWindowsOverProjects implements Rule { + private final Capture childCapture = newCapture(); + private final List> projectCaptures; + private final Pattern pattern; + + protected ManipulateAdjacentWindowsOverProjects(int numProjects) { + PropertyPattern childPattern = + source().matching(window().capturedAs(childCapture)); + ImmutableList.Builder> projectCapturesBuilder = ImmutableList.builder(); + for (int i = 0; i < numProjects; ++i) { + Capture projectCapture = newCapture(); + projectCapturesBuilder.add(projectCapture); + childPattern = source().matching(project().capturedAs(projectCapture).with(childPattern)); + } + this.projectCaptures = projectCapturesBuilder.build(); + this.pattern = window().with(childPattern); + } + + @Override + public Pattern getPattern() { + return pattern; + } + + @Override + public Result apply(WindowNode parent, Captures captures, Context context) { + // Pulling the descendant WindowNode above projects is done as a part of this rule, as opposed + // in a + // separate rule, because that pullup is not useful on its own, and could be undone by other + // rules. + // For example, a rule could insert a project-off node between adjacent WindowNodes that use + // different + // input symbols. + List projects = + projectCaptures.stream().map(captures::get).collect(toImmutableList()); + + return pullWindowNodeAboveProjects(captures.get(childCapture), projects) + .flatMap(newChild -> manipulateAdjacentWindowNodes(parent, newChild, context)) + .map(Result::ofPlanNode) + .orElse(Result.empty()); + } + + protected abstract Optional manipulateAdjacentWindowNodes( + WindowNode parent, WindowNode child, Context context); + + /** + * Looks for the pattern (ProjectNode*)WindowNode, and rewrites it to WindowNode(ProjectNode*), + * returning an empty option if it can't rewrite the projects, for example because they rely on + * the output of the WindowNode. + * + * @param projects the nodes above the target, bottom first. + */ + protected static Optional pullWindowNodeAboveProjects( + WindowNode target, List projects) { + if (projects.isEmpty()) { + return Optional.of(target); + } + + PlanNode targetChild = target.getChild(); + + Set targetInputs = ImmutableSet.copyOf(targetChild.getOutputSymbols()); + Set targetOutputs = ImmutableSet.copyOf(target.getOutputSymbols()); + + PlanNode newTargetChild = targetChild; + + for (ProjectNode project : projects) { + Set newTargetChildOutputs = ImmutableSet.copyOf(newTargetChild.getOutputSymbols()); + + // The only kind of use of the output of the target that we can safely ignore is a simple + // identity propagation. + // The target node, when hoisted above the projections, will provide the symbols directly. + Map assignmentsWithoutTargetOutputIdentities = + Maps.filterKeys( + project.getAssignments().getMap(), + output -> + !(project.getAssignments().isIdentity(output) + && targetOutputs.contains(output))); + + if (targetInputs.stream().anyMatch(assignmentsWithoutTargetOutputIdentities::containsKey)) { + // Redefinition of an input to the target -- can't handle this case. + return Optional.empty(); + } + + Assignments newAssignments = + Assignments.builder() + .putAll(assignmentsWithoutTargetOutputIdentities) + .putIdentities(targetInputs) + .build(); + + if (!newTargetChildOutputs.containsAll( + SymbolsExtractor.extractUnique(newAssignments.getExpressions()))) { + // Projection uses an output of the target -- can't move the target above this projection. + return Optional.empty(); + } + + newTargetChild = new ProjectNode(project.getPlanNodeId(), newTargetChild, newAssignments); + } + + WindowNode newTarget = (WindowNode) target.replaceChildren(ImmutableList.of(newTargetChild)); + Set newTargetOutputs = ImmutableSet.copyOf(newTarget.getOutputSymbols()); + if (!newTargetOutputs.containsAll(projects.get(projects.size() - 1).getOutputSymbols())) { + // The new target node is hiding some of the projections, which makes this rewrite + // incorrect. + return Optional.empty(); + } + return Optional.of(newTarget); + } + } + + public static class MergeAdjacentWindowsOverProjects + extends ManipulateAdjacentWindowsOverProjects { + public MergeAdjacentWindowsOverProjects(int numProjects) { + super(numProjects); + } + + @Override + protected Optional manipulateAdjacentWindowNodes( + WindowNode parent, WindowNode child, Context context) { + if (!child.getSpecification().equals(parent.getSpecification()) || dependsOn(parent, child)) { + return Optional.empty(); + } + + ImmutableMap.Builder functionsBuilder = ImmutableMap.builder(); + functionsBuilder.putAll(parent.getWindowFunctions()); + functionsBuilder.putAll(child.getWindowFunctions()); + + WindowNode mergedWindowNode = + new WindowNode( + parent.getPlanNodeId(), + child.getChild(), + parent.getSpecification(), + functionsBuilder.buildOrThrow(), + parent.getHashSymbol(), + parent.getPrePartitionedInputs(), + parent.getPreSortedOrderPrefix()); + + return Optional.of( + restrictOutputs( + context.getIdAllocator(), + mergedWindowNode, + ImmutableSet.copyOf(parent.getOutputSymbols())) + .orElse(mergedWindowNode)); + } + } + + public static class SwapAdjacentWindowsBySpecifications + extends ManipulateAdjacentWindowsOverProjects { + public SwapAdjacentWindowsBySpecifications(int numProjects) { + super(numProjects); + } + + @Override + protected Optional manipulateAdjacentWindowNodes( + WindowNode parent, WindowNode child, Context context) { + if ((compare(parent, child) < 0) && (!dependsOn(parent, child))) { + PlanNode transposedWindows = transpose(parent, child); + return Optional.of( + restrictOutputs( + context.getIdAllocator(), + transposedWindows, + ImmutableSet.copyOf(parent.getOutputSymbols())) + .orElse(transposedWindows)); + } + return Optional.empty(); + } + + private static int compare(WindowNode o1, WindowNode o2) { + int comparison = comparePartitionBy(o1, o2); + if (comparison != 0) { + return comparison; + } + + comparison = compareOrderBy(o1, o2); + if (comparison != 0) { + return comparison; + } + + // If PartitionBy and OrderBy clauses are identical, let's establish an arbitrary order to + // prevent non-deterministic results of swapping WindowNodes in such a case + return o1.getPlanNodeId().toString().compareTo(o2.getPlanNodeId().toString()); + } + + private static int comparePartitionBy(WindowNode o1, WindowNode o2) { + Iterator iterator1 = o1.getSpecification().getPartitionBy().iterator(); + Iterator iterator2 = o2.getSpecification().getPartitionBy().iterator(); + + while (iterator1.hasNext() && iterator2.hasNext()) { + Symbol symbol1 = iterator1.next(); + Symbol symbol2 = iterator2.next(); + + int partitionByComparison = symbol1.compareTo(symbol2); + if (partitionByComparison != 0) { + return partitionByComparison; + } + } + + if (iterator1.hasNext()) { + return 1; + } + if (iterator2.hasNext()) { + return -1; + } + return 0; + } + + private static int compareOrderBy(WindowNode o1, WindowNode o2) { + if (!o1.getSpecification().getOrderingScheme().isPresent() + && !o2.getSpecification().getOrderingScheme().isPresent()) { + return 0; + } + if (o1.getSpecification().getOrderingScheme().isPresent() + && !o2.getSpecification().getOrderingScheme().isPresent()) { + return 1; + } + if (!o1.getSpecification().getOrderingScheme().isPresent() + && o2.getSpecification().getOrderingScheme().isPresent()) { + return -1; + } + + OrderingScheme o1OrderingScheme = o1.getSpecification().getOrderingScheme().get(); + OrderingScheme o2OrderingScheme = o2.getSpecification().getOrderingScheme().get(); + Iterator iterator1 = o1OrderingScheme.getOrderBy().iterator(); + Iterator iterator2 = o2OrderingScheme.getOrderBy().iterator(); + + while (iterator1.hasNext() && iterator2.hasNext()) { + Symbol symbol1 = iterator1.next(); + Symbol symbol2 = iterator2.next(); + + int orderByComparison = symbol1.compareTo(symbol2); + if (orderByComparison != 0) { + return orderByComparison; + } + int sortOrderComparison = + o1OrderingScheme.getOrdering(symbol1).compareTo(o2OrderingScheme.getOrdering(symbol2)); + if (sortOrderComparison != 0) { + return sortOrderComparison; + } + } + + if (iterator1.hasNext()) { + return 1; + } + if (iterator2.hasNext()) { + return -1; + } + return 0; + } + } + + private static boolean dependsOn(WindowNode parent, WindowNode child) { + return parent.getSpecification().getPartitionBy().stream() + .anyMatch(child.getCreatedSymbols()::contains) + || (parent.getSpecification().getOrderingScheme().isPresent() + && parent.getSpecification().getOrderingScheme().get().getOrderBy().stream() + .anyMatch(child.getCreatedSymbols()::contains)) + || parent.getWindowFunctions().values().stream() + .map(SymbolsExtractor::extractUnique) + .flatMap(Collection::stream) + .anyMatch(child.getCreatedSymbols()::contains); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java new file mode 100644 index 0000000000000..ad388c09bdab5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java @@ -0,0 +1,147 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValuesNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; + +import java.util.Optional; +import java.util.OptionalInt; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +public class PushDownFilterIntoWindow implements Rule { + private static final Capture childCapture = newCapture(); + + private final Pattern pattern; + private final PlannerContext plannerContext; + + public PushDownFilterIntoWindow(PlannerContext plannerContext) { + this.plannerContext = requireNonNull(plannerContext, "plannerContext is null"); + this.pattern = + filter() + .with( + source() + .matching( + window() + .matching( + window -> window.getSpecification().getOrderingScheme().isPresent()) + .matching(window -> toTopNRankingType(window).isPresent()) + .capturedAs(childCapture))); + } + + @Override + public Pattern getPattern() { + return pattern; + } + + @Override + public Result apply(FilterNode node, Captures captures, Context context) { + WindowNode windowNode = captures.get(childCapture); + Optional rankingType = toTopNRankingType(windowNode); + Symbol rankingSymbol = getOnlyElement(windowNode.getWindowFunctions().keySet()); + + OptionalInt upperBound = extractUpperBoundFromComparison(node.getPredicate(), rankingSymbol); + + if (!upperBound.isPresent()) { + return Result.empty(); + } + + if (upperBound.getAsInt() <= 0) { + return Result.ofPlanNode( + new ValuesNode(node.getPlanNodeId(), node.getOutputSymbols(), ImmutableList.of())); + } + + TopKRankingNode newSource = + new TopKRankingNode( + windowNode.getPlanNodeId(), + windowNode.getChildren(), + windowNode.getSpecification(), + rankingType.get(), + rankingSymbol, + upperBound.getAsInt(), + false); + + if (needToKeepFilter(node.getPredicate(), rankingSymbol, upperBound.getAsInt())) { + return Result.ofPlanNode( + new FilterNode(node.getPlanNodeId(), newSource, node.getPredicate())); + } + + return Result.ofPlanNode(newSource); + } + + private OptionalInt extractUpperBoundFromComparison(Expression predicate, Symbol rankingSymbol) { + if (!(predicate instanceof ComparisonExpression)) { + return OptionalInt.empty(); + } + + ComparisonExpression comparison = (ComparisonExpression) predicate; + Expression left = comparison.getLeft(); + Expression right = comparison.getRight(); + + if (!(left instanceof SymbolReference) || !(right instanceof Literal)) { + return OptionalInt.empty(); + } + + SymbolReference symbolRef = (SymbolReference) left; + if (!symbolRef.getName().equals(rankingSymbol.getName())) { + return OptionalInt.empty(); + } + + Literal literal = (Literal) right; + Object value = literal.getTsValue(); + if (!(value instanceof Number)) { + return OptionalInt.empty(); + } + + long constantValue = ((Number) value).longValue(); + + switch (comparison.getOperator()) { + case LESS_THAN: + return OptionalInt.of(toIntExact(constantValue - 1)); + case LESS_THAN_OR_EQUAL: + case EQUAL: + return OptionalInt.of(toIntExact(constantValue)); + default: + return OptionalInt.empty(); + } + } + + private boolean needToKeepFilter(Expression predicate, Symbol rankingSymbol, int upperBound) { + if (!(predicate instanceof ComparisonExpression)) { + return true; + } + + ComparisonExpression comparison = (ComparisonExpression) predicate; + + if (comparison.getOperator() == ComparisonExpression.Operator.EQUAL) { + return true; + } + + if (comparison.getOperator() == ComparisonExpression.Operator.LESS_THAN_OR_EQUAL + || comparison.getOperator() == ComparisonExpression.Operator.LESS_THAN) { + return false; + } + + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java new file mode 100644 index 0000000000000..94e52c5caa57d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java @@ -0,0 +1,81 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; + +import java.util.Optional; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.lang.Math.toIntExact; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode.RankingType.ROW_NUMBER; +import static org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture; + +public class PushDownLimitIntoWindow implements Rule { + private static final Capture childCapture = newCapture(); + private final Pattern pattern; + + public PushDownLimitIntoWindow() { + this.pattern = + limit() + .matching( + limit -> + !limit.isWithTies() + && limit.getCount() != 0 + && limit.getCount() <= Integer.MAX_VALUE + && !limit.requiresPreSortedInputs()) + .with( + source() + .matching( + window() + .matching( + window -> window.getSpecification().getOrderingScheme().isPresent()) + .matching(window -> toTopNRankingType(window).isPresent()) + .capturedAs(childCapture))); + } + + // TODO: isOptimizeTopNRanking(session); + @Override + public boolean isEnabled(SessionInfo session) { + return true; + } + + @Override + public Pattern getPattern() { + return pattern; + } + + @Override + public Result apply(LimitNode node, Captures captures, Context context) { + WindowNode source = captures.get(childCapture); + + Optional rankingType = toTopNRankingType(source); + + int limit = toIntExact(node.getCount()); + TopKRankingNode topNRowNumberNode = + new TopKRankingNode( + source.getPlanNodeId(), + source.getChildren(), + source.getSpecification(), + rankingType.get(), + getOnlyElement(source.getWindowFunctions().keySet()), + limit, + false); + if (rankingType.get() == ROW_NUMBER && source.getSpecification().getPartitionBy().isEmpty()) { + return Result.ofPlanNode(topNRowNumberNode); + } + return Result.ofPlanNode(replaceChildren(node, ImmutableList.of(topNRowNumberNode))); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java new file mode 100644 index 0000000000000..6a6b18c8a9f0b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/RemoveRedundantWindow.java @@ -0,0 +1,26 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; + +public class RemoveRedundantWindow implements Rule { + private static final Pattern PATTERN = window(); + + @Override + public Pattern getPattern() { + return PATTERN; + } + + @Override + public Result apply(WindowNode window, Captures captures, Context context) { + // if (isEmpty(window.getChild(), context.getLookup())) { + // return Result.ofPlanNode(new ValuesNode(window.getPlanNodeId(), + // window.getOutputSymbols(), ImmutableList.of())); + // } + return Result.empty(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java new file mode 100644 index 0000000000000..4c7778068001a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java @@ -0,0 +1,43 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; + +public class ReplaceWindowWithRowNumber implements Rule { + private final Pattern pattern; + + public ReplaceWindowWithRowNumber(Metadata metadata) { + this.pattern = + window() + .matching( + window -> { + if (window.getWindowFunctions().size() != 1) { + return false; + } + BoundSignature signature = + getOnlyElement(window.getWindowFunctions().values()) + .getResolvedFunction() + .getSignature(); + return signature.getArgumentTypes().isEmpty() + && signature.getName().equals("row_number"); + }) + .matching(window -> !window.getSpecification().getOrderingScheme().isPresent()); + } + + @Override + public Pattern getPattern() { + return pattern; + } + + @Override + public Result apply(WindowNode node, Captures captures, Context context) { + return null; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java index 7447508ab1e8e..5845ffd422105 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java @@ -20,10 +20,13 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import com.google.common.collect.ImmutableList; @@ -37,12 +40,11 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode.RankingType.RANK; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode.RankingType.ROW_NUMBER; final class Util { - // private static final CatalogSchemaFunctionName ROW_NUMBER_NAME = - // builtinFunctionName("row_number"); - // private static final CatalogSchemaFunctionName RANK_NAME = builtinFunctionName("rank"); - private Util() {} /** @@ -121,22 +123,23 @@ public static Optional restrictChildOutputs( return Optional.of(node.replaceChildren(newChildrenBuilder.build())); } - /*public static Optional toTopNRankingType(WindowNode node) - { - if (node.getWindowFunctions().size() != 1 || node.getOrderingScheme().isEmpty()) { - return Optional.empty(); - } - - BoundSignature signature = getOnlyElement(node.getWindowFunctions().values()).getResolvedFunction().getSignature(); - if (!signature.getArgumentTypes().isEmpty()) { - return Optional.empty(); - } - if (signature.getName().equals(ROW_NUMBER_NAME)) { - return Optional.of(ROW_NUMBER); - } - if (signature.getName().equals(RANK_NAME)) { - return Optional.of(RANK); - } + public static Optional toTopNRankingType(WindowNode node) { + if (node.getWindowFunctions().size() != 1 + || !node.getSpecification().getOrderingScheme().isPresent()) { + return Optional.empty(); + } + + BoundSignature signature = + getOnlyElement(node.getWindowFunctions().values()).getResolvedFunction().getSignature(); + if (!signature.getArgumentTypes().isEmpty()) { return Optional.empty(); - }*/ + } + if (signature.getName().equals("row_number")) { + return Optional.of(ROW_NUMBER); + } + if (signature.getName().equals("rank")) { + return Optional.of(RANK); + } + return Optional.empty(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java new file mode 100644 index 0000000000000..9c8c3408f4c91 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java @@ -0,0 +1,177 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; + +public class RowNumberNode extends SingleChildProcessNode { + private final List partitionBy; + /* + * This flag indicates that the node depends on the row order established by the subplan. + * It is taken into account while adding local exchanges to the plan, ensuring that sorted order + * of data will be respected. + * Note: if the subplan doesn't produce sorted output, this flag doesn't change the resulting plan. + * Note: this flag is used for planning of queries involving ORDER BY and OFFSET. + */ + private final boolean orderSensitive; + private final Optional maxRowCountPerPartition; + private final Symbol rowNumberSymbol; + + public RowNumberNode( + PlanNodeId id, + List partitionBy, + boolean orderSensitive, + Symbol rowNumberSymbol, + Optional maxRowCountPerPartition) { + super(id); + + this.partitionBy = ImmutableList.copyOf(partitionBy); + this.orderSensitive = orderSensitive; + this.rowNumberSymbol = rowNumberSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + public RowNumberNode( + PlanNodeId id, + PlanNode child, + List partitionBy, + boolean orderSensitive, + Symbol rowNumberSymbol, + Optional maxRowCountPerPartition) { + super(id, child); + + this.partitionBy = ImmutableList.copyOf(partitionBy); + this.orderSensitive = orderSensitive; + this.rowNumberSymbol = rowNumberSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + @Override + public PlanNode clone() { + return new RowNumberNode( + getPlanNodeId(), partitionBy, orderSensitive, rowNumberSymbol, maxRowCountPerPartition); + } + + public R accept(PlanVisitor visitor, C context) { + return visitor.visitRowNumber(this, context); + } + + @Override + public List getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_ROW_NUMBER_NODE.serialize(byteBuffer); + ReadWriteIOUtils.write(partitionBy.size(), byteBuffer); + for (Symbol symbol : partitionBy) { + Symbol.serialize(symbol, byteBuffer); + } + ReadWriteIOUtils.write(orderSensitive, byteBuffer); + Symbol.serialize(rowNumberSymbol, byteBuffer); + if (maxRowCountPerPartition.isPresent()) { + ReadWriteIOUtils.write(true, byteBuffer); + ReadWriteIOUtils.write(maxRowCountPerPartition.get(), byteBuffer); + } else { + ReadWriteIOUtils.write(false, byteBuffer); + } + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_ROW_NUMBER_NODE.serialize(stream); + ReadWriteIOUtils.write(partitionBy.size(), stream); + for (Symbol symbol : partitionBy) { + Symbol.serialize(symbol, stream); + } + ReadWriteIOUtils.write(orderSensitive, stream); + Symbol.serialize(rowNumberSymbol, stream); + if (maxRowCountPerPartition.isPresent()) { + ReadWriteIOUtils.write(true, stream); + ReadWriteIOUtils.write(maxRowCountPerPartition.get(), stream); + } else { + ReadWriteIOUtils.write(false, stream); + } + } + + public static RowNumberNode deserialize(ByteBuffer buffer) { + int partitionBySize = ReadWriteIOUtils.readInt(buffer); + ImmutableList.Builder partitionBy = ImmutableList.builder(); + for (int i = 0; i < partitionBySize; i++) { + partitionBy.add(Symbol.deserialize(buffer)); + } + boolean orderSensitive = ReadWriteIOUtils.readBoolean(buffer); + Symbol rowNumberSymbol = Symbol.deserialize(buffer); + Optional maxRowCountPerPartition; + if (ReadWriteIOUtils.readBoolean(buffer)) { + maxRowCountPerPartition = Optional.of(ReadWriteIOUtils.readInt(buffer)); + } else { + maxRowCountPerPartition = Optional.empty(); + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); + return new RowNumberNode( + planNodeId, partitionBy.build(), orderSensitive, rowNumberSymbol, maxRowCountPerPartition); + } + + @Override + public List getOutputSymbols() { + return Collections.singletonList(rowNumberSymbol); + } + + @Override + public PlanNode replaceChildren(List newChildren) { + checkArgument(newChildren.size() == 1, "wrong number of new children"); + return new RowNumberNode( + id, + newChildren.get(0), + partitionBy, + orderSensitive, + rowNumberSymbol, + maxRowCountPerPartition); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + RowNumberNode node = (RowNumberNode) o; + + if (node.partitionBy.size() != partitionBy.size()) return false; + for (int i = 0; i < partitionBy.size(); i++) { + if (!node.partitionBy.get(i).equals(partitionBy.get(i))) return false; + } + return Objects.equal(orderSensitive, node.orderSensitive) + && Objects.equal(rowNumberSymbol, node.rowNumberSymbol) + && Objects.equal(maxRowCountPerPartition, node.maxRowCountPerPartition); + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.hashCode(), partitionBy, orderSensitive, rowNumberSymbol, maxRowCountPerPartition); + } + + @Override + public String toString() { + return "RowNumber-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java new file mode 100644 index 0000000000000..1e5934f6ab889 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java @@ -0,0 +1,168 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.base.Objects; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +public class TopKRankingNode extends MultiChildProcessNode { + public enum RankingType { + ROW_NUMBER, + RANK, + DENSE_RANK + } + + private final DataOrganizationSpecification specification; + private final RankingType rankingType; + private final Symbol rankingSymbol; + private final int maxRankingPerPartition; + private final boolean partial; + + public TopKRankingNode( + PlanNodeId id, + DataOrganizationSpecification specification, + RankingType rankingType, + Symbol rankingSymbol, + int maxRankingPerPartition, + boolean partial) { + super(id); + + this.specification = specification; + this.rankingType = rankingType; + this.rankingSymbol = rankingSymbol; + this.maxRankingPerPartition = maxRankingPerPartition; + this.partial = partial; + } + + public TopKRankingNode( + PlanNodeId id, + List children, + DataOrganizationSpecification specification, + RankingType rankingType, + Symbol rankingSymbol, + int maxRankingPerPartition, + boolean partial) { + super(id, children); + + this.specification = specification; + this.rankingType = rankingType; + this.rankingSymbol = rankingSymbol; + this.maxRankingPerPartition = maxRankingPerPartition; + this.partial = partial; + } + + @Override + public PlanNode clone() { + return new TopKRankingNode( + getPlanNodeId(), + specification, + rankingType, + rankingSymbol, + maxRankingPerPartition, + partial); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitTopKRanking(this, context); + } + + @Override + public List getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_TOPK_RANKING_NODE.serialize(byteBuffer); + specification.serialize(byteBuffer); + ReadWriteIOUtils.write(rankingType.ordinal(), byteBuffer); + Symbol.serialize(rankingSymbol, byteBuffer); + ReadWriteIOUtils.write(maxRankingPerPartition, byteBuffer); + ReadWriteIOUtils.write(partial, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_TOPK_RANKING_NODE.serialize(stream); + specification.serialize(stream); + ReadWriteIOUtils.write(rankingType.ordinal(), stream); + Symbol.serialize(rankingSymbol, stream); + ReadWriteIOUtils.write(maxRankingPerPartition, stream); + ReadWriteIOUtils.write(partial, stream); + } + + public static TopKRankingNode deserialize(ByteBuffer byteBuffer) { + DataOrganizationSpecification specification = + DataOrganizationSpecification.deserialize(byteBuffer); + RankingType rankingType = RankingType.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + Symbol rankingSymbol = Symbol.deserialize(byteBuffer); + int maxRankingPerPartition = ReadWriteIOUtils.readInt(byteBuffer); + boolean partial = ReadWriteIOUtils.readBoolean(byteBuffer); + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new TopKRankingNode( + planNodeId, specification, rankingType, rankingSymbol, maxRankingPerPartition, partial); + } + + @Override + public List getOutputSymbols() { + return Collections.singletonList(rankingSymbol); + } + + @Override + public PlanNode replaceChildren(List newChildren) { + checkArgument(children.size() == newChildren.size(), "wrong number of new children"); + return new TopKRankingNode( + id, + newChildren, + specification, + rankingType, + rankingSymbol, + maxRankingPerPartition, + partial); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + TopKRankingNode rankingNode = (TopKRankingNode) o; + return Objects.equal(specification, rankingNode.specification) + && Objects.equal(rankingType, rankingNode.rankingType) + && Objects.equal(rankingSymbol, rankingNode.rankingSymbol) + && Objects.equal(maxRankingPerPartition, rankingNode.maxRankingPerPartition) + && Objects.equal(partial, rankingNode.partial); + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.hashCode(), + specification, + rankingType, + rankingSymbol, + maxRankingPerPartition, + partial); + } + + @Override + public String toString() { + return "TopKRankingNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java new file mode 100644 index 0000000000000..7a2a971e6af9a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ValuesNode.java @@ -0,0 +1,211 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.Objects.requireNonNull; + +public class ValuesNode extends SourceNode { + private final List outputSymbols; + private final int rowCount; + // If ValuesNode produces output symbols, each row in ValuesNode is represented by a single + // expression in `rows` list. + // It can be an expression of type Row or any other expression that evaluates to RowType. + // In case when output symbols are present but ValuesNode does not have any rows, `rows` is an + // Optional with empty list. + // If ValuesNode does not produce any output symbols, `rows` is Optional.empty(). + private final Optional> rows; + + protected TRegionReplicaSet regionReplicaSet; + + /** Constructor of ValuesNode with non-empty output symbols list */ + public ValuesNode(PlanNodeId id, List outputSymbols, List rows) { + this(id, outputSymbols, rows.size(), Optional.of(rows)); + } + + /** Constructor of ValuesNode with empty output symbols list */ + public ValuesNode(PlanNodeId id, int rowCount) { + this(id, ImmutableList.of(), rowCount, Optional.empty()); + } + + public ValuesNode( + PlanNodeId id, List outputSymbols, int rowCount, Optional> rows) { + super(id); + this.outputSymbols = + ImmutableList.copyOf(requireNonNull(outputSymbols, "outputSymbols is null")); + this.rowCount = rowCount; + + requireNonNull(rows, "rows is null"); + if (rows.isPresent()) { + checkArgument( + rowCount == rows.get().size(), + "declared and actual row counts don't match: %s vs %s", + rowCount, + rows.get().size()); + + // check row size consistency (only for rows specified as Row) + List rowSizes = + rows.get().stream() + .map(row -> requireNonNull(row, "row is null")) + .filter(expression -> expression instanceof Row) + .map(expression -> ((Row) expression).getItems().size()) + .distinct() + .collect(toImmutableList()); + checkState(rowSizes.size() <= 1, "mismatched rows. All rows must be the same size"); + + // check if row size matches the number of output symbols (only for rows specified as Row) + if (rowSizes.size() == 1) { + checkState( + getOnlyElement(rowSizes).equals(outputSymbols.size()), + "row size doesn't match the number of output symbols: %s vs %s", + getOnlyElement(rowSizes), + outputSymbols.size()); + } + } else { + checkArgument( + outputSymbols.isEmpty(), + "missing rows specification for Values with non-empty output symbols"); + } + + if (outputSymbols.isEmpty()) { + this.rows = Optional.empty(); + } else { + this.rows = rows.map(ImmutableList::copyOf); + } + } + + @Override + public List getChildren() { + return ImmutableList.of(); + } + + @Override + public void addChild(PlanNode child) {} + + @Override + public PlanNode clone() { + return new ValuesNode( + getPlanNodeId(), outputSymbols, rowCount, rows.map(ImmutableList::copyOf)); + } + + @Override + public int allowedChildCount() { + return 0; + } + + @Override + public List getOutputColumnNames() { + return outputSymbols.stream().map(Symbol::getName).collect(toImmutableList()); + } + + @Override + public void serializeAttributes(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer); + outputSymbols.forEach(symbol -> ReadWriteIOUtils.write(symbol.getName(), byteBuffer)); + ReadWriteIOUtils.write(rowCount, byteBuffer); + if (rows.isPresent()) { + ReadWriteIOUtils.write(true, byteBuffer); + ReadWriteIOUtils.write(rows.get().size(), byteBuffer); + for (Expression expression : rows.get()) { + Expression.serialize(expression, byteBuffer); + } + } else { + ReadWriteIOUtils.write(false, byteBuffer); + } + } + + @Override + public void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_VALUES_NODE.serialize(stream); + + ReadWriteIOUtils.write(outputSymbols.size(), stream); + for (Symbol symbol : outputSymbols) { + ReadWriteIOUtils.write(symbol.getName(), stream); + } + ReadWriteIOUtils.write(rowCount, stream); + if (rows.isPresent()) { + ReadWriteIOUtils.write(true, stream); + ReadWriteIOUtils.write(rows.get().size(), stream); + for (Expression expression : rows.get()) { + Expression.serialize(expression, stream); + } + } else { + ReadWriteIOUtils.write(false, stream); + } + } + + public static ValuesNode deserialize(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_VALUES_NODE.serialize(byteBuffer); + + int size = ReadWriteIOUtils.read(byteBuffer); + List outputSymbols = new ArrayList<>(); + for (int i = 0; i < size; i++) { + outputSymbols.add(new Symbol(ReadWriteIOUtils.readString(byteBuffer))); + } + + int rowCount = ReadWriteIOUtils.read(byteBuffer); + + List rows = new ArrayList<>(); + boolean flag = ReadWriteIOUtils.readBool(byteBuffer); + if (flag) { + size = ReadWriteIOUtils.readInt(byteBuffer); + for (int i = 0; i < size; i++) { + rows.add(Expression.deserialize(byteBuffer)); + } + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new ValuesNode( + planNodeId, outputSymbols, rowCount, flag ? Optional.of(rows) : Optional.empty()); + } + + public int getRowCount() { + return rowCount; + } + + public Optional> getRows() { + return rows; + } + + @Override + public PlanNode replaceChildren(List newChildren) { + checkArgument(newChildren.isEmpty(), "newChildren is not empty"); + return this; + } + + @Override + public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { + this.regionReplicaSet = regionReplicaSet; + } + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return regionReplicaSet; + } + + @Override + public void open() throws Exception {} + + @Override + public void close() throws Exception {} +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java index 021d506054097..02a77920c3056 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/WindowNode.java @@ -136,6 +136,10 @@ public List getOutputSymbols() { return ImmutableList.copyOf(concat(child.getOutputSymbols(), windowFunctions.keySet())); } + public Set getCreatedSymbols() { + return ImmutableSet.copyOf(windowFunctions.keySet()); + } + @Override public PlanNode replaceChildren(List newChildren) { return new WindowNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java new file mode 100644 index 0000000000000..25eb13b30e5a3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/HeapTraversal.java @@ -0,0 +1,45 @@ +package org.apache.iotdb.db.utils; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +public class HeapTraversal { + public enum Child { + LEFT, + RIGHT + } + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(HeapTraversal.class); + private static final long TOP_BIT_MASK = 1L << (Long.SIZE - 1); + + private long shifted; + private int treeDepthToNode; + + public void resetWithPathTo(long targetNodeIndex) { + checkArgument(targetNodeIndex >= 1, "Target node index must be greater than or equal to one"); + int leadingZeros = Long.numberOfLeadingZeros(targetNodeIndex); + // Shift off the leading zeros PLUS the most significant one bit (which is not needed for this + // calculation) + shifted = targetNodeIndex << (leadingZeros + 1); + treeDepthToNode = Long.SIZE - (leadingZeros + 1); + } + + public boolean isTarget() { + return treeDepthToNode == 0; + } + + public Child nextChild() { + checkState(!isTarget(), "Already at target"); + Child childToFollow = (shifted & TOP_BIT_MASK) == 0 ? Child.LEFT : Child.RIGHT; + shifted <<= 1; + treeDepthToNode--; + return childToFollow; + } + + public long sizeOf() { + return INSTANCE_SIZE; + } +}