Skip to content

Commit 1063a02

Browse files
BerrysoftCopilot
andauthored
feat(driver,executor,runtime)!: remove notify-always (#902)
* feat(executor,runtime): remove notify-always * feat(driver,poll): store awake Co-authored-by: Copilot <copilot@github.com> * feat(driver,iocp): store awake * feat(driver,iour): store awake * fix(driver,iour): avoid empty loop * docs(executor): the waker is always used * fix(driver,iour): avoid changing timeout * fix(driver): use seq-cst for AwakeFlag * fix(driver): don't set awake when polling blocking * fix(driver,iour): only mask `awake` when polling * fix(driver,poll): reset `awake` before waiting * docs(executor,runtime): apply suggestions * fix(driver,poll): reset `awake` before return * fix(driver,poll): wait first * refactor(driver): make the flag more meaningful * fix(driver,iour): avoid timedout if don't wait * fix(driver,iocp): set awake if has entries * fix(driver): loosen the ordering --------- Co-authored-by: Copilot <copilot@github.com>
1 parent e7fd1ae commit 1063a02

13 files changed

Lines changed: 140 additions & 129 deletions

File tree

compio-driver/src/sys/driver/iocp/mod.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use windows_sys::Win32::{Foundation::ERROR_OPERATION_ABORTED, System::IO::OVERLA
99
use crate::{
1010
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder,
1111
control::Carrier,
12-
sys::{extra::IocpExtra, prelude::*},
12+
sys::{driver::AwakeFlag, extra::IocpExtra, prelude::*},
1313
};
1414

1515
mod cp;
@@ -186,14 +186,19 @@ impl Driver {
186186
entry.notify();
187187
has_entry = true;
188188
}
189+
if self.notify.reset() {
190+
has_entry = true;
191+
}
189192

190193
if !has_entry {
191194
for e in self.notify.port.poll(timeout)? {
192195
if let Some(e) = Self::create_entry(notify, &mut self.waits, e) {
196+
self.notify.set_awake();
193197
e.notify()
194198
}
195199
}
196200
}
201+
self.notify.set_awake();
197202

198203
Ok(())
199204
}
@@ -217,16 +222,24 @@ impl AsRawFd for Driver {
217222
pub(crate) struct Notify {
218223
port: cp::Port,
219224
overlapped: Overlapped,
225+
awake: AwakeFlag,
220226
}
221227

222228
impl Notify {
223229
fn new(port: cp::Port, overlapped: Overlapped) -> Self {
224-
Self { port, overlapped }
230+
Self {
231+
port,
232+
overlapped,
233+
awake: AwakeFlag::new(),
234+
}
225235
}
226236

227-
/// Notify the inner driver.
228-
pub fn notify(&self) -> io::Result<()> {
229-
self.port.post_raw(&self.overlapped)
237+
fn set_awake(&self) {
238+
self.awake.set();
239+
}
240+
241+
fn reset(&self) -> bool {
242+
self.awake.reset()
230243
}
231244
}
232245

@@ -236,6 +249,8 @@ impl Wake for Notify {
236249
}
237250

238251
fn wake_by_ref(self: &Arc<Self>) {
239-
self.notify().ok();
252+
if !self.awake.wake() {
253+
self.port.post_raw(&self.overlapped).ok();
254+
}
240255
}
241256
}

compio-driver/src/sys/driver/iour/mod.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,12 @@ impl Driver {
149149
}
150150

151151
// Auto means that it choose to wait or not automatically.
152-
fn submit_auto(&mut self, timeout: Option<Duration>) -> io::Result<()> {
152+
fn submit_auto(&mut self, timeout: Option<Duration>, need_wait: bool) -> io::Result<()> {
153153
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout);
154154

155155
// when taskrun is true, there are completed cqes wait to handle, no need to
156156
// block the submit
157-
let want_sqe = if self.inner.submission().taskrun() {
157+
let want_sqe = if !need_wait || self.inner.submission().taskrun() {
158158
0
159159
} else {
160160
1
@@ -173,7 +173,7 @@ impl Driver {
173173
trace!("submit result: {res:?}");
174174
match res {
175175
Ok(_) => {
176-
if self.inner.completion().is_empty() {
176+
if want_sqe > 0 && self.inner.completion().is_empty() {
177177
Err(io::ErrorKind::TimedOut.into())
178178
} else {
179179
Ok(())
@@ -197,11 +197,8 @@ impl Driver {
197197
}
198198

199199
fn poll_entries(&mut self) -> bool {
200-
let mut has_entry = self.poll_blocking();
201-
202-
let mut cqueue = self.inner.completion();
203-
cqueue.sync();
204-
has_entry |= !cqueue.is_empty();
200+
let cqueue = self.inner.completion();
201+
let has_entry = !cqueue.is_empty();
205202
for entry in cqueue {
206203
match entry.user_data() {
207204
Self::CANCEL => {}
@@ -210,7 +207,9 @@ impl Driver {
210207
if !more(flags) {
211208
self.need_push_notifier = true;
212209
}
213-
self.notifier.clear().expect("cannot clear notifier");
210+
if let Err(_e) = self.notifier.clear() {
211+
error!("failed to clear notifier: {_e}");
212+
}
214213
}
215214
key => {
216215
let flags = entry.flags();
@@ -282,8 +281,7 @@ impl Driver {
282281
}
283282
Err(_) => {
284283
drop(squeue);
285-
self.poll_entries();
286-
match self.submit_auto(Some(Duration::ZERO)) {
284+
match self.submit_auto(Some(Duration::ZERO), true) {
287285
Ok(()) => {}
288286
Err(e)
289287
if matches!(
@@ -292,6 +290,12 @@ impl Driver {
292290
) => {}
293291
Err(e) => return Err(e),
294292
}
293+
// If the CQEs are consumed here, we should make the driver aware of it. We
294+
// should not mask `awake` here, otherwise the driver may wait for the next
295+
// event indefinitely.
296+
//
297+
// Anyway it is not a hot path, so we can afford an extra `write` syscall here.
298+
self.poll_entries();
295299
}
296300
}
297301
}
@@ -349,14 +353,19 @@ impl Driver {
349353
closure = e.0;
350354
std::thread::yield_now();
351355
}
352-
self.poll_blocking();
353356
}
354357

355358
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
356359
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
357-
// Anyway we need to submit once, no matter if there are entries in squeue.
360+
361+
if self.poll_blocking() {
362+
return Ok(());
363+
}
364+
358365
trace!("start polling");
359366

367+
let need_wait = !self.notifier.reset();
368+
360369
if self.need_push_notifier {
361370
#[allow(clippy::useless_conversion)]
362371
self.push_raw(
@@ -369,10 +378,10 @@ impl Driver {
369378
self.need_push_notifier = false;
370379
}
371380

372-
if !self.poll_entries() {
373-
self.submit_auto(timeout)?;
374-
self.poll_entries();
375-
}
381+
self.submit_auto(timeout, need_wait)?;
382+
383+
self.notifier.set_awake();
384+
self.poll_entries();
376385

377386
Ok(())
378387
}

compio-driver/src/sys/driver/iour/notify.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use rustix::event::{EventfdFlags, eventfd};
22

33
use super::*;
4+
use crate::sys::driver::AwakeFlag;
45

56
#[derive(Debug)]
67
pub(super) struct Notifier {
@@ -29,6 +30,14 @@ impl Notifier {
2930
Ok(())
3031
}
3132

33+
pub fn set_awake(&self) {
34+
self.notify.set_awake();
35+
}
36+
37+
pub fn reset(&self) -> bool {
38+
self.notify.reset()
39+
}
40+
3241
pub fn waker(&self) -> Waker {
3342
Waker::from(self.notify.clone())
3443
}
@@ -50,18 +59,23 @@ impl AsRawFd for Notifier {
5059
#[derive(Debug)]
5160
pub(super) struct Notify {
5261
fd: OwnedFd,
62+
awake: AwakeFlag,
5363
}
5464

5565
impl Notify {
5666
pub fn new(fd: OwnedFd) -> Self {
57-
Self { fd }
67+
Self {
68+
fd,
69+
awake: AwakeFlag::new(),
70+
}
5871
}
5972

60-
/// Notify the inner driver.
61-
pub fn notify(&self) -> io::Result<()> {
62-
rustix::io::write(&self.fd, &u64::to_be_bytes(1))?;
73+
pub fn set_awake(&self) {
74+
self.awake.set();
75+
}
6376

64-
Ok(())
77+
pub fn reset(&self) -> bool {
78+
self.awake.reset()
6579
}
6680
}
6781

@@ -71,6 +85,8 @@ impl Wake for Notify {
7185
}
7286

7387
fn wake_by_ref(self: &Arc<Self>) {
74-
self.notify().ok();
88+
if !self.awake.wake() {
89+
rustix::io::write(&self.fd, &u64::to_be_bytes(1)).ok();
90+
}
7591
}
7692
}

compio-driver/src/sys/driver/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::atomic::{AtomicU8, Ordering};
2+
13
cfg_if::cfg_if! {
24
if #[cfg(windows)] {
35
mod iocp;
@@ -21,3 +23,34 @@ cfg_if::cfg_if! {
2123

2224
crate::assert_not_impl!(Driver, Send);
2325
crate::assert_not_impl!(Driver, Sync);
26+
27+
const IDLE: u8 = 0b00;
28+
const NOTIFIED: u8 = 0b01;
29+
const AWAKE: u8 = 0b10;
30+
31+
#[derive(Debug)]
32+
struct AwakeFlag(AtomicU8);
33+
34+
impl AwakeFlag {
35+
pub fn new() -> Self {
36+
Self(AtomicU8::new(IDLE))
37+
}
38+
39+
/// Set the awake flag. It is true before the driver sleeps, and false after
40+
/// it wakes up.
41+
pub fn set(&self) {
42+
self.0.fetch_or(AWAKE, Ordering::AcqRel);
43+
}
44+
45+
/// Reset the flags. Returns true if it was notified.
46+
pub fn reset(&self) -> bool {
47+
(self.0.swap(IDLE, Ordering::AcqRel) & NOTIFIED) != 0
48+
}
49+
50+
/// Set the notified flag. Returns true if the awake flag is set or the
51+
/// notified flag is set. If the awake flag is not set, the driver needs
52+
/// to be notified through a syscall.
53+
pub fn wake(&self) -> bool {
54+
self.0.fetch_or(NOTIFIED, Ordering::AcqRel) != 0
55+
}
56+
}

compio-driver/src/sys/driver/poll/mod.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
AsyncifyPool, Entry,
1616
key::BorrowedKey,
1717
panic::catch_unwind_io,
18-
sys::{extra::PollExtra, prelude::*},
18+
sys::{driver::AwakeFlag, extra::PollExtra, prelude::*},
1919
};
2020

2121
#[derive(Debug, Default)]
@@ -436,15 +436,28 @@ impl Driver {
436436
self.renew(fd, renew_event)
437437
}
438438

439-
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
439+
pub fn poll(&mut self, mut timeout: Option<Duration>) -> io::Result<()> {
440440
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
441-
if self.poll_completed() {
442-
return Ok(());
441+
let timeout_is_some = timeout.is_some();
442+
let has_completed = !self.completed_rx.is_empty();
443+
let need_wait = !self.notify.reset();
444+
if !need_wait || has_completed {
445+
timeout = Some(Duration::ZERO);
443446
}
447+
// We need to poll the poller first to make sure it handles the internal notify
448+
// event (if any).
444449
self.events.clear();
445450
self.notify.poll.wait(&mut self.events, timeout)?;
446-
if self.events.is_empty() && timeout.is_some() {
447-
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
451+
self.notify.set_awake();
452+
if self.events.is_empty() {
453+
if self.poll_completed() {
454+
return Ok(());
455+
}
456+
if timeout_is_some {
457+
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
458+
}
459+
} else if has_completed {
460+
self.poll_completed();
448461
}
449462
self.with_events(|this, events| {
450463
for event in events.iter() {
@@ -527,16 +540,23 @@ impl Entry {
527540
/// A notify handle to the inner driver.
528541
pub(crate) struct Notify {
529542
poll: Poller,
543+
awake: AwakeFlag,
530544
}
531545

532546
impl Notify {
533547
fn new(poll: Poller) -> Self {
534-
Self { poll }
548+
Self {
549+
poll,
550+
awake: AwakeFlag::new(),
551+
}
552+
}
553+
554+
fn set_awake(&self) {
555+
self.awake.set();
535556
}
536557

537-
/// Notify the inner driver.
538-
pub fn notify(&self) -> io::Result<()> {
539-
self.poll.notify()
558+
fn reset(&self) -> bool {
559+
self.awake.reset()
540560
}
541561
}
542562

@@ -546,6 +566,8 @@ impl Wake for Notify {
546566
}
547567

548568
fn wake_by_ref(self: &Arc<Self>) {
549-
self.notify().ok();
569+
if !self.awake.wake() {
570+
self.poll.notify().ok();
571+
}
550572
}
551573
}

compio-executor/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
3131
nix = { workspace = true, features = ["resource", "signal"] }
3232

3333
[features]
34-
notify-always = []
3534
enable_log = ["compio-log/enable_log"]
3635

3736
[lints.rust]

compio-executor/src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,10 @@ pub struct ExecutorConfig {
106106
/// The maximum number of hot tasks to run in each tick.
107107
pub max_interval: u32,
108108

109-
/// A waker to be waken when a task is scheduled from other thread.
109+
/// A waker to be woken when a task is scheduled.
110110
///
111111
/// This is useful for waking up drivers that switch to kernel state when
112112
/// idle.
113-
///
114-
/// Enable `notify-always` feature to wake this waker on every schedule,
115-
/// even if the executor is already awake.
116113
pub waker: Option<Waker>,
117114
}
118115

compio-executor/src/task/local.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ impl<'a> Local<'a> {
6060

6161
queue.make_hot(self.header().id);
6262

63-
if cfg!(feature = "notify-always")
64-
&& let Some(ref waker) = shared.waker
65-
{
63+
if let Some(ref waker) = shared.waker {
6664
waker.wake_by_ref()
6765
}
6866
}

0 commit comments

Comments
 (0)