Skip to content

Commit ca4bc05

Browse files
committed
fix(driver): use Poller::notify() to wake poll backend from wakers
polling's Events::is_empty() filters NOTIFY_KEY events, so a Poller::notify() wakeup produces an empty event set. Check the AwakeFlag's NOTIFIED bit after wait() returns to detect this case and return Ok(()) instead of ETIMEDOUT. Fixes #928
1 parent cb8f77e commit ca4bc05

4 files changed

Lines changed: 71 additions & 8 deletions

File tree

compio-driver/src/sys/driver/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ impl AwakeFlag {
4343

4444
/// Mark the driver as awake by overwriting the flag byte with `AWAKE`.
4545
/// This intentionally clears any previously set `NOTIFIED` flag.
46-
pub fn set(&self) {
47-
self.0.store(AWAKE, Ordering::Release);
46+
/// Returns true if the `NOTIFIED` flag was set.
47+
pub fn set(&self) -> bool {
48+
(self.0.swap(AWAKE, Ordering::AcqRel) & NOTIFIED) != 0
4849
}
4950

5051
/// Reset the flags. Returns true if it was notified.

compio-driver/src/sys/driver/poll/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -450,13 +450,13 @@ impl Driver {
450450
if !need_wait || has_completed {
451451
timeout = Some(Duration::ZERO);
452452
}
453-
// We need to poll the poller first to make sure it handles the internal notify
454-
// event (if any).
455453
self.events.clear();
456454
self.notify.poll.wait(&mut self.events, timeout)?;
457-
self.notify.set_awake();
455+
// polling's Events::is_empty() filters NOTIFY_KEY events, so a
456+
// Poller::notify() wakeup looks like an empty return.
457+
let was_notified = self.notify.set_awake();
458458
if self.events.is_empty() {
459-
if self.poll_completed() {
459+
if was_notified | self.poll_completed() {
460460
return Ok(());
461461
}
462462
if timeout_is_some {
@@ -544,6 +544,7 @@ impl Entry {
544544
}
545545

546546
/// A notify handle to the inner driver.
547+
#[derive(Debug)]
547548
pub(crate) struct Notify {
548549
poll: Poller,
549550
awake: AwakeFlag,
@@ -557,8 +558,8 @@ impl Notify {
557558
}
558559
}
559560

560-
fn set_awake(&self) {
561-
self.awake.set();
561+
fn set_awake(&self) -> bool {
562+
self.awake.set()
562563
}
563564

564565
fn reset(&self) -> bool {

compio-runtime/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] }
3737
[target.'cfg(unix)'.dependencies]
3838
libc = { workspace = true }
3939

40+
[dev-dependencies]
41+
flume = { workspace = true, features = ["async"] }
42+
4043
[target.'cfg(windows)'.dev-dependencies]
4144
windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] }
4245

@@ -66,3 +69,7 @@ name = "event"
6669
[[test]]
6770
name = "drop"
6871
required-features = ["time"]
72+
73+
[[test]]
74+
name = "waker"
75+
required-features = ["time"]

compio-runtime/tests/waker.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::time::{Duration, Instant};
2+
3+
use compio_runtime::ResumeUnwind;
4+
5+
#[test]
6+
fn cross_thread_waker_interrupts_poll() {
7+
let rt = compio_runtime::Runtime::new().unwrap();
8+
rt.block_on(async {
9+
let (tx, rx) = flume::bounded::<u32>(1);
10+
11+
let handle = compio_runtime::spawn(async move { rx.recv_async().await.unwrap() });
12+
13+
std::thread::spawn(move || {
14+
std::thread::sleep(Duration::from_millis(10));
15+
tx.send(42).unwrap();
16+
});
17+
18+
let start = Instant::now();
19+
let val = handle.await.resume_unwind().unwrap();
20+
let elapsed = start.elapsed();
21+
22+
assert_eq!(val, 42);
23+
assert!(
24+
elapsed < Duration::from_millis(200),
25+
"took {elapsed:?}, expected < 200ms"
26+
);
27+
});
28+
}
29+
30+
#[test]
31+
fn same_thread_waker_schedules_promptly() {
32+
let rt = compio_runtime::Runtime::new().unwrap();
33+
rt.block_on(async {
34+
let (tx, rx) = flume::bounded::<u32>(1);
35+
36+
compio_runtime::spawn(async move {
37+
let val = rx.recv_async().await.unwrap();
38+
assert_eq!(val, 42);
39+
})
40+
.detach();
41+
42+
compio_runtime::time::sleep(Duration::from_millis(10)).await;
43+
44+
let start = Instant::now();
45+
tx.send(42).unwrap();
46+
compio_runtime::time::sleep(Duration::from_millis(200)).await;
47+
let elapsed = start.elapsed();
48+
49+
assert!(
50+
elapsed < Duration::from_millis(400),
51+
"took {elapsed:?}, expected < 400ms"
52+
);
53+
});
54+
}

0 commit comments

Comments
 (0)