Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions compio-driver/src/sys/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ impl AwakeFlag {

/// Mark the driver as awake by overwriting the flag byte with `AWAKE`.
/// This intentionally clears any previously set `NOTIFIED` flag.
pub fn set(&self) {
self.0.store(AWAKE, Ordering::Release);
/// Returns true if the `NOTIFIED` flag was set.
pub fn set(&self) -> bool {
(self.0.swap(AWAKE, Ordering::AcqRel) & NOTIFIED) != 0
}

/// Reset the flags. Returns true if it was notified.
Expand Down
13 changes: 7 additions & 6 deletions compio-driver/src/sys/driver/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,13 @@ impl Driver {
if !need_wait || has_completed {
timeout = Some(Duration::ZERO);
}
// We need to poll the poller first to make sure it handles the internal notify
// event (if any).
self.events.clear();
self.notify.poll.wait(&mut self.events, timeout)?;
self.notify.set_awake();
// polling's Events::is_empty() filters NOTIFY_KEY events, so a
// Poller::notify() wakeup looks like an empty return.
let was_notified = self.notify.set_awake();
if self.events.is_empty() {
if self.poll_completed() {
if was_notified | self.poll_completed() {
return Ok(());
}
if timeout_is_some {
Expand Down Expand Up @@ -544,6 +544,7 @@ impl Entry {
}

/// A notify handle to the inner driver.
#[derive(Debug)]
pub(crate) struct Notify {
poll: Poller,
awake: AwakeFlag,
Expand All @@ -557,8 +558,8 @@ impl Notify {
}
}

fn set_awake(&self) {
self.awake.set();
fn set_awake(&self) -> bool {
self.awake.set()
}

fn reset(&self) -> bool {
Expand Down
7 changes: 7 additions & 0 deletions compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ windows-sys = { workspace = true, features = ["Win32_System_IO"] }
[target.'cfg(unix)'.dependencies]
libc = { workspace = true }

[dev-dependencies]
flume = { workspace = true, features = ["async"] }

[target.'cfg(windows)'.dev-dependencies]
windows-sys = { workspace = true, features = ["Win32_UI_WindowsAndMessaging"] }

Expand Down Expand Up @@ -66,3 +69,7 @@ name = "event"
[[test]]
name = "drop"
required-features = ["time"]

[[test]]
name = "waker"
required-features = ["time"]
54 changes: 54 additions & 0 deletions compio-runtime/tests/waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::time::{Duration, Instant};

use compio_runtime::ResumeUnwind;

#[test]
fn cross_thread_waker_interrupts_poll() {
let rt = compio_runtime::Runtime::new().unwrap();
rt.block_on(async {
let (tx, rx) = flume::bounded::<u32>(1);

let handle = compio_runtime::spawn(async move { rx.recv_async().await.unwrap() });

std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
tx.send(42).unwrap();
});

let start = Instant::now();
let val = handle.await.resume_unwind().unwrap();
let elapsed = start.elapsed();

assert_eq!(val, 42);
assert!(
elapsed < Duration::from_millis(200),
"took {elapsed:?}, expected < 200ms"
);
});
}

#[test]
fn same_thread_waker_schedules_promptly() {
let rt = compio_runtime::Runtime::new().unwrap();
rt.block_on(async {
let (tx, rx) = flume::bounded::<u32>(1);

compio_runtime::spawn(async move {
let val = rx.recv_async().await.unwrap();
assert_eq!(val, 42);
})
.detach();

compio_runtime::time::sleep(Duration::from_millis(10)).await;

let start = Instant::now();
tx.send(42).unwrap();
compio_runtime::time::sleep(Duration::from_millis(200)).await;
let elapsed = start.elapsed();

assert!(
elapsed < Duration::from_millis(400),
"took {elapsed:?}, expected < 400ms"
);
});
}
Loading