diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py index d1793a734ad..abb5ab3cc11 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py @@ -16,6 +16,7 @@ from __future__ import annotations import logging +import threading from collections.abc import Sequence from itertools import count from typing import ( @@ -151,6 +152,27 @@ def _encode_attributes( return pb2_attributes +class ThreadWithReturnValue(threading.Thread): + def __init__( + self, + group=None, + target=None, + name=None, + args=(), + kwargs=None, + ): + threading.Thread.__init__(self, group, target, name, args, kwargs) + self._return = None + + def run(self): + if self._target is not None: # type: ignore + self._return = self._target(*self._args, **self._kwargs) # type: ignore + + def join(self, *args): # type: ignore + threading.Thread.join(self, *args) + return self._return + + def _get_resource_data( sdk_resource_scope_data: Dict[Resource, _ResourceDataT], resource_class: Callable[..., _TypingResourceT], diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 8f629899d77..22fbb44e7ef 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -119,4 +119,4 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: @property def _exporting(self) -> str: - return "logs" + return "logs" \ No newline at end of file diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 4be75c5335e..e65dbf645fc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -19,7 +19,6 @@ from collections.abc import Sequence # noqa: F401 from logging import getLogger from os import environ -from time import sleep from typing import ( # noqa: F401 Any, Callable, @@ -47,7 +46,6 @@ ssl_channel_credentials, ) from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, _get_resource_data, ) from opentelemetry.exporter.otlp.proto.grpc import ( @@ -175,7 +173,7 @@ def _get_credentials( class OTLPExporterMixin( ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT] ): - """OTLP span exporter + """OTLP exporter Mixin class. Args: endpoint: OpenTelemetry Collector receiver endpoint @@ -258,8 +256,9 @@ def __init__( ) self._client = self._stub(self._channel) - self._export_lock = threading.Lock() - self._shutdown = False + self._export_not_occuring = threading.Event() + self._export_not_occuring.set() + self._shutdown_occuring = threading.Event() @abstractmethod def _translate_data( @@ -272,7 +271,7 @@ def _export( ) -> ExportResultT: # After the call to shutdown, subsequent calls to Export are # not allowed and should return a Failure result. - if self._shutdown: + if self._shutdown_occuring.is_set(): logger.warning("Exporter already shutdown, ignoring batch") return self._result.FAILURE @@ -285,82 +284,82 @@ def _export( # data.__class__.__name__, # delay, # ) - max_value = 64 - # expo returns a generator that yields delay values which grow - # exponentially. Once delay is greater than max_value, the yielded - # value will remain constant. - for delay in _create_exp_backoff_generator(max_value=max_value): - if delay == max_value or self._shutdown: + for delay in [1, 2, 4, 8, 16, 32]: + if self._shutdown_occuring.is_set(): return self._result.FAILURE - - with self._export_lock: - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, + try: + self._export_not_occuring.clear() + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=self._timeout, + ) + self._export_not_occuring.set() + + return self._result.SUCCESS + + except RpcError as error: + # Important to set it here, b/c if Export fails the set() call above is not teached. + self._export_not_occuring.set() + if error.code() in [ + StatusCode.CANCELLED, + StatusCode.DEADLINE_EXCEEDED, + StatusCode.RESOURCE_EXHAUSTED, + StatusCode.ABORTED, + StatusCode.OUT_OF_RANGE, + StatusCode.UNAVAILABLE, + StatusCode.DATA_LOSS, + ]: + # No more retry will happen. Return failure. + if delay == 32: + return self._result.FAILURE + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" ) - - return self._result.SUCCESS - - except RpcError as error: - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s to %s, retrying in %ss." - ), - error.code(), - self._exporting, - self._endpoint, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s to %s, error code: %s", - self._exporting, - self._endpoint, - error.code(), - exc_info=error.code() == StatusCode.UNKNOWN, + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + delay = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 ) + logger.warning( + ( + "Transient error %s encountered while exporting " + "%s to %s, retrying in %ss." + ), + error.code(), + self._exporting, + self._endpoint, + delay, + ) + self._shutdown_occuring.wait(delay) + continue + else: + # Should not be possible ? if error.code() == StatusCode.OK: return self._result.SUCCESS + logger.error( + "Failed to export %s to %s, error code: %s", + self._exporting, + self._endpoint, + error.code(), + exc_info=error.code() == StatusCode.UNKNOWN, + ) return self._result.FAILURE return self._result.FAILURE def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - if self._shutdown: + if self._shutdown_occuring.is_set(): logger.warning("Exporter already shutdown, ignoring call") return # wait for the last export if any - self._export_lock.acquire(timeout=timeout_millis / 1e3) - self._shutdown = True + self._export_not_occuring.wait(timeout=timeout_millis / 1e3) self._channel.close() - self._export_lock.release() + self._shutdown_occuring.set() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index c78c1b81bb6..333fe78d210 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -142,8 +142,8 @@ def _translate_data( def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) - def shutdown(self) -> None: - OTLPExporterMixin.shutdown(self) + def shutdown(self, timeout_millis: int = 30000) -> None: + OTLPExporterMixin.shutdown(self, timeout_millis) def force_flush(self, timeout_millis: int = 30000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index d9b02611a07..5a7f471fd49 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -12,11 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading +import time +from concurrent.futures import ThreadPoolExecutor from logging import WARNING -from time import time_ns -from types import MethodType -from typing import Sequence from unittest import TestCase from unittest.mock import Mock, patch @@ -26,20 +24,95 @@ from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module RetryInfo, ) -from grpc import Compression +from grpc import Compression, server +from opentelemetry.exporter.otlp.proto.common._internal import ( + ThreadWithReturnValue, +) from opentelemetry.exporter.otlp.proto.grpc.exporter import ( - ExportServiceRequestT, InvalidCompressionValueException, - OTLPExporterMixin, - RpcError, - SDKDataT, StatusCode, environ_to_compression, ) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceResponse, +) +from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import ( + TraceServiceServicer, + add_TraceServiceServicer_to_server, +) +from opentelemetry.sdk.trace import _Span +from opentelemetry.sdk.trace.export import ( + SpanExportResult, +) + + +class TraceServiceServicerWithExportParams(TraceServiceServicer): + def __init__( + self, + export_result, + optional_export_sleep=None, + optional_export_retry=None, + ): + self.export_result = export_result + self.optional_export_sleep = optional_export_sleep + self.optional_export_retry = optional_export_retry + + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + if self.optional_export_sleep: + time.sleep(self.optional_export_sleep) + if self.optional_export_retry: + context.send_initial_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo().SerializeToString(), + ), + ) + ) + context.set_trailing_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo( + retry_delay=Duration( + seconds=self.optional_export_retry + ) + ).SerializeToString(), + ), + ) + ) + context.set_code(self.export_result) + + return ExportTraceServiceResponse() class TestOTLPExporterMixin(TestCase): + def setUp(self): + self.server = server(ThreadPoolExecutor(max_workers=10)) + + self.server.add_insecure_port("127.0.0.1:4317") + + self.server.start() + self.exporter = OTLPSpanExporter(insecure=True) + self.span = _Span( + "a", + context=Mock( + **{ + "trace_state": {"a": "b", "c": "d"}, + "span_id": 10217189687419569865, + "trace_id": 67545097771067222548457157018666467027, + } + ), + ) + + def tearDown(self): + self.server.stop(None) + def test_environ_to_compression(self): with patch.dict( "os.environ", @@ -64,91 +137,15 @@ def test_environ_to_compression(self): with self.assertRaises(InvalidCompressionValueException): environ_to_compression("test_invalid") - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - def test_export_warning(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [0]}) - - rpc_error = RpcError() - - def code(self): - return None - - rpc_error.code = MethodType(code, rpc_error) - - class OTLPMockExporter(OTLPExporterMixin): - _result = Mock() - _stub = Mock( - **{"return_value": Mock(**{"Export.side_effect": rpc_error})} - ) - - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass - - @property - def _exporting(self) -> str: - return "mock" - - otlp_mock_exporter = OTLPMockExporter() - - with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - otlp_mock_exporter._export(Mock()) - self.assertEqual( - warning.records[0].message, - "Failed to export mock to localhost:4317, error code: None", - ) - - def code(self): # pylint: disable=function-redefined - return StatusCode.CANCELLED - - def trailing_metadata(self): - return {} - - rpc_error.code = MethodType(code, rpc_error) - rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) - - with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - otlp_mock_exporter._export([]) - self.assertEqual( - warning.records[0].message, - ( - "Transient error StatusCode.CANCELLED encountered " - "while exporting mock to localhost:4317, retrying in 0s." - ), - ) - - def test_shutdown(self): - result_mock = Mock() - - class OTLPMockExporter(OTLPExporterMixin): - _result = result_mock - _stub = Mock(**{"return_value": Mock()}) - - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass - - @property - def _exporting(self) -> str: - return "mock" - - otlp_mock_exporter = OTLPMockExporter() - + def test_ignore_export_after_shutdown(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK), + self.server, + ) + self.exporter.shutdown() with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.SUCCESS - ) - otlp_mock_exporter.shutdown() - # pylint: disable=protected-access self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.FAILURE + self.exporter.export([self.span]), SpanExportResult.FAILURE ) self.assertEqual( warning.records[0].message, @@ -156,55 +153,91 @@ def _exporting(self) -> str: ) def test_shutdown_wait_last_export(self): - result_mock = Mock() - rpc_error = RpcError() - - def code(self): - return StatusCode.UNAVAILABLE - - def trailing_metadata(self): - return { - "google.rpc.retryinfo-bin": RetryInfo( - retry_delay=Duration(nanos=int(1e7)) - ).SerializeToString() - } - - rpc_error.code = MethodType(code, rpc_error) - rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) - - class OTLPMockExporter(OTLPExporterMixin): - _result = result_mock - _stub = Mock( - **{"return_value": Mock(**{"Export.side_effect": rpc_error})} - ) + # Shutdown waits 30 seconds for a pending Export call to finish. + # A 5 second delay is added, so it's expected that export will finish. + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK, 5), + self.server, + ) - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass + export_thread = ThreadWithReturnValue( + target=self.exporter.export, args=([self.span],) + ) + export_thread.start() + # Wait a sec for the export call. + time.sleep(1) + # pylint: disable=protected-access + self.assertFalse(self.exporter._export_not_occuring.is_set()) + # Should block until export is finished + self.exporter.shutdown() + # pylint: disable=protected-access + self.assertTrue(self.exporter._export_not_occuring.is_set()) + # pylint: disable=protected-access + self.assertTrue(self.exporter._shutdown_occuring.is_set()) + export_result = export_thread.join() + self.assertEqual(export_result, SpanExportResult.SUCCESS) + + def test_shutdown_doesnot_wait_last_export(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK, 35), + self.server, + ) - @property - def _exporting(self) -> str: - return "mock" + export_thread = ThreadWithReturnValue( + target=self.exporter.export, args=([self.span],) + ) + export_thread.start() + # Wait a sec for exporter to get lock and make export call. + time.sleep(1) + # pylint: disable=protected-access + self.assertFalse(self.exporter._export_not_occuring.is_set()) + # Set to 6 seconds, so the 35 second server-side delay will not be reached. + self.exporter.shutdown(timeout_millis=6000) + # pylint: disable=protected-access + self.assertFalse(self.exporter._export_not_occuring.is_set()) + # pylint: disable=protected-access + self.assertTrue(self.exporter._shutdown_occuring.is_set()) + export_result = export_thread.join() + self.assertEqual(export_result, SpanExportResult.FAILURE) + + def test_shutdown_interrupts_export_sleep(self): + # Returns unavailable and asks for a 20 second sleep before retry. + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams( + StatusCode.UNAVAILABLE, 0, 20 + ), + self.server, + ) - otlp_mock_exporter = OTLPMockExporter() + export_thread = ThreadWithReturnValue( + target=self.exporter.export, args=([self.span],) + ) + export_thread.start() + # Wait a sec for call to fail and export sleep to begin. + time.sleep(1) + begin_wait = time.time_ns() + # pylint: disable=protected-access + self.assertTrue(self.exporter._export_not_occuring.is_set()) + self.exporter.shutdown() + self.assertTrue(self.exporter._shutdown_occuring.is_set()) + # pylint: disable=protected-access + export_result = export_thread.join() + end_wait = time.time_ns() + self.assertEqual(export_result, SpanExportResult.FAILURE) + # Less than a second for export to finish, because the 20 second sleep is interurpted by shutdown event. + self.assertTrue((end_wait - begin_wait) < 1e9) + def test_export_over_closed_grpc_channel(self): # pylint: disable=protected-access - export_thread = threading.Thread( - target=otlp_mock_exporter._export, args=({},) + + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.OK), + self.server, + ) + self.exporter.shutdown() + data = self.exporter._translate_data([self.span]) + with self.assertRaises(ValueError) as err: + self.exporter._client.Export(request=data) + self.assertEqual( + str(err.exception), "Cannot invoke RPC on closed channel!" ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._export_lock.locked()) - # delay is 1 second while the default shutdown timeout is 30_000 milliseconds - start_time = time_ns() - otlp_mock_exporter.shutdown() - now = time_ns() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(otlp_mock_exporter._export_lock.locked()) - finally: - export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 21b877380c8..ebf9d56214d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -14,17 +14,14 @@ import gzip import logging +import threading import zlib from io import BytesIO from os import environ -from time import sleep from typing import Dict, Optional, Sequence import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, -) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( _OTLP_HTTP_HEADERS, @@ -63,8 +60,6 @@ class OTLPLogExporter(LogExporter): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: Optional[str] = None, @@ -121,7 +116,9 @@ def __init__( self._session.headers.update( {"Content-Encoding": self._compression.value} ) - self._shutdown = False + self._export_not_occuring = threading.Event() + self._export_not_occuring.set() + self._shutdown_occuring = threading.Event() def _export(self, serialized_data: bytes): data = serialized_data @@ -150,31 +147,26 @@ def _retryable(resp: requests.Response) -> bool: return False def export(self, batch: Sequence[LogData]) -> LogExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring batch") - return LogExportResult.FAILURE - serialized_data = encode_logs(batch).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: + for delay in [1, 2, 4, 8, 16, 32]: + if self._shutdown_occuring.is_set(): + _logger.warning("Exporter already shutdown, ignoring batch") return LogExportResult.FAILURE - + self._export_not_occuring.clear() resp = self._export(serialized_data) + self._export_not_occuring.set() # pylint: disable=no-else-return if resp.ok: return LogExportResult.SUCCESS elif self._retryable(resp): + if delay == 32: + return LogExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting logs batch, retrying in %ss.", resp.reason, delay, ) - sleep(delay) + self._shutdown_occuring.wait(delay) continue else: _logger.error( @@ -189,12 +181,14 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" return True - def shutdown(self): - if self._shutdown: + def shutdown(self, timeout_millis: float = 30_000, **kwargs): + if self._shutdown_occuring.is_set(): _logger.warning("Exporter already shutdown, ignoring call") return + # wait for the last export if any + self._export_not_occuring.wait(timeout=timeout_millis / 1e3) + self._shutdown_occuring.set() self._session.close() - self._shutdown = True def _compression_from_env() -> Compression: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 00f429e4c97..155a9b847c4 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -14,10 +14,10 @@ import gzip import logging +import threading import zlib from io import BytesIO from os import environ -from time import sleep from typing import ( # noqa: F401 Any, Callable, @@ -31,7 +31,6 @@ from deprecated import deprecated from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, _get_resource_data, ) from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( @@ -100,8 +99,6 @@ class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): - _MAX_RETRY_TIMEOUT = 64 - def __init__( self, endpoint: str | None = None, @@ -164,6 +161,9 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._export_not_occuring = threading.Event() + self._export_not_occuring.set() + self._shutdown_occuring = threading.Event() def _export(self, serialized_data: bytes): data = serialized_data @@ -197,24 +197,26 @@ def export( timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: - serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: + serialized_data = encode_metrics(metrics_data).SerializeToString() + for delay in [1, 2, 4, 8, 16, 32]: + if self._shutdown_occuring.is_set(): + _logger.warning("Exporter already shutdown, ignoring batch") return MetricExportResult.FAILURE - - resp = self._export(serialized_data.SerializeToString()) + self._export_not_occuring.clear() + resp = self._export(serialized_data) + self._export_not_occuring.set() # pylint: disable=no-else-return if resp.ok: return MetricExportResult.SUCCESS elif self._retryable(resp): + if delay == 32: + return MetricExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting metric batch, retrying in %ss.", resp.reason, delay, ) - sleep(delay) + self._shutdown_occuring.wait(delay) continue else: _logger.error( @@ -225,8 +227,14 @@ def export( return MetricExportResult.FAILURE return MetricExportResult.FAILURE - def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + def shutdown(self, timeout_millis: float = 30_000, **kwargs): + if self._shutdown_occuring.is_set(): + _logger.warning("Exporter already shutdown, ignoring call") + return + # wait for the last export if any + self._export_not_occuring.wait(timeout=timeout_millis / 1e3) + self._shutdown_occuring.set() + self._session.close() @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 7bcf4b4ced1..5be2c7593f6 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -14,17 +14,14 @@ import gzip import logging +import threading import zlib from io import BytesIO from os import environ -from time import sleep from typing import Dict, Optional import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, -) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, ) @@ -118,7 +115,9 @@ def __init__( self._session.headers.update( {"Content-Encoding": self._compression.value} ) - self._shutdown = False + self._export_not_occuring = threading.Event() + self._export_not_occuring.set() + self._shutdown_occuring = threading.Event() def _export(self, serialized_data: bytes): data = serialized_data @@ -149,24 +148,27 @@ def _retryable(resp: requests.Response) -> bool: def _serialize_spans(self, spans): return encode_spans(spans).SerializePartialToString() - def _export_serialized_spans(self, serialized_data): - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - if delay == self._MAX_RETRY_TIMEOUT: + def export(self, spans) -> SpanExportResult: + serialized_data = self._serialize_spans(spans) + for delay in [1, 2, 4, 8, 16, 32]: + if self._shutdown_occuring.is_set(): + _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE - + self._export_not_occuring.clear() resp = self._export(serialized_data) + self._export_not_occuring.set() # pylint: disable=no-else-return if resp.ok: return SpanExportResult.SUCCESS elif self._retryable(resp): + if delay == 32: + return SpanExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting span batch, retrying in %ss.", resp.reason, delay, ) - sleep(delay) + self._shutdown_occuring.wait(delay) continue else: _logger.error( @@ -177,23 +179,14 @@ def _export_serialized_spans(self, serialized_data): return SpanExportResult.FAILURE return SpanExportResult.FAILURE - def export(self, spans) -> SpanExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. - if self._shutdown: - _logger.warning("Exporter already shutdown, ignoring batch") - return SpanExportResult.FAILURE - - serialized_data = self._serialize_spans(spans) - - return self._export_serialized_spans(serialized_data) - - def shutdown(self): - if self._shutdown: + def shutdown(self, timeout_millis: float = 30_000, **kwargs): + if self._shutdown_occuring.is_set(): _logger.warning("Exporter already shutdown, ignoring call") return + # wait for the last export if any + self._export_not_occuring.wait(timeout=timeout_millis / 1e3) + self._shutdown_occuring.set() self._session.close() - self._shutdown = True def force_flush(self, timeout_millis: int = 30000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 16bb3e54286..56f2dfc1e0d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from logging import WARNING from os import environ from unittest import TestCase @@ -21,6 +22,9 @@ from requests.models import Response from responses import POST, activate, add +from opentelemetry.exporter.otlp.proto.common._internal import ( + ThreadWithReturnValue, +) from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, ) @@ -296,6 +300,35 @@ def test_success(self, mock_post): MetricExportResult.SUCCESS, ) + @patch.object(Session, "post") + def test_shutdown_doesnot_wait_last_export(self, mock_post): + resp = Response() + resp.status_code = 401 + + def fake_post(**kwargs): + time.sleep(35) + return resp + + mock_post.side_effect = fake_post + exporter = OTLPMetricExporter() + + export_thread = ThreadWithReturnValue( + target=exporter.export, args=(self.metrics["sum_int"],) + ) + export_thread.start() + # Wait a sec for exporter to make export call + time.sleep(1) + # pylint: disable=protected-access + self.assertFalse(exporter._export_not_occuring.is_set()) + # Set to 6 seconds, so the 35 second server-side delay will not be reached. + exporter.shutdown(timeout_millis=6000) + # pylint: disable=protected-access + self.assertFalse(exporter._export_not_occuring.is_set()) + # pylint: disable=protected-access + self.assertTrue(exporter._shutdown_occuring.is_set()) + export_result = export_thread.join() + self.assertEqual(export_result, MetricExportResult.FAILURE) + @patch.object(Session, "post") def test_failure(self, mock_post): resp = Response() @@ -332,8 +365,7 @@ def test_serialization(self, mock_post): ) @activate - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): + def test_exponential_backoff(self): # return a retryable error add( POST, @@ -346,11 +378,11 @@ def test_exponential_backoff(self, mock_sleep): endpoint="http://metrics.example.com/export" ) metrics_data = self.metrics["sum_int"] - - exporter.export(metrics_data) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object(exporter._shutdown_occuring, "wait") as wait_mock: + exporter.export(metrics_data) + wait_mock.assert_has_calls( + [call(1), call(2), call(4), call(8), call(16)] + ) def test_aggregation_temporality(self): otlp_metric_exporter = OTLPMetricExporter()