Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions compio-runtime/src/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ use crate::{ContextExt, Runtime};
struct Inner {
tokens: RefCell<HashSet<Cancel>>,
is_cancelled: Cell<bool>,
runtime: Runtime,
// No runtime handle stored here. Every method that needs the runtime
// obtains it on demand via the thread-local (`Runtime::try_with_current`).
// Storing a strong `Rc<RuntimeInner>` (or even a `Weak`) was the root of
// the reference cycle: task → CancelToken → Rc<RuntimeInner> → executor →
// task. Using the thread-local avoids the cycle entirely with no atomic
// overhead.
notify: Event,
}

Expand Down Expand Up @@ -58,7 +63,6 @@ impl CancelToken {
Self(Rc::new(Inner {
tokens: RefCell::new(HashSet::new()),
is_cancelled: Cell::new(false),
runtime: Runtime::current(),
notify: Event::new(),
}))
}
Expand All @@ -74,9 +78,13 @@ impl CancelToken {
return;
}
let tokens = mem::take(self.0.tokens.borrow_mut().deref_mut());
for t in tokens {
self.0.runtime.cancel_token(t);
}
// If the runtime is no longer active, the io_uring fd is already
// closed and all pending ops have been cancelled by the kernel.
let _ = Runtime::try_with_current(move |rt| {
for t in tokens {
rt.cancel_token(t);
}
});
}

/// Check if this token has been cancelled.
Expand All @@ -95,12 +103,16 @@ impl CancelToken {
///
/// [`with_cancel`]: crate::FutureExt::with_cancel
pub fn register<T: OpCode>(&self, key: &Key<T>) {
if self.0.is_cancelled.get() {
self.0.runtime.cancel(key.clone());
} else {
let token = self.0.runtime.register_cancel(key);
self.0.tokens.borrow_mut().insert(token);
}
// If no runtime is active (rare: the op's task should have been
// dropped first), there is nothing to register against.
let _ = Runtime::try_with_current(|rt| {
if self.0.is_cancelled.get() {
rt.cancel(key.clone());
} else {
let token = rt.register_cancel(key);
self.0.tokens.borrow_mut().insert(token);
}
});
}

/// Wait until this token is cancelled.
Expand Down
44 changes: 24 additions & 20 deletions compio-runtime/src/future/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,25 @@ pin_project_lite::pin_project! {
///
/// [`.with_extra()`]: Submit::with_extra
pub struct Submit<T: OpCode, E = ()> {
runtime: Runtime,
// No runtime handle stored here — the runtime is obtained on demand
// via the thread-local (`Runtime::current` / `try_with_current`).
// Storing any form of `Rc<RuntimeInner>` (strong or weak) from inside
// a task — which lives inside the executor — creates a reference cycle
// that prevents `executor.clear()` from running on runtime drop,
// leaking the io_uring fd and every fd owned by in-flight ops.
state: Option<State<T, E>>,
}

impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
// `try_with_current` no-ops if called outside a runtime context.
// That happens when `executor.clear()` drops tasks; it runs inside
// `Runtime::enter`, so the thread-local IS set and the cancel goes
// through. If somehow called after the runtime is fully gone, the
// io_uring fd is already closed — no need to cancel.
if let Some(State::Submitted { key, .. }) = this.state.take() {
this.runtime.cancel(key);
let _ = Runtime::try_with_current(|rt| rt.cancel(key));
}
}
}
Expand All @@ -89,9 +99,8 @@ impl<T: OpCode, E> State<T, E> {
}

impl<T: OpCode> Submit<T, ()> {
pub(crate) fn new(runtime: Runtime, op: T) -> Self {
pub(crate) fn new(op: T) -> Self {
Submit {
runtime,
state: Some(State::Idle { op }),
}
}
Expand All @@ -101,12 +110,8 @@ impl<T: OpCode> Submit<T, ()> {
/// This is useful if you need to access extra information provided by the
/// runtime upon completion of the operation.
pub fn with_extra(mut self) -> Submit<T, Extra> {
let runtime = self.runtime.clone();
let Some(state) = self.state.take() else {
return Submit {
runtime,
state: None,
};
return Submit { state: None };
};
let state = match state {
State::Submitted { key, .. } => State::Submitted {
Expand All @@ -115,10 +120,7 @@ impl<T: OpCode> Submit<T, ()> {
},
State::Idle { op } => State::Idle { op },
};
Submit {
runtime,
state: Some(state),
}
Submit { state: Some(state) }
}
}

Expand All @@ -127,19 +129,20 @@ impl<T: OpCode + 'static> Future for Submit<T, ()> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let runtime = Runtime::current();

loop {
match this.state.take().expect("Cannot poll after ready") {
State::Submitted { key, .. } => match this.runtime.poll_task(cx.get_waker(), key) {
State::Submitted { key, .. } => match runtime.poll_task(cx.get_waker(), key) {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
}
PushEntry::Ready(res) => return Poll::Ready(res),
},
State::Idle { op } => {
let extra = cx.as_extra(|| this.runtime.default_extra());
match this.runtime.submit_raw(op, extra) {
let extra = cx.as_extra(|| runtime.default_extra());
match runtime.submit_raw(op, extra) {
PushEntry::Pending(key) => {
// TODO: Should we register it only the first time or every time it's
// being polled?
Expand All @@ -164,11 +167,12 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let runtime = Runtime::current();

loop {
match this.state.take().expect("Cannot poll after ready") {
State::Submitted { key, .. } => {
match this.runtime.poll_task_with_extra(cx.get_waker(), key) {
match runtime.poll_task_with_extra(cx.get_waker(), key) {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
Expand All @@ -177,8 +181,8 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
}
}
State::Idle { op } => {
let extra = cx.as_extra(|| this.runtime.default_extra());
match this.runtime.submit_raw(op, extra) {
let extra = cx.as_extra(|| runtime.default_extra());
match runtime.submit_raw(op, extra) {
PushEntry::Pending(key) => {
if let Some(cancel) = cx.get_cancel() {
cancel.register(&key);
Expand All @@ -187,7 +191,7 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
*this.state = Some(State::submitted(key))
}
PushEntry::Ready(res) => {
return Poll::Ready((res, this.runtime.default_extra()));
return Poll::Ready((res, runtime.default_extra()));
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions compio-runtime/src/future/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ pin_project_lite::pin_project! {
/// When this is dropped and the operation hasn't finished yet, it will try to
/// cancel the operation.
pub struct SubmitMulti<T: OpCode> {
runtime: Runtime,
// No runtime handle stored here — see `Submit` in `future.rs` for the
// explanation of why storing any `Rc<RuntimeInner>` (strong or weak)
// from inside a task forms a cycle that leaks the io_uring fd.
state: Option<State<T>>,
}

impl<T: OpCode> PinnedDrop for SubmitMulti<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if let Some(State::Submitted { key }) = this.state.take() {
this.runtime.cancel(key);
let _ = Runtime::try_with_current(|rt| rt.cancel(key));
}
}
}
Expand All @@ -46,9 +48,8 @@ impl<T: OpCode> State<T> {
}

impl<T: OpCode> SubmitMulti<T> {
pub(crate) fn new(runtime: Runtime, op: T) -> Self {
pub(crate) fn new(op: T) -> Self {
SubmitMulti {
runtime,
state: Some(State::Idle { op }),
}
}
Expand Down Expand Up @@ -78,12 +79,13 @@ impl<T: OpCode + 'static> Stream for SubmitMulti<T> {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let runtime = Runtime::current();

loop {
match this.state.take().expect("State error, this is a bug") {
State::Idle { op } => {
let extra = cx.as_extra(|| this.runtime.default_extra());
match this.runtime.submit_raw(op, extra) {
let extra = cx.as_extra(|| runtime.default_extra());
match runtime.submit_raw(op, extra) {
PushEntry::Pending(key) => {
if let Some(cancel) = cx.get_cancel() {
cancel.register(&key);
Expand All @@ -93,21 +95,21 @@ impl<T: OpCode + 'static> Stream for SubmitMulti<T> {
}
PushEntry::Ready(BufResult(res, op)) => {
*this.state = Some(State::Finished { op });
let extra = this.runtime.default_extra();
let extra = runtime.default_extra();

return Poll::Ready(Some(BufResult(res, extra)));
}
}
}

State::Submitted { key, .. } => {
if let Some(res) = this.runtime.poll_multishot(cx.get_waker(), &key) {
if let Some(res) = runtime.poll_multishot(cx.get_waker(), &key) {
*this.state = Some(State::submitted(key));

return Poll::Ready(Some(res));
};

match this.runtime.poll_task_with_extra(cx.get_waker(), key) {
match runtime.poll_task_with_extra(cx.get_waker(), key) {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));

Expand Down
10 changes: 7 additions & 3 deletions compio-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,14 @@ impl Runtime {
///
/// You only need this when authoring your own [`OpCode`].
pub fn submit<T: OpCode + 'static>(&self, op: T) -> Submit<T> {
Submit::new(self.clone(), op)
Submit::new(op)
}

/// Submit a multishot operation to the runtime.
///
/// You only need this when authoring your own [`OpCode`].
pub fn submit_multi<T: OpCode + 'static>(&self, op: T) -> SubmitMulti<T> {
SubmitMulti::new(self.clone(), op)
SubmitMulti::new(op)
}

pub(crate) fn cancel<T: OpCode>(&self, key: Key<T>) {
Expand Down Expand Up @@ -445,7 +445,11 @@ impl Runtime {

impl Drop for Runtime {
fn drop(&mut self) {
// this is not the last runtime reference, no need to clear
// Only the last owner clears the executor. With no stored
// `Rc<RuntimeInner>` inside tasks (Submit/SubmitMulti/CancelToken all
// use the thread-local now), `strong_count` is 1 whenever the
// user-facing `Runtime` drops. The guard still protects against the
// edge case of a user explicitly cloning `Runtime` via `current()`.
if Rc::strong_count(&self.0) > 1 {
return;
}
Expand Down
55 changes: 54 additions & 1 deletion compio-runtime/tests/drop.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{
future::Future,
pin::Pin,
sync::Arc,
sync::{
Arc,
atomic::{AtomicBool, Ordering::SeqCst},
},
task::{Context, Poll},
thread::{self, ThreadId},
};
Expand Down Expand Up @@ -51,6 +54,56 @@ fn test_drop_with_timer() {
})
}

/// Regression test for the `Rc` cycle that prevented `executor.clear()` from
/// running when a spawned task parked on `sleep` (or any other future that
/// stores a `CancelToken`/`Submit`/`SubmitMulti`).
///
/// Before the fix, `CancelToken::Inner` held a strong `Runtime` clone. A
/// parked task therefore formed:
/// task → CancelToken → Rc<RuntimeInner> → executor → task
/// `Runtime::drop` saw `strong_count > 1` and early-returned, so
/// `executor.clear()` never ran, the task was never dropped, and the
/// io_uring fd (plus every socket fd owned by in-flight ops) leaked for
/// the life of the process.
///
/// After the fix, `CancelToken::Inner` (and `Submit`/`SubmitMulti`) hold
/// `Weak<RuntimeInner>`, so `strong_count` is always 1 when the last user
/// `Runtime` drops, `executor.clear()` always runs, and tasks are dropped.
#[test]
#[cfg(feature = "time")]
fn test_task_dropped_when_runtime_drops() {
struct DropFlag(Arc<AtomicBool>);
impl Drop for DropFlag {
fn drop(&mut self) {
self.0.store(true, SeqCst);
}
}

let flag = Arc::new(AtomicBool::new(false));
let flag2 = flag.clone();

let rt = compio_runtime::Runtime::new().unwrap();
rt.block_on(async move {
compio_runtime::spawn(async move {
let _guard = DropFlag(flag2);
// Parking on sleep is the exact pattern that formed the Rc cycle:
// CancelToken held a strong Runtime, keeping the executor alive,
// keeping the task alive, keeping the CancelToken alive, ...
compio_runtime::time::sleep(std::time::Duration::from_secs(3600)).await;
})
.detach();
// `block_on` calls `self.run()` once after the main future resolves,
// which is enough to poll the spawned task and park it on the timer.
// No explicit yield needed.
});
drop(rt);

assert!(
flag.load(SeqCst),
"spawned task was not dropped: Rc cycle still present, executor.clear() never ran"
);
}

#[test]
fn test_wake_after_runtime_drop() {
let waker = Arc::new(AtomicWaker::new());
Expand Down
Loading