Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
import org.apache.iotdb.db.tools.DelayAnalyzer;
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.DateTimeUtils;
Expand Down Expand Up @@ -369,6 +370,9 @@ public class DataRegion implements IDataRegionForQuery {
private ILoadDiskSelector ordinaryLoadDiskSelector;
private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;

/** Delay analyzer for tracking data arrival delays and calculating safe watermarks */
private final DelayAnalyzer delayAnalyzer;

Comment on lines +373 to +375
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a switch for this to avoid affecting performance of existing scenarios.

/**
* Construct a database processor.
*
Expand All @@ -387,6 +391,7 @@ public DataRegion(
this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
this.delayAnalyzer = new DelayAnalyzer();
acquireDirectBufferMemory();

dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionIdString);
Expand Down Expand Up @@ -453,6 +458,7 @@ public DataRegion(String databaseName, String dataRegionIdString) {
partitionMaxFileVersions.put(0L, 0L);
upgradeModFileThreadPool = null;
this.metrics = new DataRegionMetrics(this);
this.delayAnalyzer = new DelayAnalyzer();

initDiskSelector();
}
Expand Down Expand Up @@ -1151,6 +1157,10 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
if (deleted) {
return;
}
long arrivalTime = System.currentTimeMillis();
long generationTime = insertRowNode.getTime();
delayAnalyzer.update(generationTime, arrivalTime);

// init map
long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
initFlushTimeMap(timePartitionId);
Expand Down Expand Up @@ -1315,6 +1325,11 @@ public void insertTablet(InsertTabletNode insertTabletNode)
"Won't insert tablet {}, because region is deleted", insertTabletNode.getSearchIndex());
return;
}
long arrivalTime = System.currentTimeMillis();
long[] times = insertTabletNode.getTimes();
for (long generationTime : times) {
delayAnalyzer.update(generationTime, arrivalTime);
}
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
long[] infoForMetrics = new long[5];
Expand Down Expand Up @@ -4105,9 +4120,11 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
return;
}
long ttl = getTTL(insertRowsOfOneDeviceNode);
long arrivalTime = System.currentTimeMillis();
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>();
for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
delayAnalyzer.update(insertRowNode.getTime(), arrivalTime);
if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
// we do not need to write these part of data, as they can not be queried
// or the sub-plan has already been executed, we are retrying other sub-plans
Expand Down Expand Up @@ -4223,8 +4240,10 @@ public void insert(InsertRowsNode insertRowsNode)
}
boolean[] areSequence = new boolean[insertRowsNode.getInsertRowNodeList().size()];
long[] timePartitionIds = new long[insertRowsNode.getInsertRowNodeList().size()];
long arrivalTime = System.currentTimeMillis();
for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i);
delayAnalyzer.update(insertRowNode.getTime(), arrivalTime);
long ttl = getTTL(insertRowNode);
if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
insertRowsNode
Expand Down Expand Up @@ -4314,8 +4333,13 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
// infoForMetrics[2]: ScheduleWalTimeCost
// infoForMetrics[3]: ScheduleMemTableTimeCost
// infoForMetrics[4]: InsertedPointsNumber
long arrivalTime = System.currentTimeMillis();
for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i);
long[] times = insertTabletNode.getTimes();
for (long generationTime : times) {
delayAnalyzer.update(generationTime, arrivalTime);
}
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = false;
Expand Down Expand Up @@ -4619,6 +4643,15 @@ public TsFileManager getTsFileManager() {
return tsFileManager;
}

/**
* Get the delay analyzer instance for this data region
*
* @return DelayAnalyzer instance for tracking data arrival delays and calculating safe watermarks
*/
public DelayAnalyzer getDelayAnalyzer() {
return delayAnalyzer;
}

private long getTTL(InsertNode insertNode) {
if (insertNode.getTableName() == null) {
return DataNodeTTLCache.getInstance().getTTLForTree(insertNode.getTargetPath().getNodes());
Expand Down
Loading
Loading