diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 91c36262cbd9..3c7514c384f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -152,6 +152,7 @@ import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager; import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; +import org.apache.iotdb.db.tools.DelayAnalyzer; import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.db.utils.DateTimeUtils; @@ -369,6 +370,9 @@ public class DataRegion implements IDataRegionForQuery { private ILoadDiskSelector ordinaryLoadDiskSelector; private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector; + /** Delay analyzer for tracking data arrival delays and calculating safe watermarks */ + private final DelayAnalyzer delayAnalyzer; + /** * Construct a database processor. * @@ -387,6 +391,7 @@ public DataRegion( this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString)); this.databaseName = databaseName; this.fileFlushPolicy = fileFlushPolicy; + this.delayAnalyzer = new DelayAnalyzer(); acquireDirectBufferMemory(); dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionIdString); @@ -453,6 +458,7 @@ public DataRegion(String databaseName, String dataRegionIdString) { partitionMaxFileVersions.put(0L, 0L); upgradeModFileThreadPool = null; this.metrics = new DataRegionMetrics(this); + this.delayAnalyzer = new DelayAnalyzer(); initDiskSelector(); } @@ -1151,6 +1157,10 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException { if (deleted) { return; } + long arrivalTime = System.currentTimeMillis(); + long generationTime = insertRowNode.getTime(); + delayAnalyzer.update(generationTime, arrivalTime); + // init map long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); initFlushTimeMap(timePartitionId); @@ -1315,6 +1325,11 @@ public void insertTablet(InsertTabletNode insertTabletNode) "Won't insert tablet {}, because region is deleted", insertTabletNode.getSearchIndex()); return; } + long arrivalTime = System.currentTimeMillis(); + long[] times = insertTabletNode.getTimes(); + for (long generationTime : times) { + delayAnalyzer.update(generationTime, arrivalTime); + } TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); long[] infoForMetrics = new long[5]; @@ -4105,9 +4120,11 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) return; } long ttl = getTTL(insertRowsOfOneDeviceNode); + long arrivalTime = System.currentTimeMillis(); Map tsFileProcessorMap = new HashMap<>(); for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) { InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i); + delayAnalyzer.update(insertRowNode.getTime(), arrivalTime); if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) { // we do not need to write these part of data, as they can not be queried // or the sub-plan has already been executed, we are retrying other sub-plans @@ -4223,8 +4240,10 @@ public void insert(InsertRowsNode insertRowsNode) } boolean[] areSequence = new boolean[insertRowsNode.getInsertRowNodeList().size()]; long[] timePartitionIds = new long[insertRowsNode.getInsertRowNodeList().size()]; + long arrivalTime = System.currentTimeMillis(); for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) { InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i); + delayAnalyzer.update(insertRowNode.getTime(), arrivalTime); long ttl = getTTL(insertRowNode); if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) { insertRowsNode @@ -4314,8 +4333,13 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode) // infoForMetrics[2]: ScheduleWalTimeCost // infoForMetrics[3]: ScheduleMemTableTimeCost // infoForMetrics[4]: InsertedPointsNumber + long arrivalTime = System.currentTimeMillis(); for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) { InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); + long[] times = insertTabletNode.getTimes(); + for (long generationTime : times) { + delayAnalyzer.update(generationTime, arrivalTime); + } TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); boolean noFailure = false; @@ -4619,6 +4643,15 @@ public TsFileManager getTsFileManager() { return tsFileManager; } + /** + * Get the delay analyzer instance for this data region + * + * @return DelayAnalyzer instance for tracking data arrival delays and calculating safe watermarks + */ + public DelayAnalyzer getDelayAnalyzer() { + return delayAnalyzer; + } + private long getTTL(InsertNode insertNode) { if (insertNode.getTableName() == null) { return DataNodeTTLCache.getInstance().getTTLForTree(insertNode.getTargetPath().getNodes()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java new file mode 100644 index 000000000000..46f6f19dc133 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/DelayAnalyzer.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.tools; + +import org.apache.iotdb.commons.utils.TestOnly; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * DelayAnalyzer: Calculate watermarks for in-order and out-of-order data based on statistics and + * models + * + *

This implementation is based on the paper "Separation or Not: On Handling Out-of-Order + * Time-Series Data" (ICDE 2022) proposed method, which dynamically calculates safe watermarks by + * analyzing the Cumulative Distribution Function (CDF) of data arrival delays. + * + *

Core concepts: + * + *

+ * + *

Key formulas from the paper: + * + *

+ * + *

Features: + * + *

+ * + * @see ICDE 2022 Paper + */ +public class DelayAnalyzer { + + private static final Logger LOGGER = LoggerFactory.getLogger(DelayAnalyzer.class); + + /** Default window size, empirical value: 10000 sample points */ + public static final int DEFAULT_WINDOW_SIZE = 10000; + + /** Recommended window size range */ + public static final int MIN_WINDOW_SIZE = 1000; + + public static final int MAX_WINDOW_SIZE = 100000; + + /** Default confidence level: 99% */ + public static final double DEFAULT_CONFIDENCE_LEVEL = 0.99; + + /** + * Sliding window size for storing recent delay sample points. A larger window provides more + * accurate P99 calculations but increases memory and sorting overhead. The paper mentions using + * dynamic delay distribution to adapt to changes in delay patterns. + */ + private final int windowSize; + + /** Circular buffer storing delay samples (unit: milliseconds) */ + private final long[] delaySamples; + + /** Current write position (cursor of the circular buffer) */ + private int cursor = 0; + + /** Flag indicating whether the buffer is full */ + private boolean isFull = false; + + /** Total number of samples (for statistics) */ + private long totalSamples = 0; + + /** + * ReadWriteLock ensures thread safety in high-concurrency scenarios. Read operations (calculating + * quantiles) use read lock, write operations (updating delays) use write lock. + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Constructor using default window size */ + public DelayAnalyzer() { + this(DEFAULT_WINDOW_SIZE); + } + + /** + * Constructor + * + * @param windowSize Sliding window size, recommended range: 5000 - 10000. Larger windows provide + * more accurate statistics but increase memory and computational overhead. + * @throws IllegalArgumentException if window size is not within valid range + */ + public DelayAnalyzer(int windowSize) { + if (windowSize < MIN_WINDOW_SIZE || windowSize > MAX_WINDOW_SIZE) { + throw new IllegalArgumentException( + String.format( + "Window size must be between %d and %d, got %d", + MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, windowSize)); + } + this.windowSize = windowSize; + this.delaySamples = new long[windowSize]; + } + + @TestOnly + public DelayAnalyzer(int windowSize, int placeHolder) { + this.windowSize = windowSize; + this.delaySamples = new long[windowSize]; + } + + /** + * Core method: Record a new data point + * + *

Corresponds to the formula in the paper: p.t_d = p.t_a - p.t_g (Formula 160) + * + *

Delay calculation logic: + * + *

+ * + * @param generationTime Data generation time (Event Time), unit: milliseconds + * @param arrivalTime Data arrival time (System Time), unit: milliseconds + */ + public void update(long generationTime, long arrivalTime) { + // Calculate delay; delay cannot be negative (clock skew correction, Paper Reference 26) + long delay = Math.max(0, arrivalTime - generationTime); + + lock.writeLock().lock(); + try { + delaySamples[cursor] = delay; + cursor++; + totalSamples++; + + // Circular buffer: when cursor reaches window size, reset to 0 and mark as full + if (cursor >= windowSize) { + cursor = 0; + isFull = true; + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get the delay threshold for a specific quantile + * + *

Corresponds to the method in the paper using F(x) (CDF) to estimate the probability of + * out-of-order data (Reference 110). + * + *

Calculation steps: + * + *

    + *
  1. Get a snapshot of the current window (to avoid holding write lock for too long) + *
  2. Sort the samples + *
  3. Calculate index position based on quantile + *
  4. Return the delay value at the corresponding position + *
+ * + * @param percentile Quantile, range (0, 1], e.g., 0.99 represents 99% quantile (P99) + * @return Delay threshold, unit: milliseconds + * @throws IllegalArgumentException if percentile is not within valid range + */ + public long getDelayQuantile(double percentile) { + if (percentile <= 0 || percentile > 1) { + throw new IllegalArgumentException( + String.format("Percentile must be between 0 and 1, got %f", percentile)); + } + + long[] snapshot; + int currentSize; + + // Get snapshot to avoid blocking write lock for too long + lock.readLock().lock(); + try { + if (!isFull && cursor == 0) { + // No data yet + return 0; + } + currentSize = isFull ? windowSize : cursor; + snapshot = Arrays.copyOf(delaySamples, currentSize); + } finally { + lock.readLock().unlock(); + } + + // Sort to calculate quantile (Arrays.sort performance is sufficient for small datasets) + Arrays.sort(snapshot); + + // Calculate index corresponding to quantile + int index = (int) Math.ceil(currentSize * percentile) - 1; + // Boundary correction to prevent index out of bounds + index = Math.max(0, Math.min(index, currentSize - 1)); + + return snapshot[index]; + } + + /** + * Get the recommended safe watermark + * + *

Definition: At confidence level confidenceLevel, the probability that all data before this + * timestamp has arrived. + * + *

Calculation formula: + * + *

+   *   SafeWatermark = CurrentSystemTime - P99_Delay
+   * 
+ * + *

Meaning: If current system time is T and P99 delay is D, then data before timestamp T-D has + * a 99% probability of having all arrived. + * + * @param currentSystemTime Current system time, unit: milliseconds + * @param confidenceLevel Confidence level, e.g., 0.99 represents 99% confidence + * @return Safe watermark timestamp, unit: milliseconds + */ + public long getSafeWatermark(long currentSystemTime, double confidenceLevel) { + long pDelay = getDelayQuantile(confidenceLevel); + // Watermark = Current time - P99 delay + long watermark = currentSystemTime - pDelay; + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Calculated safe watermark: {} (currentTime: {}, P{} delay: {}ms)", + watermark, + currentSystemTime, + (int) (confidenceLevel * 100), + pDelay); + } + + return watermark; + } + + /** + * Get the recommended safe watermark (using default confidence level) + * + * @param currentSystemTime Current system time, unit: milliseconds + * @return Safe watermark timestamp, unit: milliseconds + */ + public long getSafeWatermark(long currentSystemTime) { + return getSafeWatermark(currentSystemTime, DEFAULT_CONFIDENCE_LEVEL); + } + + /** + * Get current statistics + * + * @return String containing statistics such as P50, P95, P99, maximum value, etc. + */ + public String getStatistics() { + if (!isFull && cursor == 0) { + return "DelayAnalyzer: No data collected yet"; + } + + long p50 = getDelayQuantile(0.50); + long p95 = getDelayQuantile(0.95); + long p99 = getDelayQuantile(0.99); + long max = getDelayQuantile(1.00); + int currentSize = isFull ? windowSize : cursor; + + return String.format( + "DelayAnalyzer Statistics -> Samples: %d/%d, P50: %dms, P95: %dms, P99: %dms, Max: %dms", + currentSize, windowSize, p50, p95, p99, max); + } + + /** Print current statistics (for debugging) */ + public void printStatistics() { + LOGGER.info(getStatistics()); + } + + /** + * Get the number of samples in the current window + * + * @return Current valid number of samples + */ + public int getCurrentSampleCount() { + lock.readLock().lock(); + try { + return isFull ? windowSize : cursor; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get total number of samples (including samples that have been overwritten) + * + * @return Total number of samples recorded since creation + */ + public long getTotalSamples() { + lock.readLock().lock(); + try { + return totalSamples; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get window size + * + * @return Window size + */ + public int getWindowSize() { + return windowSize; + } + + /** Reset the analyzer, clearing all statistical data */ + public void reset() { + lock.writeLock().lock(); + try { + Arrays.fill(delaySamples, 0); + cursor = 0; + isFull = false; + totalSamples = 0; + LOGGER.debug("DelayAnalyzer has been reset"); + } finally { + lock.writeLock().unlock(); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java new file mode 100644 index 000000000000..770c234560b9 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/tools/DelayAnalyzerTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.tools; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** Unit tests for DelayAnalyzer */ +public class DelayAnalyzerTest { + + /** + * Test 1: Basic functionality verification. Verifies simple data ingestion and P99 calculation to + * ensure CDF logic is correct. + */ + @Test + public void testBasicQuantileCalculation() { + // Window size of 100 + DelayAnalyzer analyzer = new DelayAnalyzer(100, 1); + + long now = System.currentTimeMillis(); + // Ingest delay data ranging from 0ms to 100ms + for (int i = 0; i <= 100; i++) { + // arrival = now, generation = now - delay + analyzer.update(now - i, now); + } + + // Verification: Since data is uniformly distributed from 0-99 + // P50 should be around 49 or 50 + // P99 should be 99 + long p50 = analyzer.getDelayQuantile(0.50); + long p99 = analyzer.getDelayQuantile(0.99); + long max = analyzer.getDelayQuantile(1.00); + + System.out.println("Basic Test -> P50: " + p50 + ", P99: " + p99); + + // Allow 1ms calculation error (depending on rounding logic) + Assert.assertTrue("P50 should be around 49", Math.abs(p50 - 49) <= 1); + Assert.assertEquals("P99 should be 99", 99, p99); + Assert.assertEquals("Max should be 100", 100, max); + } + + /** + * Test 2: Circular buffer eviction mechanism (sliding window). Corresponds to "dynamic delay + * distribution" mentioned in the paper. Scenario: Ingest low-latency data first, then + * high-latency data, verifying that old data is correctly evicted. + */ + @Test + public void testCircularBufferEviction() { + // Extremely small window size: 5 + DelayAnalyzer analyzer = new DelayAnalyzer(5, 1); + long now = System.currentTimeMillis(); + + // Phase 1: Fill the window with small delays (10ms) + for (int i = 0; i < 5; i++) { + analyzer.update(now - 10, now); + } + Assert.assertEquals("Initial max delay should be 10", 10, analyzer.getDelayQuantile(1.0)); + + // Phase 2: Ingest new high-latency data (100ms) + // Writing 5 new points should evict all previous 10ms points + for (int i = 0; i < 5; i++) { + analyzer.update(now - 100, now); + } + + // Verification: The minimum delay in the window should now be 100, old 10ms data should be + // evicted + // If the circular logic is wrong, old data might still be read + long minInWindow = analyzer.getDelayQuantile(0.01); // Approximate P0 + Assert.assertEquals("Old data (10ms) should be evicted, min should be 100", 100, minInWindow); + } + + /** + * Test 3: Negative delay handling (clock skew). The paper mentions clock skew can cause + * anomalies; the code should clamp negative values to 0. + */ + @Test + public void testNegativeDelayHandling() { + DelayAnalyzer analyzer = new DelayAnalyzer(1000); + long now = 10000; + + // Generation time is later than arrival time (future time), simulating clock rollback or + // desynchronization + analyzer.update(now + 5000, now); + + long maxDelay = analyzer.getDelayQuantile(1.0); + Assert.assertEquals("Negative delay should be clamped to 0", 0, maxDelay); + } + + /** + * Test 4: Watermark calculation logic. Verifies if getSafeWatermark correctly uses the quantile. + */ + @Test + public void testSafeWatermarkLogic() { + DelayAnalyzer analyzer = new DelayAnalyzer(100, 1); + + // Inject a fixed delay of 500ms + analyzer.update(1000, 1500); + + long currentSystemTime = 2000; + // P99 should be 500ms + // Watermark = Current(2000) - P99(500) = 1500 + long watermark = analyzer.getSafeWatermark(currentSystemTime, 0.99); + + Assert.assertEquals(1500, watermark); + } + + /** Test 5: Empty buffer and boundary handling */ + @Test + public void testEmptyAndBoundaries() { + DelayAnalyzer analyzer = new DelayAnalyzer(1000); + + // 1. When no data exists + Assert.assertEquals("Empty buffer should return 0", 0, analyzer.getDelayQuantile(0.99)); + + // 2. Only 1 data point + analyzer.update(100, 150); // delay 50 + Assert.assertEquals(50, analyzer.getDelayQuantile(0.01)); + Assert.assertEquals(50, analyzer.getDelayQuantile(0.99)); + + // 3. Illegal arguments + Assert.assertThrows( + IllegalArgumentException.class, + () -> { + analyzer.getDelayQuantile(1.5); + }); + + Assert.assertThrows( + IllegalArgumentException.class, + () -> { + analyzer.getDelayQuantile(-0.1); + }); + } + + /** + * Test 6: Concurrency safety test. Simulates high-concurrency write scenarios to verify if + * ReadWriteLock works effectively, ensuring no exceptions or deadlocks occur. + */ + @Test + public void testConcurrency() throws InterruptedException { + int windowSize = 2000; + final DelayAnalyzer analyzer = new DelayAnalyzer(windowSize); + int threadCount = 10; + final int writesPerThread = 5000; + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + final Random random = new Random(); + + // Start 10 threads writing aggressively + for (int i = 0; i < threadCount; i++) { + executor.submit( + () -> { + try { + for (int j = 0; j < writesPerThread; j++) { + long now = System.currentTimeMillis(); + // Random delay 0-100ms + analyzer.update(now - random.nextInt(100), now); + } + } finally { + latch.countDown(); + } + }); + } + + // Main thread attempts to read simultaneously + for (int i = 0; i < 10; i++) { + analyzer.getDelayQuantile(0.99); + Thread.sleep(10); + } + + latch.await(5, TimeUnit.SECONDS); // Wait for all write threads to finish + executor.shutdown(); + + // Verification: Internal state should remain consistent after concurrent writes + // Simple check: Should be able to read data normally without IndexOutOfBounds or + // ConcurrentModificationException + long p99 = analyzer.getDelayQuantile(0.99); + + Assert.assertTrue("P99 should be >= 0", p99 >= 0); + Assert.assertTrue("P99 should be <= 100", p99 <= 100); + + System.out.println("Concurrency Test Passed. P99: " + p99); + } + + /** Test 7: Default constructor and default confidence level */ + @Test + public void testDefaultConstructor() { + DelayAnalyzer analyzer = new DelayAnalyzer(); + Assert.assertEquals( + "Default window size should be " + DelayAnalyzer.DEFAULT_WINDOW_SIZE, + DelayAnalyzer.DEFAULT_WINDOW_SIZE, + analyzer.getWindowSize()); + + long now = System.currentTimeMillis(); + analyzer.update(now - 100, now); + + // Test default confidence level watermark calculation + long watermark1 = analyzer.getSafeWatermark(now); + long watermark2 = analyzer.getSafeWatermark(now, DelayAnalyzer.DEFAULT_CONFIDENCE_LEVEL); + Assert.assertEquals("Default confidence level should be consistent", watermark1, watermark2); + } + + /** Test 8: Window size validation */ + @Test + public void testWindowSizeValidation() { + // Test minimum window size + DelayAnalyzer analyzer1 = new DelayAnalyzer(DelayAnalyzer.MIN_WINDOW_SIZE); + Assert.assertEquals(DelayAnalyzer.MIN_WINDOW_SIZE, analyzer1.getWindowSize()); + + // Test maximum window size + DelayAnalyzer analyzer2 = new DelayAnalyzer(DelayAnalyzer.MAX_WINDOW_SIZE); + Assert.assertEquals(DelayAnalyzer.MAX_WINDOW_SIZE, analyzer2.getWindowSize()); + + // Test window size that is too small + Assert.assertThrows( + IllegalArgumentException.class, + () -> { + new DelayAnalyzer(DelayAnalyzer.MIN_WINDOW_SIZE - 1); + }); + + // Test window size that is too large + Assert.assertThrows( + IllegalArgumentException.class, + () -> { + new DelayAnalyzer(DelayAnalyzer.MAX_WINDOW_SIZE + 1); + }); + } + + /** Test 9: Statistics retrieval */ + @Test + public void testStatistics() { + DelayAnalyzer analyzer = new DelayAnalyzer(100, 1); + long now = System.currentTimeMillis(); + + // Add some data + for (int i = 0; i < 50; i++) { + analyzer.update(now - i, now); + } + + String stats = analyzer.getStatistics(); + Assert.assertNotNull("Statistics should not be null", stats); + Assert.assertTrue("Statistics should contain P50", stats.contains("P50")); + Assert.assertTrue("Statistics should contain P99", stats.contains("P99")); + + // Test empty statistics + DelayAnalyzer emptyAnalyzer = new DelayAnalyzer(1000); + String emptyStats = emptyAnalyzer.getStatistics(); + Assert.assertTrue("Empty statistics should indicate no data", emptyStats.contains("No data")); + } + + /** Test 10: Reset functionality */ + @Test + public void testReset() { + DelayAnalyzer analyzer = new DelayAnalyzer(100, 1); + long now = System.currentTimeMillis(); + + // Add data + for (int i = 0; i < 50; i++) { + analyzer.update(now - i, now); + } + + Assert.assertTrue("Should have data", analyzer.getCurrentSampleCount() > 0); + Assert.assertTrue("Total samples should be > 0", analyzer.getTotalSamples() > 0); + + // Reset + analyzer.reset(); + + Assert.assertEquals( + "After reset, current sample count should be 0", 0, analyzer.getCurrentSampleCount()); + Assert.assertEquals("After reset, total samples should be 0", 0, analyzer.getTotalSamples()); + Assert.assertEquals( + "After reset, quantile should return 0", 0, analyzer.getDelayQuantile(0.99)); + } + + /** Test 11: Sample count functionality */ + @Test + public void testSampleCount() { + DelayAnalyzer analyzer = new DelayAnalyzer(100, 1); + long now = System.currentTimeMillis(); + + // Initial state + Assert.assertEquals("Initial sample count should be 0", 0, analyzer.getCurrentSampleCount()); + Assert.assertEquals("Initial total samples should be 0", 0, analyzer.getTotalSamples()); + + // Add 50 samples + for (int i = 0; i < 50; i++) { + analyzer.update(now - i, now); + } + + Assert.assertEquals("Current sample count should be 50", 50, analyzer.getCurrentSampleCount()); + Assert.assertEquals("Total samples should be 50", 50, analyzer.getTotalSamples()); + + // Fill the window + for (int i = 50; i < 150; i++) { + analyzer.update(now - i, now); + } + + Assert.assertEquals( + "After window is full, current sample count should be 100", + 100, + analyzer.getCurrentSampleCount()); + Assert.assertEquals("Total samples should be 150", 150, analyzer.getTotalSamples()); + } + + /** Test 12: Accuracy of different quantiles */ + @Test + public void testDifferentQuantiles() { + DelayAnalyzer analyzer = new DelayAnalyzer(1000); + long now = System.currentTimeMillis(); + + // Create uniformly distributed delays: 0-999ms + for (int i = 0; i < 1000; i++) { + analyzer.update(now - i, now); + } + + // Verify different quantiles + long p10 = analyzer.getDelayQuantile(0.10); + long p25 = analyzer.getDelayQuantile(0.25); + long p50 = analyzer.getDelayQuantile(0.50); + long p75 = analyzer.getDelayQuantile(0.75); + long p90 = analyzer.getDelayQuantile(0.90); + long p99 = analyzer.getDelayQuantile(0.99); + + // Allow 2ms error + Assert.assertTrue("P10 should be around 100", Math.abs(p10 - 100) <= 2); + Assert.assertTrue("P25 should be around 250", Math.abs(p25 - 250) <= 2); + Assert.assertTrue("P50 should be around 500", Math.abs(p50 - 500) <= 2); + Assert.assertTrue("P75 should be around 750", Math.abs(p75 - 750) <= 2); + Assert.assertTrue("P90 should be around 900", Math.abs(p90 - 900) <= 2); + Assert.assertTrue("P99 should be around 990", Math.abs(p99 - 990) <= 2); + + // Verify monotonicity: quantiles should be increasing + Assert.assertTrue("P10 <= P25", p10 <= p25); + Assert.assertTrue("P25 <= P50", p25 <= p50); + Assert.assertTrue("P50 <= P75", p50 <= p75); + Assert.assertTrue("P75 <= P90", p75 <= p90); + Assert.assertTrue("P90 <= P99", p90 <= p99); + } +}