From 5643dba77de24313862e7a237b8214fcc71e7f69 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Fri, 8 May 2026 00:18:10 +0800 Subject: [PATCH 1/2] fix(runtime): split the runtime to avoid ref cycle --- compio-runtime/src/cancel.rs | 25 +++-- compio-runtime/src/future/future.rs | 53 +++++---- compio-runtime/src/future/mod.rs | 54 +++++++++ compio-runtime/src/future/stream.rs | 32 ++++-- compio-runtime/src/lib.rs | 167 ++++++---------------------- compio-runtime/src/time.rs | 38 ++++++- compio-runtime/tests/drop.rs | 33 ++++++ 7 files changed, 225 insertions(+), 177 deletions(-) diff --git a/compio-runtime/src/cancel.rs b/compio-runtime/src/cancel.rs index c37fd7054..7440afd79 100644 --- a/compio-runtime/src/cancel.rs +++ b/compio-runtime/src/cancel.rs @@ -1,6 +1,7 @@ use std::{ cell::{Cell, RefCell}, collections::HashSet, + fmt::Debug, mem, ops::DerefMut, pin::Pin, @@ -8,20 +9,30 @@ use std::{ task::{Context, Poll}, }; -use compio_driver::{Cancel, Key, OpCode}; +use compio_driver::{Cancel, Key, OpCode, Proactor}; use futures_util::{FutureExt, ready}; use synchrony::unsync::event::{Event, EventListener}; use crate::{ContextExt, Runtime}; -#[derive(Debug)] struct Inner { tokens: RefCell>, is_cancelled: Cell, - runtime: Runtime, + driver: Rc>, notify: Event, } +impl Debug for Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Inner") + .field("tokens", &self.tokens) + .field("is_cancelled", &self.is_cancelled) + .field("driver", &"...") + .field("notify", &self.notify) + .finish() + } +} + /// A token that can be used to cancel multiple operations at once. /// /// When [`CancelToken::cancel`] is called, all operations that have been @@ -58,7 +69,7 @@ impl CancelToken { Self(Rc::new(Inner { tokens: RefCell::new(HashSet::new()), is_cancelled: Cell::new(false), - runtime: Runtime::current(), + driver: Runtime::current_driver(), notify: Event::new(), })) } @@ -75,7 +86,7 @@ impl CancelToken { } let tokens = mem::take(self.0.tokens.borrow_mut().deref_mut()); for t in tokens { - self.0.runtime.cancel_token(t); + self.0.driver.borrow_mut().cancel_token(t); } } @@ -96,9 +107,9 @@ impl CancelToken { /// [`with_cancel`]: crate::FutureExt::with_cancel pub fn register(&self, key: &Key) { if self.0.is_cancelled.get() { - self.0.runtime.cancel(key.clone()); + self.0.driver.borrow_mut().cancel(key.clone()); } else { - let token = self.0.runtime.register_cancel(key); + let token = self.0.driver.borrow_mut().register_cancel(key); self.0.tokens.borrow_mut().insert(token); } } diff --git a/compio-runtime/src/future/future.rs b/compio-runtime/src/future/future.rs index 8f9452315..7b2c420d8 100644 --- a/compio-runtime/src/future/future.rs +++ b/compio-runtime/src/future/future.rs @@ -1,18 +1,21 @@ //! Future for submitting operations to the runtime. use std::{ + cell::RefCell, future::Future, marker::PhantomData, pin::Pin, + rc::Rc, task::{Context, Poll, Waker}, }; use compio_buf::BufResult; -use compio_driver::{Extra, Key, OpCode, PushEntry}; +use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry}; use futures_util::future::FusedFuture; use crate::{ - CancelToken, Runtime, + CancelToken, + future::{poll_task, poll_task_with_extra, submit_raw}, waker::{get_ext, get_waker}, }; @@ -59,7 +62,7 @@ pin_project_lite::pin_project! { /// /// [`.with_extra()`]: Submit::with_extra pub struct Submit { - runtime: Runtime, + driver: Rc>, state: Option>, } @@ -67,11 +70,10 @@ pin_project_lite::pin_project! { fn drop(this: Pin<&mut Self>) { let this = this.project(); if let Some(State::Submitted { key, .. }) = this.state.take() { - this.runtime.cancel(key); + this.driver.borrow_mut().cancel(key); } } } - } enum State { @@ -89,9 +91,9 @@ impl State { } impl Submit { - pub(crate) fn new(runtime: Runtime, op: T) -> Self { + pub(crate) fn new(driver: Rc>, op: T) -> Self { Submit { - runtime, + driver, state: Some(State::Idle { op }), } } @@ -101,10 +103,10 @@ impl Submit { /// This is useful if you need to access extra information provided by the /// runtime upon completion of the operation. pub fn with_extra(mut self) -> Submit { - let runtime = self.runtime.clone(); + let driver = self.driver.clone(); let Some(state) = self.state.take() else { return Submit { - runtime, + driver, state: None, }; }; @@ -116,7 +118,7 @@ impl Submit { State::Idle { op } => State::Idle { op }, }; Submit { - runtime, + driver, state: Some(state), } } @@ -130,16 +132,20 @@ impl Future for Submit { loop { match this.state.take().expect("Cannot poll after ready") { - State::Submitted { key, .. } => match this.runtime.poll_task(cx.get_waker(), key) { - PushEntry::Pending(key) => { - *this.state = Some(State::submitted(key)); - return Poll::Pending; + State::Submitted { key, .. } => { + let entry = poll_task(&mut this.driver.borrow_mut(), cx.get_waker(), key); + match entry { + PushEntry::Pending(key) => { + *this.state = Some(State::submitted(key)); + return Poll::Pending; + } + PushEntry::Ready(res) => return Poll::Ready(res), } - PushEntry::Ready(res) => return Poll::Ready(res), - }, + } State::Idle { op } => { - let extra = cx.as_extra(|| this.runtime.default_extra()); - match this.runtime.submit_raw(op, extra) { + let extra = cx.as_extra(|| this.driver.borrow().default_extra()); + let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra); + match entry { PushEntry::Pending(key) => { // TODO: Should we register it only the first time or every time it's // being polled? @@ -168,7 +174,9 @@ impl Future for Submit { loop { match this.state.take().expect("Cannot poll after ready") { State::Submitted { key, .. } => { - match this.runtime.poll_task_with_extra(cx.get_waker(), key) { + let entry = + poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key); + match entry { PushEntry::Pending(key) => { *this.state = Some(State::submitted(key)); return Poll::Pending; @@ -177,8 +185,9 @@ impl Future for Submit { } } State::Idle { op } => { - let extra = cx.as_extra(|| this.runtime.default_extra()); - match this.runtime.submit_raw(op, extra) { + let extra = cx.as_extra(|| this.driver.borrow().default_extra()); + let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra); + match entry { PushEntry::Pending(key) => { if let Some(cancel) = cx.get_cancel() { cancel.register(&key); @@ -187,7 +196,7 @@ impl Future for Submit { *this.state = Some(State::submitted(key)) } PushEntry::Ready(res) => { - return Poll::Ready((res, this.runtime.default_extra())); + return Poll::Ready((res, this.driver.borrow().default_extra())); } } } diff --git a/compio-runtime/src/future/mod.rs b/compio-runtime/src/future/mod.rs index 8b9dc063e..0406d9606 100644 --- a/compio-runtime/src/future/mod.rs +++ b/compio-runtime/src/future/mod.rs @@ -1,3 +1,57 @@ +use std::task::Waker; + +use compio_buf::BufResult; +use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry}; +use compio_log::instrument; + +fn poll_task( + driver: &mut Proactor, + waker: &Waker, + key: Key, +) -> PushEntry, BufResult> { + instrument!(compio_log::Level::DEBUG, "poll_task", ?key); + driver.pop(key).map_pending(|k| { + driver.update_waker(&k, waker); + k + }) +} + +fn poll_task_with_extra( + driver: &mut Proactor, + waker: &Waker, + key: Key, +) -> PushEntry, (BufResult, Extra)> { + instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key); + driver.pop_with_extra(key).map_pending(|k| { + driver.update_waker(&k, waker); + k + }) +} + +fn poll_multishot( + driver: &mut Proactor, + waker: &Waker, + key: &Key, +) -> Option> { + instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key); + if let Some(res) = driver.pop_multishot(key) { + return Some(res); + } + driver.update_waker(key, waker); + None +} + +fn submit_raw( + driver: &mut Proactor, + op: T, + extra: Option, +) -> PushEntry, BufResult> { + match extra { + Some(e) => driver.push_with_extra(op, e), + None => driver.push(op), + } +} + mod combinator; #[allow(clippy::module_inception)] mod future; diff --git a/compio-runtime/src/future/stream.rs b/compio-runtime/src/future/stream.rs index 602dc55a2..009e1d2b0 100644 --- a/compio-runtime/src/future/stream.rs +++ b/compio-runtime/src/future/stream.rs @@ -1,17 +1,22 @@ use std::{ + cell::RefCell, marker::PhantomData, pin::Pin, + rc::Rc, task::{Context, Poll}, }; use compio_buf::{BufResult, SetLen}; use compio_driver::{ - BufferPool, BufferRef, Extra, Key, OpCode, PushEntry, TakeBuffer, + BufferPool, BufferRef, Extra, Key, OpCode, Proactor, PushEntry, TakeBuffer, op::{RecvFromMultiResult, RecvMsgMultiResult}, }; use futures_util::{Stream, StreamExt, stream::FusedStream}; -use crate::{ContextExt, Runtime}; +use crate::{ + ContextExt, + future::{poll_multishot, poll_task_with_extra, submit_raw}, +}; pin_project_lite::pin_project! { /// Returned [`Stream`] for [`Runtime::submit_multi`]. @@ -19,7 +24,7 @@ pin_project_lite::pin_project! { /// When this is dropped and the operation hasn't finished yet, it will try to /// cancel the operation. pub struct SubmitMulti { - runtime: Runtime, + driver: Rc>, state: Option>, } @@ -27,7 +32,7 @@ pin_project_lite::pin_project! { fn drop(this: Pin<&mut Self>) { let this = this.project(); if let Some(State::Submitted { key }) = this.state.take() { - this.runtime.cancel(key); + this.driver.borrow_mut().cancel(key); } } } @@ -46,9 +51,9 @@ impl State { } impl SubmitMulti { - pub(crate) fn new(runtime: Runtime, op: T) -> Self { + pub(crate) fn new(driver: Rc>, op: T) -> Self { SubmitMulti { - runtime, + driver, state: Some(State::Idle { op }), } } @@ -82,8 +87,9 @@ impl Stream for SubmitMulti { loop { match this.state.take().expect("State error, this is a bug") { State::Idle { op } => { - let extra = cx.as_extra(|| this.runtime.default_extra()); - match this.runtime.submit_raw(op, extra) { + let extra = cx.as_extra(|| this.driver.borrow().default_extra()); + let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra); + match entry { PushEntry::Pending(key) => { if let Some(cancel) = cx.get_cancel() { cancel.register(&key); @@ -93,7 +99,7 @@ impl Stream for SubmitMulti { } PushEntry::Ready(BufResult(res, op)) => { *this.state = Some(State::Finished { op }); - let extra = this.runtime.default_extra(); + let extra = this.driver.borrow().default_extra(); return Poll::Ready(Some(BufResult(res, extra))); } @@ -101,13 +107,17 @@ impl Stream for SubmitMulti { } State::Submitted { key, .. } => { - if let Some(res) = this.runtime.poll_multishot(cx.get_waker(), &key) { + if let Some(res) = + poll_multishot(&mut this.driver.borrow_mut(), cx.get_waker(), &key) + { *this.state = Some(State::submitted(key)); return Poll::Ready(Some(res)); }; - match this.runtime.poll_task_with_extra(cx.get_waker(), key) { + let entry = + poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key); + match entry { PushEntry::Pending(key) => { *this.state = Some(State::submitted(key)); diff --git a/compio-runtime/src/lib.rs b/compio-runtime/src/lib.rs index 6a450fa03..cb17a4e0f 100644 --- a/compio-runtime/src/lib.rs +++ b/compio-runtime/src/lib.rs @@ -37,17 +37,13 @@ use std::{ fmt::Debug, future::Future, io, - ops::Deref, rc::Rc, task::{Context, Poll, Waker}, time::Duration, }; use compio_buf::{BufResult, IntoInner}; -use compio_driver::{ - AsRawFd, Cancel, DriverType, Extra, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd, - op::Asyncify, -}; +use compio_driver::{AsRawFd, DriverType, OpCode, Proactor, ProactorBuilder, RawFd, op::Asyncify}; pub use compio_driver::{BufferPool, ErrorExt}; use compio_executor::{Executor, ExecutorConfig}; pub use compio_executor::{JoinHandle, ResumeUnwind}; @@ -55,7 +51,7 @@ use compio_log::{debug, instrument}; use crate::affinity::bind_to_cpu_set; #[cfg(feature = "time")] -use crate::time::{TimerFuture, TimerKey, TimerRuntime}; +use crate::time::TimerRuntime; pub use crate::{attacher::*, cancel::CancelToken, future::*}; scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); @@ -65,40 +61,28 @@ fn not_in_compio_runtime() -> ! { panic!("not in a compio runtime") } -/// Inner structure of [`Runtime`]. -pub struct RuntimeInner { - executor: Executor, - driver: RefCell, - #[cfg(feature = "time")] - timer_runtime: RefCell, -} - /// The async runtime of compio. /// /// It is a thread-local runtime, meaning it cannot be sent to other threads. #[derive(Clone)] -pub struct Runtime(Rc); +pub struct Runtime { + executor: Rc, + driver: Rc>, + #[cfg(feature = "time")] + timer_runtime: Rc>, +} impl Debug for Runtime { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("Runtime"); - s.field("executor", &self.0.executor) - .field("driver", &"...") - .field("scheduler", &"..."); + s.field("executor", &self.executor); + s.field("driver", &"..."); #[cfg(feature = "time")] s.field("timer_runtime", &"..."); s.finish() } } -impl Deref for Runtime { - type Target = RuntimeInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - impl Runtime { /// Create [`Runtime`] with default config. pub fn new() -> io::Result { @@ -161,6 +145,23 @@ impl Runtime { } } + pub(crate) fn current_driver() -> Rc> { + if CURRENT_RUNTIME.is_set() { + CURRENT_RUNTIME.with(|r| r.driver.clone()) + } else { + not_in_compio_runtime() + } + } + + #[cfg(feature = "time")] + pub(crate) fn current_timer_runtime() -> Rc> { + if CURRENT_RUNTIME.is_set() { + CURRENT_RUNTIME.with(|r| r.timer_runtime.clone()) + } else { + not_in_compio_runtime() + } + } + /// Set this runtime as current runtime, and perform a function in the /// current scope. pub fn enter T>(&self, f: F) -> T { @@ -209,7 +210,7 @@ impl Runtime { /// Spawning a task enables the task to execute concurrently to other tasks. /// There is no guarantee that a spawned task will execute to completion. pub fn spawn(&self, future: F) -> JoinHandle { - self.0.executor.spawn(future) + self.executor.spawn(future) } /// Spawns a blocking task in a new thread, and wait for it. @@ -237,34 +238,18 @@ impl Runtime { self.driver.borrow_mut().attach(fd) } - fn submit_raw( - &self, - op: T, - extra: Option, - ) -> PushEntry, BufResult> { - let mut this = self.driver.borrow_mut(); - match extra { - Some(e) => this.push_with_extra(op, e), - None => this.push(op), - } - } - - fn default_extra(&self) -> Extra { - self.driver.borrow().default_extra() - } - /// Submit an operation to the runtime. /// /// You only need this when authoring your own [`OpCode`]. pub fn submit(&self, op: T) -> Submit { - Submit::new(self.clone(), op) + Submit::new(self.driver.clone(), op) } /// Submit a multishot operation to the runtime. /// /// You only need this when authoring your own [`OpCode`]. pub fn submit_multi(&self, op: T) -> SubmitMulti { - SubmitMulti::new(self.clone(), op) + SubmitMulti::new(self.driver.clone(), op) } /// Flush the driver and return whether the driver has been notified. @@ -274,77 +259,6 @@ impl Runtime { self.driver.borrow_mut().flush() } - pub(crate) fn cancel(&self, key: Key) { - self.driver.borrow_mut().cancel(key); - } - - pub(crate) fn register_cancel(&self, key: &Key) -> Cancel { - self.driver.borrow_mut().register_cancel(key) - } - - pub(crate) fn cancel_token(&self, token: Cancel) -> bool { - self.driver.borrow_mut().cancel_token(token) - } - - #[cfg(feature = "time")] - pub(crate) fn cancel_timer(&self, key: &TimerKey) { - self.timer_runtime.borrow_mut().cancel(key); - } - - pub(crate) fn poll_task( - &self, - waker: &Waker, - key: Key, - ) -> PushEntry, BufResult> { - instrument!(compio_log::Level::DEBUG, "poll_task", ?key); - let mut driver = self.driver.borrow_mut(); - driver.pop(key).map_pending(|k| { - driver.update_waker(&k, waker); - k - }) - } - - pub(crate) fn poll_task_with_extra( - &self, - waker: &Waker, - key: Key, - ) -> PushEntry, (BufResult, Extra)> { - instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key); - let mut driver = self.driver.borrow_mut(); - driver.pop_with_extra(key).map_pending(|k| { - driver.update_waker(&k, waker); - k - }) - } - - pub(crate) fn poll_multishot( - &self, - waker: &Waker, - key: &Key, - ) -> Option> { - instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key); - let mut driver = self.driver.borrow_mut(); - if let Some(res) = driver.pop_multishot(key) { - return Some(res); - } - driver.update_waker(key, waker); - None - } - - #[cfg(feature = "time")] - pub(crate) fn poll_timer(&self, cx: &mut Context, key: &TimerKey) -> Poll<()> { - instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key); - let mut timer_runtime = self.timer_runtime.borrow_mut(); - if timer_runtime.is_completed(key) { - debug!("ready"); - Poll::Ready(()) - } else { - debug!("pending"); - timer_runtime.update_waker(key, cx.waker()); - Poll::Pending - } - } - /// Low level API to control the runtime. /// /// Get the timeout value to be passed to [`Proactor::poll`]. @@ -442,7 +356,7 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { // this is not the last runtime reference, no need to clear - if Rc::strong_count(&self.0) > 1 { + if Rc::strong_count(&self.executor) > 1 { return; } @@ -560,13 +474,12 @@ impl RuntimeBuilder { local_queue_size: *local_queue_size, waker: Some(driver.waker()), }); - let inner = RuntimeInner { - executor, - driver: RefCell::new(driver), + Ok(Runtime { + executor: Rc::new(executor), + driver: Rc::new(RefCell::new(driver)), #[cfg(feature = "time")] - timer_runtime: RefCell::new(TimerRuntime::new()), - }; - Ok(Runtime(Rc::new(inner))) + timer_runtime: Rc::new(RefCell::new(TimerRuntime::new())), + }) } } @@ -667,11 +580,3 @@ pub fn register_files(fds: &[RawFd]) -> io::Result<()> { pub fn unregister_files() -> io::Result<()> { Runtime::with_current(|r| r.unregister_files()) } - -#[cfg(feature = "time")] -pub(crate) async fn create_timer(instant: std::time::Instant) { - let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant)); - if let Some(key) = key { - TimerFuture::new(key).await - } -} diff --git a/compio-runtime/src/time.rs b/compio-runtime/src/time.rs index 507ab4361..769ae8e09 100644 --- a/compio-runtime/src/time.rs +++ b/compio-runtime/src/time.rs @@ -1,6 +1,7 @@ //! Utilities for tracking time. use std::{ + cell::RefCell, collections::BTreeMap, error::Error, fmt::Display, @@ -8,10 +9,12 @@ use std::{ marker::PhantomData, mem::replace, pin::Pin, + rc::Rc, task::{Context, Poll, Waker}, time::{Duration, Instant}, }; +use compio_log::{debug, instrument}; use futures_util::{FutureExt, select}; use crate::Runtime; @@ -60,7 +63,15 @@ pub async fn sleep(duration: Duration) { /// # }) /// ``` pub async fn sleep_until(deadline: Instant) { - crate::create_timer(deadline).await + create_timer(deadline).await +} + +async fn create_timer(instant: std::time::Instant) { + let timer_runtime = Runtime::current_timer_runtime(); + let key = timer_runtime.borrow_mut().insert(instant); + if let Some(key) = key { + TimerFuture::new(timer_runtime, key).await; + } } /// Error returned by [`timeout`] or [`timeout_at`]. @@ -319,13 +330,28 @@ impl TimerRuntime { w.wake(); } } + + pub fn poll_timer(&mut self, cx: &mut Context<'_>, key: &TimerKey) -> Poll<()> { + instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key); + if self.is_completed(key) { + debug!("ready"); + Poll::Ready(()) + } else { + debug!("pending"); + self.update_waker(key, cx.waker()); + Poll::Pending + } + } } -pub(crate) struct TimerFuture(TimerKey); +pub(crate) struct TimerFuture { + runtime: Rc>, + key: TimerKey, +} impl TimerFuture { - pub fn new(key: TimerKey) -> Self { - Self(key) + pub fn new(runtime: Rc>, key: TimerKey) -> Self { + Self { runtime, key } } } @@ -333,13 +359,13 @@ impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Runtime::with_current(|r| r.poll_timer(cx, &self.0)) + self.runtime.borrow_mut().poll_timer(cx, &self.key) } } impl Drop for TimerFuture { fn drop(&mut self) { - Runtime::with_current(|r| r.cancel_timer(&self.0)); + self.runtime.borrow_mut().cancel(&self.key); } } diff --git a/compio-runtime/tests/drop.rs b/compio-runtime/tests/drop.rs index f5cc34e72..ae0a02eb7 100644 --- a/compio-runtime/tests/drop.rs +++ b/compio-runtime/tests/drop.rs @@ -1,11 +1,14 @@ use std::{ + cell::Cell, future::Future, pin::Pin, + rc::Rc, sync::Arc, task::{Context, Poll}, thread::{self, ThreadId}, }; +use compio_runtime::CancelToken; use futures_util::task::AtomicWaker; struct DropWatcher { @@ -88,3 +91,33 @@ fn test_wake_from_another_thread_after_runtime_drop() { .join() .unwrap(); } + +#[test] +fn test_task_dropped_when_runtime_drops() { + struct DropFlag(Rc>); + impl Drop for DropFlag { + fn drop(&mut self) { + self.0.set(true); + } + } + + let flag = Rc::new(Cell::new(false)); + let flag2 = flag.clone(); + + let rt = compio_runtime::Runtime::new().unwrap(); + rt.block_on(async move { + compio_runtime::spawn(async move { + let _guard = DropFlag(flag2); + + // `CancelToken` contains a strong reference to the driver, but should not + // prevent the task from being dropped when the runtime is dropped. + let _token = CancelToken::new(); + + compio_runtime::time::sleep(std::time::Duration::from_secs(3600)).await; + }) + .detach(); + }); + drop(rt); + + assert!(flag.get(), "spawned task was not dropped: Rc cycle?"); +} From db14930121d2ff2569a4eed9a80d3bb1d62d47e8 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Fri, 8 May 2026 00:37:09 +0800 Subject: [PATCH 2/2] fix(runtime): ref cycle in timer --- compio-runtime/src/cancel.rs | 2 +- compio-runtime/src/lib.rs | 17 ----------------- compio-runtime/src/time.rs | 20 +++++++------------- 3 files changed, 8 insertions(+), 31 deletions(-) diff --git a/compio-runtime/src/cancel.rs b/compio-runtime/src/cancel.rs index 7440afd79..304619ce6 100644 --- a/compio-runtime/src/cancel.rs +++ b/compio-runtime/src/cancel.rs @@ -69,7 +69,7 @@ impl CancelToken { Self(Rc::new(Inner { tokens: RefCell::new(HashSet::new()), is_cancelled: Cell::new(false), - driver: Runtime::current_driver(), + driver: Runtime::with_current(|r| r.driver.clone()), notify: Event::new(), })) } diff --git a/compio-runtime/src/lib.rs b/compio-runtime/src/lib.rs index cb17a4e0f..a7077cb86 100644 --- a/compio-runtime/src/lib.rs +++ b/compio-runtime/src/lib.rs @@ -145,23 +145,6 @@ impl Runtime { } } - pub(crate) fn current_driver() -> Rc> { - if CURRENT_RUNTIME.is_set() { - CURRENT_RUNTIME.with(|r| r.driver.clone()) - } else { - not_in_compio_runtime() - } - } - - #[cfg(feature = "time")] - pub(crate) fn current_timer_runtime() -> Rc> { - if CURRENT_RUNTIME.is_set() { - CURRENT_RUNTIME.with(|r| r.timer_runtime.clone()) - } else { - not_in_compio_runtime() - } - } - /// Set this runtime as current runtime, and perform a function in the /// current scope. pub fn enter T>(&self, f: F) -> T { diff --git a/compio-runtime/src/time.rs b/compio-runtime/src/time.rs index 769ae8e09..12743d92e 100644 --- a/compio-runtime/src/time.rs +++ b/compio-runtime/src/time.rs @@ -1,7 +1,6 @@ //! Utilities for tracking time. use std::{ - cell::RefCell, collections::BTreeMap, error::Error, fmt::Display, @@ -9,7 +8,6 @@ use std::{ marker::PhantomData, mem::replace, pin::Pin, - rc::Rc, task::{Context, Poll, Waker}, time::{Duration, Instant}, }; @@ -67,10 +65,9 @@ pub async fn sleep_until(deadline: Instant) { } async fn create_timer(instant: std::time::Instant) { - let timer_runtime = Runtime::current_timer_runtime(); - let key = timer_runtime.borrow_mut().insert(instant); + let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant)); if let Some(key) = key { - TimerFuture::new(timer_runtime, key).await; + TimerFuture::new(key).await; } } @@ -344,14 +341,11 @@ impl TimerRuntime { } } -pub(crate) struct TimerFuture { - runtime: Rc>, - key: TimerKey, -} +pub(crate) struct TimerFuture(TimerKey); impl TimerFuture { - pub fn new(runtime: Rc>, key: TimerKey) -> Self { - Self { runtime, key } + pub fn new(key: TimerKey) -> Self { + Self(key) } } @@ -359,13 +353,13 @@ impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.runtime.borrow_mut().poll_timer(cx, &self.key) + Runtime::with_current(|r| r.timer_runtime.borrow_mut().poll_timer(cx, &self.0)) } } impl Drop for TimerFuture { fn drop(&mut self) { - self.runtime.borrow_mut().cancel(&self.key); + Runtime::with_current(|r| r.timer_runtime.borrow_mut().cancel(&self.0)); } }