From 5e46ad6feeefba09cf47b1a489eaf72d778dee7c Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 16 Jan 2026 16:51:30 -0600 Subject: [PATCH 1/6] Add synchronous instrument stress test --- .../SynchronousInstrumentStressTest.java | 479 ++++++++++++++++++ 1 file changed, 479 insertions(+) create mode 100644 sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java new file mode 100644 index 00000000000..a88c90ec3c5 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -0,0 +1,479 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics; + +import static io.opentelemetry.api.common.Attributes.empty; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.internal.testing.CleanupExtension; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * {@link #stressTest(AggregationTemporality, InstrumentType, MemoryMode, InstrumentValueType)} + * performs a stress test to confirm simultaneous record and collections do not have concurrency + * issues like lost writes, partial writes, duplicate writes, etc. All combinations of the following + * dimensions are tested: aggregation temporality, instrument type (synchronous), memory mode, + * instrument value type. + */ +// TODO: add support for exponential histogram +class SynchronousInstrumentStressTest { + + private static final String instrumentName = "instrument"; + private static final Duration oneMicrosecond = Duration.ofNanos(1000); + private static final List instrumentTypes = + Arrays.asList( + InstrumentType.COUNTER, + InstrumentType.HISTOGRAM, + InstrumentType.UP_DOWN_COUNTER, + InstrumentType.GAUGE); + private static final List bucketBoundaries = + ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES; + private static final double[] bucketBoundariesArr = + bucketBoundaries.stream().mapToDouble(Double::doubleValue).toArray(); + + @RegisterExtension CleanupExtension cleanup = new CleanupExtension(); + + @ParameterizedTest + @MethodSource("stressTestArgs") + void stressTest( + AggregationTemporality aggregationTemporality, + InstrumentType instrumentType, + MemoryMode memoryMode, + InstrumentValueType instrumentValueType) { + // Initialize metric SDK + InMemoryMetricReader reader = + InMemoryMetricReader.builder() + .setAggregationTemporalitySelector(unused -> aggregationTemporality) + .setMemoryMode(memoryMode) + .build(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(reader).build(); + cleanup.addCloseable(meterProvider); + Meter meter = meterProvider.get("test"); + Instrument instrument = getInstrument(meter, instrumentType, instrumentValueType); + + // Define list of measurements to record + // Later, we'll assert that the data collected matches these measurements, with no lost writes, + // partial writes, duplicate writes, etc. + int measurementCount = 2000; + List measurements = new ArrayList<>(); + for (int i = 0; i < measurementCount; i++) { + measurements.add((long) i); + } + Collections.shuffle(measurements); + + // Define recording threads + int threadCount = 4; + List recordThreads = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(threadCount); + for (int i = 0; i < 4; i++) { + recordThreads.add( + new Thread( + () -> { + for (Long measurement : measurements) { + instrument.record(measurement); + Uninterruptibles.sleepUninterruptibly(oneMicrosecond); + } + latch.countDown(); + })); + } + + // Define collecting thread + // NOTE: collect makes a copy of MetricData because REUSEABLE_MEMORY mode reuses MetricData + List collectedMetrics = new ArrayList<>(); + Thread collectThread = + new Thread( + () -> { + while (latch.getCount() != 0) { + Uninterruptibles.sleepUninterruptibly(oneMicrosecond); + collectedMetrics.addAll( + reader.collectAllMetrics().stream() + .map(SynchronousInstrumentStressTest::copy) + .collect(toList())); + } + collectedMetrics.addAll( + reader.collectAllMetrics().stream() + .map(SynchronousInstrumentStressTest::copy) + .collect(toList())); + }); + + // Start all the threads + collectThread.start(); + recordThreads.forEach(Thread::start); + + // Wait for the collect thread to end, which collects until the record threads are done + Uninterruptibles.joinUninterruptibly(collectThread); + + // Assert collected data is consistent with recorded measurements by independently computing the + // expected aggregated value and comparing to the actual results. + // NOTE: this does not validate the absence of partial writes for cumulative instruments which + // track multiple fields. For example, explicit histogram tracks sum and bucket counts. These + // should be atomically updated such that we never collect the sum without corresponding bucket + // counts update, or vice verse. This test asserts that the cumulative state at the end is + // consistent, and interim collects unknowingly see partial writes. + AtomicLong lastValue = new AtomicLong(0); + AtomicLong sum = new AtomicLong(0); + AtomicLong min = new AtomicLong(Long.MAX_VALUE); + AtomicLong max = new AtomicLong(-1); + List bucketCounts = new ArrayList<>(); + for (int i = 0; i < bucketBoundaries.size() + 1; i++) { + bucketCounts.add(0L); + } + LongStream.range(0, threadCount) + .flatMap(i -> measurements.stream().mapToLong(l -> l)) + .forEach( + measurement -> { + lastValue.set(measurement); + sum.addAndGet(measurement); + min.updateAndGet(v -> Math.min(v, measurement)); + max.updateAndGet(v -> Math.max(v, measurement)); + int bucketIndex = + ExplicitBucketHistogramUtils.findBucketIndex( + bucketBoundariesArr, (double) measurement); + bucketCounts.set(bucketIndex, bucketCounts.get(bucketIndex) + 1); + }); + + PointData reducedPointData = + getReducedPointData( + collectedMetrics, aggregationTemporality == AggregationTemporality.CUMULATIVE); + if (instrumentType == InstrumentType.COUNTER + || instrumentType == InstrumentType.UP_DOWN_COUNTER) { + if (instrumentValueType == InstrumentValueType.DOUBLE) { + assertThat(reducedPointData) + .isInstanceOfSatisfying( + DoublePointData.class, + point -> assertThat(point.getValue()).isEqualTo((double) sum.get())); + } else { + assertThat(reducedPointData) + .isInstanceOfSatisfying( + LongPointData.class, point -> assertThat(point.getValue()).isEqualTo(sum.get())); + } + } else if (instrumentType == InstrumentType.GAUGE) { + if (instrumentValueType == InstrumentValueType.DOUBLE) { + assertThat(reducedPointData) + .isInstanceOfSatisfying( + DoublePointData.class, + point -> assertThat(point.getValue()).isEqualTo((double) lastValue.get())); + } else { + assertThat(reducedPointData) + .isInstanceOfSatisfying( + LongPointData.class, + point -> assertThat(point.getValue()).isEqualTo(lastValue.get())); + } + } else if (instrumentType == InstrumentType.HISTOGRAM) { + assertThat(reducedPointData) + .isInstanceOfSatisfying( + HistogramPointData.class, + point -> { + assertThat(point.getSum()).isEqualTo((double) sum.get()); + assertThat(point.getMin()).isEqualTo((double) min.get()); + assertThat(point.getMax()).isEqualTo((double) max.get()); + assertThat(point.getCount()).isEqualTo(bucketCounts.stream().reduce(0L, Long::sum)); + assertThat(point.getCounts()).isEqualTo(bucketCounts); + }); + } else { + throw new IllegalArgumentException(); + } + } + + private static Stream stressTestArgs() { + List argumentsList = new ArrayList<>(); + for (AggregationTemporality aggregationTemporality : AggregationTemporality.values()) { + for (InstrumentType instrumentType : instrumentTypes) { + for (MemoryMode memoryMode : MemoryMode.values()) { + for (InstrumentValueType instrumentValueType : InstrumentValueType.values()) { + argumentsList.add( + Arguments.of( + aggregationTemporality, instrumentType, memoryMode, instrumentValueType)); + } + } + } + } + return argumentsList.stream(); + } + + private static Instrument getInstrument( + Meter meter, InstrumentType instrumentType, InstrumentValueType instrumentValueType) { + switch (instrumentType) { + case COUNTER: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter.counterBuilder(instrumentName).ofDoubles().build()::add + : meter.counterBuilder(instrumentName).build()::add; + case UP_DOWN_COUNTER: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter.upDownCounterBuilder(instrumentName).ofDoubles().build()::add + : meter.upDownCounterBuilder(instrumentName).build()::add; + case HISTOGRAM: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter + .histogramBuilder(instrumentName) + .setExplicitBucketBoundariesAdvice(bucketBoundaries) + .build() + ::record + : meter + .histogramBuilder(instrumentName) + .setExplicitBucketBoundariesAdvice(bucketBoundaries) + .ofLongs() + .build() + ::record; + case GAUGE: + return instrumentValueType == InstrumentValueType.DOUBLE + ? meter.gaugeBuilder(instrumentName).build()::set + : meter.gaugeBuilder(instrumentName).ofLongs().build()::set; + case OBSERVABLE_COUNTER: + case OBSERVABLE_UP_DOWN_COUNTER: + case OBSERVABLE_GAUGE: + } + throw new IllegalArgumentException(); + } + + private interface Instrument { + void record(long value); + } + + private static MetricData copy(MetricData m) { + switch (m.getType()) { + case LONG_GAUGE: + return ImmutableMetricData.createLongGauge( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableGaugeData.create( + m.getLongGaugeData().getPoints().stream() + .map( + p -> + ImmutableLongPointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case DOUBLE_GAUGE: + return ImmutableMetricData.createDoubleGauge( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableGaugeData.create( + m.getDoubleGaugeData().getPoints().stream() + .map( + p -> + ImmutableDoublePointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case LONG_SUM: + return ImmutableMetricData.createLongSum( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableSumData.create( + m.getLongSumData().isMonotonic(), + m.getLongSumData().getAggregationTemporality(), + m.getLongSumData().getPoints().stream() + .map( + p -> + ImmutableLongPointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case DOUBLE_SUM: + return ImmutableMetricData.createDoubleSum( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableSumData.create( + m.getDoubleSumData().isMonotonic(), + m.getDoubleSumData().getAggregationTemporality(), + m.getDoubleSumData().getPoints().stream() + .map( + p -> + ImmutableDoublePointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getValue(), + p.getExemplars())) + .collect(toList()))); + case HISTOGRAM: + return ImmutableMetricData.createDoubleHistogram( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableHistogramData.create( + m.getHistogramData().getAggregationTemporality(), + m.getHistogramData().getPoints().stream() + .map( + p -> + ImmutableHistogramPointData.create( + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getSum(), + p.hasMin(), + p.getMin(), + p.hasMax(), + p.getMax(), + p.getBoundaries(), + p.getCounts(), + p.getExemplars())) + .collect(toList()))); + case SUMMARY: + case EXPONENTIAL_HISTOGRAM: + } + throw new IllegalArgumentException(); + } + + /** + * Reduce a list of metric data assumed to be uniform and for a single instrument to a single + * point data. If cumulative, return the last point data. If delta, merge the data points. + */ + private static PointData getReducedPointData(List metrics, boolean isCumulative) { + metrics.stream() + .forEach(metricData -> assertThat(metricData.getName()).isEqualTo(instrumentName)); + MetricData first = metrics.get(0); + switch (first.getType()) { + case LONG_GAUGE: + List lgaugePoints = + metrics.stream() + .flatMap(m -> m.getLongGaugeData().getPoints().stream()) + .collect(toList()); + return lgaugePoints.get(lgaugePoints.size() - 1); + case DOUBLE_GAUGE: + List dgaugePoints = + metrics.stream() + .flatMap(m -> m.getDoubleGaugeData().getPoints().stream()) + .collect(toList()); + return dgaugePoints.get(dgaugePoints.size() - 1); + case LONG_SUM: + List lsumPoints = + metrics.stream() + .flatMap(m -> m.getLongSumData().getPoints().stream()) + .collect(toList()); + return isCumulative + ? lsumPoints.get(lsumPoints.size() - 1) + : lsumPoints.stream() + .reduce( + ImmutableLongPointData.create(0, 0, empty(), 0), + (p1, p2) -> + ImmutableLongPointData.create( + 0, 0, empty(), p1.getValue() + p2.getValue(), emptyList())); + case DOUBLE_SUM: + List dsumPoints = + metrics.stream() + .flatMap(m -> m.getDoubleSumData().getPoints().stream()) + .collect(toList()); + return isCumulative + ? dsumPoints.get(dsumPoints.size() - 1) + : dsumPoints.stream() + .reduce( + ImmutableDoublePointData.create(0, 0, empty(), 0), + (p1, p2) -> + ImmutableDoublePointData.create( + 0, 0, empty(), p1.getValue() + p2.getValue(), emptyList())); + case HISTOGRAM: + List histPoints = + metrics.stream() + .flatMap(m -> m.getHistogramData().getPoints().stream()) + .collect(toList()); + return isCumulative + ? histPoints.get(histPoints.size() - 1) + : histPoints.stream() + .reduce( + ImmutableHistogramPointData.create( + 0, + 0, + empty(), + 0, + /* hasMin= */ true, + 0, + /* hasMax= */ true, + 0, + emptyList(), + singletonList(0L)), + (p1, p2) -> + ImmutableHistogramPointData.create( + 0, + 0, + empty(), + p1.getSum() + p2.getSum(), + p1.hasMin() || p2.hasMin(), + Math.min(p1.getMin(), p2.getMin()), + p2.hasMax() || p1.hasMax(), + Math.max(p1.getMax(), p2.getMax()), + p2.getBoundaries(), + mergeCounts(p1.getCounts(), p2.getCounts()))); + case EXPONENTIAL_HISTOGRAM: + case SUMMARY: + } + throw new IllegalArgumentException(); + } + + private static List mergeCounts(List l1, List l2) { + int size = Math.max(l1.size(), l2.size()); + List merged = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + long mergedCount = 0; + if (i < l1.size()) { + mergedCount += l1.get(i); + } + if (i < l2.size()) { + mergedCount += l2.get(i); + } + merged.add(mergedCount); + } + return merged; + } +} From e33650210d322b1d76a54441737c6922787c7bbe Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 16 Jan 2026 17:14:01 -0600 Subject: [PATCH 2/6] Extend to attributes and delete existing stress tests --- .../sdk/metrics/SdkDoubleCounterTest.java | 93 --------------- .../sdk/metrics/SdkDoubleGaugeTest.java | 95 --------------- .../sdk/metrics/SdkDoubleHistogramTest.java | 105 ----------------- .../metrics/SdkDoubleUpDownCounterTest.java | 98 ---------------- .../sdk/metrics/SdkLongCounterTest.java | 96 --------------- .../sdk/metrics/SdkLongGaugeTest.java | 95 --------------- .../sdk/metrics/SdkLongHistogramTest.java | 105 ----------------- .../sdk/metrics/SdkLongUpDownCounterTest.java | 98 ---------------- .../sdk/metrics/StressTestRunner.java | 104 ---------------- .../SynchronousInstrumentStressTest.java | 99 +++++++++++----- ...se2ExponentialHistogramAggregatorTest.java | 1 + ...ExplicitBucketHistogramAggregatorTest.java | 47 -------- .../state/SynchronousMetricStorageTest.java | 111 ------------------ 13 files changed, 69 insertions(+), 1078 deletions(-) delete mode 100644 sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java index d733821972b..34882efc751 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleCounterTest.java @@ -21,7 +21,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -174,96 +173,4 @@ void doubleCounterAdd_NaN() { doubleCounter.add(Double.NaN); assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0); } - - @Test - void stressTest() { - DoubleCounter doubleCounter = sdkMeter.counterBuilder("testCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, 1, () -> doubleCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testCounter") - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(40000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleCounter doubleCounter = sdkMeter.counterBuilder("testCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> - doubleCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java index 61c0d3ad0b9..740775e766f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeTest.java @@ -27,7 +27,6 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.time.Duration; import java.util.Collections; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; /** Unit tests for {@link SdkDoubleGauge}. */ @@ -271,98 +270,4 @@ void collectMetrics_WithMultipleCollects() { .hasValue(222d) .hasAttributes(attributeEntry("K", "V"))))); } - - @Test - void stressTest() { - DoubleGauge doubleGauge = sdkMeter.gaugeBuilder("testGauge").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 1, - () -> { - doubleGauge.set(10, Attributes.builder().put("K", "V").build()); - doubleGauge.set(11, Attributes.builder().put("K", "V").build()); - })); - } - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testGauge") - .hasDoubleGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleGauge doubleGauge = sdkMeter.gaugeBuilder("testGauge").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> { - doubleGauge.set(10, Attributes.builder().put(keys[i], values[i]).build()); - doubleGauge.set(11, Attributes.builder().put(keys[i], values[i]).build()); - }))); - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasDoubleGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java index dd210faad46..4da66aca7df 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogramTest.java @@ -27,7 +27,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -369,108 +368,4 @@ void collectMetrics_ExemplarsWithExplicitBucketHistogram() { .put("key", "value") .build()))))); } - - @Test - void stressTest() { - DoubleHistogram doubleHistogram = sdkMeter.histogramBuilder("testHistogram").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 1, - () -> doubleHistogram.record(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasAttributes(attributeEntry("K", "V")) - .hasCount(4_000) - .hasSum(40_000)))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleHistogram doubleHistogram = sdkMeter.histogramBuilder("testHistogram").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> - doubleHistogram.record( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(2_000) - .hasSum(20_000) - .hasBucketCounts( - 0, 0, 2000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java index a8dc0041c0d..a15dcf422cc 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounterTest.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; /** Unit tests for {@link SdkDoubleUpDownCounter}. */ @@ -158,103 +157,6 @@ void collectMetrics_WithMultipleCollects() { .hasValue(777.9)))); } - @Test - void stressTest() { - DoubleUpDownCounter doubleUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> doubleUpDownCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(40_000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - DoubleUpDownCounter doubleUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").ofDoubles().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> - doubleUpDownCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasDoubleSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(20_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } - @Test void testToString() { String expected = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java index 30f9c27ff22..160a07b9e48 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongCounterTest.java @@ -20,7 +20,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -163,99 +162,4 @@ void longCounterAdd_Monotonicity() { logs.assertContains( "Counters can only increase. Instrument testCounter has recorded a negative value."); } - - @Test - void stressTest() { - LongCounter longCounter = sdkMeter.counterBuilder("testCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, 1, () -> longCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testCounter") - .hasLongSumSatisfying( - longSum -> - longSum - .isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(80_000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongCounter longCounter = sdkMeter.counterBuilder("testCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> - longCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testCounter") - .hasLongSumSatisfying( - longSum -> - longSum - .isCumulative() - .isMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java index 0a117e48d82..3879f2e8a76 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeTest.java @@ -24,7 +24,6 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.time.Duration; import java.util.Collections; -import java.util.stream.IntStream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -265,98 +264,4 @@ void collectMetrics_WithMultipleCollects() { .hasValue(222) .hasAttributes(attributeEntry("K", "V"))))); } - - @Test - void stressTest() { - LongGauge longGauge = sdkMeter.gaugeBuilder("testGauge").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 1, - () -> { - longGauge.set(10, Attributes.builder().put("K", "V").build()); - longGauge.set(11, Attributes.builder().put("K", "V").build()); - })); - } - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testGauge") - .hasLongGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongGauge longGauge = sdkMeter.gaugeBuilder("testGauge").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> { - longGauge.set(10, Attributes.builder().put(keys[i], values[i]).build()); - longGauge.set(11, Attributes.builder().put(keys[i], values[i]).build()); - }))); - - stressTestBuilder.build().run(); - assertThat(cumulativeReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasLongGaugeSatisfying( - gauge -> - gauge.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(11) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java index dd802ff613f..4ecde4c0c28 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongHistogramTest.java @@ -26,7 +26,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -529,108 +528,4 @@ void collectMetrics_ExemplarsWithExplicitBucketHistogram() { .put("key", "value") .build()))))); } - - @Test - void stressTest() { - LongHistogram longHistogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> longHistogram.record(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(reader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasAttributes(attributeEntry("K", "V")) - .hasCount(8_000) - .hasSum(80_000)))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongHistogram longHistogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> - longHistogram.record( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(reader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testHistogram") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasCount(1_000) - .hasSum(10_000) - .hasBucketCounts( - 0, 0, 1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java index 50feee13094..39eb6500207 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounterTest.java @@ -18,7 +18,6 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; import java.time.Duration; -import java.util.stream.IntStream; import org.junit.jupiter.api.Test; /** Unit tests for {@link SdkLongUpDownCounter}. */ @@ -150,101 +149,4 @@ void collectMetrics_WithMultipleCollects() { .hasAttributes(attributeEntry("K", "V")) .hasValue(777)))); } - - @Test - void stressTest() { - LongUpDownCounter longUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - for (int i = 0; i < 4; i++) { - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 2_000, - 1, - () -> longUpDownCounter.add(10, Attributes.builder().put("K", "V").build()))); - } - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasLongSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(80_000) - .hasAttributes(attributeEntry("K", "V"))))); - } - - @Test - void stressTest_WithDifferentLabelSet() { - String[] keys = {"Key_1", "Key_2", "Key_3", "Key_4"}; - String[] values = {"Value_1", "Value_2", "Value_3", "Value_4"}; - LongUpDownCounter longUpDownCounter = - sdkMeter.upDownCounterBuilder("testUpDownCounter").build(); - - StressTestRunner.Builder stressTestBuilder = - StressTestRunner.builder().setCollectionIntervalMs(100); - - IntStream.range(0, 4) - .forEach( - i -> - stressTestBuilder.addOperation( - StressTestRunner.Operation.create( - 1_000, - 2, - () -> - longUpDownCounter.add( - 10, Attributes.builder().put(keys[i], values[i]).build())))); - - stressTestBuilder.build().run(); - assertThat(sdkMeterReader.collectAllMetrics()) - .satisfiesExactly( - metric -> - assertThat(metric) - .hasResource(RESOURCE) - .hasInstrumentationScope(INSTRUMENTATION_SCOPE_INFO) - .hasName("testUpDownCounter") - .hasLongSumSatisfying( - sum -> - sum.isCumulative() - .isNotMonotonic() - .hasPointsSatisfying( - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[0], values[0])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[1], values[1])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[2], values[2])), - point -> - point - .hasStartEpochNanos(testClock.now()) - .hasEpochNanos(testClock.now()) - .hasValue(10_000) - .hasAttributes(attributeEntry(keys[3], values[3]))))); - } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java deleted file mode 100644 index 22f5f475187..00000000000 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/StressTestRunner.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.sdk.metrics; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Uninterruptibles; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import javax.annotation.concurrent.Immutable; - -@AutoValue -@Immutable -abstract class StressTestRunner { - abstract ImmutableList getOperations(); - - abstract int getCollectionIntervalMs(); - - final void run() { - List operations = getOperations(); - int numThreads = operations.size(); - CountDownLatch countDownLatch = new CountDownLatch(numThreads); - Thread collectionThread = - new Thread( - () -> { - // While workers still work, do collections. - while (countDownLatch.getCount() != 0) { - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(getCollectionIntervalMs())); - } - }); - List operationThreads = new ArrayList<>(numThreads); - for (Operation operation : operations) { - operationThreads.add( - new Thread( - () -> { - for (int i = 0; i < operation.getNumOperations(); i++) { - operation.getUpdater().update(); - Uninterruptibles.sleepUninterruptibly( - Duration.ofMillis(operation.getOperationDelayMs())); - } - countDownLatch.countDown(); - })); - } - - // Start collection thread then the rest of the worker threads. - collectionThread.start(); - for (Thread thread : operationThreads) { - thread.start(); - } - - // Wait for all the thread to finish. - for (Thread thread : operationThreads) { - Uninterruptibles.joinUninterruptibly(thread); - } - Uninterruptibles.joinUninterruptibly(collectionThread); - } - - static Builder builder() { - return new AutoValue_StressTestRunner.Builder(); - } - - @AutoValue.Builder - abstract static class Builder { - - abstract ImmutableList.Builder operationsBuilder(); - - abstract Builder setCollectionIntervalMs(int collectionInterval); - - Builder addOperation(Operation operation) { - operationsBuilder().add(operation); - return this; - } - - public abstract StressTestRunner build(); - } - - @AutoValue - @Immutable - abstract static class Operation { - - abstract int getNumOperations(); - - abstract int getOperationDelayMs(); - - abstract OperationUpdater getUpdater(); - - static Operation create(int numOperations, int operationDelayMs, OperationUpdater updater) { - return new AutoValue_StressTestRunner_Operation(numOperations, operationDelayMs, updater); - } - } - - interface OperationUpdater { - - /** Called every operation. */ - void update(); - } - - StressTestRunner() {} -} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java index a88c90ec3c5..263a0ba1dd4 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -12,6 +12,8 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.internal.testing.CleanupExtension; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -66,6 +68,11 @@ class SynchronousInstrumentStressTest { ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES; private static final double[] bucketBoundariesArr = bucketBoundaries.stream().mapToDouble(Double::doubleValue).toArray(); + private static final AttributeKey attributesKey = AttributeKey.stringKey("key"); + private static final Attributes attr1 = Attributes.of(attributesKey, "value1"); + private static final Attributes attr2 = Attributes.of(attributesKey, "value2"); + private static final Attributes attr3 = Attributes.of(attributesKey, "value3"); + private static final Attributes attr4 = Attributes.of(attributesKey, "value4"); @RegisterExtension CleanupExtension cleanup = new CleanupExtension(); @@ -106,8 +113,12 @@ void stressTest( recordThreads.add( new Thread( () -> { + List attributes = Arrays.asList(attr1, attr2, attr3, attr4); + Collections.shuffle(attributes); for (Long measurement : measurements) { - instrument.record(measurement); + for (Attributes attr : attributes) { + instrument.record(measurement, attr); + } Uninterruptibles.sleepUninterruptibly(oneMicrosecond); } latch.countDown(); @@ -169,44 +180,64 @@ void stressTest( bucketCounts.set(bucketIndex, bucketCounts.get(bucketIndex) + 1); }); - PointData reducedPointData = - getReducedPointData( - collectedMetrics, aggregationTemporality == AggregationTemporality.CUMULATIVE); + boolean isCumulative = aggregationTemporality == AggregationTemporality.CUMULATIVE; + List points = + Stream.of(attr1, attr2, attr3, attr4) + .map(attr -> getReducedPointData(collectedMetrics, isCumulative, attr)) + .collect(toList()); if (instrumentType == InstrumentType.COUNTER || instrumentType == InstrumentType.UP_DOWN_COUNTER) { if (instrumentValueType == InstrumentValueType.DOUBLE) { - assertThat(reducedPointData) - .isInstanceOfSatisfying( - DoublePointData.class, - point -> assertThat(point.getValue()).isEqualTo((double) sum.get())); + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + DoublePointData.class, + p -> assertThat(p.getValue()).isEqualTo((double) sum.get()))); + } else { - assertThat(reducedPointData) - .isInstanceOfSatisfying( - LongPointData.class, point -> assertThat(point.getValue()).isEqualTo(sum.get())); + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + LongPointData.class, + p -> assertThat(p.getValue()).isEqualTo(sum.get()))); } } else if (instrumentType == InstrumentType.GAUGE) { if (instrumentValueType == InstrumentValueType.DOUBLE) { - assertThat(reducedPointData) - .isInstanceOfSatisfying( - DoublePointData.class, - point -> assertThat(point.getValue()).isEqualTo((double) lastValue.get())); + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + DoublePointData.class, + p -> assertThat(p.getValue()).isEqualTo((double) lastValue.get()))); } else { - assertThat(reducedPointData) - .isInstanceOfSatisfying( - LongPointData.class, - point -> assertThat(point.getValue()).isEqualTo(lastValue.get())); + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + LongPointData.class, + p -> assertThat(p.getValue()).isEqualTo(lastValue.get()))); } } else if (instrumentType == InstrumentType.HISTOGRAM) { - assertThat(reducedPointData) - .isInstanceOfSatisfying( - HistogramPointData.class, - point -> { - assertThat(point.getSum()).isEqualTo((double) sum.get()); - assertThat(point.getMin()).isEqualTo((double) min.get()); - assertThat(point.getMax()).isEqualTo((double) max.get()); - assertThat(point.getCount()).isEqualTo(bucketCounts.stream().reduce(0L, Long::sum)); - assertThat(point.getCounts()).isEqualTo(bucketCounts); - }); + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + HistogramPointData.class, + p -> { + assertThat(p.getSum()).isEqualTo((double) sum.get()); + assertThat(p.getMin()).isEqualTo((double) min.get()); + assertThat(p.getMax()).isEqualTo((double) max.get()); + assertThat(p.getCount()) + .isEqualTo(bucketCounts.stream().reduce(0L, Long::sum)); + assertThat(p.getCounts()).isEqualTo(bucketCounts); + })); } else { throw new IllegalArgumentException(); } @@ -264,7 +295,7 @@ private static Instrument getInstrument( } private interface Instrument { - void record(long value); + void record(long value, Attributes attributes); } private static MetricData copy(MetricData m) { @@ -380,7 +411,8 @@ private static MetricData copy(MetricData m) { * Reduce a list of metric data assumed to be uniform and for a single instrument to a single * point data. If cumulative, return the last point data. If delta, merge the data points. */ - private static PointData getReducedPointData(List metrics, boolean isCumulative) { + private static PointData getReducedPointData( + List metrics, boolean isCumulative, Attributes attributes) { metrics.stream() .forEach(metricData -> assertThat(metricData.getName()).isEqualTo(instrumentName)); MetricData first = metrics.get(0); @@ -389,18 +421,21 @@ private static PointData getReducedPointData(List metrics, boolean i List lgaugePoints = metrics.stream() .flatMap(m -> m.getLongGaugeData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) .collect(toList()); return lgaugePoints.get(lgaugePoints.size() - 1); case DOUBLE_GAUGE: List dgaugePoints = metrics.stream() .flatMap(m -> m.getDoubleGaugeData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) .collect(toList()); return dgaugePoints.get(dgaugePoints.size() - 1); case LONG_SUM: List lsumPoints = metrics.stream() .flatMap(m -> m.getLongSumData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) .collect(toList()); return isCumulative ? lsumPoints.get(lsumPoints.size() - 1) @@ -414,6 +449,7 @@ private static PointData getReducedPointData(List metrics, boolean i List dsumPoints = metrics.stream() .flatMap(m -> m.getDoubleSumData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) .collect(toList()); return isCumulative ? dsumPoints.get(dsumPoints.size() - 1) @@ -427,6 +463,7 @@ private static PointData getReducedPointData(List metrics, boolean i List histPoints = metrics.stream() .flatMap(m -> m.getHistogramData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) .collect(toList()); return isCumulative ? histPoints.get(histPoints.size() - 1) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java index 763d36fbec6..34da3bfef7b 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java @@ -413,6 +413,7 @@ void testToMetricData(MemoryMode memoryMode) { @ParameterizedTest @EnumSource(MemoryMode.class) + // TODO: incorporate into SynchronousInstrumentStressTest and delete void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { initialize(memoryMode); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java index e883b5d2698..2e425e5cd0c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregatorTest.java @@ -7,7 +7,6 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; -import com.google.common.collect.ImmutableList; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -31,9 +30,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import java.util.stream.DoubleStream; import org.junit.jupiter.api.Test; @@ -268,49 +264,6 @@ void testHistogramCounts(MemoryMode memoryMode) { assertThat(point.getCounts().size()).isEqualTo(boundaries.length + 1); } - @ParameterizedTest - @EnumSource(MemoryMode.class) - void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { - init(memoryMode); - AggregatorHandle aggregatorHandle = aggregator.createHandle(); - ImmutableList updates = ImmutableList.of(1L, 2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L); - int numberOfThreads = updates.size(); - int numberOfUpdates = 10000; - ThreadPoolExecutor executor = - (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads); - - executor.invokeAll( - updates.stream() - .map( - v -> - Executors.callable( - () -> { - for (int j = 0; j < numberOfUpdates; j++) { - aggregatorHandle.recordLong(v, Attributes.empty(), Context.current()); - if (ThreadLocalRandom.current().nextInt(10) == 0) { - aggregatorHandle.aggregateThenMaybeReset( - 0, 1, Attributes.empty(), /* reset= */ false); - } - } - })) - .collect(Collectors.toList())); - - assertThat( - aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ false)) - .isEqualTo( - ImmutableHistogramPointData.create( - 0, - 1, - Attributes.empty(), - 1010000, - /* hasMin= */ true, - 1d, - /* hasMax= */ true, - 23d, - boundariesList, - Arrays.asList(50000L, 50000L, 0L, 0L))); - } - @Test void testReusableDataMemoryMode() { init(MemoryMode.REUSABLE_DATA); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index a3175578d77..0d74e5ee9cb 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -14,8 +14,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.google.common.util.concurrent.AtomicDouble; -import com.google.common.util.concurrent.Uninterruptibles; import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -30,7 +28,6 @@ import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; @@ -44,19 +41,11 @@ import io.opentelemetry.sdk.testing.assertj.DoubleSumAssert; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.time.TestClock; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.function.BiConsumer; -import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.event.Level; @SuppressLogger(DefaultSynchronousMetricStorage.class) @@ -768,106 +757,6 @@ private static void assertOverflowDoesNotExists(MetricData metricData) { .equals(MetricStorage.CARDINALITY_OVERFLOW)))); } - @ParameterizedTest - @MethodSource("concurrentStressTestArguments") - void recordAndCollect_concurrentStressTest( - DefaultSynchronousMetricStorage storage, BiConsumer collect) { - // Define record threads. Each records a value of 1.0, 2000 times - List threads = new ArrayList<>(); - CountDownLatch latch = new CountDownLatch(4); - for (int i = 0; i < 4; i++) { - Thread thread = - new Thread( - () -> { - for (int j = 0; j < 2000; j++) { - storage.recordDouble(1.0, Attributes.empty(), Context.current()); - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); - } - latch.countDown(); - }); - threads.add(thread); - } - - // Define collect thread. Collect thread collects and aggregates the - AtomicDouble cumulativeSum = new AtomicDouble(); - Thread collectThread = - new Thread( - () -> { - int extraCollects = 0; - // If we terminate when latch.count() == 0, the last collect may have occurred before - // the last recorded measurement. To ensure we collect all measurements, we collect - // one extra time after latch.count() == 0. - while (latch.getCount() != 0 || extraCollects <= 1) { - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1)); - MetricData metricData = - storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1); - if (!metricData.isEmpty()) { - metricData.getDoubleSumData().getPoints().stream() - .findFirst() - .ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum)); - } - if (latch.getCount() == 0) { - extraCollects++; - } - } - }); - - // Start all the threads - collectThread.start(); - threads.forEach(Thread::start); - - // Wait for the collect thread to end, which collects until the record threads are done - Uninterruptibles.joinUninterruptibly(collectThread); - - assertThat(cumulativeSum.get()).isEqualTo(8000.0); - } - - private static Stream concurrentStressTestArguments() { - List argumentsList = new ArrayList<>(); - - for (MemoryMode memoryMode : MemoryMode.values()) { - Aggregator aggregator = - ((AggregatorFactory) Aggregation.sum()) - .createAggregator( - DESCRIPTOR, asExemplarFilterInternal(ExemplarFilter.alwaysOff()), memoryMode); - - argumentsList.add( - Arguments.of( - // Delta - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create( - InMemoryMetricReader.builder() - .setAggregationTemporalitySelector(unused -> AggregationTemporality.DELTA) - .setMemoryMode(memoryMode) - .build(), - ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT, - /* enabled= */ true), - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.addAndGet(value))); - - argumentsList.add( - Arguments.of( - // Cumulative - new DefaultSynchronousMetricStorage<>( - RegisteredReader.create( - InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(), - ViewRegistry.create()), - METRIC_DESCRIPTOR, - aggregator, - AttributesProcessor.noop(), - CARDINALITY_LIMIT, - /* enabled= */ true), - (BiConsumer) - (value, cumulativeCount) -> cumulativeCount.set(value))); - } - - return argumentsList.stream(); - } - @ParameterizedTest @EnumSource(MemoryMode.class) void enabledThenDisable_isEnabled(MemoryMode memoryMode) { From 2566c097123d39192b36a5e081c87ef9b47fbce5 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Mon, 19 Jan 2026 10:56:41 -0600 Subject: [PATCH 3/6] Add support for expo histogram --- .../SynchronousInstrumentStressTest.java | 184 +++++++++++++++--- ...se2ExponentialHistogramAggregatorTest.java | 1 - 2 files changed, 154 insertions(+), 31 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java index 263a0ba1dd4..6506c2fb119 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -19,12 +19,17 @@ import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; @@ -47,23 +52,16 @@ import org.junit.jupiter.params.provider.MethodSource; /** - * {@link #stressTest(AggregationTemporality, InstrumentType, MemoryMode, InstrumentValueType)} - * performs a stress test to confirm simultaneous record and collections do not have concurrency - * issues like lost writes, partial writes, duplicate writes, etc. All combinations of the following - * dimensions are tested: aggregation temporality, instrument type (synchronous), memory mode, - * instrument value type. + * {@link #stressTest(AggregationTemporality, SyncInstrumentAggregation, MemoryMode, + * InstrumentValueType)} performs a stress test to confirm simultaneous record and collections do + * not have concurrency issues like lost writes, partial writes, duplicate writes, etc. All + * combinations of the following dimensions are tested: aggregation temporality, instrument type + * (synchronous), memory mode, instrument value type. */ -// TODO: add support for exponential histogram class SynchronousInstrumentStressTest { private static final String instrumentName = "instrument"; private static final Duration oneMicrosecond = Duration.ofNanos(1000); - private static final List instrumentTypes = - Arrays.asList( - InstrumentType.COUNTER, - InstrumentType.HISTOGRAM, - InstrumentType.UP_DOWN_COUNTER, - InstrumentType.GAUGE); private static final List bucketBoundaries = ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES; private static final double[] bucketBoundariesArr = @@ -80,12 +78,19 @@ class SynchronousInstrumentStressTest { @MethodSource("stressTestArgs") void stressTest( AggregationTemporality aggregationTemporality, - InstrumentType instrumentType, + SyncInstrumentAggregation instrumentType, MemoryMode memoryMode, InstrumentValueType instrumentValueType) { // Initialize metric SDK + DefaultAggregationSelector aggregationSelector = DefaultAggregationSelector.getDefault(); + if (instrumentType == SyncInstrumentAggregation.HISTOGRAM_EXPONENTIAL_HISTOGRAM) { + aggregationSelector = + aggregationSelector.with( + InstrumentType.HISTOGRAM, Aggregation.base2ExponentialBucketHistogram()); + } InMemoryMetricReader reader = InMemoryMetricReader.builder() + .setDefaultAggregationSelector(aggregationSelector) .setAggregationTemporalitySelector(unused -> aggregationTemporality) .setMemoryMode(memoryMode) .build(); @@ -166,6 +171,8 @@ void stressTest( for (int i = 0; i < bucketBoundaries.size() + 1; i++) { bucketCounts.add(0L); } + AtomicLong totalCount = new AtomicLong(0); + AtomicLong zeroCount = new AtomicLong(0); LongStream.range(0, threadCount) .flatMap(i -> measurements.stream().mapToLong(l -> l)) .forEach( @@ -174,10 +181,14 @@ void stressTest( sum.addAndGet(measurement); min.updateAndGet(v -> Math.min(v, measurement)); max.updateAndGet(v -> Math.max(v, measurement)); + totalCount.incrementAndGet(); int bucketIndex = ExplicitBucketHistogramUtils.findBucketIndex( bucketBoundariesArr, (double) measurement); bucketCounts.set(bucketIndex, bucketCounts.get(bucketIndex) + 1); + if (measurement == 0) { + zeroCount.incrementAndGet(); + } }); boolean isCumulative = aggregationTemporality == AggregationTemporality.CUMULATIVE; @@ -185,8 +196,8 @@ void stressTest( Stream.of(attr1, attr2, attr3, attr4) .map(attr -> getReducedPointData(collectedMetrics, isCumulative, attr)) .collect(toList()); - if (instrumentType == InstrumentType.COUNTER - || instrumentType == InstrumentType.UP_DOWN_COUNTER) { + if (instrumentType == SyncInstrumentAggregation.COUNTER_SUM + || instrumentType == SyncInstrumentAggregation.UP_DOWN_COUNTER_SUM) { if (instrumentValueType == InstrumentValueType.DOUBLE) { assertThat(points) .allSatisfy( @@ -205,7 +216,7 @@ void stressTest( LongPointData.class, p -> assertThat(p.getValue()).isEqualTo(sum.get()))); } - } else if (instrumentType == InstrumentType.GAUGE) { + } else if (instrumentType == SyncInstrumentAggregation.GAUGE_LAST_VALUE) { if (instrumentValueType == InstrumentValueType.DOUBLE) { assertThat(points) .allSatisfy( @@ -223,7 +234,7 @@ void stressTest( LongPointData.class, p -> assertThat(p.getValue()).isEqualTo(lastValue.get()))); } - } else if (instrumentType == InstrumentType.HISTOGRAM) { + } else if (instrumentType == SyncInstrumentAggregation.HISTOGRAM_EXPLICIT_HISTOGRAM) { assertThat(points) .allSatisfy( point -> @@ -238,6 +249,23 @@ void stressTest( .isEqualTo(bucketCounts.stream().reduce(0L, Long::sum)); assertThat(p.getCounts()).isEqualTo(bucketCounts); })); + } else if (instrumentType == SyncInstrumentAggregation.HISTOGRAM_EXPONENTIAL_HISTOGRAM) { + assertThat(points) + .allSatisfy( + point -> + assertThat(point) + .isInstanceOfSatisfying( + ExponentialHistogramPointData.class, + p -> { + assertThat(p.getSum()).isEqualTo((double) sum.get()); + assertThat(p.getMin()).isEqualTo((double) min.get()); + assertThat(p.getMax()).isEqualTo((double) max.get()); + assertThat(p.getZeroCount()).isEqualTo(zeroCount.get()); + assertThat( + p.getPositiveBuckets().getBucketCounts().stream() + .reduce(0L, Long::sum)) + .isEqualTo(totalCount.get() - zeroCount.get()); + })); } else { throw new IllegalArgumentException(); } @@ -246,7 +274,7 @@ void stressTest( private static Stream stressTestArgs() { List argumentsList = new ArrayList<>(); for (AggregationTemporality aggregationTemporality : AggregationTemporality.values()) { - for (InstrumentType instrumentType : instrumentTypes) { + for (SyncInstrumentAggregation instrumentType : SyncInstrumentAggregation.values()) { for (MemoryMode memoryMode : MemoryMode.values()) { for (InstrumentValueType instrumentValueType : InstrumentValueType.values()) { argumentsList.add( @@ -260,17 +288,20 @@ private static Stream stressTestArgs() { } private static Instrument getInstrument( - Meter meter, InstrumentType instrumentType, InstrumentValueType instrumentValueType) { + Meter meter, + SyncInstrumentAggregation instrumentType, + InstrumentValueType instrumentValueType) { switch (instrumentType) { - case COUNTER: + case COUNTER_SUM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter.counterBuilder(instrumentName).ofDoubles().build()::add : meter.counterBuilder(instrumentName).build()::add; - case UP_DOWN_COUNTER: + case UP_DOWN_COUNTER_SUM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter.upDownCounterBuilder(instrumentName).ofDoubles().build()::add : meter.upDownCounterBuilder(instrumentName).build()::add; - case HISTOGRAM: + case HISTOGRAM_EXPLICIT_HISTOGRAM: + case HISTOGRAM_EXPONENTIAL_HISTOGRAM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter .histogramBuilder(instrumentName) @@ -283,13 +314,10 @@ private static Instrument getInstrument( .ofLongs() .build() ::record; - case GAUGE: + case GAUGE_LAST_VALUE: return instrumentValueType == InstrumentValueType.DOUBLE ? meter.gaugeBuilder(instrumentName).build()::set : meter.gaugeBuilder(instrumentName).ofLongs().build()::set; - case OBSERVABLE_COUNTER: - case OBSERVABLE_UP_DOWN_COUNTER: - case OBSERVABLE_GAUGE: } throw new IllegalArgumentException(); } @@ -398,11 +426,43 @@ private static MetricData copy(MetricData m) { p.hasMax(), p.getMax(), p.getBoundaries(), - p.getCounts(), + new ArrayList<>(p.getCounts()), p.getExemplars())) .collect(toList()))); - case SUMMARY: case EXPONENTIAL_HISTOGRAM: + return ImmutableMetricData.createExponentialHistogram( + m.getResource(), + m.getInstrumentationScopeInfo(), + m.getName(), + m.getDescription(), + m.getUnit(), + ImmutableExponentialHistogramData.create( + m.getExponentialHistogramData().getAggregationTemporality(), + m.getExponentialHistogramData().getPoints().stream() + .map( + p -> + ImmutableExponentialHistogramPointData.create( + p.getScale(), + p.getSum(), + p.getZeroCount(), + p.hasMin(), + p.getMin(), + p.hasMax(), + p.getMax(), + ExponentialHistogramBuckets.create( + p.getPositiveBuckets().getScale(), + p.getPositiveBuckets().getOffset(), + new ArrayList<>(p.getPositiveBuckets().getBucketCounts())), + ExponentialHistogramBuckets.create( + p.getNegativeBuckets().getScale(), + p.getNegativeBuckets().getOffset(), + new ArrayList<>(p.getPositiveBuckets().getBucketCounts())), + p.getStartEpochNanos(), + p.getEpochNanos(), + p.getAttributes(), + p.getExemplars())) + .collect(toList()))); + case SUMMARY: } throw new IllegalArgumentException(); } @@ -491,14 +551,66 @@ private static PointData getReducedPointData( p2.hasMax() || p1.hasMax(), Math.max(p1.getMax(), p2.getMax()), p2.getBoundaries(), - mergeCounts(p1.getCounts(), p2.getCounts()))); + mergeBuckets(p1.getCounts(), p2.getCounts()))); case EXPONENTIAL_HISTOGRAM: + List expoHistPoints = + metrics.stream() + .flatMap(m -> m.getExponentialHistogramData().getPoints().stream()) + .filter(p -> attributes.equals(p.getAttributes())) + .collect(toList()); + return isCumulative + ? expoHistPoints.get(expoHistPoints.size() - 1) + : expoHistPoints.stream() + .reduce( + // NOTE: we're only testing the correctness of sum, count, min, and max, so we + // skip the complexity of correctly merge which involves re-bucketing when the + // scale changes. The result is bucket counts with meaningless values, but + // correct aggregate counts. + ImmutableExponentialHistogramPointData.create( + 0, + 0, + 0, + /* hasMin= */ true, + 0, + /* hasMax= */ true, + 0, + ExponentialHistogramBuckets.create(0, 0, emptyList()), + ExponentialHistogramBuckets.create(0, 0, emptyList()), + 0, + 0, + empty(), + emptyList()), + (p1, p2) -> + ImmutableExponentialHistogramPointData.create( + Math.min(p1.getScale(), p2.getScale()), + p1.getSum() + p2.getSum(), + p1.getZeroCount() + p2.getZeroCount(), + p1.hasMin() || p2.hasMin(), + Math.min(p1.getMin(), p2.getMin()), + p1.hasMax() || p2.hasMax(), + Math.max(p1.getMax(), p2.getMax()), + ExponentialHistogramBuckets.create( + 0, + 0, + mergeBuckets( + p1.getPositiveBuckets().getBucketCounts(), + p2.getPositiveBuckets().getBucketCounts())), + ExponentialHistogramBuckets.create( + 0, + 0, + mergeBuckets( + p1.getNegativeBuckets().getBucketCounts(), + p2.getNegativeBuckets().getBucketCounts())), + 0, + 0, + empty(), + emptyList())); case SUMMARY: } throw new IllegalArgumentException(); } - private static List mergeCounts(List l1, List l2) { + private static List mergeBuckets(List l1, List l2) { int size = Math.max(l1.size(), l2.size()); List merged = new ArrayList<>(size); for (int i = 0; i < size; i++) { @@ -513,4 +625,16 @@ private static List mergeCounts(List l1, List l2) { } return merged; } + + /** + * Enum that is the composite of the instrument type and aggregation. {@link InstrumentType} would + * be preferred, but doesn't include an option for the exponential histogram aggregation. + */ + private enum SyncInstrumentAggregation { + COUNTER_SUM, + UP_DOWN_COUNTER_SUM, + GAUGE_LAST_VALUE, + HISTOGRAM_EXPLICIT_HISTOGRAM, + HISTOGRAM_EXPONENTIAL_HISTOGRAM + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java index 34da3bfef7b..763d36fbec6 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregatorTest.java @@ -413,7 +413,6 @@ void testToMetricData(MemoryMode memoryMode) { @ParameterizedTest @EnumSource(MemoryMode.class) - // TODO: incorporate into SynchronousInstrumentStressTest and delete void testMultithreadedUpdates(MemoryMode memoryMode) throws InterruptedException { initialize(memoryMode); From 08f48555100f69a31c36e06fb394af4bbd6649e7 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Wed, 21 Jan 2026 11:34:53 -0600 Subject: [PATCH 4/6] Slight reworking --- .../ResourceConfigurationTest.java | 3 +- .../SynchronousInstrumentStressTest.java | 82 +++++++++++-------- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java index 1678fb29576..820af042138 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java @@ -34,7 +34,8 @@ void customConfigResourceWithDisabledKeys() { Map props = new HashMap<>(); props.put("otel.service.name", "test-service"); props.put( - "otel.resource.attributes", "food=cheesecake,drink=juice,animal= ,color=,shape=square"); + "otel.resource.attributes", + "food=cheesecake,drink=juice,animal= ,color=,shape=square,animal=d g"); props.put("otel.resource.disabled-keys", "drink"); assertThat( diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java index 6506c2fb119..0d658593c46 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -52,7 +52,7 @@ import org.junit.jupiter.params.provider.MethodSource; /** - * {@link #stressTest(AggregationTemporality, SyncInstrumentAggregation, MemoryMode, + * {@link #stressTest(AggregationTemporality, InstrumentTypeAndAggregation, MemoryMode, * InstrumentValueType)} performs a stress test to confirm simultaneous record and collections do * not have concurrency issues like lost writes, partial writes, duplicate writes, etc. All * combinations of the following dimensions are tested: aggregation temporality, instrument type @@ -66,6 +66,22 @@ class SynchronousInstrumentStressTest { ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES; private static final double[] bucketBoundariesArr = bucketBoundaries.stream().mapToDouble(Double::doubleValue).toArray(); + + /** + * Tracks the unique combinations of synchronous {@link InstrumentType} and {@link Aggregation}. + * {@link InstrumentType#HISTOGRAM} aggregates to either explicit or base2 exponential histogram. + * Other instrument types have a single (typical) aggregation. + */ + private static final List instrumentAggregations = + Arrays.asList( + new InstrumentTypeAndAggregation(InstrumentType.COUNTER, Aggregation.sum()), + new InstrumentTypeAndAggregation(InstrumentType.UP_DOWN_COUNTER, Aggregation.sum()), + new InstrumentTypeAndAggregation( + InstrumentType.HISTOGRAM, Aggregation.explicitBucketHistogram()), + new InstrumentTypeAndAggregation(InstrumentType.GAUGE, Aggregation.lastValue()), + new InstrumentTypeAndAggregation( + InstrumentType.HISTOGRAM, Aggregation.base2ExponentialBucketHistogram())); + private static final AttributeKey attributesKey = AttributeKey.stringKey("key"); private static final Attributes attr1 = Attributes.of(attributesKey, "value1"); private static final Attributes attr2 = Attributes.of(attributesKey, "value2"); @@ -78,16 +94,14 @@ class SynchronousInstrumentStressTest { @MethodSource("stressTestArgs") void stressTest( AggregationTemporality aggregationTemporality, - SyncInstrumentAggregation instrumentType, + InstrumentTypeAndAggregation instrumentTypeAndAggregation, MemoryMode memoryMode, InstrumentValueType instrumentValueType) { + InstrumentType instrumentType = instrumentTypeAndAggregation.instrumentType; + Aggregation aggregation = instrumentTypeAndAggregation.aggregation; // Initialize metric SDK - DefaultAggregationSelector aggregationSelector = DefaultAggregationSelector.getDefault(); - if (instrumentType == SyncInstrumentAggregation.HISTOGRAM_EXPONENTIAL_HISTOGRAM) { - aggregationSelector = - aggregationSelector.with( - InstrumentType.HISTOGRAM, Aggregation.base2ExponentialBucketHistogram()); - } + DefaultAggregationSelector aggregationSelector = + DefaultAggregationSelector.getDefault().with(instrumentType, aggregation); InMemoryMetricReader reader = InMemoryMetricReader.builder() .setDefaultAggregationSelector(aggregationSelector) @@ -98,7 +112,7 @@ void stressTest( SdkMeterProvider.builder().registerMetricReader(reader).build(); cleanup.addCloseable(meterProvider); Meter meter = meterProvider.get("test"); - Instrument instrument = getInstrument(meter, instrumentType, instrumentValueType); + Instrument instrument = getInstrument(meter, instrumentTypeAndAggregation, instrumentValueType); // Define list of measurements to record // Later, we'll assert that the data collected matches these measurements, with no lost writes, @@ -196,8 +210,7 @@ void stressTest( Stream.of(attr1, attr2, attr3, attr4) .map(attr -> getReducedPointData(collectedMetrics, isCumulative, attr)) .collect(toList()); - if (instrumentType == SyncInstrumentAggregation.COUNTER_SUM - || instrumentType == SyncInstrumentAggregation.UP_DOWN_COUNTER_SUM) { + if (aggregation == Aggregation.sum()) { if (instrumentValueType == InstrumentValueType.DOUBLE) { assertThat(points) .allSatisfy( @@ -216,7 +229,7 @@ void stressTest( LongPointData.class, p -> assertThat(p.getValue()).isEqualTo(sum.get()))); } - } else if (instrumentType == SyncInstrumentAggregation.GAUGE_LAST_VALUE) { + } else if (aggregation == Aggregation.lastValue()) { if (instrumentValueType == InstrumentValueType.DOUBLE) { assertThat(points) .allSatisfy( @@ -234,7 +247,7 @@ void stressTest( LongPointData.class, p -> assertThat(p.getValue()).isEqualTo(lastValue.get()))); } - } else if (instrumentType == SyncInstrumentAggregation.HISTOGRAM_EXPLICIT_HISTOGRAM) { + } else if (aggregation == Aggregation.explicitBucketHistogram()) { assertThat(points) .allSatisfy( point -> @@ -249,7 +262,7 @@ void stressTest( .isEqualTo(bucketCounts.stream().reduce(0L, Long::sum)); assertThat(p.getCounts()).isEqualTo(bucketCounts); })); - } else if (instrumentType == SyncInstrumentAggregation.HISTOGRAM_EXPONENTIAL_HISTOGRAM) { + } else if (aggregation == Aggregation.base2ExponentialBucketHistogram()) { assertThat(points) .allSatisfy( point -> @@ -274,12 +287,15 @@ void stressTest( private static Stream stressTestArgs() { List argumentsList = new ArrayList<>(); for (AggregationTemporality aggregationTemporality : AggregationTemporality.values()) { - for (SyncInstrumentAggregation instrumentType : SyncInstrumentAggregation.values()) { + for (InstrumentTypeAndAggregation instrumentTypeAndAggregation : instrumentAggregations) { for (MemoryMode memoryMode : MemoryMode.values()) { for (InstrumentValueType instrumentValueType : InstrumentValueType.values()) { argumentsList.add( Arguments.of( - aggregationTemporality, instrumentType, memoryMode, instrumentValueType)); + aggregationTemporality, + instrumentTypeAndAggregation, + memoryMode, + instrumentValueType)); } } } @@ -289,19 +305,18 @@ private static Stream stressTestArgs() { private static Instrument getInstrument( Meter meter, - SyncInstrumentAggregation instrumentType, + InstrumentTypeAndAggregation instrumentTypeAndAggregation, InstrumentValueType instrumentValueType) { - switch (instrumentType) { - case COUNTER_SUM: + switch (instrumentTypeAndAggregation.instrumentType) { + case COUNTER: return instrumentValueType == InstrumentValueType.DOUBLE ? meter.counterBuilder(instrumentName).ofDoubles().build()::add : meter.counterBuilder(instrumentName).build()::add; - case UP_DOWN_COUNTER_SUM: + case UP_DOWN_COUNTER: return instrumentValueType == InstrumentValueType.DOUBLE ? meter.upDownCounterBuilder(instrumentName).ofDoubles().build()::add : meter.upDownCounterBuilder(instrumentName).build()::add; - case HISTOGRAM_EXPLICIT_HISTOGRAM: - case HISTOGRAM_EXPONENTIAL_HISTOGRAM: + case HISTOGRAM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter .histogramBuilder(instrumentName) @@ -314,10 +329,13 @@ private static Instrument getInstrument( .ofLongs() .build() ::record; - case GAUGE_LAST_VALUE: + case GAUGE: return instrumentValueType == InstrumentValueType.DOUBLE ? meter.gaugeBuilder(instrumentName).build()::set : meter.gaugeBuilder(instrumentName).ofLongs().build()::set; + case OBSERVABLE_COUNTER: + case OBSERVABLE_UP_DOWN_COUNTER: + case OBSERVABLE_GAUGE: } throw new IllegalArgumentException(); } @@ -626,15 +644,13 @@ private static List mergeBuckets(List l1, List l2) { return merged; } - /** - * Enum that is the composite of the instrument type and aggregation. {@link InstrumentType} would - * be preferred, but doesn't include an option for the exponential histogram aggregation. - */ - private enum SyncInstrumentAggregation { - COUNTER_SUM, - UP_DOWN_COUNTER_SUM, - GAUGE_LAST_VALUE, - HISTOGRAM_EXPLICIT_HISTOGRAM, - HISTOGRAM_EXPONENTIAL_HISTOGRAM + private static class InstrumentTypeAndAggregation { + private final InstrumentType instrumentType; + private final Aggregation aggregation; + + private InstrumentTypeAndAggregation(InstrumentType instrumentType, Aggregation aggregation) { + this.instrumentType = instrumentType; + this.aggregation = aggregation; + } } } From 03670f6fe29a906193e8224aeff85e41d2286f9a Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Wed, 21 Jan 2026 11:45:13 -0600 Subject: [PATCH 5/6] revert unreleated --- .../sdk/autoconfigure/ResourceConfigurationTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java index 820af042138..1678fb29576 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/ResourceConfigurationTest.java @@ -34,8 +34,7 @@ void customConfigResourceWithDisabledKeys() { Map props = new HashMap<>(); props.put("otel.service.name", "test-service"); props.put( - "otel.resource.attributes", - "food=cheesecake,drink=juice,animal= ,color=,shape=square,animal=d g"); + "otel.resource.attributes", "food=cheesecake,drink=juice,animal= ,color=,shape=square"); props.put("otel.resource.disabled-keys", "drink"); assertThat( From 934ffcf34007a711539643cf0c1978e21d6318d7 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Mon, 26 Jan 2026 14:11:03 -0600 Subject: [PATCH 6/6] feedback --- .../SynchronousInstrumentStressTest.java | 118 +++++++++--------- 1 file changed, 56 insertions(+), 62 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java index 0d658593c46..af12cc6af12 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -6,6 +6,10 @@ package io.opentelemetry.sdk.metrics; import static io.opentelemetry.api.common.Attributes.empty; +import static io.opentelemetry.sdk.metrics.InstrumentType.COUNTER; +import static io.opentelemetry.sdk.metrics.InstrumentType.GAUGE; +import static io.opentelemetry.sdk.metrics.InstrumentType.HISTOGRAM; +import static io.opentelemetry.sdk.metrics.InstrumentType.UP_DOWN_COUNTER; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; @@ -52,7 +56,7 @@ import org.junit.jupiter.params.provider.MethodSource; /** - * {@link #stressTest(AggregationTemporality, InstrumentTypeAndAggregation, MemoryMode, + * {@link #stressTest(AggregationTemporality, InstrumentType, Aggregation, MemoryMode, * InstrumentValueType)} performs a stress test to confirm simultaneous record and collections do * not have concurrency issues like lost writes, partial writes, duplicate writes, etc. All * combinations of the following dimensions are tested: aggregation temporality, instrument type @@ -60,33 +64,18 @@ */ class SynchronousInstrumentStressTest { - private static final String instrumentName = "instrument"; - private static final Duration oneMicrosecond = Duration.ofNanos(1000); - private static final List bucketBoundaries = + private static final String INSTRUMENT_NAME = "instrument"; + private static final Duration ONE_MICROSECOND = Duration.ofNanos(1000); + private static final List BUCKET_BOUNDARIES = ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES; - private static final double[] bucketBoundariesArr = - bucketBoundaries.stream().mapToDouble(Double::doubleValue).toArray(); + private static final double[] BUCKET_BOUNDARIES_ARR = + BUCKET_BOUNDARIES.stream().mapToDouble(Double::doubleValue).toArray(); - /** - * Tracks the unique combinations of synchronous {@link InstrumentType} and {@link Aggregation}. - * {@link InstrumentType#HISTOGRAM} aggregates to either explicit or base2 exponential histogram. - * Other instrument types have a single (typical) aggregation. - */ - private static final List instrumentAggregations = - Arrays.asList( - new InstrumentTypeAndAggregation(InstrumentType.COUNTER, Aggregation.sum()), - new InstrumentTypeAndAggregation(InstrumentType.UP_DOWN_COUNTER, Aggregation.sum()), - new InstrumentTypeAndAggregation( - InstrumentType.HISTOGRAM, Aggregation.explicitBucketHistogram()), - new InstrumentTypeAndAggregation(InstrumentType.GAUGE, Aggregation.lastValue()), - new InstrumentTypeAndAggregation( - InstrumentType.HISTOGRAM, Aggregation.base2ExponentialBucketHistogram())); - - private static final AttributeKey attributesKey = AttributeKey.stringKey("key"); - private static final Attributes attr1 = Attributes.of(attributesKey, "value1"); - private static final Attributes attr2 = Attributes.of(attributesKey, "value2"); - private static final Attributes attr3 = Attributes.of(attributesKey, "value3"); - private static final Attributes attr4 = Attributes.of(attributesKey, "value4"); + private static final AttributeKey KEY = AttributeKey.stringKey("key"); + private static final Attributes ATTR_1 = Attributes.of(KEY, "value1"); + private static final Attributes ATTR_2 = Attributes.of(KEY, "value2"); + private static final Attributes ATTR_3 = Attributes.of(KEY, "value3"); + private static final Attributes ATTR_4 = Attributes.of(KEY, "value4"); @RegisterExtension CleanupExtension cleanup = new CleanupExtension(); @@ -94,11 +83,10 @@ class SynchronousInstrumentStressTest { @MethodSource("stressTestArgs") void stressTest( AggregationTemporality aggregationTemporality, - InstrumentTypeAndAggregation instrumentTypeAndAggregation, + InstrumentType instrumentType, + Aggregation aggregation, MemoryMode memoryMode, InstrumentValueType instrumentValueType) { - InstrumentType instrumentType = instrumentTypeAndAggregation.instrumentType; - Aggregation aggregation = instrumentTypeAndAggregation.aggregation; // Initialize metric SDK DefaultAggregationSelector aggregationSelector = DefaultAggregationSelector.getDefault().with(instrumentType, aggregation); @@ -112,7 +100,7 @@ void stressTest( SdkMeterProvider.builder().registerMetricReader(reader).build(); cleanup.addCloseable(meterProvider); Meter meter = meterProvider.get("test"); - Instrument instrument = getInstrument(meter, instrumentTypeAndAggregation, instrumentValueType); + Instrument instrument = getInstrument(meter, instrumentType, instrumentValueType); // Define list of measurements to record // Later, we'll assert that the data collected matches these measurements, with no lost writes, @@ -128,30 +116,30 @@ void stressTest( int threadCount = 4; List recordThreads = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(threadCount); - for (int i = 0; i < 4; i++) { + for (int i = 0; i < threadCount; i++) { recordThreads.add( new Thread( () -> { - List attributes = Arrays.asList(attr1, attr2, attr3, attr4); + List attributes = Arrays.asList(ATTR_1, ATTR_2, ATTR_3, ATTR_4); Collections.shuffle(attributes); for (Long measurement : measurements) { for (Attributes attr : attributes) { instrument.record(measurement, attr); } - Uninterruptibles.sleepUninterruptibly(oneMicrosecond); + Uninterruptibles.sleepUninterruptibly(ONE_MICROSECOND); } latch.countDown(); })); } // Define collecting thread - // NOTE: collect makes a copy of MetricData because REUSEABLE_MEMORY mode reuses MetricData + // NOTE: collect makes a copy of MetricData because REUSABLE_DATA mode reuses MetricData List collectedMetrics = new ArrayList<>(); Thread collectThread = new Thread( () -> { while (latch.getCount() != 0) { - Uninterruptibles.sleepUninterruptibly(oneMicrosecond); + Uninterruptibles.sleepUninterruptibly(ONE_MICROSECOND); collectedMetrics.addAll( reader.collectAllMetrics().stream() .map(SynchronousInstrumentStressTest::copy) @@ -175,14 +163,14 @@ void stressTest( // NOTE: this does not validate the absence of partial writes for cumulative instruments which // track multiple fields. For example, explicit histogram tracks sum and bucket counts. These // should be atomically updated such that we never collect the sum without corresponding bucket - // counts update, or vice verse. This test asserts that the cumulative state at the end is + // counts update, or vice versa. This test asserts that the cumulative state at the end is // consistent, and interim collects unknowingly see partial writes. AtomicLong lastValue = new AtomicLong(0); AtomicLong sum = new AtomicLong(0); AtomicLong min = new AtomicLong(Long.MAX_VALUE); AtomicLong max = new AtomicLong(-1); List bucketCounts = new ArrayList<>(); - for (int i = 0; i < bucketBoundaries.size() + 1; i++) { + for (int i = 0; i < BUCKET_BOUNDARIES.size() + 1; i++) { bucketCounts.add(0L); } AtomicLong totalCount = new AtomicLong(0); @@ -198,7 +186,7 @@ void stressTest( totalCount.incrementAndGet(); int bucketIndex = ExplicitBucketHistogramUtils.findBucketIndex( - bucketBoundariesArr, (double) measurement); + BUCKET_BOUNDARIES_ARR, (double) measurement); bucketCounts.set(bucketIndex, bucketCounts.get(bucketIndex) + 1); if (measurement == 0) { zeroCount.incrementAndGet(); @@ -207,7 +195,7 @@ void stressTest( boolean isCumulative = aggregationTemporality == AggregationTemporality.CUMULATIVE; List points = - Stream.of(attr1, attr2, attr3, attr4) + Stream.of(ATTR_1, ATTR_2, ATTR_3, ATTR_4) .map(attr -> getReducedPointData(collectedMetrics, isCumulative, attr)) .collect(toList()); if (aggregation == Aggregation.sum()) { @@ -287,13 +275,15 @@ void stressTest( private static Stream stressTestArgs() { List argumentsList = new ArrayList<>(); for (AggregationTemporality aggregationTemporality : AggregationTemporality.values()) { - for (InstrumentTypeAndAggregation instrumentTypeAndAggregation : instrumentAggregations) { + for (InstrumentTypeAndAggregation instrumentTypeAndAggregation : + InstrumentTypeAndAggregation.values()) { for (MemoryMode memoryMode : MemoryMode.values()) { for (InstrumentValueType instrumentValueType : InstrumentValueType.values()) { argumentsList.add( Arguments.of( aggregationTemporality, - instrumentTypeAndAggregation, + instrumentTypeAndAggregation.instrumentType, + instrumentTypeAndAggregation.aggregation, memoryMode, instrumentValueType)); } @@ -304,35 +294,33 @@ private static Stream stressTestArgs() { } private static Instrument getInstrument( - Meter meter, - InstrumentTypeAndAggregation instrumentTypeAndAggregation, - InstrumentValueType instrumentValueType) { - switch (instrumentTypeAndAggregation.instrumentType) { + Meter meter, InstrumentType instrumentType, InstrumentValueType instrumentValueType) { + switch (instrumentType) { case COUNTER: return instrumentValueType == InstrumentValueType.DOUBLE - ? meter.counterBuilder(instrumentName).ofDoubles().build()::add - : meter.counterBuilder(instrumentName).build()::add; + ? meter.counterBuilder(INSTRUMENT_NAME).ofDoubles().build()::add + : meter.counterBuilder(INSTRUMENT_NAME).build()::add; case UP_DOWN_COUNTER: return instrumentValueType == InstrumentValueType.DOUBLE - ? meter.upDownCounterBuilder(instrumentName).ofDoubles().build()::add - : meter.upDownCounterBuilder(instrumentName).build()::add; + ? meter.upDownCounterBuilder(INSTRUMENT_NAME).ofDoubles().build()::add + : meter.upDownCounterBuilder(INSTRUMENT_NAME).build()::add; case HISTOGRAM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter - .histogramBuilder(instrumentName) - .setExplicitBucketBoundariesAdvice(bucketBoundaries) + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) .build() ::record : meter - .histogramBuilder(instrumentName) - .setExplicitBucketBoundariesAdvice(bucketBoundaries) + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) .ofLongs() .build() ::record; case GAUGE: return instrumentValueType == InstrumentValueType.DOUBLE - ? meter.gaugeBuilder(instrumentName).build()::set - : meter.gaugeBuilder(instrumentName).ofLongs().build()::set; + ? meter.gaugeBuilder(INSTRUMENT_NAME).build()::set + : meter.gaugeBuilder(INSTRUMENT_NAME).ofLongs().build()::set; case OBSERVABLE_COUNTER: case OBSERVABLE_UP_DOWN_COUNTER: case OBSERVABLE_GAUGE: @@ -474,7 +462,7 @@ private static MetricData copy(MetricData m) { ExponentialHistogramBuckets.create( p.getNegativeBuckets().getScale(), p.getNegativeBuckets().getOffset(), - new ArrayList<>(p.getPositiveBuckets().getBucketCounts())), + new ArrayList<>(p.getNegativeBuckets().getBucketCounts())), p.getStartEpochNanos(), p.getEpochNanos(), p.getAttributes(), @@ -491,8 +479,7 @@ private static MetricData copy(MetricData m) { */ private static PointData getReducedPointData( List metrics, boolean isCumulative, Attributes attributes) { - metrics.stream() - .forEach(metricData -> assertThat(metricData.getName()).isEqualTo(instrumentName)); + metrics.forEach(metricData -> assertThat(metricData.getName()).isEqualTo(INSTRUMENT_NAME)); MetricData first = metrics.get(0); switch (first.getType()) { case LONG_GAUGE: @@ -644,13 +631,20 @@ private static List mergeBuckets(List l1, List l2) { return merged; } - private static class InstrumentTypeAndAggregation { - private final InstrumentType instrumentType; - private final Aggregation aggregation; + @SuppressWarnings("ImmutableEnumChecker") + private enum InstrumentTypeAndAggregation { + COUNTER_SUM(COUNTER, Aggregation.sum()), + UP_DOWN_COUNTER_SUM(UP_DOWN_COUNTER, Aggregation.sum()), + GAUGE_LAST_VALUE(GAUGE, Aggregation.lastValue()), + HISTOGRAM_EXPLICIT(HISTOGRAM, Aggregation.explicitBucketHistogram()), + HISTOGRAM_BASE2_EXPONENTIAL(HISTOGRAM, Aggregation.base2ExponentialBucketHistogram()); - private InstrumentTypeAndAggregation(InstrumentType instrumentType, Aggregation aggregation) { + InstrumentTypeAndAggregation(InstrumentType instrumentType, Aggregation aggregation) { this.instrumentType = instrumentType; this.aggregation = aggregation; } + + private final InstrumentType instrumentType; + private final Aggregation aggregation; } }