2424_UNSET = object ()
2525_PENDING_SINGLETON_CLEANUPS : deque [tuple [str , Callable [[Any ], Any ], Any ]] = deque ()
2626_PENDING_SINGLETON_CLEANUPS_LOCK = Lock ()
27+ _SCHEDULED_SINGLETON_CLEANUPS : set [asyncio .Task [None ]] = set ()
28+ _SCHEDULED_SINGLETON_CLEANUPS_LOCK = Lock ()
2729_REGISTERED_SINGLETON_CLEARERS : list [Callable [[], None ]] = []
2830_REGISTERED_SINGLETON_CLEARERS_LOCK = Lock ()
2931_cleanup_logger = logging .getLogger (__name__ )
@@ -410,6 +412,18 @@ async def _run_singleton_cleanup(name: str, cleanup: Callable[[Any], Any], value
410412 _cleanup_logger .exception ("reloadable_singleton_cleanup_failed" , extra = {"name" : name })
411413
412414
415+ def _track_singleton_cleanup_task (task : asyncio .Task [None ]) -> None :
416+ """Track one in-loop singleton cleanup task until it finishes."""
417+ with _SCHEDULED_SINGLETON_CLEANUPS_LOCK :
418+ _SCHEDULED_SINGLETON_CLEANUPS .add (task )
419+
420+ def _discard (completed_task : asyncio .Task [None ]) -> None :
421+ with _SCHEDULED_SINGLETON_CLEANUPS_LOCK :
422+ _SCHEDULED_SINGLETON_CLEANUPS .discard (completed_task )
423+
424+ task .add_done_callback (_discard )
425+
426+
413427def _schedule_singleton_cleanup (name : str , cleanup : Callable [[Any ], Any ], value : Any ) -> None :
414428 """Run cleanup immediately when possible or enqueue it for later draining."""
415429 try :
@@ -419,21 +433,50 @@ def _schedule_singleton_cleanup(name: str, cleanup: Callable[[Any], Any], value:
419433 _PENDING_SINGLETON_CLEANUPS .append ((name , cleanup , value ))
420434 return
421435
422- loop .create_task (_run_singleton_cleanup (name , cleanup , value ))
436+ task = loop .create_task (_run_singleton_cleanup (name , cleanup , value ))
437+ _track_singleton_cleanup_task (task )
438+
439+
440+ async def _await_scheduled_singleton_cleanups () -> None :
441+ """Await tracked cleanup tasks bound to the current running loop."""
442+ running_loop = asyncio .get_running_loop ()
443+
444+ while True :
445+ with _SCHEDULED_SINGLETON_CLEANUPS_LOCK :
446+ tasks = [
447+ task
448+ for task in _SCHEDULED_SINGLETON_CLEANUPS
449+ if not task .done () and task .get_loop () is running_loop
450+ ]
451+
452+ if not tasks :
453+ return
454+
455+ await asyncio .gather (* tasks )
423456
424457
425458async def flush_pending_singleton_cleanups () -> None :
426- """Drain any deferred singleton cleanup work."""
459+ """Drain deferred and in-flight singleton cleanup work."""
427460 while True :
428461 with _PENDING_SINGLETON_CLEANUPS_LOCK :
429- if not _PENDING_SINGLETON_CLEANUPS :
430- return
431462 pending = list (_PENDING_SINGLETON_CLEANUPS )
432463 _PENDING_SINGLETON_CLEANUPS .clear ()
433464
434465 for name , cleanup , value in pending :
435466 await _run_singleton_cleanup (name , cleanup , value )
436467
468+ await _await_scheduled_singleton_cleanups ()
469+
470+ with _PENDING_SINGLETON_CLEANUPS_LOCK :
471+ has_pending = bool (_PENDING_SINGLETON_CLEANUPS )
472+ with _SCHEDULED_SINGLETON_CLEANUPS_LOCK :
473+ has_scheduled = any (
474+ not task .done () and task .get_loop () is asyncio .get_running_loop ()
475+ for task in _SCHEDULED_SINGLETON_CLEANUPS
476+ )
477+ if not has_pending and not has_scheduled :
478+ return
479+
437480
438481def clear_reloadable_singletons () -> None :
439482 """Clear all registered reloadable singleton caches."""
0 commit comments