From 478d1f3a03813e340d00831b006f39503fceabb6 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 30 Apr 2025 17:41:59 +0000 Subject: [PATCH 01/11] Initial commit to add timeout as a parm to export, make retries encompass timeout --- .../otlp/proto/grpc/_log_exporter/__init__.py | 12 +- .../exporter/otlp/proto/grpc/exporter.py | 137 ++++++--------- .../proto/grpc/metric_exporter/__init__.py | 22 ++- .../proto/grpc/trace_exporter/__init__.py | 14 +- .../test-requirements.txt | 1 + .../tests/test_otlp_exporter_mixin.py | 166 ++++++++++-------- .../tests/test_otlp_metrics_exporter.py | 6 +- .../tests/test_otlp_trace_exporter.py | 10 +- .../otlp/proto/http/_log_exporter/__init__.py | 37 ++-- .../proto/http/metric_exporter/__init__.py | 35 ++-- .../proto/http/trace_exporter/__init__.py | 60 +++---- .../metrics/test_otlp_metrics_exporter.py | 70 +++++--- .../tests/test_proto_log_exporter.py | 68 ++++--- .../tests/test_proto_span_exporter.py | 90 ++++++---- .../sdk/_logs/_internal/export/__init__.py | 17 +- .../export/in_memory_log_exporter.py | 1 + .../sdk/trace/export/__init__.py | 12 +- 17 files changed, 431 insertions(+), 327 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 8f629899d77..d13f662ccab 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -58,7 +58,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): if insecure is None: @@ -79,7 +79,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -107,8 +107,12 @@ def _translate_data( ) -> ExportLogsServiceRequest: return encode_logs(data) - def export(self, batch: Sequence[LogData]) -> LogExportResult: - return self._export(batch) + def export( + self, batch: Sequence[LogData], timeout_millis: Optional[int] = None + ) -> LogExportResult: + return self._export( + batch, timeout_millis / 1e3 if timeout_millis else None + ) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 79270b99a0c..d169d1e5a80 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -14,12 +14,12 @@ """OTLP Exporter""" +import json import threading from abc import ABC, abstractmethod from collections.abc import Sequence # noqa: F401 from logging import getLogger from os import environ -from time import sleep from typing import ( # noqa: F401 Any, Callable, @@ -35,7 +35,6 @@ from urllib.parse import urlparse from deprecated import deprecated -from google.rpc.error_details_pb2 import RetryInfo from grpc import ( ChannelCredentials, @@ -47,7 +46,6 @@ ssl_channel_credentials, ) from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, _get_resource_data, ) from opentelemetry.exporter.otlp.proto.grpc import ( @@ -74,6 +72,29 @@ from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.util.re import parse_env_headers +json_config = json.dumps( + { + "methodConfig": [ + { + "name": [dict()], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "1s", + "maxBackoff": "64s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE", + "CANCELLED", + "RESOURCE_EXHAUSTED", + "ABORTED", + "OUT_OF_RANGE", + "DATA_LOSS", + ], + }, + } + ] + } +) logger = getLogger(__name__) SDKDataT = TypeVar("SDKDataT") ResourceDataT = TypeVar("ResourceDataT") @@ -195,7 +216,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): super().__init__() @@ -232,7 +253,7 @@ def __init__( else: self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10) ) self._collector_kwargs = None @@ -245,7 +266,11 @@ def __init__( if insecure: self._channel = insecure_channel( - self._endpoint, compression=compression + self._endpoint, + compression=compression, + options=[ + ("grpc.service_config", json_config), + ], ) else: credentials = _get_credentials( @@ -255,7 +280,12 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) self._channel = secure_channel( - self._endpoint, credentials, compression=compression + self._endpoint, + credentials, + compression=compression, + options=[ + ("grpc.service_config", json_config), + ], ) self._client = self._stub(self._channel) @@ -269,10 +299,10 @@ def _translate_data( pass def _export( - self, data: Union[TypingSequence[ReadableSpan], MetricsData] + self, + data: Union[TypingSequence[ReadableSpan], MetricsData], + timeout_sec: Optional[float] = None, ) -> ExportResultT: - # After the call to shutdown, subsequent calls to Export are - # not allowed and should return a Failure result. if self._shutdown: logger.warning("Exporter already shutdown, ignoring batch") return self._result.FAILURE @@ -280,79 +310,24 @@ def _export( # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto # TracesData and use the code below instead. - # logger.warning( - # "Transient error %s encountered while exporting %s, retrying in %ss.", - # error.code(), - # data.__class__.__name__, - # delay, - # ) - max_value = 64 - # expo returns a generator that yields delay values which grow - # exponentially. Once delay is greater than max_value, the yielded - # value will remain constant. - for delay in _create_exp_backoff_generator(max_value=max_value): - if delay == max_value or self._shutdown: + with self._export_lock: + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=(timeout_sec or self._timeout), + ) + return self._result.SUCCESS + except RpcError as error: + logger.error( + "Failed to export %s to %s, error code: %s", + self._exporting, + self._endpoint, + error.code(), + exc_info=error.code() == StatusCode.UNKNOWN, + ) return self._result.FAILURE - with self._export_lock: - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, - ) - - return self._result.SUCCESS - - except RpcError as error: - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s to %s, retrying in %ss." - ), - error.code(), - self._exporting, - self._endpoint, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s to %s, error code: %s", - self._exporting, - self._endpoint, - error.code(), - exc_info=error.code() == StatusCode.UNKNOWN, - ) - - if error.code() == StatusCode.OK: - return self._result.SUCCESS - - return self._result.FAILURE - - return self._result.FAILURE - def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: logger.warning("Exporter already shutdown, ignoring call") diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 8580dbb7386..64c10ae9056 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -13,10 +13,11 @@ from __future__ import annotations +import time from dataclasses import replace from logging import getLogger from os import environ -from typing import Iterable, List, Tuple, Union +from typing import Iterable, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression @@ -99,7 +100,7 @@ def __init__( credentials: ChannelCredentials | None = None, headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str] | None = None, - timeout: int | None = None, + timeout: float | None = None, compression: Compression | None = None, preferred_temporality: dict[type, AggregationTemporality] | None = None, @@ -124,7 +125,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -158,17 +159,22 @@ def _translate_data( def export( self, metrics_data: MetricsData, - timeout_millis: float = 10_000, + timeout_millis: Optional[int] = None, **kwargs, ) -> MetricExportResult: - # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC + timeout_sec = ( + timeout_millis / 1e3 if timeout_millis else self._timeout # pylint: disable=protected-access + ) if self._max_export_batch_size is None: - return self._export(data=metrics_data) + return self._export(metrics_data, timeout_sec) export_result = MetricExportResult.SUCCESS - + deadline_sec = time.time() + timeout_sec for split_metrics_data in self._split_metrics_data(metrics_data): - split_export_result = self._export(data=split_metrics_data) + time_remaining_sec = deadline_sec - time.time() + split_export_result = self._export( + split_metrics_data, time_remaining_sec + ) if split_export_result is MetricExportResult.FAILURE: export_result = MetricExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index c78c1b81bb6..b092b5b3b9e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -91,7 +91,7 @@ def __init__( headers: Optional[ Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): if insecure is None: @@ -112,7 +112,7 @@ def __init__( environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT) environ_timeout = ( - int(environ_timeout) if environ_timeout is not None else None + float(environ_timeout) if environ_timeout is not None else None ) compression = ( @@ -139,8 +139,14 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - return self._export(spans) + def export( + self, + spans: Sequence[ReadableSpan], + timeout_millis: Optional[int] = None, + ) -> SpanExportResult: + return self._export( + spans, timeout_millis / 1e3 if timeout_millis else None + ) def shutdown(self) -> None: OTLPExporterMixin.shutdown(self) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt b/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt index 28d778461a9..01c9f1ddadd 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt @@ -2,6 +2,7 @@ asgiref==3.7.2 Deprecated==1.2.14 googleapis-common-protos==1.63.2 grpcio==1.66.2 +grpcio-status==1.66.0 importlib-metadata==6.11.0 iniconfig==2.0.0 packaging==24.0 diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 656d9a6cb79..5a75595f693 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -15,18 +15,14 @@ import threading import time from concurrent.futures import ThreadPoolExecutor -from logging import WARNING +from logging import WARNING, getLogger from typing import Any, Optional, Sequence from unittest import TestCase -from unittest.mock import Mock, patch +from unittest.mock import ANY, Mock, patch -from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module - Duration, -) -from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module - RetryInfo, -) -from grpc import Compression, StatusCode, server +from google.rpc import code_pb2, status_pb2 +from grpc import Compression, server +from grpc_status import rpc_status from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, @@ -55,6 +51,8 @@ SpanExportResult, ) +logger = getLogger(__name__) + # The below tests use this test SpanExporter and Spans, but are testing the # underlying behavior in the mixin. A MetricExporter or LogExporter could @@ -73,8 +71,14 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - return self._export(spans) + def export( + self, + spans: Sequence[ReadableSpan], + timeout_millis: Optional[float] = None, + ) -> SpanExportResult: + return self._export( + spans, timeout_millis / 1e3 if timeout_millis else None + ) @property def _exporting(self): @@ -87,40 +91,25 @@ def shutdown(self, timeout_millis=30_000): class TraceServiceServicerWithExportParams(TraceServiceServicer): def __init__( self, - export_result: StatusCode, + export_result: code_pb2, optional_export_sleep: Optional[float] = None, - optional_export_retry_millis: Optional[float] = None, ): self.export_result = export_result self.optional_export_sleep = optional_export_sleep - self.optional_export_retry_millis = optional_export_retry_millis # pylint: disable=invalid-name,unused-argument def Export(self, request, context): + logger.warning("Export Request Recieved") if self.optional_export_sleep: time.sleep(self.optional_export_sleep) - if self.optional_export_retry_millis: - context.send_initial_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo().SerializeToString(), - ), - ) - ) - context.set_trailing_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo( - retry_delay=Duration( - nanos=int(self.optional_export_retry_millis) - ) - ).SerializeToString(), - ), + if self.export_result != code_pb2.OK: + context.abort_with_status( + rpc_status.to_status( + status_pb2.Status( + code=self.export_result, + ) ) ) - context.set_code(self.export_result) return ExportTraceServiceResponse() @@ -268,7 +257,9 @@ def test_otlp_exporter_otlp_compression_unspecified( """No env or kwarg should be NoCompression""" OTLPSpanExporterForTesting(insecure=True) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression + "localhost:4317", + compression=Compression.NoCompression, + options=ANY, ) # pylint: disable=no-self-use, disable=unused-argument @@ -292,12 +283,12 @@ def test_otlp_exporter_otlp_compression_envvar( """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPSpanExporterForTesting(insecure=True) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.Gzip + "localhost:4317", compression=Compression.Gzip, options=ANY ) def test_shutdown(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(StatusCode.OK), + TraceServiceServicerWithExportParams(code_pb2.OK), self.server, ) self.assertEqual( @@ -316,7 +307,7 @@ def test_shutdown(self): def test_shutdown_wait_last_export(self): add_TraceServiceServicer_to_server( TraceServiceServicerWithExportParams( - StatusCode.OK, optional_export_sleep=1 + code_pb2.OK, optional_export_sleep=1 ), self.server, ) @@ -337,7 +328,7 @@ def test_shutdown_wait_last_export(self): def test_shutdown_doesnot_wait_last_export(self): add_TraceServiceServicer_to_server( TraceServiceServicerWithExportParams( - StatusCode.OK, optional_export_sleep=3 + code_pb2.OK, optional_export_sleep=3 ), self.server, ) @@ -360,7 +351,7 @@ def test_export_over_closed_grpc_channel(self): # pylint: disable=protected-access add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(StatusCode.OK), + TraceServiceServicerWithExportParams(code_pb2.OK), self.server, ) self.exporter.export([self.span]) @@ -372,52 +363,79 @@ def test_export_over_closed_grpc_channel(self): str(err.exception), "Cannot invoke RPC on closed channel!" ) - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - mock_expo.configure_mock(**{"return_value": [0.01]}) - + def test_retry_timeout(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE), + TraceServiceServicerWithExportParams(code_pb2.UNAVAILABLE), self.server, ) - result = self.exporter.export([self.span]) - self.assertEqual(result, SpanExportResult.FAILURE) - mock_sleep.assert_called_with(0.01) - - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep): + with self.assertLogs(level=WARNING) as warning: + # Set timeout to 1.5 seconds + self.assertEqual( + self.exporter.export([self.span], 1500), + SpanExportResult.FAILURE, + ) + # Our GRPC retry policy starts with a 1 second backoff then doubles. + # So we expect just two calls: one at time 0, one at time 1. + # The final log is from when export fails. + self.assertEqual(len(warning.records), 3) + for idx, log in enumerate(warning.records): + if idx != 2: + self.assertEqual( + "Export Request Recieved", + log.message, + ) + else: + self.assertEqual( + "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", + log.message, + ) + with self.assertLogs(level=WARNING) as warning: + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=3.5) + # This time don't pass in a timeout to export, so it should fallback to the timeout + # passed to the exporter class. + # pylint: disable=protected-access + self.assertEqual(exporter._timeout, 3.5) + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + # We expect 3 calls: time 0, time 1, time 3, but not time 7. + # The final log is from when export fails. + self.assertEqual(len(warning.records), 4) + for idx, log in enumerate(warning.records): + if idx != 3: + self.assertEqual( + "Export Request Recieved", + log.message, + ) + else: + self.assertEqual( + "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", + log.message, + ) + + def test_timeout_set_correctly(self): add_TraceServiceServicer_to_server( TraceServiceServicerWithExportParams( - StatusCode.UNAVAILABLE, - optional_export_sleep=None, - optional_export_retry_millis=1e7, + code_pb2.OK, optional_export_sleep=0.5 ), self.server, ) + # Should timeout. Deadline should be set to now + timeout. + # That is 400 millis from now, and export sleeps for 500 millis. with self.assertLogs(level=WARNING) as warning: self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE + self.exporter.export([self.span], 400), + SpanExportResult.FAILURE, ) - mock_sleep.assert_called_with(0.01) - self.assertEqual( - warning.records[0].message, - ( - "Transient error StatusCode.UNAVAILABLE encountered " - "while exporting traces to localhost:4317, retrying in 0.01s." - ), + "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", + warning.records[-1].message, ) - def test_success(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(StatusCode.OK), - self.server, - ) self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.SUCCESS + self.exporter.export([self.span], 600), + SpanExportResult.SUCCESS, ) def test_otlp_headers_from_env(self): @@ -431,15 +449,13 @@ def test_otlp_headers_from_env(self): def test_permanent_failure(self): with self.assertLogs(level=WARNING) as warning: add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams( - StatusCode.ALREADY_EXISTS - ), + TraceServiceServicerWithExportParams(code_pb2.ALREADY_EXISTS), self.server, ) self.assertEqual( self.exporter.export([self.span]), SpanExportResult.FAILURE ) self.assertEqual( - warning.records[0].message, + warning.records[-1].message, "Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS", ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 2ea12f660fb..ceda6e72a8e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -18,7 +18,7 @@ from os.path import dirname from typing import List from unittest import TestCase -from unittest.mock import patch +from unittest.mock import ANY, patch from grpc import ChannelCredentials, Compression @@ -297,7 +297,9 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): insecure=True, compression=Compression.NoCompression ) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression + "localhost:4317", + compression=Compression.NoCompression, + options=ANY, ) def test_split_metrics_data_many_data_points(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 73d8d6c7a20..5238dc91224 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -16,7 +16,7 @@ import os from unittest import TestCase -from unittest.mock import Mock, PropertyMock, patch +from unittest.mock import ANY, Mock, PropertyMock, patch from grpc import ChannelCredentials, Compression @@ -333,7 +333,9 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): """Specifying kwarg should take precedence over env""" OTLPSpanExporter(insecure=True, compression=Compression.NoCompression) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.NoCompression + "localhost:4317", + compression=Compression.NoCompression, + options=ANY, ) # pylint: disable=no-self-use @@ -350,7 +352,9 @@ def test_otlp_exporter_otlp_compression_precendence( """ OTLPSpanExporter(insecure=True) mock_insecure_channel.assert_called_once_with( - "localhost:4317", compression=Compression.Gzip + "localhost:4317", + compression=Compression.Gzip, + options=ANY, ) def test_translate_spans(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index f86f0113833..1a46cab057c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -17,15 +17,12 @@ import zlib from io import BytesIO from os import environ -from time import sleep +from time import sleep, time from typing import Dict, Optional, Sequence import requests from requests.exceptions import ConnectionError -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, -) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( _OTLP_HTTP_HEADERS, @@ -64,8 +61,6 @@ class OTLPLogExporter(LogExporter): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -73,7 +68,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): @@ -108,7 +103,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_LOGS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -124,7 +119,7 @@ def __init__( ) self._shutdown = False - def _export(self, serialized_data: bytes): + def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -143,7 +138,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec cert=self._client_cert, ) except ConnectionError: @@ -151,7 +146,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec cert=self._client_cert, ) return resp @@ -164,7 +159,9 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def export(self, batch: Sequence[LogData]) -> LogExportResult: + def export( + self, batch: Sequence[LogData], timeout_millis: Optional[float] = None + ) -> LogExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result. if self._shutdown: @@ -172,18 +169,20 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: return LogExportResult.FAILURE serialized_data = encode_logs(batch).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: + deadline_sec = time() + ( + timeout_millis / 1e3 if timeout_millis else self._timeout + ) + for delay in [1, 2, 4, 8, 16, 32]: + remaining_time_sec = deadline_sec - time() + if remaining_time_sec < 1e-09: return LogExportResult.FAILURE - - resp = self._export(serialized_data) + resp = self._export(serialized_data, remaining_time_sec) # pylint: disable=no-else-return if resp.ok: return LogExportResult.SUCCESS elif self._retryable(resp): + if delay > (deadline_sec - time()): + return LogExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting logs batch, retrying in %ss.", resp.reason, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 4feea8d4302..6c8b930fbc7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -17,13 +17,14 @@ import zlib from io import BytesIO from os import environ -from time import sleep +from time import sleep, time from typing import ( # noqa: F401 Any, Callable, Dict, List, Mapping, + Optional, Sequence, ) @@ -32,7 +33,6 @@ from requests.exceptions import ConnectionError from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, _get_resource_data, ) from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( @@ -101,8 +101,6 @@ class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: str | None = None, @@ -110,7 +108,7 @@ def __init__( client_key_file: str | None = None, client_certificate_file: str | None = None, headers: dict[str, str] | None = None, - timeout: int | None = None, + timeout: float | None = None, compression: Compression | None = None, session: requests.Session | None = None, preferred_temporality: dict[type, AggregationTemporality] @@ -147,7 +145,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -166,7 +164,7 @@ def __init__( preferred_temporality, preferred_aggregation ) - def _export(self, serialized_data: bytes): + def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -185,7 +183,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: @@ -193,7 +191,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) return resp @@ -209,21 +207,26 @@ def _retryable(resp: requests.Response) -> bool: def export( self, metrics_data: MetricsData, - timeout_millis: float = 10_000, + timeout_millis: Optional[float] = None, **kwargs, ) -> MetricExportResult: serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: + deadline_sec = time() + ( + timeout_millis / 1e3 if timeout_millis else self._timeout + ) + for delay in [1, 2, 4, 8, 16, 32]: + remaining_time_sec = deadline_sec - time() + if remaining_time_sec < 1e-09: return MetricExportResult.FAILURE - - resp = self._export(serialized_data.SerializeToString()) + resp = self._export( + serialized_data.SerializeToString(), remaining_time_sec + ) # pylint: disable=no-else-return if resp.ok: return MetricExportResult.SUCCESS elif self._retryable(resp): + if delay > (deadline_sec - time()): + return MetricExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting metric batch, retrying in %ss.", resp.reason, diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 1841e5210a4..6fa0f1c1bdd 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -17,15 +17,12 @@ import zlib from io import BytesIO from os import environ -from time import sleep -from typing import Dict, Optional +from time import sleep, time +from typing import Dict, Optional, Sequence import requests from requests.exceptions import ConnectionError -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, -) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) @@ -49,6 +46,7 @@ OTEL_EXPORTER_OTLP_TRACES_HEADERS, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, ) +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.util.re import parse_env_headers @@ -62,8 +60,6 @@ class OTLPSpanExporter(SpanExporter): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -71,7 +67,7 @@ def __init__( client_key_file: Optional[str] = None, client_certificate_file: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, session: Optional[requests.Session] = None, ): @@ -105,7 +101,7 @@ def __init__( self._headers = headers or parse_env_headers( headers_string, liberal=True ) - self._timeout = timeout or int( + self._timeout = timeout or float( environ.get( OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), @@ -121,7 +117,7 @@ def __init__( ) self._shutdown = False - def _export(self, serialized_data: bytes): + def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -140,7 +136,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: @@ -148,7 +144,7 @@ def _export(self, serialized_data: bytes): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_sec, cert=self._client_cert, ) return resp @@ -161,21 +157,32 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def _serialize_spans(self, spans): - return encode_spans(spans).SerializePartialToString() + def export( + self, + spans: Sequence[ReadableSpan], + timeout_millis: Optional[float] = None, + ) -> SpanExportResult: + # After the call to Shutdown subsequent calls to Export are + # not allowed and should return a Failure result. + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return SpanExportResult.FAILURE - def _export_serialized_spans(self, serialized_data): - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: + serialized_data = encode_spans(spans).SerializePartialToString() + deadline_sec = time() + ( + timeout_millis / 1e3 if timeout_millis else self._timeout + ) + for delay in [1, 2, 4, 8, 16, 32]: + remaining_time_sec = deadline_sec - time() + if remaining_time_sec < 1e-09: return SpanExportResult.FAILURE - - resp = self._export(serialized_data) + resp = self._export(serialized_data, remaining_time_sec) # pylint: disable=no-else-return if resp.ok: return SpanExportResult.SUCCESS elif self._retryable(resp): + if delay > (deadline_sec - time()): + return SpanExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting span batch, retrying in %ss.", resp.reason, @@ -192,17 +199,6 @@ def _export_serialized_spans(self, serialized_data): return SpanExportResult.FAILURE return SpanExportResult.FAILURE - def export(self, spans) -> SpanExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring batch") - return SpanExportResult.FAILURE - - serialized_data = self._serialize_spans(spans) - - return self._export_serialized_spans(serialized_data) - def shutdown(self): if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 16bb3e54286..df7c0c17ea3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -15,11 +15,10 @@ from logging import WARNING from os import environ from unittest import TestCase -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import ANY, MagicMock, Mock, patch from requests import Session from requests.models import Response -from responses import POST, activate, add from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, @@ -327,31 +326,10 @@ def test_serialization(self, mock_post): url=exporter._endpoint, data=serialized_data.SerializeToString(), verify=exporter._certificate_file, - timeout=exporter._timeout, + timeout=ANY, # Timeout is a float based on real time, can't put an exact value here. cert=exporter._client_cert, ) - @activate - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - add( - POST, - "http://metrics.example.com/export", - json={"error": "something exploded"}, - status=500, - ) - - exporter = OTLPMetricExporter( - endpoint="http://metrics.example.com/export" - ) - metrics_data = self.metrics["sum_int"] - - exporter.export(metrics_data) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) - def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter() @@ -523,3 +501,47 @@ def test_preferred_aggregation_override(self): self.assertEqual( exporter._preferred_aggregation[Histogram], histogram_aggregation ) + + @patch.object(Session, "post") + def test_retry_timeout(self, mock_post): + exporter = OTLPMetricExporter(timeout=3.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + with self.assertLogs(level=WARNING) as warning: + # Set timeout to 1.5 seconds + self.assertEqual( + exporter.export(self.metrics["sum_int"], 1500), + MetricExportResult.FAILURE, + ) + # Code should return failure before the final retry which would exceed timeout. + # Code should return failure after retrying once. + self.assertEqual(len(warning.records), 1) + self.assertEqual( + "Transient error UNAVAILABLE encountered while exporting metric batch, retrying in 1s.", + warning.records[0].message, + ) + with self.assertLogs(level=WARNING) as warning: + # This time don't pass in a timeout, so it will fallback to 3.5 second set on class. + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + # 2 retrys (after 1s, 3s). + self.assertEqual(len(warning.records), 2) + + @patch.object(Session, "post") + def test_timeout_set_correctly(self, mock_post): + resp = Response() + resp.status_code = 200 + + def export_side_effect(*args, **kwargs): + # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. + self.assertTrue(0.4 - kwargs["timeout"] < 0.0005) + return resp + + mock_post.side_effect = export_side_effect + exporter = OTLPMetricExporter() + exporter.export(self.metrics["sum_int"], 400) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 66b0f890d76..00a00ae3aa9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -15,12 +15,14 @@ # pylint: disable=protected-access import unittest +from logging import WARNING from typing import List -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, patch import requests -import responses from google.protobuf.json_format import MessageToDict +from requests import Session +from requests.models import Response from opentelemetry._logs import SeverityNumber from opentelemetry.exporter.otlp.proto.http import Compression @@ -267,25 +269,6 @@ def test_exported_log_without_span_id(self): else: self.fail("No log records found") - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, - "http://logs.example.com/export", - json={"error": "something exploded"}, - status=500, - ) - - exporter = OTLPLogExporter(endpoint="http://logs.example.com/export") - logs = self._get_sdk_log_data() - - exporter.export(logs) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) - @staticmethod def _get_sdk_log_data() -> List[LogData]: log1 = LogData( @@ -365,3 +348,46 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): self.assertEqual( OTLPLogExporter().export(MagicMock()), LogExportResult.SUCCESS ) + + @patch.object(Session, "post") + def test_retry_timeout(self, mock_post): + exporter = OTLPLogExporter(timeout=3.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + with self.assertLogs(level=WARNING) as warning: + # Set timeout to 1.5 seconds + self.assertEqual( + exporter.export(self._get_sdk_log_data(), 1500), + LogExportResult.FAILURE, + ) + # Code should return failure after retrying once. + self.assertEqual(len(warning.records), 1) + self.assertEqual( + "Transient error UNAVAILABLE encountered while exporting logs batch, retrying in 1s.", + warning.records[0].message, + ) + with self.assertLogs(level=WARNING) as warning: + # This time don't pass in a timeout, so it will fallback to 3.5 second set on class. + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogExportResult.FAILURE, + ) + # 2 retrys (after 1s, 3s). + self.assertEqual(len(warning.records), 2) + + @patch.object(Session, "post") + def test_timeout_set_correctly(self, mock_post): + resp = Response() + resp.status_code = 200 + + def export_side_effect(*args, **kwargs): + # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. + self.assertTrue(0.4 - kwargs["timeout"] < 0.0005) + return resp + + mock_post.side_effect = export_side_effect + exporter = OTLPLogExporter() + exporter.export(self._get_sdk_log_data(), 400) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 8d8ff6037aa..b7e357bbe4c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -13,10 +13,12 @@ # limitations under the License. import unittest -from unittest.mock import MagicMock, Mock, call, patch +from logging import WARNING +from unittest.mock import MagicMock, Mock, patch import requests -import responses +from requests import Session +from requests.models import Response from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( @@ -52,6 +54,16 @@ OS_ENV_CLIENT_KEY = "os/env/client-key.pem" OS_ENV_HEADERS = "envHeader1=val1,envHeader2=val2" OS_ENV_TIMEOUT = "30" +BASIC_SPAN = _Span( + "abc", + context=Mock( + **{ + "trace_state": {"a": "b", "c": "d"}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), +) # pylint: disable=protected-access @@ -227,37 +239,6 @@ def test_headers_parse_from_env(self): ), ) - # pylint: disable=no-self-use - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, - "http://traces.example.com/export", - json={"error": "something exploded"}, - status=500, - ) - - exporter = OTLPSpanExporter( - endpoint="http://traces.example.com/export" - ) - span = _Span( - "abc", - context=Mock( - **{ - "trace_state": {"a": "b", "c": "d"}, - "span_id": 10217189687419569865, - "trace_id": 67545097771067222548457157018666467027, - } - ), - ) - - exporter.export([span]) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ @@ -267,3 +248,46 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): self.assertEqual( OTLPSpanExporter().export(MagicMock()), SpanExportResult.SUCCESS ) + + @patch.object(Session, "post") + def test_retry_timeout(self, mock_post): + exporter = OTLPSpanExporter(timeout=3.5) + + resp = Response() + resp.status_code = 503 + resp.reason = "UNAVAILABLE" + mock_post.return_value = resp + with self.assertLogs(level=WARNING) as warning: + # Set timeout to 1.5 seconds + self.assertEqual( + exporter.export([BASIC_SPAN], 1500), + SpanExportResult.FAILURE, + ) + # Code should return failure after retrying once. + self.assertEqual(len(warning.records), 1) + self.assertEqual( + "Transient error UNAVAILABLE encountered while exporting span batch, retrying in 1s.", + warning.records[0].message, + ) + with self.assertLogs(level=WARNING) as warning: + # This time don't pass in a timeout, so it will fallback to 3.5 second set on class. + self.assertEqual( + exporter.export([BASIC_SPAN]), + SpanExportResult.FAILURE, + ) + # 2 retrys (after 1s, 3s). + self.assertEqual(len(warning.records), 2) + + @patch.object(Session, "post") + def test_timeout_set_correctly(self, mock_post): + resp = Response() + resp.status_code = 200 + + def export_side_effect(*args, **kwargs): + # Timeout should be set to something slightly less than 400 milliseconds depending on how much time has passed. + self.assertTrue(0.4 - kwargs["timeout"] < 0.0005) + return resp + + mock_post.side_effect = export_side_effect + exporter = OTLPSpanExporter() + exporter.export([BASIC_SPAN], 400) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 254c5f6b96d..36382ea9b29 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -72,11 +72,14 @@ class LogExporter(abc.ABC): """ @abc.abstractmethod - def export(self, batch: Sequence[LogData]): + def export( + self, batch: Sequence[LogData], timeout_millis: Optional[int] = None + ): """Exports a batch of logs. Args: - batch: The list of `LogData` objects to be exported + batch: The list of `LogData` objects to be exported. + timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeded. Returns: The result of the export @@ -89,6 +92,13 @@ def shutdown(self): Called when the SDK is shut down. """ + @abc.abstractmethod + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Hint to ensure that the export of any spans the exporter has received + prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably + before returning from this method. + """ + class ConsoleLogExporter(LogExporter): """Implementation of :class:`LogExporter` that prints log records to the @@ -107,6 +117,7 @@ def __init__( self.out = out self.formatter = formatter + # pylint: disable=arguments-differ def export(self, batch: Sequence[LogData]): for data in batch: self.out.write(self.formatter(data.log_record)) @@ -310,7 +321,7 @@ def shutdown(self): self._worker_thread.join() self._exporter.shutdown() - def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + def force_flush(self, timeout_millis: Optional[int] = None): if self._shutdown: return # Blocking call to export. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py index 68cb6b7389a..910e2cb17c2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py @@ -40,6 +40,7 @@ def get_finished_logs(self) -> typing.Tuple[LogData, ...]: with self._lock: return tuple(self._logs) + # pylint: disable=arguments-differ def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: if self._stopped: return LogExportResult.FAILURE diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 9e60d6cff9b..5fccdf6fc94 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import abc import collections import logging import os @@ -56,7 +57,8 @@ class SpanExportResult(Enum): FAILURE = 1 -class SpanExporter: + +class SpanExporter(abc.ABC): """Interface for exporting spans. Interface to be implemented by services that want to export spans recorded @@ -66,24 +68,30 @@ class SpanExporter: `SimpleSpanProcessor` or a `BatchSpanProcessor`. """ + @abc.abstractmethod def export( - self, spans: typing.Sequence[ReadableSpan] + self, + spans: typing.Sequence[ReadableSpan], + timeout_millis: typing.Optional[int] = None, ) -> "SpanExportResult": """Exports a batch of telemetry data. Args: spans: The list of `opentelemetry.trace.Span` objects to be exported + timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeded. Returns: The result of the export """ + @abc.abstractmethod def shutdown(self) -> None: """Shuts down the exporter. Called when the SDK is shut down. """ + @abc.abstractmethod def force_flush(self, timeout_millis: int = 30000) -> bool: """Hint to ensure that the export of any spans the exporter has received prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably From ccdd2246bbb4328bd0247472236da1c8fbc7b160 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 30 Apr 2025 18:41:38 +0000 Subject: [PATCH 02/11] Fix lint issues --- .../exporter/otlp/proto/http/_log_exporter/__init__.py | 4 ++-- .../opentelemetry/sdk/_logs/_internal/export/__init__.py | 8 -------- .../src/opentelemetry/sdk/trace/export/__init__.py | 8 +++++--- .../sdk/trace/export/in_memory_span_exporter.py | 1 + opentelemetry-sdk/tests/trace/export/test_export.py | 4 ++++ 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 1a46cab057c..c8530972ade 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -138,7 +138,7 @@ def _export(self, serialized_data: bytes, timeout_sec: float): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=timeout_sec + timeout=timeout_sec, cert=self._client_cert, ) except ConnectionError: @@ -146,7 +146,7 @@ def _export(self, serialized_data: bytes, timeout_sec: float): url=self._endpoint, data=data, verify=self._certificate_file, - timeout=timeout_sec + timeout=timeout_sec, cert=self._client_cert, ) return resp diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index 36382ea9b29..e42cc1532fe 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -92,13 +92,6 @@ def shutdown(self): Called when the SDK is shut down. """ - @abc.abstractmethod - def force_flush(self, timeout_millis: int = 30000) -> bool: - """Hint to ensure that the export of any spans the exporter has received - prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably - before returning from this method. - """ - class ConsoleLogExporter(LogExporter): """Implementation of :class:`LogExporter` that prints log records to the @@ -127,7 +120,6 @@ def export(self, batch: Sequence[LogData]): def shutdown(self): pass - class SimpleLogRecordProcessor(LogRecordProcessor): """This is an implementation of LogRecordProcessor which passes received logs in the export-friendly LogData representation to the diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 5fccdf6fc94..035f67d9f3f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -57,7 +57,6 @@ class SpanExportResult(Enum): FAILURE = 1 - class SpanExporter(abc.ABC): """Interface for exporting spans. @@ -91,7 +90,6 @@ def shutdown(self) -> None: Called when the SDK is shut down. """ - @abc.abstractmethod def force_flush(self, timeout_millis: int = 30000) -> bool: """Hint to ensure that the export of any spans the exporter has received prior to the call to ForceFlush SHOULD be completed as soon as possible, preferably @@ -517,11 +515,15 @@ def __init__( self.formatter = formatter self.service_name = service_name + # pylint: disable=arguments-differ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: for span in spans: self.out.write(self.formatter(span)) self.out.flush() return SpanExportResult.SUCCESS - def force_flush(self, timeout_millis: int = 30000) -> bool: + def force_flush(self, timeout_millis: int = 30000): return True + + def shutdown(self): + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py index c28ecfd214f..eadef019514 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py @@ -42,6 +42,7 @@ def get_finished_spans(self) -> typing.Tuple[ReadableSpan, ...]: with self._lock: return tuple(self._finished_spans) + # pylint: disable=arguments-differ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: """Stores a list of spans in memory.""" if self._stopped: diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index a6d9c36875b..770b3ff123c 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -60,6 +60,7 @@ def __init__( self.export_timeout = export_timeout_millis / 1e3 self.export_event = export_event + # pylint: disable=arguments-differ def export(self, spans: trace.Span) -> export.SpanExportResult: if ( self.max_export_batch_size is not None @@ -75,6 +76,9 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: def shutdown(self): self.is_shutdown = True + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + class TestSimpleSpanProcessor(unittest.TestCase): def test_simple_span_processor(self): From 5bc8894521323424c42daa5acfe5c043c863d43a Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Thu, 1 May 2025 19:04:05 +0000 Subject: [PATCH 03/11] Fix a bunch of failing style/lint/spellcheck checks --- .../opencensus/trace_exporter/__init__.py | 1 + .../exporter/otlp/proto/grpc/exporter.py | 6 +-- .../tests/test_otlp_exporter_mixin.py | 37 ++++++++++--------- .../exporter/zipkin/json/__init__.py | 1 + .../sdk/_logs/_internal/export/__init__.py | 3 +- .../sdk/trace/export/__init__.py | 8 +--- .../trace/export/in_memory_span_exporter.py | 3 -- .../tests/trace/export/test_export.py | 3 -- 8 files changed, 28 insertions(+), 34 deletions(-) diff --git a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py index 0b79bbb2073..5dd1ae2df09 100644 --- a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py @@ -69,6 +69,7 @@ def __init__( self.host_name = host_name self.node = utils.get_node(service_name, host_name) + # pylint: disable=arguments-differ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: # Populate service_name from first span # We restrict any SpanProcessor to be only associated with a single diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index d169d1e5a80..e4022e9bdd4 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -72,7 +72,7 @@ from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.util.re import parse_env_headers -json_config = json.dumps( +_JSON_CONFIG = json.dumps( { "methodConfig": [ { @@ -269,7 +269,7 @@ def __init__( self._endpoint, compression=compression, options=[ - ("grpc.service_config", json_config), + ("grpc.service_config", _JSON_CONFIG), ], ) else: @@ -284,7 +284,7 @@ def __init__( credentials, compression=compression, options=[ - ("grpc.service_config", json_config), + ("grpc.service_config", _JSON_CONFIG), ], ) self._client = self._stub(self._channel) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 5a75595f693..674e8589645 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -20,7 +20,12 @@ from unittest import TestCase from unittest.mock import ANY, Mock, patch -from google.rpc import code_pb2, status_pb2 +from google.rpc.code_pb2 import ( # pylint: disable=no-name-in-module + ALREADY_EXISTS, + OK, + UNAVAILABLE, +) +from google.rpc.status_pb2 import Status # pylint: disable=no-name-in-module from grpc import Compression, server from grpc_status import rpc_status @@ -91,7 +96,7 @@ def shutdown(self, timeout_millis=30_000): class TraceServiceServicerWithExportParams(TraceServiceServicer): def __init__( self, - export_result: code_pb2, + export_result: int, optional_export_sleep: Optional[float] = None, ): self.export_result = export_result @@ -99,13 +104,13 @@ def __init__( # pylint: disable=invalid-name,unused-argument def Export(self, request, context): - logger.warning("Export Request Recieved") + logger.warning("Export Request Received") if self.optional_export_sleep: time.sleep(self.optional_export_sleep) - if self.export_result != code_pb2.OK: + if self.export_result != OK: context.abort_with_status( rpc_status.to_status( - status_pb2.Status( + Status( code=self.export_result, ) ) @@ -288,7 +293,7 @@ def test_otlp_exporter_otlp_compression_envvar( def test_shutdown(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(code_pb2.OK), + TraceServiceServicerWithExportParams(OK), self.server, ) self.assertEqual( @@ -306,9 +311,7 @@ def test_shutdown(self): def test_shutdown_wait_last_export(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams( - code_pb2.OK, optional_export_sleep=1 - ), + TraceServiceServicerWithExportParams(OK, optional_export_sleep=1), self.server, ) @@ -327,9 +330,7 @@ def test_shutdown_wait_last_export(self): def test_shutdown_doesnot_wait_last_export(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams( - code_pb2.OK, optional_export_sleep=3 - ), + TraceServiceServicerWithExportParams(OK, optional_export_sleep=3), self.server, ) @@ -351,7 +352,7 @@ def test_export_over_closed_grpc_channel(self): # pylint: disable=protected-access add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(code_pb2.OK), + TraceServiceServicerWithExportParams(OK), self.server, ) self.exporter.export([self.span]) @@ -365,7 +366,7 @@ def test_export_over_closed_grpc_channel(self): def test_retry_timeout(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(code_pb2.UNAVAILABLE), + TraceServiceServicerWithExportParams(UNAVAILABLE), self.server, ) with self.assertLogs(level=WARNING) as warning: @@ -381,7 +382,7 @@ def test_retry_timeout(self): for idx, log in enumerate(warning.records): if idx != 2: self.assertEqual( - "Export Request Recieved", + "Export Request Received", log.message, ) else: @@ -405,7 +406,7 @@ def test_retry_timeout(self): for idx, log in enumerate(warning.records): if idx != 3: self.assertEqual( - "Export Request Recieved", + "Export Request Received", log.message, ) else: @@ -417,7 +418,7 @@ def test_retry_timeout(self): def test_timeout_set_correctly(self): add_TraceServiceServicer_to_server( TraceServiceServicerWithExportParams( - code_pb2.OK, optional_export_sleep=0.5 + OK, optional_export_sleep=0.5 ), self.server, ) @@ -449,7 +450,7 @@ def test_otlp_headers_from_env(self): def test_permanent_failure(self): with self.assertLogs(level=WARNING) as warning: add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(code_pb2.ALREADY_EXISTS), + TraceServiceServicerWithExportParams(ALREADY_EXISTS), self.server, ) self.assertEqual( diff --git a/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py b/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py index ba313db942a..d0d559981cf 100644 --- a/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py +++ b/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py @@ -149,6 +149,7 @@ def __init__( environ.get(OTEL_EXPORTER_ZIPKIN_TIMEOUT, 10) ) + # pylint: disable=arguments-differ def export(self, spans: Sequence[Span]) -> SpanExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index e42cc1532fe..ffcdc6f2343 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -79,7 +79,7 @@ def export( Args: batch: The list of `LogData` objects to be exported. - timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeded. + timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeeded. Returns: The result of the export @@ -120,6 +120,7 @@ def export(self, batch: Sequence[LogData]): def shutdown(self): pass + class SimpleLogRecordProcessor(LogRecordProcessor): """This is an implementation of LogRecordProcessor which passes received logs in the export-friendly LogData representation to the diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 035f67d9f3f..a4a1ba19d72 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -77,13 +77,12 @@ def export( Args: spans: The list of `opentelemetry.trace.Span` objects to be exported - timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeded. + timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeeded. Returns: The result of the export """ - @abc.abstractmethod def shutdown(self) -> None: """Shuts down the exporter. @@ -522,8 +521,5 @@ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: self.out.flush() return SpanExportResult.SUCCESS - def force_flush(self, timeout_millis: int = 30000): + def force_flush(self, timeout_millis: int = 30000) -> bool: return True - - def shutdown(self): - pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py index eadef019514..f13b9c512ef 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py @@ -57,6 +57,3 @@ def shutdown(self) -> None: Calls to export after the exporter has been shut down will fail. """ self._stopped = True - - def force_flush(self, timeout_millis: int = 30000) -> bool: - return True diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 770b3ff123c..f20656ecb5a 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -76,9 +76,6 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: def shutdown(self): self.is_shutdown = True - def force_flush(self, timeout_millis: int = 30000) -> bool: - return True - class TestSimpleSpanProcessor(unittest.TestCase): def test_simple_span_processor(self): From ba92c5aa7537965436d8120ff42bb3b59b9e7b70 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 2 May 2025 21:05:01 +0000 Subject: [PATCH 04/11] Remove timeout param from the export calls. --- .../otlp/proto/grpc/_log_exporter/__init__.py | 8 ++----- .../exporter/otlp/proto/grpc/exporter.py | 3 +-- .../proto/grpc/metric_exporter/__init__.py | 19 +++++---------- .../proto/grpc/trace_exporter/__init__.py | 10 ++------ .../tests/test_otlp_exporter_mixin.py | 24 +++++++------------ .../otlp/proto/http/_log_exporter/__init__.py | 8 ++----- .../proto/http/trace_exporter/__init__.py | 10 ++------ .../tests/test_proto_log_exporter.py | 11 ++++----- .../tests/test_proto_span_exporter.py | 11 ++++----- .../exporter/zipkin/json/__init__.py | 1 - .../sdk/_logs/_internal/export/__init__.py | 8 ++----- .../export/in_memory_log_exporter.py | 1 - .../sdk/trace/export/__init__.py | 10 ++------ .../trace/export/in_memory_span_exporter.py | 4 +++- .../tests/trace/export/test_export.py | 1 - 15 files changed, 41 insertions(+), 88 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index d13f662ccab..e66f9dbcca7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -107,12 +107,8 @@ def _translate_data( ) -> ExportLogsServiceRequest: return encode_logs(data) - def export( - self, batch: Sequence[LogData], timeout_millis: Optional[int] = None - ) -> LogExportResult: - return self._export( - batch, timeout_millis / 1e3 if timeout_millis else None - ) + def export(self, batch: Sequence[LogData]) -> LogExportResult: + return self._export(batch) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index e4022e9bdd4..0f280dc9986 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -301,7 +301,6 @@ def _translate_data( def _export( self, data: Union[TypingSequence[ReadableSpan], MetricsData], - timeout_sec: Optional[float] = None, ) -> ExportResultT: if self._shutdown: logger.warning("Exporter already shutdown, ignoring batch") @@ -315,7 +314,7 @@ def _export( self._client.Export( request=self._translate_data(data), metadata=self._headers, - timeout=(timeout_sec or self._timeout), + timeout=self._timeout, ) return self._result.SUCCESS except RpcError as error: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 64c10ae9056..dbb2a8e1dee 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -13,11 +13,10 @@ from __future__ import annotations -import time from dataclasses import replace from logging import getLogger from os import environ -from typing import Iterable, List, Optional, Tuple, Union +from typing import Iterable, List, Tuple, Union from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression @@ -159,26 +158,20 @@ def _translate_data( def export( self, metrics_data: MetricsData, - timeout_millis: Optional[int] = None, + timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: - timeout_sec = ( - timeout_millis / 1e3 if timeout_millis else self._timeout # pylint: disable=protected-access - ) + # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC if self._max_export_batch_size is None: - return self._export(metrics_data, timeout_sec) + return self._export(data=metrics_data) export_result = MetricExportResult.SUCCESS - deadline_sec = time.time() + timeout_sec + for split_metrics_data in self._split_metrics_data(metrics_data): - time_remaining_sec = deadline_sec - time.time() - split_export_result = self._export( - split_metrics_data, time_remaining_sec - ) + split_export_result = self._export(data=split_metrics_data) if split_export_result is MetricExportResult.FAILURE: export_result = MetricExportResult.FAILURE - return export_result def _split_metrics_data( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index b092b5b3b9e..7aef65a2ca9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -139,14 +139,8 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) - def export( - self, - spans: Sequence[ReadableSpan], - timeout_millis: Optional[int] = None, - ) -> SpanExportResult: - return self._export( - spans, timeout_millis / 1e3 if timeout_millis else None - ) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + return self._export(spans) def shutdown(self) -> None: OTLPExporterMixin.shutdown(self) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 674e8589645..2e116f4218b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -76,14 +76,8 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) - def export( - self, - spans: Sequence[ReadableSpan], - timeout_millis: Optional[float] = None, - ) -> SpanExportResult: - return self._export( - spans, timeout_millis / 1e3 if timeout_millis else None - ) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + return self._export(spans) @property def _exporting(self): @@ -369,10 +363,11 @@ def test_retry_timeout(self): TraceServiceServicerWithExportParams(UNAVAILABLE), self.server, ) + # Set timeout to 1.5 seconds. + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=1.5) with self.assertLogs(level=WARNING) as warning: - # Set timeout to 1.5 seconds self.assertEqual( - self.exporter.export([self.span], 1500), + exporter.export([self.span]), SpanExportResult.FAILURE, ) # Our GRPC retry policy starts with a 1 second backoff then doubles. @@ -392,8 +387,6 @@ def test_retry_timeout(self): ) with self.assertLogs(level=WARNING) as warning: exporter = OTLPSpanExporterForTesting(insecure=True, timeout=3.5) - # This time don't pass in a timeout to export, so it should fallback to the timeout - # passed to the exporter class. # pylint: disable=protected-access self.assertEqual(exporter._timeout, 3.5) self.assertEqual( @@ -422,20 +415,21 @@ def test_timeout_set_correctly(self): ), self.server, ) + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.4) # Should timeout. Deadline should be set to now + timeout. # That is 400 millis from now, and export sleeps for 500 millis. with self.assertLogs(level=WARNING) as warning: self.assertEqual( - self.exporter.export([self.span], 400), + exporter.export([self.span]), SpanExportResult.FAILURE, ) self.assertEqual( "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", warning.records[-1].message, ) - + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.6) self.assertEqual( - self.exporter.export([self.span], 600), + exporter.export([self.span]), SpanExportResult.SUCCESS, ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index c8530972ade..6a6e8821feb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -159,9 +159,7 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def export( - self, batch: Sequence[LogData], timeout_millis: Optional[float] = None - ) -> LogExportResult: + def export(self, batch: Sequence[LogData]) -> LogExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result. if self._shutdown: @@ -169,9 +167,7 @@ def export( return LogExportResult.FAILURE serialized_data = encode_logs(batch).SerializeToString() - deadline_sec = time() + ( - timeout_millis / 1e3 if timeout_millis else self._timeout - ) + deadline_sec = time() + self._timeout for delay in [1, 2, 4, 8, 16, 32]: remaining_time_sec = deadline_sec - time() if remaining_time_sec < 1e-09: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 6fa0f1c1bdd..3feb8731c5e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -157,11 +157,7 @@ def _retryable(resp: requests.Response) -> bool: return True return False - def export( - self, - spans: Sequence[ReadableSpan], - timeout_millis: Optional[float] = None, - ) -> SpanExportResult: + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result. if self._shutdown: @@ -169,9 +165,7 @@ def export( return SpanExportResult.FAILURE serialized_data = encode_spans(spans).SerializePartialToString() - deadline_sec = time() + ( - timeout_millis / 1e3 if timeout_millis else self._timeout - ) + deadline_sec = time() + self._timeout for delay in [1, 2, 4, 8, 16, 32]: remaining_time_sec = deadline_sec - time() if remaining_time_sec < 1e-09: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 00a00ae3aa9..1ba1db06567 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -351,16 +351,15 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPLogExporter(timeout=3.5) + exporter = OTLPLogExporter(timeout=1.5) resp = Response() resp.status_code = 503 resp.reason = "UNAVAILABLE" mock_post.return_value = resp with self.assertLogs(level=WARNING) as warning: - # Set timeout to 1.5 seconds self.assertEqual( - exporter.export(self._get_sdk_log_data(), 1500), + exporter.export(self._get_sdk_log_data()), LogExportResult.FAILURE, ) # Code should return failure after retrying once. @@ -369,8 +368,8 @@ def test_retry_timeout(self, mock_post): "Transient error UNAVAILABLE encountered while exporting logs batch, retrying in 1s.", warning.records[0].message, ) + exporter = OTLPLogExporter(timeout=3.5) with self.assertLogs(level=WARNING) as warning: - # This time don't pass in a timeout, so it will fallback to 3.5 second set on class. self.assertEqual( exporter.export(self._get_sdk_log_data()), LogExportResult.FAILURE, @@ -389,5 +388,5 @@ def export_side_effect(*args, **kwargs): return resp mock_post.side_effect = export_side_effect - exporter = OTLPLogExporter() - exporter.export(self._get_sdk_log_data(), 400) + exporter = OTLPLogExporter(timeout=0.4) + exporter.export(self._get_sdk_log_data()) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index b7e357bbe4c..2096c727462 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -251,16 +251,15 @@ def test_2xx_status_code(self, mock_otlp_metric_exporter): @patch.object(Session, "post") def test_retry_timeout(self, mock_post): - exporter = OTLPSpanExporter(timeout=3.5) + exporter = OTLPSpanExporter(timeout=1.5) resp = Response() resp.status_code = 503 resp.reason = "UNAVAILABLE" mock_post.return_value = resp with self.assertLogs(level=WARNING) as warning: - # Set timeout to 1.5 seconds self.assertEqual( - exporter.export([BASIC_SPAN], 1500), + exporter.export([BASIC_SPAN]), SpanExportResult.FAILURE, ) # Code should return failure after retrying once. @@ -269,8 +268,8 @@ def test_retry_timeout(self, mock_post): "Transient error UNAVAILABLE encountered while exporting span batch, retrying in 1s.", warning.records[0].message, ) + exporter = OTLPSpanExporter(timeout=3.5) with self.assertLogs(level=WARNING) as warning: - # This time don't pass in a timeout, so it will fallback to 3.5 second set on class. self.assertEqual( exporter.export([BASIC_SPAN]), SpanExportResult.FAILURE, @@ -289,5 +288,5 @@ def export_side_effect(*args, **kwargs): return resp mock_post.side_effect = export_side_effect - exporter = OTLPSpanExporter() - exporter.export([BASIC_SPAN], 400) + exporter = OTLPSpanExporter(timeout=0.4) + exporter.export([BASIC_SPAN]) diff --git a/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py b/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py index d0d559981cf..ba313db942a 100644 --- a/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py +++ b/exporter/opentelemetry-exporter-zipkin-json/src/opentelemetry/exporter/zipkin/json/__init__.py @@ -149,7 +149,6 @@ def __init__( environ.get(OTEL_EXPORTER_ZIPKIN_TIMEOUT, 10) ) - # pylint: disable=arguments-differ def export(self, spans: Sequence[Span]) -> SpanExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index ffcdc6f2343..c331fa76c6b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -72,14 +72,11 @@ class LogExporter(abc.ABC): """ @abc.abstractmethod - def export( - self, batch: Sequence[LogData], timeout_millis: Optional[int] = None - ): + def export(self, batch: Sequence[LogData]): """Exports a batch of logs. Args: batch: The list of `LogData` objects to be exported. - timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeeded. Returns: The result of the export @@ -110,7 +107,6 @@ def __init__( self.out = out self.formatter = formatter - # pylint: disable=arguments-differ def export(self, batch: Sequence[LogData]): for data in batch: self.out.write(self.formatter(data.log_record)) @@ -314,7 +310,7 @@ def shutdown(self): self._worker_thread.join() self._exporter.shutdown() - def force_flush(self, timeout_millis: Optional[int] = None): + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if self._shutdown: return # Blocking call to export. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py index 910e2cb17c2..68cb6b7389a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/in_memory_log_exporter.py @@ -40,7 +40,6 @@ def get_finished_logs(self) -> typing.Tuple[LogData, ...]: with self._lock: return tuple(self._logs) - # pylint: disable=arguments-differ def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: if self._stopped: return LogExportResult.FAILURE diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index a4a1ba19d72..9e60d6cff9b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -13,7 +13,6 @@ # limitations under the License. from __future__ import annotations -import abc import collections import logging import os @@ -57,7 +56,7 @@ class SpanExportResult(Enum): FAILURE = 1 -class SpanExporter(abc.ABC): +class SpanExporter: """Interface for exporting spans. Interface to be implemented by services that want to export spans recorded @@ -67,17 +66,13 @@ class SpanExporter(abc.ABC): `SimpleSpanProcessor` or a `BatchSpanProcessor`. """ - @abc.abstractmethod def export( - self, - spans: typing.Sequence[ReadableSpan], - timeout_millis: typing.Optional[int] = None, + self, spans: typing.Sequence[ReadableSpan] ) -> "SpanExportResult": """Exports a batch of telemetry data. Args: spans: The list of `opentelemetry.trace.Span` objects to be exported - timeout_millis: Optional milliseconds until Export should timeout if it hasn't succeeded. Returns: The result of the export @@ -514,7 +509,6 @@ def __init__( self.formatter = formatter self.service_name = service_name - # pylint: disable=arguments-differ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: for span in spans: self.out.write(self.formatter(span)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py index f13b9c512ef..c28ecfd214f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/in_memory_span_exporter.py @@ -42,7 +42,6 @@ def get_finished_spans(self) -> typing.Tuple[ReadableSpan, ...]: with self._lock: return tuple(self._finished_spans) - # pylint: disable=arguments-differ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: """Stores a list of spans in memory.""" if self._stopped: @@ -57,3 +56,6 @@ def shutdown(self) -> None: Calls to export after the exporter has been shut down will fail. """ self._stopped = True + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index f20656ecb5a..a6d9c36875b 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -60,7 +60,6 @@ def __init__( self.export_timeout = export_timeout_millis / 1e3 self.export_event = export_event - # pylint: disable=arguments-differ def export(self, spans: trace.Span) -> export.SpanExportResult: if ( self.max_export_batch_size is not None From 29144a1dfb6e982cea8c5fcaee32dfe1e5f6d325 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 2 May 2025 21:13:42 +0000 Subject: [PATCH 05/11] Fix flaky windows test ? --- .../tests/test_otlp_exporter_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 2e116f4218b..c1b2fbb6997 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -427,7 +427,7 @@ def test_timeout_set_correctly(self): "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", warning.records[-1].message, ) - exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.6) + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.8) self.assertEqual( exporter.export([self.span]), SpanExportResult.SUCCESS, From 95ccfea3f9af5d10c6220b9788e9825d4958eec4 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 9 May 2025 17:43:25 +0000 Subject: [PATCH 06/11] Respond to review comments.. --- .../exporter/opencensus/trace_exporter/__init__.py | 1 - .../src/opentelemetry/exporter/otlp/proto/grpc/exporter.py | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py index 5dd1ae2df09..0b79bbb2073 100644 --- a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/trace_exporter/__init__.py @@ -69,7 +69,6 @@ def __init__( self.host_name = host_name self.node = utils.get_node(service_name, host_name) - # pylint: disable=arguments-differ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: # Populate service_name from first span # We restrict any SpanProcessor to be only associated with a single diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 0f280dc9986..b373df1c8df 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -78,9 +78,14 @@ { "name": [dict()], "retryPolicy": { + # 5 is the maximum allowable attempts allowed by grpc retry policy. + # This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt. + # Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting + # for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the + # timeout. See https://grpc.io/docs/guides/retry/ for more details. "maxAttempts": 5, "initialBackoff": "1s", - "maxBackoff": "64s", + "maxBackoff": "9s", "backoffMultiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE", From 8770e1518c38f145c52f6835d52f2324de06a63c Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 13 May 2025 18:08:47 +0000 Subject: [PATCH 07/11] Delete exponential backoff code that is now unused --- .../otlp/proto/common/_internal/__init__.py | 35 -------------- .../tests/test_backoff.py | 46 ------------------- .../exporter/otlp/proto/grpc/exporter.py | 2 +- 3 files changed, 1 insertion(+), 82 deletions(-) delete mode 100644 exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py index 2f49502cf1d..4f83db24eeb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -177,38 +177,3 @@ def _get_resource_data( ) ) return resource_data - - -def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]: - """ - Generates an infinite sequence of exponential backoff values. The sequence starts - from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified - and non-zero, the generated values will not exceed this maximum, capping at max_value - instead of growing indefinitely. - - Parameters: - - max_value (int, optional): The maximum value to yield. If 0 or not provided, the - sequence grows without bound. - - Returns: - Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or - capped at max_value. - - Example: - ``` - gen = _create_exp_backoff_generator(max_value=10) - for _ in range(5): - print(next(gen)) - ``` - This will print: - 1 - 2 - 4 - 8 - 10 - - Note: this functionality used to be handled by the 'backoff' package. - """ - for i in count(0): - out = 2**i - yield min(out, max_value) if max_value else out diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py deleted file mode 100644 index 789a184ad04..00000000000 --- a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import TestCase - -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, -) - - -class TestBackoffGenerator(TestCase): - def test_exp_backoff_generator(self): - generator = _create_exp_backoff_generator() - self.assertEqual(next(generator), 1) - self.assertEqual(next(generator), 2) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 8) - self.assertEqual(next(generator), 16) - - def test_exp_backoff_generator_with_max(self): - generator = _create_exp_backoff_generator(max_value=4) - self.assertEqual(next(generator), 1) - self.assertEqual(next(generator), 2) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 4) - - def test_exp_backoff_generator_with_odd_max(self): - # use a max_value that's not in the set - generator = _create_exp_backoff_generator(max_value=11) - self.assertEqual(next(generator), 1) - self.assertEqual(next(generator), 2) - self.assertEqual(next(generator), 4) - self.assertEqual(next(generator), 8) - self.assertEqual(next(generator), 11) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index df2d64a8d0e..e998147da31 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -82,7 +82,7 @@ # This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt. # Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting # for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the - # timeout. See https://grpc.io/docs/guides/retry/ for more details. + # timeout, and all fail. See https://grpc.io/docs/guides/retry/ for more details. "maxAttempts": 5, "initialBackoff": "1s", "maxBackoff": "9s", From f373caa572fb30410c3d9ca93a44ff781a9798be Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 13 May 2025 18:59:03 +0000 Subject: [PATCH 08/11] Add changelog and remove some unused imports.. --- CHANGELOG.md | 3 +++ .../exporter/otlp/proto/common/_internal/__init__.py | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85ab5a84852..3fa22b0b45b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)). +- Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs, + and an unnecessary 32 second sleep that occured after all retries had completed/failed was removed + ([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)). ## Version 1.33.0/0.54b0 (2025-05-09) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py index 4f83db24eeb..200644368df 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -17,12 +17,10 @@ import logging from collections.abc import Sequence -from itertools import count from typing import ( Any, Callable, Dict, - Iterator, List, Mapping, Optional, From d1e04e102a81238e3937722885cbe95794077905 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 13 May 2025 19:05:49 +0000 Subject: [PATCH 09/11] fix typo and unit test flaking on windows --- CHANGELOG.md | 2 +- .../tests/test_otlp_exporter_mixin.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fa22b0b45b..7b9053270e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)). - Update OTLP gRPC/HTTP exporters: the export timeout is now inclusive of all retries and backoffs, - and an unnecessary 32 second sleep that occured after all retries had completed/failed was removed + and an unnecessary 32 second sleep that occurred after all retries had completed/failed was removed ([#4564](https://github.com/open-telemetry/opentelemetry-python/pull/4564)). diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index c1b2fbb6997..aa620ed390a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -386,9 +386,9 @@ def test_retry_timeout(self): log.message, ) with self.assertLogs(level=WARNING) as warning: - exporter = OTLPSpanExporterForTesting(insecure=True, timeout=3.5) + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=5) # pylint: disable=protected-access - self.assertEqual(exporter._timeout, 3.5) + self.assertEqual(exporter._timeout, 5) self.assertEqual( exporter.export([self.span]), SpanExportResult.FAILURE, From f42ecd31bea8dab7a52f0a0def30bbbc190b38bc Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Thu, 22 May 2025 14:13:41 +0000 Subject: [PATCH 10/11] Refactor tests, HTTP exporters a bit --- .../pyproject.toml | 1 + .../otlp/proto/common/_internal/__init__.py | 10 ++ .../test-requirements.txt | 1 + .../exporter/otlp/proto/grpc/exporter.py | 19 ++- .../tests/test_otlp_exporter_mixin.py | 140 +++++++++--------- .../otlp/proto/http/_log_exporter/__init__.py | 48 +++--- .../proto/http/metric_exporter/__init__.py | 62 ++++---- .../proto/http/trace_exporter/__init__.py | 50 +++---- .../metrics/test_otlp_metrics_exporter.py | 42 ++++-- .../tests/test_proto_log_exporter.py | 22 ++- .../tests/test_proto_span_exporter.py | 22 ++- uv.lock | 6 +- 12 files changed, 221 insertions(+), 202 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml b/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml index 40b4950b99a..77ff64ea5f8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml +++ b/exporter/opentelemetry-exporter-otlp-proto-common/pyproject.toml @@ -29,6 +29,7 @@ classifiers = [ ] dependencies = [ "opentelemetry-proto == 1.34.0.dev", + "requests ~= 2.7", ] [project.urls] diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py index 200644368df..c239a4ed18a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -27,6 +27,8 @@ TypeVar, ) +import requests + from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue from opentelemetry.proto.common.v1.common_pb2 import ( ArrayValue as PB2ArrayValue, @@ -108,6 +110,14 @@ def _encode_key_value( ) +def _is_retryable(resp: requests.Response) -> bool: + if resp.status_code == 408: + return True + if resp.status_code >= 500 and resp.status_code <= 599: + return True + return False + + def _encode_array( array: Sequence[Any], allow_null: bool = False ) -> Sequence[PB2AnyValue]: diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt b/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt index 6ab1dbf75b7..49ffef98ac2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt +++ b/exporter/opentelemetry-exporter-otlp-proto-common/test-requirements.txt @@ -2,6 +2,7 @@ asgiref==3.7.2 Deprecated==1.2.14 importlib-metadata==6.11.0 iniconfig==2.0.0 +requests == 2.7.0 packaging==24.0 pluggy==1.5.0 protobuf==5.26.1 diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index e998147da31..baccd38d3d2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -72,17 +72,18 @@ from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.util.re import parse_env_headers -_JSON_CONFIG = json.dumps( +# 5 is the maximum allowable attempts allowed by grpc retry policy. +# This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt, +# plus or minus a 20% jitter. Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting +# for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the +# timeout, and all fail. A header `grpc-retry-pushback-ms` when set by the server will override +# and take precedence over this backoff. See https://grpc.io/docs/guides/retry/ for more details. +_GRPC_RETRY_POLICY = json.dumps( { "methodConfig": [ { "name": [dict()], "retryPolicy": { - # 5 is the maximum allowable attempts allowed by grpc retry policy. - # This policy results in backoffs of 1s, 2s, 4s, and then 8s after the initial failed attempt. - # Timeout set on the RPC call encompasses the retry backoffs AND time spent waiting - # for a response. DEADLINE_EXCEEDED is returned if all the attempts cannot complete within the - # timeout, and all fail. See https://grpc.io/docs/guides/retry/ for more details. "maxAttempts": 5, "initialBackoff": "1s", "maxBackoff": "9s", @@ -213,8 +214,6 @@ class OTLPExporterMixin( compression: gRPC compression method to use """ - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -276,7 +275,7 @@ def __init__( self._endpoint, compression=compression, options=[ - ("grpc.service_config", _JSON_CONFIG), + ("grpc.service_config", _GRPC_RETRY_POLICY), ], ) else: @@ -291,7 +290,7 @@ def __init__( credentials, compression=compression, options=[ - ("grpc.service_config", _JSON_CONFIG), + ("grpc.service_config", _GRPC_RETRY_POLICY), ], ) self._client = self._stub(self._channel) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index aa620ed390a..3c752d70e50 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -20,14 +20,7 @@ from unittest import TestCase from unittest.mock import ANY, Mock, patch -from google.rpc.code_pb2 import ( # pylint: disable=no-name-in-module - ALREADY_EXISTS, - OK, - UNAVAILABLE, -) -from google.rpc.status_pb2 import Status # pylint: disable=no-name-in-module -from grpc import Compression, server -from grpc_status import rpc_status +from grpc import Compression, StatusCode, server from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, @@ -90,25 +83,36 @@ def shutdown(self, timeout_millis=30_000): class TraceServiceServicerWithExportParams(TraceServiceServicer): def __init__( self, - export_result: int, + export_result: StatusCode, optional_export_sleep: Optional[float] = None, + optional_first_time_retry_millis: Optional[int] = None, ): self.export_result = export_result self.optional_export_sleep = optional_export_sleep + self.optional_first_time_retry_millis = ( + optional_first_time_retry_millis + ) + self.first_attempt = True + self.num_requests = 0 + self.now = time.time() # pylint: disable=invalid-name,unused-argument def Export(self, request, context): - logger.warning("Export Request Received") + self.num_requests += 1 if self.optional_export_sleep: time.sleep(self.optional_export_sleep) - if self.export_result != OK: - context.abort_with_status( - rpc_status.to_status( - Status( - code=self.export_result, + if self.export_result != StatusCode.OK: + if self.optional_first_time_retry_millis and self.first_attempt: + self.first_attempt = False + context.set_trailing_metadata( + ( + ( + "grpc-retry-pushback-ms", + str(self.optional_first_time_retry_millis), + ), ) ) - ) + context.abort(self.export_result, "") return ExportTraceServiceResponse() @@ -287,7 +291,7 @@ def test_otlp_exporter_otlp_compression_envvar( def test_shutdown(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(OK), + TraceServiceServicerWithExportParams(StatusCode.OK), self.server, ) self.assertEqual( @@ -305,7 +309,9 @@ def test_shutdown(self): def test_shutdown_wait_last_export(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(OK, optional_export_sleep=1), + TraceServiceServicerWithExportParams( + StatusCode.OK, optional_export_sleep=1 + ), self.server, ) @@ -324,7 +330,9 @@ def test_shutdown_wait_last_export(self): def test_shutdown_doesnot_wait_last_export(self): add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(OK, optional_export_sleep=3), + TraceServiceServicerWithExportParams( + StatusCode.OK, optional_export_sleep=3 + ), self.server, ) @@ -346,7 +354,7 @@ def test_export_over_closed_grpc_channel(self): # pylint: disable=protected-access add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(OK), + TraceServiceServicerWithExportParams(StatusCode.OK), self.server, ) self.exporter.export([self.span]) @@ -358,61 +366,56 @@ def test_export_over_closed_grpc_channel(self): str(err.exception), "Cannot invoke RPC on closed channel!" ) + def test_retry_with_server_pushback(self): + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.UNAVAILABLE, optional_first_time_retry_millis=200 + ) + add_TraceServiceServicer_to_server( + mock_trace_service, + self.server, + ) + exporter = OTLPSpanExporterForTesting(insecure=True, timeout=10) + before = time.time() + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + after = time.time() + # We set the `grpc-retry-pushback-ms` header to 200 millis on the first server response. + # So we expect the first request at time 0, second at time 0.2, + # third at 1.2 (start of backoff policy), fourth at time 3.2, last at time 7.2. + self.assertEqual(mock_trace_service.num_requests, 5) + # The backoffs have a jitter +- 20%, so we have to put a higher bound than 7.2. + self.assertTrue(after - before < 8.8) + def test_retry_timeout(self): + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.UNAVAILABLE + ) add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(UNAVAILABLE), + mock_trace_service, self.server, ) # Set timeout to 1.5 seconds. exporter = OTLPSpanExporterForTesting(insecure=True, timeout=1.5) - with self.assertLogs(level=WARNING) as warning: - self.assertEqual( - exporter.export([self.span]), - SpanExportResult.FAILURE, - ) - # Our GRPC retry policy starts with a 1 second backoff then doubles. - # So we expect just two calls: one at time 0, one at time 1. - # The final log is from when export fails. - self.assertEqual(len(warning.records), 3) - for idx, log in enumerate(warning.records): - if idx != 2: - self.assertEqual( - "Export Request Received", - log.message, - ) - else: - self.assertEqual( - "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", - log.message, - ) - with self.assertLogs(level=WARNING) as warning: - exporter = OTLPSpanExporterForTesting(insecure=True, timeout=5) - # pylint: disable=protected-access - self.assertEqual(exporter._timeout, 5) - self.assertEqual( - exporter.export([self.span]), - SpanExportResult.FAILURE, - ) - # We expect 3 calls: time 0, time 1, time 3, but not time 7. - # The final log is from when export fails. - self.assertEqual(len(warning.records), 4) - for idx, log in enumerate(warning.records): - if idx != 3: - self.assertEqual( - "Export Request Received", - log.message, - ) - else: - self.assertEqual( - "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", - log.message, - ) + before = time.time() + self.assertEqual( + exporter.export([self.span]), + SpanExportResult.FAILURE, + ) + after = time.time() + # Our retry starts with a 1 second backoff then doubles. + # So we expect just two calls: one at time 0, one at time 1. + self.assertEqual(mock_trace_service.num_requests, 2) + # gRPC retry config waits for the timeout (1.5) before cancelling the request. + self.assertTrue(after - before < 1.6) def test_timeout_set_correctly(self): + mock_trace_service = TraceServiceServicerWithExportParams( + StatusCode.OK, optional_export_sleep=0.5 + ) add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams( - OK, optional_export_sleep=0.5 - ), + mock_trace_service, self.server, ) exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.4) @@ -427,6 +430,7 @@ def test_timeout_set_correctly(self): "Failed to export traces to localhost:4317, error code: StatusCode.DEADLINE_EXCEEDED", warning.records[-1].message, ) + self.assertEqual(mock_trace_service.num_requests, 1) exporter = OTLPSpanExporterForTesting(insecure=True, timeout=0.8) self.assertEqual( exporter.export([self.span]), @@ -444,7 +448,9 @@ def test_otlp_headers_from_env(self): def test_permanent_failure(self): with self.assertLogs(level=WARNING) as warning: add_TraceServiceServicer_to_server( - TraceServiceServicerWithExportParams(ALREADY_EXISTS), + TraceServiceServicerWithExportParams( + StatusCode.ALREADY_EXISTS + ), self.server, ) self.assertEqual( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 6a6e8821feb..9dda3cca5cd 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -14,6 +14,7 @@ import gzip import logging +import random import zlib from io import BytesIO from os import environ @@ -23,6 +24,9 @@ import requests from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._internal import ( + _is_retryable, +) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( _OTLP_HTTP_HEADERS, @@ -58,6 +62,7 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_LOGS_EXPORT_PATH = "v1/logs" DEFAULT_TIMEOUT = 10 # in seconds +_MAX_RETRYS = 6 class OTLPLogExporter(LogExporter): @@ -151,48 +156,37 @@ def _export(self, serialized_data: bytes, timeout_sec: float): ) return resp - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - def export(self, batch: Sequence[LogData]) -> LogExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return LogExportResult.FAILURE serialized_data = encode_logs(batch).SerializeToString() deadline_sec = time() + self._timeout - for delay in [1, 2, 4, 8, 16, 32]: - remaining_time_sec = deadline_sec - time() - if remaining_time_sec < 1e-09: - return LogExportResult.FAILURE - resp = self._export(serialized_data, remaining_time_sec) - # pylint: disable=no-else-return + backoff_seconds = 1 * random.uniform(0.8, 1.2) + for retry_num in range(1, _MAX_RETRYS + 1): + resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return LogExportResult.SUCCESS - elif self._retryable(resp): - if delay > (deadline_sec - time()): - return LogExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: + if ( + not _is_retryable(resp) + or retry_num == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): _logger.error( "Failed to export logs batch code: %s, reason: %s", resp.status_code, resp.text, ) return LogExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + backoff_seconds *= 2 * random.uniform(0.8, 1.2) + # Not possible to reach here but the linter is complaining. return LogExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 6c8b930fbc7..bed515b0bc5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -14,6 +14,7 @@ import gzip import logging +import random import zlib from io import BytesIO from os import environ @@ -34,6 +35,7 @@ from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, + _is_retryable, ) from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( OTLPMetricExporterMixin, @@ -48,7 +50,7 @@ from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 ExportMetricsServiceRequest, ) -from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 # noqa: F401 +from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401 AnyValue, ArrayValue, InstrumentationScope, @@ -98,6 +100,7 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" DEFAULT_TIMEOUT = 10 # in seconds +_MAX_RETRYS = 6 class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): @@ -163,6 +166,7 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._shutdown = False def _export(self, serialized_data: bytes, timeout_sec: float): data = serialized_data @@ -196,55 +200,51 @@ def _export(self, serialized_data: bytes, timeout_sec: float): ) return resp - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - def export( self, metrics_data: MetricsData, timeout_millis: Optional[float] = None, **kwargs, ) -> MetricExportResult: - serialized_data = encode_metrics(metrics_data) + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return MetricExportResult.FAILURE + serialized_data = encode_metrics(metrics_data).SerializeToString() deadline_sec = time() + ( timeout_millis / 1e3 if timeout_millis else self._timeout ) - for delay in [1, 2, 4, 8, 16, 32]: - remaining_time_sec = deadline_sec - time() - if remaining_time_sec < 1e-09: - return MetricExportResult.FAILURE - resp = self._export( - serialized_data.SerializeToString(), remaining_time_sec - ) - # pylint: disable=no-else-return + backoff_seconds = 1 * random.uniform(0.8, 1.2) + for retry_num in range(1, _MAX_RETRYS + 1): + resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return MetricExportResult.SUCCESS - elif self._retryable(resp): - if delay > (deadline_sec - time()): - return MetricExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: + if ( + not _is_retryable(resp) + or retry_num == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): _logger.error( - "Failed to export batch code: %s, reason: %s", + "Failed to export metrics batch code: %s, reason: %s", resp.status_code, resp.text, ) return MetricExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + backoff_seconds *= 2 * random.uniform(0.8, 1.2) + # Not possible to reach here but the linter is complaining. return MetricExportResult.FAILURE def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._session.close() + self._shutdown = True @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 3feb8731c5e..7bce9494648 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -14,6 +14,7 @@ import gzip import logging +import random import zlib from io import BytesIO from os import environ @@ -23,6 +24,9 @@ import requests from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.common._internal import ( + _is_retryable, +) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) @@ -57,6 +61,7 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_TRACES_EXPORT_PATH = "v1/traces" DEFAULT_TIMEOUT = 10 # in seconds +_MAX_RETRYS = 6 class OTLPSpanExporter(SpanExporter): @@ -149,48 +154,37 @@ def _export(self, serialized_data: bytes, timeout_sec: float): ) return resp - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE serialized_data = encode_spans(spans).SerializePartialToString() deadline_sec = time() + self._timeout - for delay in [1, 2, 4, 8, 16, 32]: - remaining_time_sec = deadline_sec - time() - if remaining_time_sec < 1e-09: - return SpanExportResult.FAILURE - resp = self._export(serialized_data, remaining_time_sec) - # pylint: disable=no-else-return + backoff_seconds = 1 * random.uniform(0.8, 1.2) + for retry_num in range(1, _MAX_RETRYS + 1): + resp = self._export(serialized_data, deadline_sec - time()) if resp.ok: return SpanExportResult.SUCCESS - elif self._retryable(resp): - if delay > (deadline_sec - time()): - return SpanExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: + if ( + not _is_retryable(resp) + or retry_num == _MAX_RETRYS + or backoff_seconds > (deadline_sec - time()) + ): _logger.error( - "Failed to export batch code: %s, reason: %s", + "Failed to export span batch code: %s, reason: %s", resp.status_code, resp.text, ) return SpanExportResult.FAILURE + _logger.warning( + "Transient error %s encountered while exporting span batch, retrying in %.2fs.", + resp.reason, + backoff_seconds, + ) + sleep(backoff_seconds) + backoff_seconds *= 2 * random.uniform(0.8, 1.2) + # Not possible to reach here but the linter is complaining. return SpanExportResult.FAILURE def shutdown(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index df7c0c17ea3..85422e2b281 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from logging import WARNING from os import environ from unittest import TestCase @@ -511,26 +512,39 @@ def test_retry_timeout(self, mock_post): resp.reason = "UNAVAILABLE" mock_post.return_value = resp with self.assertLogs(level=WARNING) as warning: - # Set timeout to 1.5 seconds + before = time.time() + # Set timeout to 1.5 seconds, takes precedence over the 3.5 second class timeout. self.assertEqual( exporter.export(self.metrics["sum_int"], 1500), MetricExportResult.FAILURE, ) - # Code should return failure before the final retry which would exceed timeout. - # Code should return failure after retrying once. - self.assertEqual(len(warning.records), 1) - self.assertEqual( - "Transient error UNAVAILABLE encountered while exporting metric batch, retrying in 1s.", + after = time.time() + + # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. + self.assertEqual(mock_post.call_count, 2) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 1.3) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting metrics batch, retrying in", warning.records[0].message, ) - with self.assertLogs(level=WARNING) as warning: - # This time don't pass in a timeout, so it will fallback to 3.5 second set on class. - self.assertEqual( - exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - # 2 retrys (after 1s, 3s). - self.assertEqual(len(warning.records), 2) + mock_post.reset_mock() + before = time.time() + # This time the class level 3.5s timeout should be used. + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + after = time.time() + + # First call at time 0, second at time 1, third at time 3. + self.assertEqual(mock_post.call_count, 3) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 3.7) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting metrics batch, retrying in", + warning.records[0].message, + ) @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 1ba1db06567..d646968fa74 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -14,6 +14,7 @@ # pylint: disable=protected-access +import time import unittest from logging import WARNING from typing import List @@ -358,24 +359,21 @@ def test_retry_timeout(self, mock_post): resp.reason = "UNAVAILABLE" mock_post.return_value = resp with self.assertLogs(level=WARNING) as warning: + before = time.time() + # Set timeout to 1.5 seconds self.assertEqual( exporter.export(self._get_sdk_log_data()), LogExportResult.FAILURE, ) - # Code should return failure after retrying once. - self.assertEqual(len(warning.records), 1) - self.assertEqual( - "Transient error UNAVAILABLE encountered while exporting logs batch, retrying in 1s.", + after = time.time() + # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. + self.assertEqual(mock_post.call_count, 2) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 1.3) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting logs batch, retrying in", warning.records[0].message, ) - exporter = OTLPLogExporter(timeout=3.5) - with self.assertLogs(level=WARNING) as warning: - self.assertEqual( - exporter.export(self._get_sdk_log_data()), - LogExportResult.FAILURE, - ) - # 2 retrys (after 1s, 3s). - self.assertEqual(len(warning.records), 2) @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 2096c727462..0707f7ef73f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import unittest from logging import WARNING from unittest.mock import MagicMock, Mock, patch @@ -258,24 +259,21 @@ def test_retry_timeout(self, mock_post): resp.reason = "UNAVAILABLE" mock_post.return_value = resp with self.assertLogs(level=WARNING) as warning: + before = time.time() + # Set timeout to 1.5 seconds self.assertEqual( exporter.export([BASIC_SPAN]), SpanExportResult.FAILURE, ) - # Code should return failure after retrying once. - self.assertEqual(len(warning.records), 1) - self.assertEqual( - "Transient error UNAVAILABLE encountered while exporting span batch, retrying in 1s.", + after = time.time() + # First call at time 0, second at time 1, then an early return before the second backoff sleep b/c it would exceed timeout. + self.assertEqual(mock_post.call_count, 2) + # There's a +/-20% jitter on each backoff. + self.assertTrue(after - before < 1.3) + self.assertIn( + "Transient error UNAVAILABLE encountered while exporting span batch, retrying in", warning.records[0].message, ) - exporter = OTLPSpanExporter(timeout=3.5) - with self.assertLogs(level=WARNING) as warning: - self.assertEqual( - exporter.export([BASIC_SPAN]), - SpanExportResult.FAILURE, - ) - # 2 retrys (after 1s, 3s). - self.assertEqual(len(warning.records), 2) @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): diff --git a/uv.lock b/uv.lock index adb4be16d17..4ba10c0dd5f 100644 --- a/uv.lock +++ b/uv.lock @@ -293,10 +293,14 @@ name = "opentelemetry-exporter-otlp-proto-common" source = { editable = "exporter/opentelemetry-exporter-otlp-proto-common" } dependencies = [ { name = "opentelemetry-proto" }, + { name = "requests" }, ] [package.metadata] -requires-dist = [{ name = "opentelemetry-proto", editable = "opentelemetry-proto" }] +requires-dist = [ + { name = "opentelemetry-proto", editable = "opentelemetry-proto" }, + { name = "requests", specifier = "~=2.7" }, +] [[package]] name = "opentelemetry-exporter-otlp-proto-grpc" From 46e15f16b26ff94ff6b0b48215cae23ec7687dff Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Thu, 22 May 2025 15:38:35 +0000 Subject: [PATCH 11/11] Remove unneeded test reqs --- .../test-requirements.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt b/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt index b90eb642adb..67a94245243 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt @@ -1,7 +1,5 @@ asgiref==3.7.2 -googleapis-common-protos==1.63.2 grpcio==1.66.2 -grpcio-status==1.66.0 importlib-metadata==6.11.0 iniconfig==2.0.0 packaging==24.0