Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use threading.Events to communicate between shutdown and export #4511

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:

@property
def _exporting(self) -> str:
return "logs"
return "logs"
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand Down Expand Up @@ -47,7 +46,6 @@
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
Expand Down Expand Up @@ -258,8 +256,9 @@ def __init__(
)
self._client = self._stub(self._channel)

self._export_lock = threading.Lock()
self._shutdown = False
self._export_not_occuring = threading.Event()
self._export_not_occuring.set()
self._shutdown_occuring = threading.Event()

@abstractmethod
def _translate_data(
Expand All @@ -285,82 +284,80 @@ def _export(
# data.__class__.__name__,
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
for delay in [1, 2, 4, 8, 16, 32]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it include 64 as well like max_value before?

if self._shutdown_occuring.is_set():
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
try:
self._export_not_occuring.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of _export_not_occuring looks like a lock to me. Is there a benefit to using an event for it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an event allows the export thread to communicate to shutdown that there is / is not a pending RPC. In Shutdown we call the wait() method that blocks until the flag is true.

The problem with the lock is export gives it up, only to immediately require it. When 2 threads ask for a lock there's no guarantee on which gets it.

If the behavior that we want is for shutdown to block for any pending RPC and otherwise execute I think an event is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but if you're doing

while:
  if shutdown_occuring.is_set(): return
  
  event.clear()
  export()
  event.set()

there is no guarantee that the thing waiting for the event will have run and set shutdown_occuring before export() gets called again. I think even switching to a lock doesn't necessarily solve everything. Might need to rethink the approach a little.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be some delay for the shutdown thread to receive that notification and set shutdown_occurring, but that must be really small.

I'm sure it's less than the sleeps in the retry loop (I think my test covers this, but I'll double check). I can probably test and see exactly how small that delay is. Conceivably a new export call could occur in the milliseconds or nanoseconds it takes. If that happens shutdown will have proceeded and closed the channel which will interrupt this export call. I don't think it's that bad for this to happen, and it's unlikely. Still an improvement on the current behavior IMO.

self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
self._export_not_occuring.set()

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
# No more retry will happen. Return failure.
if delay == 32:
return self._result.FAILURE
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
self._shutdown_occuring.wait(delay)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this would be a little clearer to just return here

else:
# Should not be possible ?
if error.code() == StatusCode.OK:
return self._result.SUCCESS
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
if self._shutdown_occuring.is_set():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already had the same problem, but shutdown() is not thread safe. I guess for this PR we can assume only one thread calls it.

logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_lock.acquire(timeout=timeout_millis / 1e3)
self._shutdown = True
self._export_not_occuring.wait(timeout=timeout_millis / 1e3)
self._channel.close()
self._export_lock.release()
self._shutdown_occuring.set()

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@

import gzip
import logging
import threading
import zlib
from io import BytesIO
from os import environ
from time import sleep
from typing import Dict, Optional, Sequence

import requests

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
)
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.proto.http import (
_OTLP_HTTP_HEADERS,
Expand Down Expand Up @@ -63,8 +60,6 @@


class OTLPLogExporter(LogExporter):
_MAX_RETRY_TIMEOUT = 64

def __init__(
self,
endpoint: Optional[str] = None,
Expand Down Expand Up @@ -121,7 +116,9 @@ def __init__(
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
self._shutdown = False
self._export_not_occuring = threading.Event()
self._export_not_occuring.set()
self._shutdown_occuring = threading.Event()

def _export(self, serialized_data: bytes):
data = serialized_data
Expand Down Expand Up @@ -150,31 +147,26 @@ def _retryable(resp: requests.Response) -> bool:
return False

def export(self, batch: Sequence[LogData]) -> LogExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return LogExportResult.FAILURE

serialized_data = encode_logs(batch).SerializeToString()

for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):
if delay == self._MAX_RETRY_TIMEOUT:
for delay in [1, 2, 4, 8, 16, 32]:
if self._shutdown_occuring.is_set():
_logger.warning("Exporter already shutdown, ignoring batch")
return LogExportResult.FAILURE

self._export_not_occuring.clear()
resp = self._export(serialized_data)
self._export_not_occuring.set()
# pylint: disable=no-else-return
if resp.ok:
return LogExportResult.SUCCESS
elif self._retryable(resp):
if delay == 32:
return LogExportResult.FAILURE
_logger.warning(
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
resp.reason,
delay,
)
sleep(delay)
self._shutdown_occuring.wait(delay)
continue
else:
_logger.error(
Expand All @@ -189,12 +181,14 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
return True

def shutdown(self):
if self._shutdown:
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
if self._shutdown_occuring.is_set():
_logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_not_occuring.wait(timeout=timeout_millis / 1e3)
self._shutdown_occuring.set()
self._session.close()
self._shutdown = True


def _compression_from_env() -> Compression:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import gzip
import logging
import threading
import zlib
from io import BytesIO
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand All @@ -31,7 +31,6 @@
from deprecated import deprecated

from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
Expand Down Expand Up @@ -100,8 +99,6 @@


class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin):
_MAX_RETRY_TIMEOUT = 64

def __init__(
self,
endpoint: str | None = None,
Expand Down Expand Up @@ -164,6 +161,9 @@ def __init__(
self._common_configuration(
preferred_temporality, preferred_aggregation
)
self._export_not_occuring = threading.Event()
self._export_not_occuring.set()
self._shutdown_occuring = threading.Event()

def _export(self, serialized_data: bytes):
data = serialized_data
Expand Down Expand Up @@ -197,24 +197,26 @@ def export(
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
serialized_data = encode_metrics(metrics_data)
for delay in _create_exp_backoff_generator(
max_value=self._MAX_RETRY_TIMEOUT
):
if delay == self._MAX_RETRY_TIMEOUT:
serialized_data = encode_metrics(metrics_data).SerializeToString()
for delay in [1, 2, 4, 8, 16, 32]:
if self._shutdown_occuring.is_set():
_logger.warning("Exporter already shutdown, ignoring batch")
return MetricExportResult.FAILURE

resp = self._export(serialized_data.SerializeToString())
self._export_not_occuring.clear()
resp = self._export(serialized_data)
self._export_not_occuring.set()
# pylint: disable=no-else-return
if resp.ok:
return MetricExportResult.SUCCESS
elif self._retryable(resp):
if delay == 32:
return MetricExportResult.FAILURE
_logger.warning(
"Transient error %s encountered while exporting metric batch, retrying in %ss.",
resp.reason,
delay,
)
sleep(delay)
self._shutdown_occuring.wait(delay)
continue
else:
_logger.error(
Expand All @@ -225,8 +227,14 @@ def export(
return MetricExportResult.FAILURE
return MetricExportResult.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
if self._shutdown_occuring.is_set():
_logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_not_occuring.wait(timeout=timeout_millis / 1e3)
self._shutdown_occuring.set()
self._session.close()

@property
def _exporting(self) -> str:
Expand Down
Loading
Loading