Skip to content

Commit 1b266e5

Browse files
committed
fix: Replace atexit with async-compatible join_pending_shutdown
Remove atexit handler mechanism and replace with explicit join function: - Add join_pending_shutdown(py) that blocks with GIL released - Remove PENDING_CLEANUPS, ATEXIT_REGISTERED, and related code - Add PENDING_SHUTDOWN to store single thread handle for joining The new approach allows Python code to await the shutdown via asyncio.to_thread(), ensuring cleanup completes within the async context before the event loop closes.
1 parent a1f372e commit 1b266e5

File tree

1 file changed

+49
-72
lines changed

1 file changed

+49
-72
lines changed

src/tokio.rs

Lines changed: 49 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
//! ```
1515
1616
use std::cell::OnceCell;
17-
use std::sync::atomic::{AtomicBool, Ordering};
1817
use std::sync::{Arc, Mutex, OnceLock};
1918
use std::thread::{self, JoinHandle};
2019
use std::{future::Future, pin::Pin};
@@ -27,7 +26,6 @@ use ::tokio::{
2726
use once_cell::sync::Lazy;
2827
use parking_lot::RwLock;
2928
use pyo3::prelude::*;
30-
use pyo3::types::PyCFunction;
3129

3230
use crate::{
3331
generic::{self, ContextExt, LocalContextExt, Runtime as GenericRuntime, SpawnLocalExt},
@@ -184,70 +182,9 @@ static TOKIO_BUILDER: Lazy<Mutex<Builder>> = Lazy::new(|| Mutex::new(multi_threa
184182
static RUNTIME_WRAPPER: RwLock<Option<Arc<RuntimeWrapper>>> = RwLock::new(None);
185183
static TOKIO_RUNTIME: OnceLock<&'static Runtime> = OnceLock::new();
186184

187-
/// Pending runtime threads that need cleanup at process exit.
188-
/// Used by request_shutdown_background to defer thread joining until Python's atexit.
189-
static PENDING_CLEANUPS: Lazy<Mutex<Vec<JoinHandle<()>>>> = Lazy::new(|| Mutex::new(Vec::new()));
190-
/// Flag to track if we've registered the atexit handler.
191-
static ATEXIT_REGISTERED: AtomicBool = AtomicBool::new(false);
192-
193-
/// Join all pending runtime threads.
194-
///
195-
/// This function is called from Python's atexit to ensure all runtime threads
196-
/// are properly joined before Python finalizes. It should be registered via
197-
/// [`register_atexit_cleanup`] during module initialization.
198-
fn join_pending_cleanups() {
199-
let threads: Vec<JoinHandle<()>> = {
200-
let mut guard = PENDING_CLEANUPS.lock().unwrap();
201-
std::mem::take(&mut *guard)
202-
};
203-
204-
for thread in threads {
205-
let _ = thread.join();
206-
}
207-
}
208-
209-
/// Register the atexit cleanup handler for this module.
210-
///
211-
/// Call this function during your `#[pymodule]` initialization to ensure
212-
/// tokio runtime threads are properly cleaned up before Python finalizes.
213-
///
214-
/// # Example
215-
///
216-
/// ```rust,ignore
217-
/// use pyo3::prelude::*;
218-
///
219-
/// #[pymodule]
220-
/// fn my_module(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
221-
/// // Register atexit cleanup for tokio runtime
222-
/// pyo3_async_runtimes::tokio::register_atexit_cleanup(py)?;
223-
///
224-
/// // ... rest of module init
225-
/// Ok(())
226-
/// }
227-
/// ```
228-
pub fn register_atexit_cleanup(py: Python<'_>) -> PyResult<()> {
229-
if ATEXIT_REGISTERED
230-
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
231-
.is_ok()
232-
{
233-
// Create a Python function that calls our Rust cleanup
234-
let cleanup_fn = PyCFunction::new_closure(
235-
py,
236-
None,
237-
None,
238-
|_args: &Bound<'_, pyo3::types::PyTuple>,
239-
_kwargs: Option<&Bound<'_, pyo3::types::PyDict>>| {
240-
join_pending_cleanups();
241-
Ok::<(), PyErr>(())
242-
},
243-
)?;
244-
245-
// Register with Python's atexit module
246-
let atexit = py.import("atexit")?;
247-
atexit.call_method1("register", (cleanup_fn,))?;
248-
}
249-
Ok(())
250-
}
185+
/// Pending runtime thread that needs cleanup.
186+
/// Stored when request_shutdown_background is called, joined by join_pending_shutdown.
187+
static PENDING_SHUTDOWN: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
251188

252189
/// Get or create the runtime wrapper.
253190
///
@@ -639,16 +576,12 @@ pub fn request_shutdown_background(timeout_ms: u64) -> bool {
639576
let _ = sender.send(timeout_ms);
640577
}
641578

642-
// Take the thread handle and store it for cleanup at process exit
579+
// Store the thread handle for later joining via join_pending_shutdown()
643580
if let Some(thread) = wrapper.runtime_thread.lock().unwrap().take() {
644-
// Store the thread handle for later cleanup via atexit.
645-
// This ensures the thread is properly joined before Python finalizes,
646-
// preventing SIGSEGV from tokio threads running during interpreter shutdown.
647-
PENDING_CLEANUPS.lock().unwrap().push(thread);
581+
*PENDING_SHUTDOWN.lock().unwrap() = Some(thread);
648582
}
649583

650584
// The wrapper is dropped here, releasing all Arc references.
651-
// The thread will be joined later via join_pending_cleanups() called from atexit.
652585

653586
true
654587
} else {
@@ -657,6 +590,50 @@ pub fn request_shutdown_background(timeout_ms: u64) -> bool {
657590
}
658591
}
659592

593+
/// Join any pending runtime shutdown thread.
594+
///
595+
/// This function should be called from Python (via `asyncio.to_thread`) after
596+
/// `request_shutdown_background` signals shutdown. It blocks until the runtime
597+
/// thread completes, with the GIL released to allow other Python threads to run.
598+
///
599+
/// This is the key to proper cleanup: by calling this from Python's async context
600+
/// (via `asyncio.to_thread`), we ensure the runtime thread is fully terminated
601+
/// before Python's event loop closes.
602+
///
603+
/// # Arguments
604+
///
605+
/// * `py` - Python GIL token (will be released during blocking wait)
606+
///
607+
/// # Returns
608+
///
609+
/// Returns `true` if a thread was joined, `false` if no pending shutdown.
610+
///
611+
/// # Example
612+
///
613+
/// ```python
614+
/// import asyncio
615+
/// from etcd_client import _join_pending_shutdown
616+
///
617+
/// async def cleanup():
618+
/// # ... signal shutdown ...
619+
/// # Wait for runtime to fully terminate
620+
/// await asyncio.to_thread(_join_pending_shutdown)
621+
/// ```
622+
pub fn join_pending_shutdown(py: Python<'_>) -> bool {
623+
let thread = PENDING_SHUTDOWN.lock().unwrap().take();
624+
625+
if let Some(thread) = thread {
626+
// Release GIL while blocking on thread join
627+
#[allow(deprecated)] // py.allow_threads is deprecated but detach doesn't fit our use case
628+
py.allow_threads(|| {
629+
let _ = thread.join();
630+
});
631+
true
632+
} else {
633+
false
634+
}
635+
}
636+
660637
// ============================================================================
661638
// Public API - Future Conversion
662639
// ============================================================================

0 commit comments

Comments
 (0)