From ca4bc057a9226a0901bfd37133b3deced47de13c Mon Sep 17 00:00:00 2001 From: Patrik Wenger Date: Fri, 22 May 2026 12:17:04 +0200 Subject: [PATCH] fix(driver): use Poller::notify() to wake poll backend from wakers polling's Events::is_empty() filters NOTIFY_KEY events, so a Poller::notify() wakeup produces an empty event set. Check the AwakeFlag's NOTIFIED bit after wait() returns to detect this case and return Ok(()) instead of ETIMEDOUT. Fixes compio-rs/compio#928 --- compio-driver/src/sys/driver/mod.rs | 5 ++- compio-driver/src/sys/driver/poll/mod.rs | 13 +++--- compio-runtime/Cargo.toml | 7 +++ compio-runtime/tests/waker.rs | 54 ++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 8 deletions(-) create mode 100644 compio-runtime/tests/waker.rs diff --git a/compio-driver/src/sys/driver/mod.rs b/compio-driver/src/sys/driver/mod.rs index 1a49becc..a138c605 100644 --- a/compio-driver/src/sys/driver/mod.rs +++ b/compio-driver/src/sys/driver/mod.rs @@ -43,8 +43,9 @@ impl AwakeFlag { /// Mark the driver as awake by overwriting the flag byte with `AWAKE`. /// This intentionally clears any previously set `NOTIFIED` flag. - pub fn set(&self) { - self.0.store(AWAKE, Ordering::Release); + /// Returns true if the `NOTIFIED` flag was set. + pub fn set(&self) -> bool { + (self.0.swap(AWAKE, Ordering::AcqRel) & NOTIFIED) != 0 } /// Reset the flags. Returns true if it was notified. diff --git a/compio-driver/src/sys/driver/poll/mod.rs b/compio-driver/src/sys/driver/poll/mod.rs index 7d05a7b5..8344f5aa 100644 --- a/compio-driver/src/sys/driver/poll/mod.rs +++ b/compio-driver/src/sys/driver/poll/mod.rs @@ -450,13 +450,13 @@ impl Driver { if !need_wait || has_completed { timeout = Some(Duration::ZERO); } - // We need to poll the poller first to make sure it handles the internal notify - // event (if any). self.events.clear(); self.notify.poll.wait(&mut self.events, timeout)?; - self.notify.set_awake(); + // polling's Events::is_empty() filters NOTIFY_KEY events, so a + // Poller::notify() wakeup looks like an empty return. + let was_notified = self.notify.set_awake(); if self.events.is_empty() { - if self.poll_completed() { + if was_notified | self.poll_completed() { return Ok(()); } if timeout_is_some { @@ -544,6 +544,7 @@ impl Entry { } /// A notify handle to the inner driver. +#[derive(Debug)] pub(crate) struct Notify { poll: Poller, awake: AwakeFlag, @@ -557,8 +558,8 @@ impl Notify { } } - fn set_awake(&self) { - self.awake.set(); + fn set_awake(&self) -> bool { + self.awake.set() } fn reset(&self) -> bool { diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index c875d199..2176e4b5 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -37,6 +37,9 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] } [target.'cfg(unix)'.dependencies] libc = { workspace = true } +[dev-dependencies] +flume = { workspace = true, features = ["async"] } + [target.'cfg(windows)'.dev-dependencies] windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] } @@ -66,3 +69,7 @@ name = "event" [[test]] name = "drop" required-features = ["time"] + +[[test]] +name = "waker" +required-features = ["time"] diff --git a/compio-runtime/tests/waker.rs b/compio-runtime/tests/waker.rs new file mode 100644 index 00000000..ff445b59 --- /dev/null +++ b/compio-runtime/tests/waker.rs @@ -0,0 +1,54 @@ +use std::time::{Duration, Instant}; + +use compio_runtime::ResumeUnwind; + +#[test] +fn cross_thread_waker_interrupts_poll() { + let rt = compio_runtime::Runtime::new().unwrap(); + rt.block_on(async { + let (tx, rx) = flume::bounded::(1); + + let handle = compio_runtime::spawn(async move { rx.recv_async().await.unwrap() }); + + std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + tx.send(42).unwrap(); + }); + + let start = Instant::now(); + let val = handle.await.resume_unwind().unwrap(); + let elapsed = start.elapsed(); + + assert_eq!(val, 42); + assert!( + elapsed < Duration::from_millis(200), + "took {elapsed:?}, expected < 200ms" + ); + }); +} + +#[test] +fn same_thread_waker_schedules_promptly() { + let rt = compio_runtime::Runtime::new().unwrap(); + rt.block_on(async { + let (tx, rx) = flume::bounded::(1); + + compio_runtime::spawn(async move { + let val = rx.recv_async().await.unwrap(); + assert_eq!(val, 42); + }) + .detach(); + + compio_runtime::time::sleep(Duration::from_millis(10)).await; + + let start = Instant::now(); + tx.send(42).unwrap(); + compio_runtime::time::sleep(Duration::from_millis(200)).await; + let elapsed = start.elapsed(); + + assert!( + elapsed < Duration::from_millis(400), + "took {elapsed:?}, expected < 400ms" + ); + }); +}