From 973194a5242b519d8111ae4c5aaad9cdf86a30f1 Mon Sep 17 00:00:00 2001 From: skerroumi Date: Tue, 31 Oct 2023 15:19:19 +0100 Subject: [PATCH] feat: adding a custom Reader for prometheus metrics --- .../sdk/metrics/_internal/export/__init__.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 0568270ae6b..2957a727ba5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -552,3 +552,66 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: super().force_flush(timeout_millis=timeout_millis) self._exporter.force_flush(timeout_millis=timeout_millis) return True + + +class CustomMetricReader(MetricReader): + """Implementation of `MetricReader` that returns its metrics from :func:`get_metrics_data`. + + This is useful for e.g. unit tests. + """ + + def __init__( + self, + exporter: MetricExporter, + export_timeout_millis: Optional[float] = None, + ) -> None: + super().__init__( + preferred_temporality=exporter._preferred_temporality, + preferred_aggregation=exporter._preferred_aggregation, + ) + self._lock = RLock() + self._export_lock = Lock() + + self._exporter = exporter + self._metrics_data: ( + "opentelemetry.sdk.metrics.export.MetricsData" + ) = None + self._daemon_thread = None + self._export_timeout_millis = export_timeout_millis + + def get_metrics_data( + self, + ) -> ("opentelemetry.sdk.metrics.export.MetricsData"): + """Reads and returns current metrics from the SDK""" + with self._lock: + try: + self.collect(timeout_millis=self._export_timeout_millis) + except MetricsTimeoutError: + _logger.warning( + "Metric collection timed out", + exc_info=True, + ) + metrics_data = self._metrics_data + self._metrics_data = None + return metrics_data + + def _receive_metrics( + self, + metrics_data: "opentelemetry.sdk.metrics._internal.point.MetricsData", + timeout_millis: float = 10_000, + **kwargs, + ) -> None: + if metrics_data is None: + return + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + with self._lock: + self._exporter.export( + metrics_data, timeout_millis=timeout_millis + ) + except Exception as e: # pylint: disable=broad-except,invalid-name + _logger.exception("Exception while exporting metrics %s", str(e)) + detach(token) + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass \ No newline at end of file