diff --git a/CHANGELOG.md b/CHANGELOG.md index a2a0ca7ac5d..19794ec872b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### General +- Support fixed bucket histogram aggregation and exporting them with OTLP/Prometheus exporter. + #### Breaking Changes - Methods and classes deprecated in 0.14.x have been removed. diff --git a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java index f0856846437..e2e7d3c2944 100644 --- a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java +++ b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/MetricAdapter.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.common.Labels; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData; @@ -46,7 +47,9 @@ final class MetricAdapter { static final String SAMPLE_SUFFIX_COUNT = "_count"; static final String SAMPLE_SUFFIX_SUM = "_sum"; + static final String SAMPLE_SUFFIX_BUCKET = "_bucket"; static final String LABEL_NAME_QUANTILE = "quantile"; + static final String LABEL_NAME_LE = "le"; // Converts a MetricData to a Prometheus MetricFamilySamples. static MetricFamilySamples toMetricFamilySamples(MetricData metricData) { @@ -85,6 +88,8 @@ static Collector.Type toMetricFamilyType(MetricData metricData) { return Collector.Type.GAUGE; case SUMMARY: return Collector.Type.SUMMARY; + case HISTOGRAM: + return Collector.Type.HISTOGRAM; } return Collector.Type.UNTYPED; } @@ -122,6 +127,10 @@ static List toSamples( addSummarySamples( (DoubleSummaryPointData) pointData, name, labelNames, labelValues, samples); break; + case HISTOGRAM: + addHistogramSamples( + (DoubleHistogramPointData) pointData, name, labelNames, labelValues, samples); + break; } } return samples; @@ -169,6 +178,46 @@ private static void addSummarySamples( } } + private static void addHistogramSamples( + DoubleHistogramPointData doubleHistogramPointData, + String name, + List labelNames, + List labelValues, + List samples) { + samples.add( + new Sample( + name + SAMPLE_SUFFIX_COUNT, + labelNames, + labelValues, + doubleHistogramPointData.getCount())); + samples.add( + new Sample( + name + SAMPLE_SUFFIX_SUM, labelNames, labelValues, doubleHistogramPointData.getSum())); + + List labelNamesWithLe = new ArrayList<>(labelNames.size() + 1); + labelNamesWithLe.addAll(labelNames); + labelNamesWithLe.add(LABEL_NAME_LE); + long[] cumulativeCount = new long[] {0}; + doubleHistogramPointData.forEach( + (upperBound, count) -> { + List labelValuesWithLe = new ArrayList<>(labelValues.size() + 1); + labelValuesWithLe.addAll(labelValues); + labelValuesWithLe.add(doubleToGoString(upperBound)); + // According to + // https://github.com/open-telemetry/opentelemetry-proto/blob/v0.7.0/opentelemetry/proto/metrics/v1/metrics.proto#L505 + // the upper bound is exclusive while Prometheus requires them to be inclusive. + // There is not much we can do here until the proto add a field to support inclusive upper + // bounds. + cumulativeCount[0] += count; + samples.add( + new Sample( + name + SAMPLE_SUFFIX_BUCKET, + labelNamesWithLe, + labelValuesWithLe, + cumulativeCount[0])); + }); + } + private static int estimateNumSamples(int numPoints, MetricDataType type) { if (type == MetricDataType.SUMMARY) { // count + sum + estimated 2 percentiles (default MinMaxSumCount aggregator). @@ -189,6 +238,8 @@ private static Collection getPoints(MetricData metricData) return metricData.getLongSumData().getPoints(); case SUMMARY: return metricData.getDoubleSummaryData().getPoints(); + case HISTOGRAM: + return metricData.getDoubleHistogramData().getPoints(); } return Collections.emptyList(); } diff --git a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/MetricAdapterTest.java b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/MetricAdapterTest.java index 99230bbab11..4563bedb017 100644 --- a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/MetricAdapterTest.java +++ b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/MetricAdapterTest.java @@ -14,6 +14,8 @@ import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramData; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryData; @@ -157,6 +159,24 @@ class MetricAdapterTest { Collections.singletonList( DoubleSummaryPointData.create( 123, 456, Labels.of("kp", "vp"), 5, 7, Collections.emptyList())))); + private static final MetricData HISTOGRAM = + MetricData.createDoubleHistogram( + Resource.create(Attributes.of(stringKey("kr"), "vr")), + InstrumentationLibraryInfo.create("full", "version"), + "instrument.name", + "description", + "1", + DoubleHistogramData.create( + AggregationTemporality.DELTA, + Collections.singletonList( + DoubleHistogramPointData.create( + 123, + 456, + Labels.of("kp", "vp"), + 1.0, + 2L, + Collections.emptyList(), + Collections.singletonList(2L))))); @Test void toProtoMetricDescriptorType() { @@ -204,6 +224,10 @@ void toProtoMetricDescriptorType() { metricFamilySamples = MetricAdapter.toMetricFamilySamples(LONG_GAUGE); assertThat(metricFamilySamples.type).isEqualTo(Collector.Type.GAUGE); assertThat(metricFamilySamples.samples).hasSize(1); + + metricFamilySamples = MetricAdapter.toMetricFamilySamples(HISTOGRAM); + assertThat(metricFamilySamples.type).isEqualTo(Collector.Type.HISTOGRAM); + assertThat(metricFamilySamples.samples).hasSize(3); } @Test @@ -323,6 +347,37 @@ void toSamples_SummaryPoints() { 12.3)); } + @Test + void toSamples_HistogramPoints() { + assertThat( + MetricAdapter.toSamples("full_name", MetricDataType.HISTOGRAM, Collections.emptyList())) + .isEmpty(); + + assertThat( + MetricAdapter.toSamples( + "full_name", + MetricDataType.HISTOGRAM, + ImmutableList.of( + DoubleHistogramPointData.create( + 321, + 654, + Labels.of("kp", "vp"), + 18.3, + 9, + ImmutableList.of(1.0), + ImmutableList.of(4L, 9L))))) + .containsExactly( + new Sample("full_name_count", ImmutableList.of("kp"), ImmutableList.of("vp"), 9), + new Sample("full_name_sum", ImmutableList.of("kp"), ImmutableList.of("vp"), 18.3), + new Sample( + "full_name_bucket", ImmutableList.of("kp", "le"), ImmutableList.of("vp", "1.0"), 4), + new Sample( + "full_name_bucket", + ImmutableList.of("kp", "le"), + ImmutableList.of("vp", "+Inf"), + 13)); + } + @Test void toMetricFamilySamples() { MetricData metricData = MONOTONIC_CUMULATIVE_DOUBLE_SUM; diff --git a/sdk-extensions/otproto/src/main/java/io/opentelemetry/sdk/extension/otproto/MetricAdapter.java b/sdk-extensions/otproto/src/main/java/io/opentelemetry/sdk/extension/otproto/MetricAdapter.java index 965bd80ad5e..6b1a005b074 100644 --- a/sdk-extensions/otproto/src/main/java/io/opentelemetry/sdk/extension/otproto/MetricAdapter.java +++ b/sdk-extensions/otproto/src/main/java/io/opentelemetry/sdk/extension/otproto/MetricAdapter.java @@ -25,6 +25,8 @@ import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramData; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryData; @@ -149,6 +151,15 @@ static Metric toProtoMetric(MetricData metricData) { .addAllDataPoints(toDoubleDataPoints(doubleGaugeData.getPoints())) .build()); break; + case HISTOGRAM: + DoubleHistogramData doubleHistogramData = metricData.getDoubleHistogramData(); + builder.setDoubleHistogram( + DoubleHistogram.newBuilder() + .setAggregationTemporality( + mapToTemporality(doubleHistogramData.getAggregationTemporality())) + .addAllDataPoints(toDoubleHistogramDataPoints(doubleHistogramData.getPoints())) + .build()); + break; } return builder.build(); } @@ -198,6 +209,27 @@ static Collection toDoubleDataPoints(Collection toDoubleHistogramDataPoints( + Collection points) { + List result = new ArrayList<>(points.size()); + for (DoubleHistogramPointData doubleHistogramPoint : points) { + DoubleHistogramDataPoint.Builder builder = + DoubleHistogramDataPoint.newBuilder() + .setStartTimeUnixNano(doubleHistogramPoint.getStartEpochNanos()) + .setTimeUnixNano(doubleHistogramPoint.getEpochNanos()) + .setCount(doubleHistogramPoint.getCount()) + .setSum(doubleHistogramPoint.getSum()) + .addAllBucketCounts(doubleHistogramPoint.getCounts()) + .addAllExplicitBounds(doubleHistogramPoint.getBoundaries()); + Collection labels = toProtoLabels(doubleHistogramPoint.getLabels()); + if (!labels.isEmpty()) { + builder.addAllLabels(labels); + } + result.add(builder.build()); + } + return result; + } + static List toSummaryDataPoints( Collection points) { List result = new ArrayList<>(points.size()); diff --git a/sdk-extensions/otproto/src/test/java/io/opentelemetry/sdk/extension/otproto/MetricAdapterTest.java b/sdk-extensions/otproto/src/test/java/io/opentelemetry/sdk/extension/otproto/MetricAdapterTest.java index 9f6bec39e9a..4c09045ee66 100644 --- a/sdk-extensions/otproto/src/test/java/io/opentelemetry/sdk/extension/otproto/MetricAdapterTest.java +++ b/sdk-extensions/otproto/src/test/java/io/opentelemetry/sdk/extension/otproto/MetricAdapterTest.java @@ -33,6 +33,8 @@ import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoubleGaugeData; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramData; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryData; @@ -43,6 +45,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.ValueAtPercentile; import io.opentelemetry.sdk.resources.Resource; +import java.util.Arrays; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -206,6 +209,48 @@ void toSummaryDataPoints() { .build()); } + @Test + void toHistogramDataPoints() { + assertThat( + MetricAdapter.toDoubleHistogramDataPoints( + ImmutableList.of( + DoubleHistogramPointData.create( + 123, + 456, + Labels.of("k", "v"), + 14.2, + 5, + Collections.singletonList(1.0), + Arrays.asList(1L, 5L)), + DoubleHistogramPointData.create( + 123, + 456, + Labels.empty(), + 15.3, + 7, + Collections.emptyList(), + Collections.singletonList(7L))))) + .containsExactly( + DoubleHistogramDataPoint.newBuilder() + .setStartTimeUnixNano(123) + .setTimeUnixNano(456) + .addAllLabels( + singletonList(StringKeyValue.newBuilder().setKey("k").setValue("v").build())) + .setCount(5) + .setSum(14.2) + .addBucketCounts(1) + .addBucketCounts(5) + .addExplicitBounds(1.0) + .build(), + DoubleHistogramDataPoint.newBuilder() + .setStartTimeUnixNano(123) + .setTimeUnixNano(456) + .setCount(7) + .setSum(15.3) + .addBucketCounts(7) + .build()); + } + @Test void toProtoMetric_monotonic() { assertThat( @@ -462,6 +507,53 @@ void toProtoMetric_summary() { .build()); } + @Test + void toProtoMetric_histogram() { + assertThat( + MetricAdapter.toProtoMetric( + MetricData.createDoubleHistogram( + Resource.getEmpty(), + InstrumentationLibraryInfo.getEmpty(), + "name", + "description", + "1", + DoubleHistogramData.create( + AggregationTemporality.DELTA, + singletonList( + DoubleHistogramPointData.create( + 123, + 456, + Labels.of("k", "v"), + 4.0, + 33L, + emptyList(), + Collections.singletonList(33L))))))) + .isEqualTo( + Metric.newBuilder() + .setName("name") + .setDescription("description") + .setUnit("1") + .setDoubleHistogram( + DoubleHistogram.newBuilder() + .setAggregationTemporality(AGGREGATION_TEMPORALITY_DELTA) + .addDataPoints( + DoubleHistogramDataPoint.newBuilder() + .setStartTimeUnixNano(123) + .setTimeUnixNano(456) + .addAllLabels( + singletonList( + StringKeyValue.newBuilder() + .setKey("k") + .setValue("v") + .build())) + .setCount(33) + .setSum(4.0) + .addBucketCounts(33) + .build()) + .build()) + .build()); + } + @Test void toProtoResourceMetrics() { Resource resource = Resource.create(Attributes.of(stringKey("ka"), "va")); diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/BucketSearch.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/BucketSearch.java new file mode 100644 index 00000000000..6f0b39594d7 --- /dev/null +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/BucketSearch.java @@ -0,0 +1,244 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class BucketSearch { + private static final double[] arr5 = new double[] {5, 10, 25, 50, 100}; + private static final double[] arr10 = + new double[] {10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120}; + private static final double[] arrLarge = + new double[] { + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 12, + 14, + 16, + 18, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + 60, + 70, + 80, + 90, + 100, + 120, + 140, + 160, + 180, + 200, + 250, + 300, + 350, + 400, + 450, + 500, + 600, + 700, + 800, + 900, + 1000, + 1200, + 1400, + 1600, + 1800, + 2000, + 2500, + 3000, + 3500, + 4000, + 4500, + 5000, + 6000, + 7000, + 8000, + 9000, + 10000, + 12000, + 14000, + 16000, + 18000, + 20000, + 25000, + 30000, + 35000, + 40000, + 45000, + 50000, + 60000, + 70000, + 80000, + 90000, + 100000, + 120000, + 140000, + 160000, + 180000, + 200000, + 250000, + 300000, + 350000, + 400000, + 450000, + 500000, + 600000, + 700000, + 800000, + 900000, + 1000000, + 1200000, + 1400000, + 1600000, + 1800000, + 2000000, + 2500000, + 3000000, + 3500000, + 4000000, + 4500000, + 5000000, + 6000000, + 7000000, + 8000000, + 9000000, + 10000000, + 12000000, + 14000000, + 16000000, + 18000000, + 20000000, + 25000000, + 30000000, + 35000000, + 40000000, + 45000000, + 50000000, + 60000000, + 70000000, + 80000000, + 90000000, + 100000000, + 120000000, + 140000000, + 160000000, + 180000000, + 200000000, + 250000000, + 300000000, + 350000000, + 400000000, + 450000000, + 500000000, + 600000000, + 700000000, + 800000000, + 900000000, + 1000000000, + 1200000000, + 1400000000, + 1600000000, + 1800000000, + 2000000000, + 2500000000.0, + 3000000000.0, + 3500000000.0, + 4000000000.0, + 4500000000.0, + 5000000000.0, + 6000000000.0, + 7000000000.0, + 8000000000.0, + 9000000000.0, + 1e200 + }; + + private static int findBucketIndex(double[] values, double target) { + for (int i = 0; i < values.length; ++i) { + if (target < values[i]) { + return i; + } + } + return values.length; + } + + @Benchmark + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void linearArr5() { + int ignored = findBucketIndex(arr5, ThreadLocalRandom.current().nextDouble(150)); + } + + @Benchmark + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void linearArr10() { + int ignored = findBucketIndex(arr10, ThreadLocalRandom.current().nextDouble(5000)); + } + + @Benchmark + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void linearArrLarge() { + int ignored = findBucketIndex(arrLarge, ThreadLocalRandom.current().nextDouble()); + } + + @Benchmark + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void binaryArr5() { + int ignored = Arrays.binarySearch(arr5, ThreadLocalRandom.current().nextDouble(150)); + } + + @Benchmark + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void binaryArr10() { + int ignored = Arrays.binarySearch(arr10, ThreadLocalRandom.current().nextDouble(5000)); + } + + @Benchmark + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void binaryArrLarge() { + int ignored = Arrays.binarySearch(arrLarge, ThreadLocalRandom.current().nextDouble()); + } +} diff --git a/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java new file mode 100644 index 00000000000..c35b2d41f45 --- /dev/null +++ b/sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramBenchmark.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.common.InstrumentValueType; +import io.opentelemetry.sdk.resources.Resource; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class DoubleHistogramBenchmark { + private static final Aggregator aggregator = + AggregatorFactory.histogram(new double[] {10, 100, 1_000}, /* stateful= */ false) + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.getEmpty(), + InstrumentDescriptor.create( + "name", + "description", + "1", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE)); + private AggregatorHandle aggregatorHandle; + + @Setup(Level.Trial) + public final void setup() { + aggregatorHandle = aggregator.createHandle(); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 10) + public void aggregate_10Threads() { + aggregatorHandle.recordDouble(100.0056); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 5) + public void aggregate_5Threads() { + aggregatorHandle.recordDouble(100.0056); + } + + @Benchmark + @Fork(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Threads(value = 1) + public void aggregate_1Threads() { + aggregatorHandle.recordDouble(100.0056); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java index 05887a3fd20..920e1ea2f52 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java @@ -45,6 +45,18 @@ static AggregatorFactory count(AggregationTemporality temporality) { return new CountAggregatorFactory(temporality); } + /** + * Returns an {@code AggregatorFactory} that calculates an approximation of the distribution of + * the measurements taken. + * + * @param stateful configures if the aggregator is stateful. + * @param boundary configures the fixed bucket boundaries. + * @return an {@code AggregationFactory} that calculates histogram of recorded measurements. + */ + static AggregatorFactory histogram(double[] boundary, boolean stateful) { + return new HistogramAggregatorFactory(boundary, stateful); + } + /** * Returns an {@code AggregationFactory} that calculates the last value of all recorded * measurements. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java new file mode 100644 index 00000000000..05f0ef567ad --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregator.java @@ -0,0 +1,168 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import javax.annotation.concurrent.GuardedBy; + +final class DoubleHistogramAggregator extends AbstractAggregator { + private final double[] boundaries; + + DoubleHistogramAggregator( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + InstrumentDescriptor instrumentDescriptor, + double[] boundaries, + boolean stateful) { + super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful); + this.boundaries = boundaries; + } + + @Override + public AggregatorHandle createHandle() { + return new Handle(this.boundaries); + } + + @Override + public final HistogramAccumulation merge(HistogramAccumulation x, HistogramAccumulation y) { + if (!x.getBoundaries().equals(y.getBoundaries())) { + throw new IllegalArgumentException("can't merge histograms with different boundaries"); + } + + long[] mergedCounts = new long[x.getCounts().size()]; + for (int i = 0; i < x.getCounts().size(); ++i) { + mergedCounts[i] = x.getCounts().get(i) + y.getCounts().get(i); + } + return HistogramAccumulation.create( + x.getCount() + y.getCount(), + x.getSum() + y.getSum(), + x.getBoundaries(), + Arrays.stream(mergedCounts).boxed().collect(Collectors.toList())); + } + + @Override + public final MetricData toMetricData( + Map accumulationByLabels, + long startEpochNanos, + long lastCollectionEpoch, + long epochNanos) { + return MetricData.createDoubleHistogram( + getResource(), + getInstrumentationLibraryInfo(), + getInstrumentDescriptor().getName(), + getInstrumentDescriptor().getDescription(), + getInstrumentDescriptor().getUnit(), + DoubleHistogramData.create( + isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA, + MetricDataUtils.toDoubleHistogramPointList( + accumulationByLabels, + isStateful() ? startEpochNanos : lastCollectionEpoch, + epochNanos))); + } + + @Override + public HistogramAccumulation accumulateDouble(double value) { + return HistogramAccumulation.create( + 1, value, Collections.emptyList(), Collections.singletonList(1L)); + } + + @Override + public HistogramAccumulation accumulateLong(long value) { + return HistogramAccumulation.create( + 1, value, Collections.emptyList(), Collections.singletonList(1L)); + } + + static final class Handle extends AggregatorHandle { + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + @GuardedBy("lock") + private final State current; + + Handle(double[] boundaries) { + current = new State(boundaries); + } + + @Override + protected HistogramAccumulation doAccumulateThenReset() { + lock.writeLock().lock(); + try { + HistogramAccumulation result = + HistogramAccumulation.create( + current.count, + current.sum, + Arrays.stream(current.boundaries).boxed().collect(Collectors.toList()), + Arrays.stream(current.counts).boxed().collect(Collectors.toList())); + current.reset(); + return result; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + protected void doRecordDouble(double value) { + int bucketIndex = current.findBucketIndex(value); + + lock.writeLock().lock(); + try { + current.record(bucketIndex, value); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + protected void doRecordLong(long value) { + doRecordDouble((double) value); + } + + private static final class State { + private long count; + private double sum; + private final double[] boundaries; + private final long[] counts; + + public State(double[] boundaries) { + this.boundaries = Arrays.copyOf(boundaries, boundaries.length); + this.counts = new long[this.boundaries.length + 1]; + reset(); + } + + // Benchmark shows that linear search performs better with ordinary buckets. + private int findBucketIndex(double value) { + for (int i = 0; i < this.boundaries.length; ++i) { + if (value < this.boundaries[i]) { + return i; + } + } + return this.boundaries.length; + } + + private void reset() { + this.count = 0; + this.sum = 0; + Arrays.fill(this.counts, 0); + } + + private void record(int bucketIndex, double value) { + this.count++; + this.sum += value; + this.counts[bucketIndex]++; + } + } + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java new file mode 100644 index 00000000000..87d04ccd24b --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulation.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; +import java.util.List; +import javax.annotation.concurrent.Immutable; + +@Immutable +@AutoValue +public abstract class HistogramAccumulation { + /** + * Creates a new {@link HistogramAccumulation} with the given values. + * + * @return a new {@link HistogramAccumulation} with the given values. + */ + static HistogramAccumulation create( + long count, double sum, List boundaries, List counts) { + // TODO make it immutable? + return new AutoValue_HistogramAccumulation(count, sum, boundaries, counts); + } + + HistogramAccumulation() {} + + /** + * The number of measurements taken. + * + * @return the count of recorded measurements. + */ + abstract long getCount(); + + /** + * The sum of all measurements recorded. + * + * @return the sum of recorded measurements. + */ + abstract double getSum(); + + /** + * The bucket boundaries. For a Histogram with N defined boundaries, e.g, [x, y, z]. There are N+1 + * counts: [-inf, x), [x, y), [y, z), [z, +inf]. + * + * @return the bucket boundaries in increasing order. + */ + abstract List getBoundaries(); + + /** + * The counts in each bucket. + * + * @return the counts in each bucket. + */ + abstract List getCounts(); + + final DoubleHistogramPointData toPoint(long startEpochNanos, long epochNanos, Labels labels) { + return DoubleHistogramPointData.create( + startEpochNanos, epochNanos, labels, getSum(), getCount(), getBoundaries(), getCounts()); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java new file mode 100644 index 00000000000..3d33895eefe --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAggregatorFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.resources.Resource; + +final class HistogramAggregatorFactory implements AggregatorFactory { + private final double[] boundaries; + private final boolean stateful; + + HistogramAggregatorFactory(double[] boundaries, boolean stateful) { + for (int i = 1; i < boundaries.length; ++i) { + if (Double.compare(boundaries[i - 1], boundaries[i]) >= 0) { + throw new IllegalArgumentException( + "invalid bucket boundary: " + boundaries[i - 1] + " >= " + boundaries[i]); + } + } + if (boundaries.length > 0) { + if (boundaries[0] == Double.NEGATIVE_INFINITY) { + throw new IllegalArgumentException("invalid bucket boundary: -Inf"); + } + if (boundaries[boundaries.length - 1] == Double.POSITIVE_INFINITY) { + throw new IllegalArgumentException("invalid bucket boundary: +Inf"); + } + } + this.boundaries = boundaries; + this.stateful = stateful; + } + + @Override + @SuppressWarnings("unchecked") + public Aggregator create( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + InstrumentDescriptor descriptor) { + switch (descriptor.getValueType()) { + case LONG: + case DOUBLE: + return (Aggregator) + new DoubleHistogramAggregator( + resource, instrumentationLibraryInfo, descriptor, this.boundaries, this.stateful); + } + throw new IllegalArgumentException("Invalid instrument value type"); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java index d21f4965638..8efa36e203f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/MetricDataUtils.java @@ -6,6 +6,7 @@ package io.opentelemetry.sdk.metrics.aggregator; import io.opentelemetry.api.common.Labels; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; @@ -44,4 +45,13 @@ static List toDoubleSummaryPointList( points.add(aggregator.toPoint(startEpochNanos, epochNanos, labels))); return points; } + + static List toDoubleHistogramPointList( + Map accumulationMap, long startEpochNanos, long epochNanos) { + List points = new ArrayList<>(accumulationMap.size()); + accumulationMap.forEach( + (labels, aggregator) -> + points.add(aggregator.toPoint(startEpochNanos, epochNanos, labels))); + return points; + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramData.java new file mode 100644 index 00000000000..a5f90422faf --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramData.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.data; + +import com.google.auto.value.AutoValue; +import java.util.Collection; +import javax.annotation.concurrent.Immutable; + +@Immutable +@AutoValue +public abstract class DoubleHistogramData implements Data { + DoubleHistogramData() {} + + public static DoubleHistogramData create( + AggregationTemporality temporality, Collection points) { + return new AutoValue_DoubleHistogramData(temporality, points); + } + + /** + * Returns the {@code AggregationTemporality} of this metric, + * + *

AggregationTemporality describes if the aggregator reports delta changes since last report + * time, or cumulative changes since a fixed start time. + * + * @return the {@code AggregationTemporality} of this metric + */ + public abstract AggregationTemporality getAggregationTemporality(); + + @Override + public abstract Collection getPoints(); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramPointData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramPointData.java new file mode 100644 index 00000000000..111be05fcf4 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/DoubleHistogramPointData.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.data; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.common.Labels; +import java.util.List; +import java.util.function.BiConsumer; +import javax.annotation.concurrent.Immutable; + +/** + * DoubleHistogramPointData represents an approximate representation of the distribution of + * measurements. + */ +@Immutable +@AutoValue +public abstract class DoubleHistogramPointData implements PointData { + /** + * Creates a DoubleHistogramPointData. + * + * @return a DoubleHistogramPointData. + */ + public static DoubleHistogramPointData create( + long startEpochNanos, + long epochNanos, + Labels labels, + double sum, + long count, + List boundaries, + List counts) { + return new AutoValue_DoubleHistogramPointData( + startEpochNanos, epochNanos, labels, sum, count, boundaries, counts); + } + + DoubleHistogramPointData() {} + + /** + * The sum of all measurements recorded. + * + * @return the sum of recorded measurements. + */ + public abstract double getSum(); + + /** + * The number of measurements taken. + * + * @return the count of recorded measurements. + */ + public abstract long getCount(); + + /** + * The bucket boundaries. For a Histogram with N defined boundaries, e.g, [x, y, z]. There are N+1 + * counts: [-inf, x), [x, y), [y, z), [z, +inf]. + * + * @return the bucket boundaries in increasing order. + */ + public abstract List getBoundaries(); + + /** + * The counts in each bucket. + * + * @return the counts in each bucket. + */ + public abstract List getCounts(); + + /** Iterates over all the bucket boundaries and counts in this histogram. */ + public void forEach(BiConsumer action) { + List boundaries = getBoundaries(); + List counts = getCounts(); + for (int i = 0; i < boundaries.size(); ++i) { + action.accept(boundaries.get(i), counts.get(i)); + } + action.accept(Double.POSITIVE_INFINITY, counts.get(boundaries.size())); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java index d7e2939635a..61df82f5db5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricData.java @@ -29,6 +29,8 @@ public abstract class MetricData { /* isMonotonic= */ false, AggregationTemporality.CUMULATIVE, Collections.emptyList()); private static final DoubleSummaryData DEFAULT_DOUBLE_SUMMARY_DATA = DoubleSummaryData.create(Collections.emptyList()); + private static final DoubleHistogramData DEFAULT_DOUBLE_HISTOGRAM_DATA = + DoubleHistogramData.create(AggregationTemporality.CUMULATIVE, Collections.emptyList()); /** * Returns a new MetricData wih a {@link MetricDataType#DOUBLE_GAUGE} type. @@ -140,6 +142,28 @@ public static MetricData createDoubleSummary( data); } + /** + * Returns a new MetricData with a {@link MetricDataType#HISTOGRAM} type. + * + * @return a new MetricData wih a {@link MetricDataType#HISTOGRAM} type. + */ + public static MetricData createDoubleHistogram( + Resource resource, + InstrumentationLibraryInfo instrumentationLibraryInfo, + String name, + String description, + String unit, + DoubleHistogramData data) { + return new AutoValue_MetricData( + resource, + instrumentationLibraryInfo, + name, + description, + unit, + MetricDataType.HISTOGRAM, + data); + } + MetricData() {} /** @@ -265,4 +289,18 @@ public final DoubleSummaryData getDoubleSummaryData() { } return DEFAULT_DOUBLE_SUMMARY_DATA; } + + /** + * Returns the {@code DoubleHistogramData} if type is {@link MetricDataType#HISTOGRAM}, otherwise + * a default empty data. + * + * @return the {@code DoubleHistogramData} if type is {@link MetricDataType#HISTOGRAM}, otherwise + * a default empty data. + */ + public final DoubleHistogramData getDoubleHistogramData() { + if (getType() == MetricDataType.HISTOGRAM) { + return (DoubleHistogramData) getData(); + } + return DEFAULT_DOUBLE_HISTOGRAM_DATA; + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java index 50adc159fbd..0749807848f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/data/MetricDataType.java @@ -30,4 +30,10 @@ public enum MetricDataType { * value recorded, the sum of all measurements and the total number of measurements recorded. */ SUMMARY, + + /** + * A Histogram represents an approximate representation of the distribution of measurements + * recorded. + */ + HISTOGRAM, } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java index 4c3f0df9b8d..42a4408cf6d 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.sdk.metrics.common.InstrumentValueType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.resources.Resource; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; class AggregatorFactoryTest { @@ -123,4 +124,73 @@ void getSumAggregatorFactory() { InstrumentValueType.DOUBLE))) .isInstanceOf(DoubleSumAggregator.class); } + + @Test + void getHistogramAggregatorFactory() { + AggregatorFactory histogram = + AggregatorFactory.histogram(new double[] {1.0}, /* stateful= */ false); + assertThat( + histogram.create( + Resource.getDefault(), + InstrumentationLibraryInfo.getEmpty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG))) + .isInstanceOf(DoubleHistogramAggregator.class); + assertThat( + histogram.create( + Resource.getDefault(), + InstrumentationLibraryInfo.getEmpty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE))) + .isInstanceOf(DoubleHistogramAggregator.class); + + assertThat( + histogram + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.getEmpty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG)) + .isStateful()) + .isFalse(); + assertThat( + AggregatorFactory.histogram(new double[] {1.0}, /* stateful= */ true) + .create( + Resource.getDefault(), + InstrumentationLibraryInfo.getEmpty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.DOUBLE)) + .isStateful()) + .isTrue(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + AggregatorFactory.histogram( + new double[] {Double.NEGATIVE_INFINITY}, /* stateful= */ false)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + AggregatorFactory.histogram( + new double[] {1, Double.POSITIVE_INFINITY}, /* stateful= */ false)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> AggregatorFactory.histogram(new double[] {2, 1, 3}, /* stateful= */ false)); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java new file mode 100644 index 00000000000..0d8f1ec9e4b --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleHistogramAggregatorTest.java @@ -0,0 +1,182 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.common.InstrumentValueType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; + +public class DoubleHistogramAggregatorTest { + private static final ImmutableList BUCKET_BOUNDARIES = + ImmutableList.of(10.0, 100.0, 1000.0); + private static final DoubleHistogramAggregator aggregator = + new DoubleHistogramAggregator( + Resource.getDefault(), + InstrumentationLibraryInfo.getEmpty(), + InstrumentDescriptor.create( + "name", + "description", + "unit", + InstrumentType.VALUE_RECORDER, + InstrumentValueType.LONG), + BUCKET_BOUNDARIES.stream().mapToDouble(i -> i).toArray(), + /* stateful= */ false); + + @Test + void createHandle() { + assertThat(aggregator.createHandle()).isInstanceOf(DoubleHistogramAggregator.Handle.class); + } + + @Test + void testRecordings() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordLong(20); + aggregatorHandle.recordLong(5); + aggregatorHandle.recordLong(150); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo( + HistogramAccumulation.create( + 3, 175, BUCKET_BOUNDARIES, ImmutableList.of(1L, 1L, 1L, 0L))); + } + + @Test + void toAccumulationAndReset() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + + aggregatorHandle.recordLong(100); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo( + HistogramAccumulation.create( + 1, 100, BUCKET_BOUNDARIES, ImmutableList.of(0L, 0L, 1L, 0L))); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + + aggregatorHandle.recordLong(0); + assertThat(aggregatorHandle.accumulateThenReset()) + .isEqualTo( + HistogramAccumulation.create( + 1, 0, BUCKET_BOUNDARIES, ImmutableList.of(1L, 0L, 0L, 0L))); + assertThat(aggregatorHandle.accumulateThenReset()).isNull(); + } + + @Test + void toMetricData() { + AggregatorHandle aggregatorHandle = aggregator.createHandle(); + aggregatorHandle.recordLong(10); + + MetricData metricData = + aggregator.toMetricData( + Collections.singletonMap(Labels.empty(), aggregatorHandle.accumulateThenReset()), + 0, + 10, + 100); + assertThat(metricData).isNotNull(); + assertThat(metricData.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(metricData.getDoubleHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + } + + @Test + void accumulateData() { + assertThat(aggregator.accumulateDouble(2.0)) + .isEqualTo( + HistogramAccumulation.create(1, 2.0, Collections.emptyList(), ImmutableList.of(1L))); + assertThat(aggregator.accumulateLong(10)) + .isEqualTo( + HistogramAccumulation.create(1, 10.0, Collections.emptyList(), ImmutableList.of(1L))); + } + + @Test + void testMultithreadedUpdates() throws Exception { + final AggregatorHandle aggregatorHandle = aggregator.createHandle(); + final Histogram summarizer = new Histogram(); + int numberOfThreads = 10; + final long[] updates = new long[] {1, 2, 3, 5, 7, 11, 13, 17, 19, 23}; + final int numberOfUpdates = 1000; + final CountDownLatch startingGun = new CountDownLatch(numberOfThreads); + List workers = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + Thread t = + new Thread( + () -> { + long update = updates[index]; + try { + startingGun.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < numberOfUpdates; j++) { + aggregatorHandle.recordLong(update); + if (ThreadLocalRandom.current().nextInt(10) == 0) { + summarizer.process(aggregatorHandle.accumulateThenReset()); + } + } + }); + workers.add(t); + t.start(); + } + for (int i = 0; i <= numberOfThreads; i++) { + startingGun.countDown(); + } + + for (Thread worker : workers) { + worker.join(); + } + // make sure everything gets merged when all the aggregation is done. + summarizer.process(aggregatorHandle.accumulateThenReset()); + + assertThat(summarizer.accumulation) + .isEqualTo( + HistogramAccumulation.create( + numberOfThreads * numberOfUpdates, + 101000, + BUCKET_BOUNDARIES, + ImmutableList.of(5000L, 5000L, 0L, 0L))); + } + + private static final class Histogram { + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + @GuardedBy("lock") + @Nullable + private HistogramAccumulation accumulation; + + void process(@Nullable HistogramAccumulation other) { + if (other == null) { + return; + } + lock.writeLock().lock(); + try { + if (accumulation == null) { + accumulation = other; + return; + } + accumulation = aggregator.merge(accumulation, other); + } finally { + lock.writeLock().unlock(); + } + } + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulationTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulationTest.java new file mode 100644 index 00000000000..ec32f9f9c63 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/HistogramAccumulationTest.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.aggregator; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class HistogramAccumulationTest { + @Test + void toPoint() { + HistogramAccumulation accumulation = + HistogramAccumulation.create(12, 25, ImmutableList.of(1.0), ImmutableList.of(1L, 2L)); + DoubleHistogramPointData point = getPoint(accumulation); + assertThat(point.getCount()).isEqualTo(12); + assertThat(point.getSum()).isEqualTo(25); + assertThat(point.getBoundaries()).isEqualTo(ImmutableList.of(1.0)); + assertThat(point.getCounts()).isEqualTo(ImmutableList.of(1L, 2L)); + + List boundaries = new ArrayList<>(); + List counts = new ArrayList<>(); + point.forEach( + (b, c) -> { + boundaries.add(b); + counts.add(c); + }); + assertThat(boundaries).isEqualTo(ImmutableList.of(1.0, Double.POSITIVE_INFINITY)); + assertThat(counts).isEqualTo(point.getCounts()); + } + + private static DoubleHistogramPointData getPoint(HistogramAccumulation accumulation) { + DoubleHistogramPointData point = accumulation.toPoint(12345, 12358, Labels.of("key", "value")); + assertThat(point).isNotNull(); + assertThat(point.getStartEpochNanos()).isEqualTo(12345); + assertThat(point.getEpochNanos()).isEqualTo(12358); + assertThat(point.getLabels().size()).isEqualTo(1); + assertThat(point.getLabels().get("key")).isEqualTo("value"); + assertThat(point).isInstanceOf(DoubleHistogramPointData.class); + return point; + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java index b66a6e3312a..5fc164e4946 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/data/MetricDataTest.java @@ -10,8 +10,10 @@ import io.opentelemetry.api.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -40,6 +42,15 @@ class MetricDataTest { Arrays.asList( ValueAtPercentile.create(0.0, DOUBLE_VALUE), ValueAtPercentile.create(100, DOUBLE_VALUE))); + private static final DoubleHistogramPointData HISTOGRAM_POINT = + DoubleHistogramPointData.create( + START_EPOCH_NANOS, + EPOCH_NANOS, + Labels.of("key", "value"), + DOUBLE_VALUE, + LONG_VALUE, + Collections.singletonList(1.0), + Arrays.asList(1L, 1L)); @Test void metricData_Getters() { @@ -146,6 +157,39 @@ void metricData_SummaryPoints() { assertThat(metricData.getDoubleSummaryData().getPoints()).containsExactly(SUMMARY_POINT); } + @Test + void metricData_HistogramPoints() { + assertThat(HISTOGRAM_POINT.getStartEpochNanos()).isEqualTo(START_EPOCH_NANOS); + assertThat(HISTOGRAM_POINT.getEpochNanos()).isEqualTo(EPOCH_NANOS); + assertThat(HISTOGRAM_POINT.getLabels().size()).isEqualTo(1); + assertThat(HISTOGRAM_POINT.getLabels().get("key")).isEqualTo("value"); + assertThat(HISTOGRAM_POINT.getCount()).isEqualTo(LONG_VALUE); + assertThat(HISTOGRAM_POINT.getSum()).isEqualTo(DOUBLE_VALUE); + assertThat(HISTOGRAM_POINT.getBoundaries()).isEqualTo(Collections.singletonList(1.0)); + assertThat(HISTOGRAM_POINT.getCounts()).isEqualTo(Arrays.asList(1L, 1L)); + + List boundaries = new ArrayList<>(); + List counts = new ArrayList<>(); + HISTOGRAM_POINT.forEach( + (b, c) -> { + boundaries.add(b); + counts.add(c); + }); + assertThat(boundaries).isEqualTo(Arrays.asList(1.0, Double.POSITIVE_INFINITY)); + assertThat(counts).isEqualTo(HISTOGRAM_POINT.getCounts()); + + MetricData metricData = + MetricData.createDoubleHistogram( + Resource.getEmpty(), + InstrumentationLibraryInfo.getEmpty(), + "metric_name", + "metric_description", + "ms", + DoubleHistogramData.create( + AggregationTemporality.DELTA, Collections.singleton(HISTOGRAM_POINT))); + assertThat(metricData.getDoubleHistogramData().getPoints()).containsExactly(HISTOGRAM_POINT); + } + @Test void metricData_GetDefault() { MetricData metricData = @@ -160,6 +204,7 @@ void metricData_GetDefault() { assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSumData().getPoints()).isEmpty(); assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); + assertThat(metricData.getDoubleHistogramData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSummaryData().getPoints()).containsExactly(SUMMARY_POINT); metricData = @@ -174,6 +219,7 @@ void metricData_GetDefault() { assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSumData().getPoints()).isEmpty(); assertThat(metricData.getLongGaugeData().getPoints()).isEmpty(); + assertThat(metricData.getDoubleHistogramData().getPoints()).isEmpty(); assertThat(metricData.getDoubleSummaryData().getPoints()).isEmpty(); } }