Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use threading.Events to communicate between shutdown and export #4511

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

DylanRussell
Copy link

@DylanRussell DylanRussell commented Mar 26, 2025

Description

It seems like the behavior we want for Shutdown() is:

  • Don't interrupt an in flight export RPC/Request.
  • Prevent any new export RPC/Request from being made.
  • Shutdown() to interrupt the sleep call in export, so we don't idle only to report Failure.

This PR accomplishes these via threading events.

  • We use an event for export to communicate to shutdown that an RPC is in progress, and to wait until it's done or the shutdown timeout finishes.
  • We use another event for shutdown to communicate to export that shutdown is happening, and it doesn't need to sleep.

We use these 2 events to communicate between the 2 threads. AFAIK there are only 2 threads we need to worry about, one thread where export is repeatedly called, and the main thread where shutdown is called.

Note that this PR also fixes a bug where were we were needlessly sleeping for 32 seconds only to report failure, because we would simply break out of the loop in the next iteration. I also did some minor code cleanup in the exporters in this PR.

Type of change

Please delete options that are not relevant.

  • [X ] Bug fix (non-breaking change which fixes an issue)

How Has This Been Tested?

Still need to write tests. Putting this out there now to get early feedback.

  • Test A

Does This PR Require a Contrib Repo Change?

  • [ X] No.

Checklist:

  • [X ] Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

export call is occuring, so that shutdown waits for
export call to finish.

Use threading.Event() to communicate when shutdown
is occuring, so that sleep in export is interrupted if a
shutdown is occuring.
metadata=self._headers,
timeout=self._timeout,
try:
self._export_not_occuring.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of _export_not_occuring looks like a lock to me. Is there a benefit to using an event for it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an event allows the export thread to communicate to shutdown that there is / is not a pending RPC. In Shutdown we call the wait() method that blocks until the flag is true.

The problem with the lock is export gives it up, only to immediately require it. When 2 threads ask for a lock there's no guarantee on which gets it.

If the behavior that we want is for shutdown to block for any pending RPC and otherwise execute I think an event is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but if you're doing

while:
  if shutdown_occuring.is_set(): return
  
  event.clear()
  export()
  event.set()

there is no guarantee that the thing waiting for the event will have run and set shutdown_occuring before export() gets called again. I think even switching to a lock doesn't necessarily solve everything. Might need to rethink the approach a little.

delay,
)
self._shutdown_occuring.wait(delay)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this would be a little clearer to just return here

Comment on lines +167 to +169
def run(self):
if self._target is not None: # type: ignore
self._return = self._target(*self._args, **self._kwargs) # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include the cleanup from the original run function or is that not a concern here?

Suggested change
def run(self):
if self._target is not None: # type: ignore
self._return = self._target(*self._args, **self._kwargs) # type: ignore
try:
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs

Comment on lines +171 to +172
def join(self, *args): # type: ignore
threading.Thread.join(self, *args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we avoid type ignore by explicitly passing the expected type?

Suggested change
def join(self, *args): # type: ignore
threading.Thread.join(self, *args)
def join(self, timeout: float | None = None) -> Any:
threading.Thread.join(self, timeout=timeout)

# value will remain constant.
for delay in _create_exp_backoff_generator(max_value=max_value):
if delay == max_value or self._shutdown:
for delay in [1, 2, 4, 8, 16, 32]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it include 64 as well like max_value before?

metadata=self._headers,
timeout=self._timeout,
try:
self._export_not_occuring.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but if you're doing

while:
  if shutdown_occuring.is_set(): return
  
  event.clear()
  export()
  event.set()

there is no guarantee that the thing waiting for the event will have run and set shutdown_occuring before export() gets called again. I think even switching to a lock doesn't necessarily solve everything. Might need to rethink the approach a little.


return self._result.FAILURE

return self._result.FAILURE

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
if self._shutdown_occuring.is_set():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already had the same problem, but shutdown() is not thread safe. I guess for this PR we can assume only one thread calls it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants