diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java index d733821972b..34882efc751 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java @@ -21,7 +21,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -174,96 +173,4 @@ void doubleCounterAdd_NaN() { doubleCounter.add(Double.NaN); assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0); } - - @Test - void stressTest() { - DoubleCounter doubleCounter = sdkMeter.counterBuilder("testCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, 1, () -> doubleCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testCounter") - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(40000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleCounter doubleCounter = sdkMeter.counterBuilder("testCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> - doubleCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java index 61c0d3ad0b9..740775e766f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java @@ -27,7 +27,6 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.time.Duration; import java.util.Collections; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; /** Unit tests for {@link SdkDoubleGauge}. */ @@ -271,98 +270,4 @@ void collectMetrics_WithMultipleCollects() { .hasValue(222d) .hasAttributes(attributeEntry("K", "V"))))); } - - @Test - void stressTest() { - DoubleGauge doubleGauge = sdkMeter.gaugeBuilder("testGauge").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 1, - () -> { - doubleGauge.set(10, Attributes.builder().put("K", "V").build()); - doubleGauge.set(11, Attributes.builder().put("K", "V").build()); - })); - } - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testGauge") - .hasDoubleGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleGauge doubleGauge = sdkMeter.gaugeBuilder("testGauge").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> { - doubleGauge.set(10, Attributes.builder().put(keys[i], values[i]).build()); - doubleGauge.set(11, Attributes.builder().put(keys[i], values[i]).build()); - }))); - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasDoubleGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java index dd210faad46..4da66aca7df 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java @@ -27,7 +27,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -369,108 +368,4 @@ void collectMetrics_ExemplarsWithExplicitBucketHistogram() { .put("key", "value") .build()))))); } - - @Test - void stressTest() { - DoubleHistogram doubleHistogram = sdkMeter.histogramBuilder("testHistogram").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 1, - () -> doubleHistogram.record(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasAttributes(attributeEntry("K", "V")) - .hasCount(4_000) - .hasSum(40_000)))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleHistogram doubleHistogram = sdkMeter.histogramBuilder("testHistogram").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> - doubleHistogram.record( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java index a8dc0041c0d..a15dcf422cc 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; /** Unit tests for {@link SdkDoubleUpDownCounter}. */ @@ -158,103 +157,6 @@ void collectMetrics_WithMultipleCollects() { .hasValue(777.9)))); } - @Test - void stressTest() { - DoubleUpDownCounter doubleUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> doubleUpDownCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(40_000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleUpDownCounter doubleUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> - doubleUpDownCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } - @Test void testToString() { String expected = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java index 30f9c27ff22..160a07b9e48 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java @@ -20,7 +20,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -163,99 +162,4 @@ void longCounterAdd_Monotonicity() { logs.assertContains( "Counters can only increase. Instrument testCounter has recorded a negative value."); } - - @Test - void stressTest() { - LongCounter longCounter = sdkMeter.counterBuilder("testCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, 1, () -> longCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testCounter") - .hasLongSumSatisfying( - longSum -> - longSum - .isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(80_000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongCounter longCounter = sdkMeter.counterBuilder("testCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> - longCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testCounter") - .hasLongSumSatisfying( - longSum -> - longSum - .isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java index 0a117e48d82..3879f2e8a76 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java @@ -24,7 +24,6 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.time.Duration; import java.util.Collections; -import java.util.stream.IntStream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -265,98 +264,4 @@ void collectMetrics_WithMultipleCollects() { .hasValue(222) .hasAttributes(attributeEntry("K", "V"))))); } - - @Test - void stressTest() { - LongGauge longGauge = sdkMeter.gaugeBuilder("testGauge").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 1, - () -> { - longGauge.set(10, Attributes.builder().put("K", "V").build()); - longGauge.set(11, Attributes.builder().put("K", "V").build()); - })); - } - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testGauge") - .hasLongGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongGauge longGauge = sdkMeter.gaugeBuilder("testGauge").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> { - longGauge.set(10, Attributes.builder().put(keys[i], values[i]).build()); - longGauge.set(11, Attributes.builder().put(keys[i], values[i]).build()); - }))); - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasLongGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java index dd802ff613f..4ecde4c0c28 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java @@ -26,7 +26,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -529,108 +528,4 @@ void collectMetrics_ExemplarsWithExplicitBucketHistogram() { .put("key", "value") .build()))))); } - - @Test - void stressTest() { - LongHistogram longHistogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> longHistogram.record(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(reader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasAttributes(attributeEntry("K", "V")) - .hasCount(8_000) - .hasSum(80_000)))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongHistogram longHistogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> - longHistogram.record( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(reader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java index 50feee13094..39eb6500207 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; /** Unit tests for {@link SdkLongUpDownCounter}. */ @@ -150,101 +149,4 @@ void collectMetrics_WithMultipleCollects() { .hasAttributes(attributeEntry("K", "V")) .hasValue(777)))); } - - @Test - void stressTest() { - LongUpDownCounter longUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> longUpDownCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasLongSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(80_000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongUpDownCounter longUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> - longUpDownCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasLongSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java deleted file mode 100644 index 22f5f475187..00000000000 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Uninterruptibles; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import javax.annotation.concurrent.Immutable; - -@AutoValue -@Immutable -abstract class StressTestRunner { - abstract ImmutableList getOperations(); - - abstract int getCollectionIntervalMs(); - - final void run() { - List operations = getOperations(); - int numThreads = operations.size(); - CountDownLatch countDownLatch = new CountDownLatch(numThreads); - Thread collectionThread = - new Thread( - () -> { - // While workers still work, do collections. - while (countDownLatch.getCount() != 0) { - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(getCollectionIntervalMs())); - } - }); - List operationThreads = new ArrayList<>(numThreads); - for (Operation operation : operations) { - operationThreads.add( - new Thread( - () -> { - for (int i = 0; i < operation.getNumOperations(); i++) { - operation.getUpdater().update(); - Uninterruptibles.sleepUninterruptibly( - Duration.ofMillis(operation.getOperationDelayMs())); - } - countDownLatch.countDown(); - })); - } - - // Start collection thread then the rest of the worker threads. - collectionThread.start(); - for (Thread thread : operationThreads) { - thread.start(); - } - - // Wait for all the thread to finish. - for (Thread thread : operationThreads) { - Uninterruptibles.joinUninterruptibly(thread); - } - Uninterruptibles.joinUninterruptibly(collectionThread); - } - - static Builder builder() { - return new AutoValue_StressTestRunner.Builder(); - } - - @AutoValue.Builder - abstract static class Builder { - - abstract ImmutableList.Builder operationsBuilder(); - - abstract Builder setCollectionIntervalMs(int collectionInterval); - - Builder addOperation(Operation operation) { - operationsBuilder().add(operation); - return this; - } - - public abstract StressTestRunner build(); - } - - @AutoValue - @Immutable - abstract static class Operation { - - abstract int getNumOperations(); - - abstract int getOperationDelayMs(); - - abstract OperationUpdater getUpdater(); - - static Operation create(int numOperations, int operationDelayMs, OperationUpdater updater) { - return new AutoValue_StressTestRunner_Operation(numOperations, operationDelayMs, updater); - } - } - - interface OperationUpdater { - - /** Called every operation. */ - void update(); - } - - StressTestRunner() {} -} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java new file mode 100644 index 00000000000..af12cc6af12 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -0,0 +1,650 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics; + +import static io.opentelemetry.api.common.Attributes.empty; +import static io.opentelemetry.sdk.metrics.InstrumentType.COUNTER; +import static io.opentelemetry.sdk.metrics.InstrumentType.GAUGE; +import static io.opentelemetry.sdk.metrics.InstrumentType.HISTOGRAM; +import static io.opentelemetry.sdk.metrics.InstrumentType.UP_DOWN_COUNTER; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.internal.testing.CleanupExtension; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; +import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * {@link #stressTest(AggregationTemporality, InstrumentType, Aggregation, MemoryMode, + * InstrumentValueType)} performs a stress test to confirm simultaneous record and collections do + * not have concurrency issues like lost writes, partial writes, duplicate writes, etc. All + * combinations of the following dimensions are tested: aggregation temporality, instrument type + * (synchronous), memory mode, instrument value type. + */ +class SynchronousInstrumentStressTest { + + private static final String INSTRUMENT_NAME = "instrument"; + private static final Duration ONE_MICROSECOND = Duration.ofNanos(1000); + private static final List BUCKET_BOUNDARIES = + ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES; + private static final double[] BUCKET_BOUNDARIES_ARR = + BUCKET_BOUNDARIES.stream().mapToDouble(Double::doubleValue).toArray(); + + private static final AttributeKey KEY = AttributeKey.stringKey("key"); + private static final Attributes ATTR_1 = Attributes.of(KEY, "value1"); + private static final Attributes ATTR_2 = Attributes.of(KEY, "value2"); + private static final Attributes ATTR_3 = Attributes.of(KEY, "value3"); + private static final Attributes ATTR_4 = Attributes.of(KEY, "value4"); + + @RegisterExtension CleanupExtension cleanup = new CleanupExtension(); + + @ParameterizedTest + @MethodSource("stressTestArgs") + void stressTest( + AggregationTemporality aggregationTemporality, + InstrumentType instrumentType, + Aggregation aggregation, + MemoryMode memoryMode, + InstrumentValueType instrumentValueType) { + // Initialize metric SDK + DefaultAggregationSelector aggregationSelector = + DefaultAggregationSelector.getDefault().with(instrumentType, aggregation); + InMemoryMetricReader reader = + InMemoryMetricReader.builder() + .setDefaultAggregationSelector(aggregationSelector) + .setAggregationTemporalitySelector(unused -> aggregationTemporality) + .setMemoryMode(memoryMode) + .build(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(reader).build(); + cleanup.addCloseable(meterProvider); + Meter meter = meterProvider.get("test"); + Instrument instrument = getInstrument(meter, instrumentType, instrumentValueType); + + // Define list of measurements to record + // Later, we'll assert that the data collected matches these measurements, with no lost writes, + // partial writes, duplicate writes, etc. + int measurementCount = 2000; + List measurements = new ArrayList<>(); + for (int i = 0; i < measurementCount; i++) { + measurements.add((long) i); + } + Collections.shuffle(measurements); + + // Define recording threads + int threadCount = 4; + List recordThreads = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(threadCount); + for (int i = 0; i < threadCount; i++) { + recordThreads.add( + new Thread( + () -> { + List attributes = Arrays.asList(ATTR_1, ATTR_2, ATTR_3, ATTR_4); + Collections.shuffle(attributes); + for (Long measurement : measurements) { + for (Attributes attr : attributes) { + instrument.record(measurement, attr); + } + Uninterruptibles.sleepUninterruptibly(ONE_MICROSECOND); + } + latch.countDown(); + })); + } + + // Define collecting thread + // NOTE: collect makes a copy of MetricData because REUSABLE_DATA mode reuses MetricData + List collectedMetrics = new ArrayList<>(); + Thread collectThread = + new Thread( + () -> { + while (latch.getCount() != 0) { + Uninterruptibles.sleepUninterruptibly(ONE_MICROSECOND); + collectedMetrics.addAll( + reader.collectAllMetrics().stream() + .map(SynchronousInstrumentStressTest::copy) + .collect(toList())); + } + collectedMetrics.addAll( + reader.collectAllMetrics().stream() + .map(SynchronousInstrumentStressTest::copy) + .collect(toList())); + }); + + // Start all the threads + collectThread.start(); + recordThreads.forEach(Thread::start); + + // Wait for the collect thread to end, which collects until the record threads are done + Uninterruptibles.joinUninterruptibly(collectThread); + + // Assert collected data is consistent with recorded measurements by independently computing the + // expected aggregated value and comparing to the actual results. + // NOTE: this does not validate the absence of partial writes for cumulative instruments which + // track multiple fields. For example, explicit histogram tracks sum and bucket counts. These + // should be atomically updated such that we never collect the sum without corresponding bucket + // counts update, or vice versa. This test asserts that the cumulative state at the end is + // consistent, and interim collects unknowingly see partial writes. + AtomicLong lastValue = new AtomicLong(0); + AtomicLong sum = new AtomicLong(0); + AtomicLong min = new AtomicLong(Long.MAX_VALUE); + AtomicLong max = new AtomicLong(-1); + List bucketCounts = new ArrayList<>(); + for (int i = 0; i < BUCKET_BOUNDARIES.size() + 1; i++) { + bucketCounts.add(0L); + } + AtomicLong totalCount = new AtomicLong(0); + AtomicLong zeroCount = new AtomicLong(0); + LongStream.range(0, threadCount) + .flatMap(i -> measurements.stream().mapToLong(l -> l)) + .forEach( + measurement -> { + lastValue.set(measurement); + sum.addAndGet(measurement); + min.updateAndGet(v -> Math.min(v, measurement)); + max.updateAndGet(v -> Math.max(v, measurement)); + totalCount.incrementAndGet(); + int bucketIndex = + ExplicitBucketHistogramUtils.findBucketIndex( + BUCKET_BOUNDARIES_ARR, (double) measurement); + bucketCounts.set(bucketIndex, bucketCounts.get(bucketIndex) + 1); + if (measurement == 0) { + zeroCount.incrementAndGet(); + } + }); + + boolean isCumulative = aggregationTemporality == AggregationTemporality.CUMULATIVE; + List points = + Stream.of(ATTR_1, ATTR_2, ATTR_3, ATTR_4) + .map(attr -> getReducedPointData(collectedMetrics, isCumulative, attr)) + .collect(toList()); + if (aggregation == Aggregation.sum()) { + if (instrumentValueType == InstrumentValueType.DOUBLE) { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + DoublePointData.class, + p -> assertThat(p.getValue()).isEqualTo((double) sum.get()))); + + } else { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + LongPointData.class, + p -> assertThat(p.getValue()).isEqualTo(sum.get()))); + } + } else if (aggregation == Aggregation.lastValue()) { + if (instrumentValueType == InstrumentValueType.DOUBLE) { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + DoublePointData.class, + p -> assertThat(p.getValue()).isEqualTo((double) lastValue.get()))); + } else { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + LongPointData.class, + p -> assertThat(p.getValue()).isEqualTo(lastValue.get()))); + } + } else if (aggregation == Aggregation.explicitBucketHistogram()) { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + HistogramPointData.class, + p -> { + assertThat(p.getSum()).isEqualTo((double) sum.get()); + assertThat(p.getMin()).isEqualTo((double) min.get()); + assertThat(p.getMax()).isEqualTo((double) max.get()); + assertThat(p.getCount()) + .isEqualTo(bucketCounts.stream().reduce(0L, Long::sum)); + assertThat(p.getCounts()).isEqualTo(bucketCounts); + })); + } else if (aggregation == Aggregation.base2ExponentialBucketHistogram()) { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + ExponentialHistogramPointData.class, + p -> { + assertThat(p.getSum()).isEqualTo((double) sum.get()); + assertThat(p.getMin()).isEqualTo((double) min.get()); + assertThat(p.getMax()).isEqualTo((double) max.get()); + assertThat(p.getZeroCount()).isEqualTo(zeroCount.get()); + assertThat( + p.getPositiveBuckets().getBucketCounts().stream() + .reduce(0L, Long::sum)) + .isEqualTo(totalCount.get() - zeroCount.get()); + })); + } else { + throw new IllegalArgumentException(); + } + } + + private static Stream stressTestArgs() { + List argumentsList = new ArrayList<>(); + for (AggregationTemporality aggregationTemporality : AggregationTemporality.values()) { + for (InstrumentTypeAndAggregation instrumentTypeAndAggregation : + InstrumentTypeAndAggregation.values()) { + for (MemoryMode memoryMode : MemoryMode.values()) { + for (InstrumentValueType instrumentValueType : InstrumentValueType.values()) { + argumentsList.add( + Arguments.of( + aggregationTemporality, + instrumentTypeAndAggregation.instrumentType, + instrumentTypeAndAggregation.aggregation, + memoryMode, + instrumentValueType)); + } + } + } + } + return argumentsList.stream(); + } + + private static Instrument getInstrument( + Meter meter, InstrumentType instrumentType, InstrumentValueType instrumentValueType) { + switch (instrumentType) { + case COUNTER: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter.counterBuilder(INSTRUMENT_NAME).ofDoubles().build()::add + : meter.counterBuilder(INSTRUMENT_NAME).build()::add; + case UP_DOWN_COUNTER: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter.upDownCounterBuilder(INSTRUMENT_NAME).ofDoubles().build()::add + : meter.upDownCounterBuilder(INSTRUMENT_NAME).build()::add; + case HISTOGRAM: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .build() + ::record + : meter + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .ofLongs() + .build() + ::record; + case GAUGE: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter.gaugeBuilder(INSTRUMENT_NAME).build()::set + : meter.gaugeBuilder(INSTRUMENT_NAME).ofLongs().build()::set; + case OBSERVABLE_COUNTER: + case OBSERVABLE_UP_DOWN_COUNTER: + case OBSERVABLE_GAUGE: + } + throw new IllegalArgumentException(); + } + + private interface Instrument { + void record(long value, Attributes attributes); + } + + private static MetricData copy(MetricData m) { + switch (m.getType()) { + case LONG_GAUGE: + return ImmutableMetricData.createLongGauge( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableGaugeData.create( + m.getLongGaugeData().getPoints().stream() + .map( + p -> + ImmutableLongPointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case DOUBLE_GAUGE: + return ImmutableMetricData.createDoubleGauge( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableGaugeData.create( + m.getDoubleGaugeData().getPoints().stream() + .map( + p -> + ImmutableDoublePointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case LONG_SUM: + return ImmutableMetricData.createLongSum( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableSumData.create( + m.getLongSumData().isMonotonic(), + m.getLongSumData().getAggregationTemporality(), + m.getLongSumData().getPoints().stream() + .map( + p -> + ImmutableLongPointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case DOUBLE_SUM: + return ImmutableMetricData.createDoubleSum( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableSumData.create( + m.getDoubleSumData().isMonotonic(), + m.getDoubleSumData().getAggregationTemporality(), + m.getDoubleSumData().getPoints().stream() + .map( + p -> + ImmutableDoublePointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case HISTOGRAM: + return ImmutableMetricData.createDoubleHistogram( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableHistogramData.create( + m.getHistogramData().getAggregationTemporality(), + m.getHistogramData().getPoints().stream() + .map( + p -> + ImmutableHistogramPointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getSum(), + p.hasMin(), + p.getMin(), + p.hasMax(), + p.getMax(), + p.getBoundaries(), + new ArrayList<>(p.getCounts()), + p.getExemplars())) + .collect(toList()))); + case EXPONENTIAL_HISTOGRAM: + return ImmutableMetricData.createExponentialHistogram( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableExponentialHistogramData.create( + m.getExponentialHistogramData().getAggregationTemporality(), + m.getExponentialHistogramData().getPoints().stream() + .map( + p -> + ImmutableExponentialHistogramPointData.create( + p.getScale(), + p.getSum(), + p.getZeroCount(), + p.hasMin(), + p.getMin(), + p.hasMax(), + p.getMax(), + ExponentialHistogramBuckets.create( + p.getPositiveBuckets().getScale(), + p.getPositiveBuckets().getOffset(), + new ArrayList<>(p.getPositiveBuckets().getBucketCounts())), + ExponentialHistogramBuckets.create( + p.getNegativeBuckets().getScale(), + p.getNegativeBuckets().getOffset(), + new ArrayList<>(p.getNegativeBuckets().getBucketCounts())), + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getExemplars())) + .collect(toList()))); + case SUMMARY: + } + throw new IllegalArgumentException(); + } + + /** + * Reduce a list of metric data assumed to be uniform and for a single instrument to a single + * point data. If cumulative, return the last point data. If delta, merge the data points. + */ + private static PointData getReducedPointData( + List metrics, boolean isCumulative, Attributes attributes) { + metrics.forEach(metricData -> assertThat(metricData.getName()).isEqualTo(INSTRUMENT_NAME)); + MetricData first = metrics.get(0); + switch (first.getType()) { + case LONG_GAUGE: + List lgaugePoints = + metrics.stream() + .flatMap(m -> m.getLongGaugeData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return lgaugePoints.get(lgaugePoints.size() - 1); + case DOUBLE_GAUGE: + List dgaugePoints = + metrics.stream() + .flatMap(m -> m.getDoubleGaugeData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return dgaugePoints.get(dgaugePoints.size() - 1); + case LONG_SUM: + List lsumPoints = + metrics.stream() + .flatMap(m -> m.getLongSumData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return isCumulative + ? lsumPoints.get(lsumPoints.size() - 1) + : lsumPoints.stream() + .reduce( + ImmutableLongPointData.create(0, 0, empty(), 0), + (p1, p2) -> + ImmutableLongPointData.create( + 0, 0, empty(), p1.getValue() + p2.getValue(), emptyList())); + case DOUBLE_SUM: + List dsumPoints = + metrics.stream() + .flatMap(m -> m.getDoubleSumData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return isCumulative + ? dsumPoints.get(dsumPoints.size() - 1) + : dsumPoints.stream() + .reduce( + ImmutableDoublePointData.create(0, 0, empty(), 0), + (p1, p2) -> + ImmutableDoublePointData.create( + 0, 0, empty(), p1.getValue() + p2.getValue(), emptyList())); + case HISTOGRAM: + List histPoints = + metrics.stream() + .flatMap(m -> m.getHistogramData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return isCumulative + ? histPoints.get(histPoints.size() - 1) + : histPoints.stream() + .reduce( + ImmutableHistogramPointData.create( + 0, + 0, + empty(), + 0, + /* hasMin= */ true, + 0, + /* hasMax= */ true, + 0, + emptyList(), + singletonList(0L)), + (p1, p2) -> + ImmutableHistogramPointData.create( + 0, + 0, + empty(), + p1.getSum() + p2.getSum(), + p1.hasMin() || p2.hasMin(), + Math.min(p1.getMin(), p2.getMin()), + p2.hasMax() || p1.hasMax(), + Math.max(p1.getMax(), p2.getMax()), + p2.getBoundaries(), + mergeBuckets(p1.getCounts(), p2.getCounts()))); + case EXPONENTIAL_HISTOGRAM: + List expoHistPoints = + metrics.stream() + .flatMap(m -> m.getExponentialHistogramData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return isCumulative + ? expoHistPoints.get(expoHistPoints.size() - 1) + : expoHistPoints.stream() + .reduce( + // NOTE: we're only testing the correctness of sum, count, min, and max, so we + // skip the complexity of correctly merge which involves re-bucketing when the + // scale changes. The result is bucket counts with meaningless values, but + // correct aggregate counts. + ImmutableExponentialHistogramPointData.create( + 0, + 0, + 0, + /* hasMin= */ true, + 0, + /* hasMax= */ true, + 0, + ExponentialHistogramBuckets.create(0, 0, emptyList()), + ExponentialHistogramBuckets.create(0, 0, emptyList()), + 0, + 0, + empty(), + emptyList()), + (p1, p2) -> + ImmutableExponentialHistogramPointData.create( + Math.min(p1.getScale(), p2.getScale()), + p1.getSum() + p2.getSum(), + p1.getZeroCount() + p2.getZeroCount(), + p1.hasMin() || p2.hasMin(), + Math.min(p1.getMin(), p2.getMin()), + p1.hasMax() || p2.hasMax(), + Math.max(p1.getMax(), p2.getMax()), + ExponentialHistogramBuckets.create( + 0, + 0, + mergeBuckets( + p1.getPositiveBuckets().getBucketCounts(), + p2.getPositiveBuckets().getBucketCounts())), + ExponentialHistogramBuckets.create( + 0, + 0, + mergeBuckets( + p1.getNegativeBuckets().getBucketCounts(), + p2.getNegativeBuckets().getBucketCounts())), + 0, + 0, + empty(), + emptyList())); + case SUMMARY: + } + throw new IllegalArgumentException(); + } + + private static List mergeBuckets(List l1, List l2) { + int size = Math.max(l1.size(), l2.size()); + List merged = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + long mergedCount = 0; + if (i < l1.size()) { + mergedCount += l1.get(i); + } + if (i < l2.size()) { + mergedCount += l2.get(i); + } + merged.add(mergedCount); + } + return merged; + } + + @SuppressWarnings("ImmutableEnumChecker") + private enum InstrumentTypeAndAggregation { + COUNTER_SUM(COUNTER, Aggregation.sum()), + UP_DOWN_COUNTER_SUM(UP_DOWN_COUNTER, Aggregation.sum()), + GAUGE_LAST_VALUE(GAUGE, Aggregation.lastValue()), + HISTOGRAM_EXPLICIT(HISTOGRAM, Aggregation.explicitBucketHistogram()), + HISTOGRAM_BASE2_EXPONENTIAL(HISTOGRAM, Aggregation.base2ExponentialBucketHistogram()); + + InstrumentTypeAndAggregation(InstrumentType instrumentType, Aggregation aggregation) { + this.instrumentType = instrumentType; + this.aggregation = aggregation; + } + + private final InstrumentType instrumentType; + private final Aggregation aggregation; + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java index e883b5d2698..2e425e5cd0c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java @@ -7,7 +7,6 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; -import com.google.common.collect.ImmutableList; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -31,9 +30,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import java.util.stream.DoubleStream; import org.junit.jupiter.api.Test; @@ -268,49 +264,6 @@ void testHistogramCounts(MemoryMode memoryMode) { assertThat(point.getCounts().size()).isEqualTo(boundaries.length + 1); } - @ParameterizedTest - @EnumSource(MemoryMode.class) - void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { - init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); - ImmutableList updates = ImmutableList.of(1L, 2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L); - int numberOfThreads = updates.size(); - int numberOfUpdates = 10000; - ThreadPoolExecutor executor = - (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads); - - executor.invokeAll( - updates.stream() - .map( - v -> - Executors.callable( - () -> { - for (int j = 0; j < numberOfUpdates; j++) { - aggregatorHandle.recordLong(v, Attributes.empty(), Context.current()); - if (ThreadLocalRandom.current().nextInt(10) == 0) { - aggregatorHandle.aggregateThenMaybeReset( - 0, 1, Attributes.empty(), /* reset= */ false); - } - } - })) - .collect(Collectors.toList())); - - assertThat( - aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ false)) - .isEqualTo( - ImmutableHistogramPointData.create( - 0, - 1, - Attributes.empty(), - 1010000, - /* hasMin= */ true, - 1d, - /* hasMax= */ true, - 23d, - boundariesList, - Arrays.asList(50000L, 50000L, 0L, 0L))); - } - @Test void testReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index a3175578d77..0d74e5ee9cb 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -14,8 +14,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.google.common.util.concurrent.AtomicDouble; -import com.google.common.util.concurrent.Uninterruptibles; import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -30,7 +28,6 @@ import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; @@ -44,19 +41,11 @@ import io.opentelemetry.sdk.testing.assertj.DoubleSumAssert; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.function.BiConsumer; -import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.event.Level; @SuppressLogger(DefaultSynchronousMetricStorage.class) @@ -768,106 +757,6 @@ private static void assertOverflowDoesNotExists(MetricData metricData) { .equals(MetricStorage.CARDINALITY_OVERFLOW)))); } - @ParameterizedTest - @MethodSource("concurrentStressTestArguments") - void recordAndCollect_concurrentStressTest( - DefaultSynchronousMetricStorage storage, BiConsumer collect) { - // Define record threads. Each records a value of 1.0, 2000 times - List threads = new ArrayList<>(); - CountDownLatch latch = new CountDownLatch(4); - for (int i = 0; i < 4; i++) { - Thread thread = - new Thread( - () -> { - for (int j = 0; j < 2000; j++) { - storage.recordDouble(1.0, Attributes.empty(), Context.current()); - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); - } - latch.countDown(); - }); - threads.add(thread); - } - - // Define collect thread. Collect thread collects and aggregates the - AtomicDouble cumulativeSum = new AtomicDouble(); - Thread collectThread = - new Thread( - () -> { - int extraCollects = 0; - // If we terminate when latch.count() == 0, the last collect may have occurred before - // the last recorded measurement. To ensure we collect all measurements, we collect - // one extra time after latch.count() == 0. - while (latch.getCount() != 0 || extraCollects <= 1) { - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); - MetricData metricData = - storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1); - if (!metricData.isEmpty()) { - metricData.getDoubleSumData().getPoints().stream() - .findFirst() - .ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum)); - } - if (latch.getCount() == 0) { - extraCollects++; - } - } - }); - - // Start all the threads - collectThread.start(); - threads.forEach(Thread::start); - - // Wait for the collect thread to end, which collects until the record threads are done - Uninterruptibles.joinUninterruptibly(collectThread); - - assertThat(cumulativeSum.get()).isEqualTo(8000.0); - } - - private static Stream concurrentStressTestArguments() { - List argumentsList = new ArrayList<>(); - - for (MemoryMode memoryMode : MemoryMode.values()) { - Aggregator aggregator = - ((AggregatorFactory) Aggregation.sum()) - .createAggregator( - DESCRIPTOR, asExemplarFilterInternal(ExemplarFilter.alwaysOff()), memoryMode); - - argumentsList.add( - Arguments.of( - // Delta - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create( - InMemoryMetricReader.builder() - .setAggregationTemporalitySelector(unused -> AggregationTemporality.DELTA) - .setMemoryMode(memoryMode) - .build(), - ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT, - /* enabled= */ true), - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.addAndGet(value))); - - argumentsList.add( - Arguments.of( - // Cumulative - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create( - InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(), - ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT, - /* enabled= */ true), - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.set(value))); - } - - return argumentsList.stream(); - } - @ParameterizedTest @EnumSource(MemoryMode.class) void enabledThenDisable_isEnabled(MemoryMode memoryMode) {