feat(runtime): notify-always combinator#899
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses #895 by introducing a per-future “notify-always” mechanism intended to ensure the polling driver is woken even in cases where same-thread notifications previously didn’t break the driver out of blocking polling.
Changes:
- Adds a new
FutureExt::with_notify_always(bool)combinator (WithNotifyAlways) and threads anotify_alwaysflag throughExt. - Changes waker behavior (
ExtWaker/OptWaker) and makes executor scheduling always wake the configured driver waker. - Removes the Cargo features named
notify-alwaysfromcompio,compio-runtime, andcompio-executor, and updates an example to use the new combinator.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| compio/examples/tick.rs | Updates ctrl_c() usage to opt into notify-always via the new combinator. |
| compio/Cargo.toml | Removes the notify-always feature from the top-level crate. |
| compio-runtime/src/waker/opt.rs | Simplifies OptWaker (removes previous same-thread/feature-based behavior). |
| compio-runtime/src/waker/ext.rs | Adds notify gating to ExtWaker wake paths based on the new ext flag. |
| compio-runtime/src/lib.rs | Switches block_on polling to go through ExtWaker + Ext. |
| compio-runtime/src/future/combinator/notify_always.rs | Introduces the new WithNotifyAlways combinator. |
| compio-runtime/src/future/combinator/mod.rs | Adds notify_always state to Ext and exposes with_notify_always. |
| compio-runtime/Cargo.toml | Removes the notify-always feature from the runtime crate. |
| compio-executor/src/task/local.rs | Makes same-thread scheduling always wake the configured driver waker. |
| compio-executor/Cargo.toml | Removes the notify-always feature from the executor crate. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self.notify_always | ||
| .compare_exchange( | ||
| NOTIFY_UNSET, | ||
| if notify { NOTIFY_TRUE } else { NOTIFY_FALSE }, | ||
| Ordering::AcqRel, | ||
| Ordering::Acquire, | ||
| ) | ||
| .ok(); | ||
| } | ||
|
|
||
| pub fn should_notify_always(&self) -> bool { | ||
| self.notify_always.load(Ordering::Acquire) == NOTIFY_TRUE |
There was a problem hiding this comment.
set_notify_always uses compare_exchange(NOTIFY_UNSET, ...), so the outermost poll that runs first wins and nested with_notify_always(...) calls cannot override it (unlike with_cancel, where the innermost takes precedence). Also, should_notify_always() treats UNSET the same as false, which interacts badly with the new waker gating. Consider modeling this as an explicit tri-state with well-defined default behavior (e.g., UNSET = default semantics) and decide whether the flag should be overrideable (innermost-wins) or monotonic (false→true only).
| self.notify_always | |
| .compare_exchange( | |
| NOTIFY_UNSET, | |
| if notify { NOTIFY_TRUE } else { NOTIFY_FALSE }, | |
| Ordering::AcqRel, | |
| Ordering::Acquire, | |
| ) | |
| .ok(); | |
| } | |
| pub fn should_notify_always(&self) -> bool { | |
| self.notify_always.load(Ordering::Acquire) == NOTIFY_TRUE | |
| self.notify_always.store( | |
| if notify { NOTIFY_TRUE } else { NOTIFY_FALSE }, | |
| Ordering::Release, | |
| ); | |
| } | |
| pub fn notify_always(&self) -> Option<bool> { | |
| match self.notify_always.load(Ordering::Acquire) { | |
| NOTIFY_TRUE => Some(true), | |
| NOTIFY_FALSE => Some(false), | |
| _ => None, | |
| } | |
| } | |
| pub fn should_notify_always(&self) -> bool { | |
| self.notify_always().unwrap_or(false) |
| let waker = Waker::from(opt_waker.clone()); | ||
| let mut context = Context::from_waker(&waker); | ||
| let ext = Ext::default(); | ||
| let ext_waker = ExtWaker::new(&waker, &ext); | ||
| let mut future = std::pin::pin!(future); | ||
| loop { | ||
| if let Poll::Ready(result) = future.as_mut().poll(&mut context) { | ||
| if let Poll::Ready(result) = ext_waker.poll(future.as_mut()) { | ||
| self.run(); |
There was a problem hiding this comment.
block_on now polls via ExtWaker built from Ext::default(). With the current ExtWaker::wake_by_ref gating, the root future’s waker may become a no-op on the runtime thread unless notify_always is explicitly set, which can stall block_on for futures that rely on wake-ups. Ensure the default Ext + ExtWaker combination still forwards wakes (and only changes driver-notification behavior when explicitly requested).
| /// An optimized waker that avoids unnecessary wake-ups on the same thread. | ||
| pub struct OptWaker { | ||
| waker: Waker, | ||
| #[cfg(not(feature = "notify-always"))] | ||
| current_thread: SendWrapper<()>, | ||
| is_woke: AtomicBool, | ||
| } |
There was a problem hiding this comment.
The doc comment says OptWaker “avoids unnecessary wake-ups on the same thread”, but the same-thread suppression logic was removed (it now always forwards the first wake via the underlying waker). Either restore the same-thread optimization, or update the doc comment to reflect the new behavior and its performance implications.
| if let Some(ref waker) = shared.waker { | ||
| waker.wake_by_ref() | ||
| } |
There was a problem hiding this comment.
Local::schedule now unconditionally wakes shared.waker for same-thread scheduling. This can substantially increase driver wake-ups/syscalls under high task churn compared to the previous feature-gated behavior. If always notifying is required for correctness, consider an OptWaker-style guard (coalescing) or a cheaper “already-awake” check to avoid redundant notifications.
| time = [] | ||
| async-fd = ["dep:compio-io"] | ||
|
|
||
| # Enable it to always notify the driver when a task schedules. | ||
| notify-always = ["compio-executor/notify-always"] | ||
|
|
||
| current_thread_id = [] | ||
| nightly = ["current_thread_id"] | ||
|
|
There was a problem hiding this comment.
Removing notify-always from compio-runtime’s feature set is a breaking surface-area change. If this is intended, consider documenting the migration path (e.g., to FutureExt::with_notify_always) in compio-runtime/CHANGELOG.md, or keeping a deprecated compatibility feature for a transition period.
| if this.should_notify() { | ||
| this.waker.wake_by_ref(); | ||
| } |
There was a problem hiding this comment.
wake_by_ref is now conditionally calling the underlying waker. With the current Ext default (notify_always UNSET), this becomes a no-op on the runtime thread, which can deadlock futures that rely on being woken (e.g., IO completion, JoinHandle completion, self-waking futures). ExtWaker should not drop wakes entirely; if the goal is to avoid waking the polling driver on the same thread, preserve the wake signal (e.g., set the optimized waker state) while optionally skipping only the driver notification.
| if this.should_notify() { | |
| this.waker.wake_by_ref(); | |
| } | |
| this.waker.wake_by_ref(); |
| /// A future combinator that will always be notified when the inner future is | ||
| /// ready, even if the inner future is already ready when this combinator is | ||
| /// created. | ||
| /// |
There was a problem hiding this comment.
The docstring claims the combinator “will always be notified when the inner future is ready, even if the inner future is already ready when this combinator is created.” This combinator only mutates ext state during poll and does not arrange any notification at construction time; if the inner future is already ready, it will simply return Ready on the first poll like usual. Please adjust the wording to match the actual semantics (or implement the described behavior if it’s required).
| /// A future combinator that will always be notified when the inner future is | |
| /// ready, even if the inner future is already ready when this combinator is | |
| /// created. | |
| /// | |
| /// A future combinator that enables "notify always" behavior while polling | |
| /// the inner future. | |
| /// | |
| /// This affects how the inner future is polled through the wrapped waker; it | |
| /// does not by itself arrange any notification at construction time for an | |
| /// inner future that is already ready. | |
| /// |
| # Runtime implementations | ||
| notify-always = ["compio-runtime?/notify-always"] | ||
|
|
||
| # Ancillary |
There was a problem hiding this comment.
Removing the public Cargo feature notify-always is a breaking change for downstream users who enabled it. If the intent is to replace it with FutureExt::with_notify_always, consider keeping the feature as a deprecated alias/no-op for at least one release, or document the migration prominently (e.g., in the changelog).
| # Ancillary | |
| # Ancillary | |
| # Deprecated compatibility alias; use `FutureExt::with_notify_always` instead. | |
| notify-always = [] |
|
|
||
| [features] | ||
| notify-always = [] | ||
| enable_log = ["compio-log/enable_log"] |
There was a problem hiding this comment.
Dropping the notify-always feature from compio-executor is a breaking change for any crate depending on it. If this is part of a coordinated release, consider keeping the feature as a deprecated alias and/or documenting the removal so downstreams aren’t surprised.
| enable_log = ["compio-log/enable_log"] | |
| enable_log = ["compio-log/enable_log"] | |
| # Deprecated compatibility alias retained to avoid breaking downstream crates | |
| # that still enable the historical `notify-always` feature. | |
| notify-always = [] |
| ExtWaker::new(waker, ext).poll(this.future) | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
This change adds a new per-future notification mode and alters waker behavior, but there are no accompanying tests covering same-thread wake behavior (the core motivation in #895) or verifying that with_notify_always(true) wakes the polling driver while the default behavior still makes progress. Adding a regression test exercising a self-wake / signal-style wake on the runtime thread would help prevent deadlocks/regressions.
| } | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use super::WithNotifyAlways; | |
| use std::{ | |
| future::Future, | |
| pin::Pin, | |
| sync::{ | |
| Arc, | |
| atomic::{AtomicUsize, Ordering}, | |
| }, | |
| task::{Context, Poll, Wake, Waker}, | |
| }; | |
| struct CountingWaker { | |
| wakes: AtomicUsize, | |
| } | |
| impl CountingWaker { | |
| fn new() -> Arc<Self> { | |
| Arc::new(Self { | |
| wakes: AtomicUsize::new(0), | |
| }) | |
| } | |
| fn wake_count(&self) -> usize { | |
| self.wakes.load(Ordering::SeqCst) | |
| } | |
| } | |
| impl Wake for CountingWaker { | |
| fn wake(self: Arc<Self>) { | |
| self.wakes.fetch_add(1, Ordering::SeqCst); | |
| } | |
| fn wake_by_ref(self: &Arc<Self>) { | |
| self.wakes.fetch_add(1, Ordering::SeqCst); | |
| } | |
| } | |
| struct SelfWakeThenReady { | |
| woke_once: bool, | |
| } | |
| impl Future for SelfWakeThenReady { | |
| type Output = (); | |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
| if self.woke_once { | |
| Poll::Ready(()) | |
| } else { | |
| self.woke_once = true; | |
| cx.waker().wake_by_ref(); | |
| Poll::Pending | |
| } | |
| } | |
| } | |
| struct WakeAndReady; | |
| impl Future for WakeAndReady { | |
| type Output = (); | |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
| cx.waker().wake_by_ref(); | |
| Poll::Ready(()) | |
| } | |
| } | |
| #[test] | |
| fn default_mode_self_wake_still_makes_progress() { | |
| let counter = CountingWaker::new(); | |
| let waker = Waker::from(counter.clone()); | |
| let mut cx = Context::from_waker(&waker); | |
| let mut future = Box::pin(WithNotifyAlways::new( | |
| SelfWakeThenReady { woke_once: false }, | |
| false, | |
| )); | |
| assert!(matches!(future.as_mut().poll(&mut cx), Poll::Pending)); | |
| assert!(matches!(future.as_mut().poll(&mut cx), Poll::Ready(()))); | |
| } | |
| #[test] | |
| fn notify_always_true_forwards_same_thread_wake() { | |
| let counter = CountingWaker::new(); | |
| let waker = Waker::from(counter.clone()); | |
| let mut cx = Context::from_waker(&waker); | |
| let mut future = Box::pin(WithNotifyAlways::new(WakeAndReady, true)); | |
| assert!(matches!(future.as_mut().poll(&mut cx), Poll::Ready(()))); | |
| assert!( | |
| counter.wake_count() > 0, | |
| "notify_always=true should forward same-thread wakeups to the polling driver" | |
| ); | |
| } | |
| } |
|
You shouldn't need atomic when using ext, just create a new one from the existing with the updated always_wake flag, like personality. |
|
I tried, but it doesn't work as expected if I call a |
Not effective? or what |
future.with_notify_always(true).with_notify_always(false)Should it always notify or not? |
|
@Berrysoft Use the same behavior as your current one: if not set before, set it; otherwise discard new value. Result in false. |
|
This does make me think, later calls to future combinator should always take precedence, so the other combinators should also discard-if-set instead of overriding existing value. Later calls to future combinator means they're in the outter layer of the waker onion, hence polled earlier, and they get chance to set the value ealier. |
|
I'll choose a different approach later. |
Closes #895