Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use threading.Events to communicate between shutdown and export #4511

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

import logging
import threading
from collections.abc import Sequence
from itertools import count
from typing import (
Expand Down Expand Up @@ -151,6 +152,27 @@ def _encode_attributes(
return pb2_attributes


class ThreadWithReturnValue(threading.Thread):
def __init__(
self,
group=None,
target=None,
name=None,
args=(),
kwargs=None,
):
threading.Thread.__init__(self, group, target, name, args, kwargs)
self._return = None

def run(self):
if self._target is not None: # type: ignore
self._return = self._target(*self._args, **self._kwargs) # type: ignore
Comment on lines +167 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include the cleanup from the original run function or is that not a concern here?

Suggested change
def run(self):
if self._target is not None: # type: ignore
self._return = self._target(*self._args, **self._kwargs) # type: ignore
try:
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs


def join(self, *args): # type: ignore
threading.Thread.join(self, *args)
Comment on lines +171 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we avoid type ignore by explicitly passing the expected type?

Suggested change
def join(self, *args): # type: ignore
threading.Thread.join(self, *args)
def join(self, timeout: float | None = None) -> Any:
threading.Thread.join(self, timeout=timeout)

return self._return


def _get_resource_data(
sdk_resource_scope_data: Dict[Resource, _ResourceDataT],
resource_class: Callable[..., _TypingResourceT],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:

@property
def _exporting(self) -> str:
return "logs"
return "logs"
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections.abc import Sequence # noqa: F401
from logging import getLogger
from os import environ
from time import sleep
from typing import ( # noqa: F401
Any,
Callable,
Expand Down Expand Up @@ -47,7 +46,6 @@
ssl_channel_credentials,
)
from opentelemetry.exporter.otlp.proto.common._internal import (
_create_exp_backoff_generator,
_get_resource_data,
)
from opentelemetry.exporter.otlp.proto.grpc import (
Expand Down Expand Up @@ -175,7 +173,7 @@ def _get_credentials(
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
):
"""OTLP span exporter
"""OTLP exporter Mixin class.

Args:
endpoint: OpenTelemetry Collector receiver endpoint
Expand Down Expand Up @@ -258,8 +256,9 @@ def __init__(
)
self._client = self._stub(self._channel)

self._export_lock = threading.Lock()
self._shutdown = False
self._export_not_occuring = threading.Event()
self._export_not_occuring.set()
self._shutdown_occuring = threading.Event()

@abstractmethod
def _translate_data(
Expand All @@ -272,7 +271,7 @@ def _export(
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
if self._shutdown_occuring.is_set():
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

Expand All @@ -285,82 +284,82 @@ def _export(
# data.__class__.__name__,
# delay,
# )
max_value = 64
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
for delay in [1, 2, 4, 8, 16, 32]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it include 64 as well like max_value before?

if self._shutdown_occuring.is_set():
return self._result.FAILURE

with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
try:
self._export_not_occuring.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of _export_not_occuring looks like a lock to me. Is there a benefit to using an event for it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an event allows the export thread to communicate to shutdown that there is / is not a pending RPC. In Shutdown we call the wait() method that blocks until the flag is true.

The problem with the lock is export gives it up, only to immediately require it. When 2 threads ask for a lock there's no guarantee on which gets it.

If the behavior that we want is for shutdown to block for any pending RPC and otherwise execute I think an event is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but if you're doing

while:
  if shutdown_occuring.is_set(): return
  
  event.clear()
  export()
  event.set()

there is no guarantee that the thing waiting for the event will have run and set shutdown_occuring before export() gets called again. I think even switching to a lock doesn't necessarily solve everything. Might need to rethink the approach a little.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be some delay for the shutdown thread to receive that notification and set shutdown_occurring, but that must be really small.

I'm sure it's less than the sleeps in the retry loop (I think my test covers this, but I'll double check). I can probably test and see exactly how small that delay is. Conceivably a new export call could occur in the milliseconds or nanoseconds it takes. If that happens shutdown will have proceeded and closed the channel which will interrupt this export call. I don't think it's that bad for this to happen, and it's unlikely. Still an improvement on the current behavior IMO.

self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
self._export_not_occuring.set()

return self._result.SUCCESS

except RpcError as error:
# Important to set it here, b/c if Export fails the set() call above is not teached.
self._export_not_occuring.set()
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
# No more retry will happen. Return failure.
if delay == 32:
return self._result.FAILURE
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)

return self._result.SUCCESS

except RpcError as error:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s to %s, retrying in %ss."
),
error.code(),
self._exporting,
self._endpoint,
delay,
)
self._shutdown_occuring.wait(delay)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this would be a little clearer to just return here

else:
# Should not be possible ?
if error.code() == StatusCode.OK:
return self._result.SUCCESS
logger.error(
"Failed to export %s to %s, error code: %s",
self._exporting,
self._endpoint,
error.code(),
exc_info=error.code() == StatusCode.UNKNOWN,
)

return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
if self._shutdown_occuring.is_set():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already had the same problem, but shutdown() is not thread safe. I guess for this PR we can assume only one thread calls it.

logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_lock.acquire(timeout=timeout_millis / 1e3)
self._shutdown = True
self._export_not_occuring.wait(timeout=timeout_millis / 1e3)
self._channel.close()
self._export_lock.release()
self._shutdown_occuring.set()

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def _translate_data(
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)
def shutdown(self, timeout_millis: int = 30000) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis)

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
Expand Down
Loading