diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index faa0959fce..31ac1d40ef 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -432,11 +432,12 @@ def __init__( ) ), resource=resource, - metric_readers=metric_readers, views=views, ) + self._metric_readers = metric_readers self._measurement_consumer = SynchronousMeasurementConsumer( - sdk_config=self._sdk_config + sdk_config=self._sdk_config, + metric_readers=metric_readers, ) disabled = environ.get(OTEL_SDK_DISABLED, "") self._disabled = disabled.lower().strip() == "true" @@ -448,7 +449,7 @@ def __init__( self._shutdown_once = Once() self._shutdown = False - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: # pylint: disable=broad-exception-raised @@ -468,7 +469,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -513,7 +514,7 @@ def _shutdown(): metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -580,3 +581,31 @@ def get_meter( self._measurement_consumer, ) return self._meters[info] + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader" + ) -> None: + with self._all_metric_readers_lock: + if metric_reader in self._all_metric_readers: + raise ValueError( + f"MetricReader {metric_reader} has been registered already!" + ) + self._measurement_consumer.add_metric_reader(metric_reader) + metric_reader._set_collect_callback( + self._measurement_consumer.collect + ) + self._all_metric_readers.add(metric_reader) + + def remove_metric_reader( + self, + metric_reader: "opentelemetry.sdk.metrics.export.MetricReader", + ) -> None: + with self._all_metric_readers_lock: + if metric_reader not in self._all_metric_readers: + raise ValueError( + f"MetricReader {metric_reader} has not been registered!" + ) + self._measurement_consumer.remove_metric_reader(metric_reader) + metric_reader._set_collect_callback(None) + metric_reader.shutdown() + self._all_metric_readers.remove(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051..f3cd4d43f5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -59,10 +59,12 @@ class SynchronousMeasurementConsumer(MeasurementConsumer): def __init__( self, sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", + metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"], ) -> None: self._lock = Lock() self._sdk_config = sdk_config # should never be mutated + self._metric_readers = tuple(metric_readers) self._reader_storages: Mapping[ "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage ] = { @@ -71,7 +73,7 @@ def __init__( reader._instrument_class_temporality, reader._instrument_class_aggregation, ) - for reader in sdk_config.metric_readers + for reader in self._metric_readers } self._async_instruments: List[ "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" @@ -143,3 +145,27 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Registers a new metric reader.""" + with self._lock: + self._metric_readers += (metric_reader,) + self._reader_storages[metric_reader] = MetricReaderStorage( + self._sdk_config, + metric_reader._instrument_class_temporality, + metric_reader._instrument_class_aggregation, + ) + + def remove_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Unregisters the given metric reader.""" + with self._lock: + self._reader_storages.pop(metric_reader) + self._metric_readers = tuple( + reader + for reader in self._metric_readers + if reader is not metric_reader + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py index 3d88facb0c..f87da33ce3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -26,5 +26,4 @@ class SdkConfiguration: exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter" resource: "opentelemetry.sdk.resources.Resource" - metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"] views: Sequence["opentelemetry.sdk.metrics.View"] diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 22abfbd3cf..d11a6fffb8 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -34,7 +34,8 @@ class TestSynchronousMeasurementConsumer(TestCase): def test_parent(self, _): self.assertIsInstance( - SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer + SynchronousMeasurementConsumer(MagicMock(), metric_readers=()), + MeasurementConsumer, ) def test_creates_metric_reader_storages(self, MockMetricReaderStorage): @@ -44,9 +45,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5) @@ -61,9 +62,9 @@ def test_measurements_passed_to_each_reader_storage( SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) measurement_mock = Mock() consumer.consume_measurement(measurement_mock) @@ -83,9 +84,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks): rs_mock.collect.assert_not_called() @@ -102,9 +103,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) async_instrument_mocks = [MagicMock() for _ in range(5)] for i_mock in async_instrument_mocks: @@ -133,9 +134,9 @@ def test_collect_timeout(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) def sleep_1(*args, **kwargs): @@ -166,9 +167,9 @@ def test_collect_deadline( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) def sleep_1(*args, **kwargs): diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index 7c9484b917..78200e0b7d 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -79,7 +79,6 @@ def test_creates_view_instrument_matches( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -146,7 +145,6 @@ def test_forwards_calls_to_view_instrument_match( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -256,7 +254,6 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1,), ), MagicMock( @@ -291,7 +288,6 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(), ), MagicMock( @@ -327,7 +323,6 @@ def test_drop_aggregation(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="name", aggregation=DropAggregation() @@ -355,7 +350,6 @@ def test_same_collection_start(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="name"),), ), MagicMock( @@ -402,7 +396,6 @@ def test_conflicting_view_configuration(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="observable_counter", @@ -451,7 +444,6 @@ def test_view_instrument_match_conflict_0(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -509,7 +501,6 @@ def test_view_instrument_match_conflict_1(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -578,7 +569,6 @@ def test_view_instrument_match_conflict_2(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="foo"), View(instrument_name="bar"), @@ -631,7 +621,6 @@ def test_view_instrument_match_conflict_3(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -682,7 +671,6 @@ def test_view_instrument_match_conflict_4(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -729,7 +717,6 @@ def test_view_instrument_match_conflict_5(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -784,7 +771,6 @@ def test_view_instrument_match_conflict_6(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter", name="foo"), View(instrument_name="histogram", name="foo"), @@ -839,7 +825,6 @@ def test_view_instrument_match_conflict_7(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -894,7 +879,6 @@ def test_view_instrument_match_conflict_8(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="up_down_counter", name="foo"), View( diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 3991fd6e15..e8a772439b 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -13,9 +13,8 @@ # limitations under the License. # pylint: disable=protected-access,no-self-use - import weakref -from logging import WARNING +from logging import DEBUG, WARNING from time import sleep from typing import Iterable, Sequence from unittest.mock import MagicMock, Mock, patch @@ -36,6 +35,7 @@ ) from opentelemetry.sdk.metrics._internal import SynchronousMeasurementConsumer from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, @@ -426,6 +426,36 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer): sync_consumer_instance.consume_measurement.assert_called() + def test_addition_of_metric_reader(self): + # Suppress warnings for calling collect on an unregistered metric reader + with self.assertLogs( + "opentelemetry.sdk.metrics._internal.export", DEBUG + ): + reader = InMemoryMetricReader() + meter_provider = MeterProvider() + meter = meter_provider.get_meter(__name__) + counter = meter.create_counter("counter") + counter.add(1) + self.assertIsNone(reader.get_metrics_data()) + + meter_provider.add_metric_reader(reader) + counter.add(1) + self.assertIsNotNone(reader.get_metrics_data()) + + with self.assertRaises(ValueError) as cm: + meter_provider.add_metric_reader(reader) + self.assertIn( + "has been registered already!", str(cm.exception) + ) + + meter_provider.remove_metric_reader(reader) + counter.add(1) + self.assertIsNone(reader.get_metrics_data()) + + with self.assertRaises(ValueError) as cm: + meter_provider.remove_metric_reader(reader) + self.assertIn("has not been registered!", str(cm.exception)) + class TestMeter(TestCase): def setUp(self): diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 38d36758f3..63a5edfed8 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -82,7 +82,6 @@ def setUpClass(cls): cls.sdk_configuration = SdkConfiguration( exemplar_filter=Mock(), resource=cls.mock_resource, - metric_readers=[], views=[], ) diff --git a/opentelemetry-sdk/tests/test_configurator.py b/opentelemetry-sdk/tests/test_configurator.py index 8edc9190da..6fb5693d36 100644 --- a/opentelemetry-sdk/tests/test_configurator.py +++ b/opentelemetry-sdk/tests/test_configurator.py @@ -1043,7 +1043,7 @@ def test_metrics_init_exporter(self): provider._sdk_config.resource.attributes.get("service.name"), "otlp-service", ) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReader) self.assertIsInstance(reader.exporter, DummyOTLPMetricExporter) @@ -1056,7 +1056,7 @@ def test_metrics_init_pull_exporter(self): self.assertEqual(self.set_provider_mock.call_count, 1) provider = self.set_provider_mock.call_args[0][0] self.assertIsInstance(provider, DummyMeterProvider) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReaderPullExporter) def test_metrics_init_exporter_uses_exporter_args_map(self): @@ -1070,7 +1070,7 @@ def test_metrics_init_exporter_uses_exporter_args_map(self): }, ) provider = self.set_provider_mock.call_args[0][0] - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader.exporter.compression, "gzip")