From a1b0ae1a34a16df46062d04d9ccd052ceda297be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Mon, 4 May 2026 14:05:01 +0100 Subject: [PATCH 1/7] [runtime/iouring/waker] add loom model --- runtime/src/iouring/waker.rs | 1136 +++++++++++++++++++++++++++++++++- 1 file changed, 1121 insertions(+), 15 deletions(-) diff --git a/runtime/src/iouring/waker.rs b/runtime/src/iouring/waker.rs index 0b516ba1bd2..b5b4995dd96 100644 --- a/runtime/src/iouring/waker.rs +++ b/runtime/src/iouring/waker.rs @@ -19,9 +19,38 @@ //! This keeps the arm-and-recheck handshake lock-free, enables futex sleep when //! the loop is truly idle, and avoids repeated wake writes while a wake is //! already pending. +//! +//! ## Loom Model +//! +//! The `loom` feature keeps the same packed state machine, but replaces the +//! kernel wake surfaces with loom-visible userspace models. The futex path uses +//! a mutex and condition variable to preserve the atomic compare-and-park +//! property of `FUTEX_WAIT`. The eventfd path uses a durable readiness counter +//! plus a condition variable to model both persistent wake readiness and a +//! blocked `submit_and_wait` returning after a wake CQE. +//! +//! The loom tests exercise the producer/loop protocol around that state word: +//! publishes must advance the submitted sequence exactly once, armed waits must +//! not lose concurrent publishes or out-of-band wakes, repeated wake attempts +//! within one epoch must coalesce, sticky unarmed wakes must be consumed by the +//! next arm cycle, and the `Release`/`Acquire` edges must make producer-side +//! state visible after the loop observes progress or clears a wait epoch. +//! +//! The model intentionally stops at this userspace protocol boundary. It does +//! not validate kernel CQE ordering, `io_uring_enter`, wake-poll rearming, or +//! syscall error handling; those are covered by the normal real-syscall tests. use super::UserData; -use io_uring::{opcode::PollAdd, squeue::SubmissionQueue, types::Fd}; +use io_uring::squeue::SubmissionQueue; +#[cfg(not(feature = "loom"))] +use io_uring::{opcode::PollAdd, types::Fd}; +#[cfg(feature = "loom")] +use loom::sync::{ + atomic::{AtomicU32, AtomicU64, Ordering}, + Arc, Condvar, Mutex, +}; +use std::time::{Duration, Instant}; +#[cfg(not(feature = "loom"))] use std::{ mem::size_of, os::fd::{AsRawFd, FromRawFd, OwnedFd}, @@ -29,8 +58,8 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, }, - time::{Duration, Instant}, }; +#[cfg(not(feature = "loom"))] use tracing::warn; /// Reserved `user_data` value for internal wake poll completions. @@ -113,11 +142,38 @@ impl Drop for ArmGuard<'_> { /// /// This makes submissions racing with the sleep transition observable either by /// sequence mismatch in the loop or by a futex/eventfd wakeup. +#[cfg(not(feature = "loom"))] struct WakerInner { + /// Non-blocking eventfd monitored by the loop's multishot wake poll. wake_fd: OwnedFd, + /// Packed wait-target, wake-latch, and submitted-sequence state. state: AtomicU32, } +/// Loom-only model of the waker state. +/// +/// Loom cannot observe real futexes, eventfds, or io_uring CQEs, so this +/// variant keeps the same packed atomic state as the production waker and +/// replaces the kernel wake surfaces with userspace condvar models. The goal is +/// to model the producer/loop atomic protocol closely enough for loom to +/// explore memory orderings and wake races. It is not a model of kernel CQE +/// ordering, `io_uring_enter`, or wake-poll rearm behavior. +#[cfg(feature = "loom")] +struct WakerInner { + /// Packed wait-target, wake-latch, and submitted-sequence state. + state: AtomicU32, + /// Mutex standing in for the kernel futex bucket lock. + futex_bucket: Mutex<()>, + /// Condvar standing in for the fully-idle futex wait queue. + futex_waiters: Condvar, + /// Durable eventfd readiness counter observed by the modeled eventfd wait. + eventfd_counter: AtomicU64, + /// Mutex pairing eventfd readiness checks with condvar parking. + eventfd_readiness: Mutex<()>, + /// Condvar standing in for `submit_and_wait` waking on a wake CQE. + eventfd_waiters: Condvar, +} + /// Internal hybrid futex/eventfd wake source for the io_uring loop. /// /// - Publish submissions from producers via [`Waker::publish`] @@ -143,6 +199,7 @@ pub struct Waker { impl Waker { /// Create a hybrid futex/eventfd wake source backed by a non-blocking /// `eventfd`. + #[cfg(not(feature = "loom"))] pub fn new() -> Result { // SAFETY: `eventfd` is called with valid flags and no aliasing pointers. let fd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) }; @@ -160,6 +217,25 @@ impl Waker { }) } + /// Create the loom model of the hybrid wake source. + /// + /// This keeps the same packed atomic state as production, but replaces the + /// eventfd and futex kernel objects with loom-visible counters and + /// condition variables. + #[cfg(feature = "loom")] + pub fn new() -> Result { + Ok(Self { + inner: Arc::new(WakerInner { + state: AtomicU32::new(0), + futex_bucket: Mutex::new(()), + futex_waiters: Condvar::new(), + eventfd_counter: AtomicU64::new(0), + eventfd_readiness: Mutex::new(()), + eventfd_waiters: Condvar::new(), + }), + }) + } + /// Latch one pending wake and, if a target is currently armed, wake it. /// /// The first caller to set `WAKE_SIGNALLED_BIT` in an epoch performs the @@ -328,6 +404,7 @@ impl Waker { /// /// Retries on `EINTR`. Treats `EAGAIN` as "nothing to drain". Without /// `EFD_SEMAPHORE`, one successful read drains the full counter to zero. + #[cfg(not(feature = "loom"))] pub fn acknowledge(&self) { let mut value: u64 = 0; loop { @@ -363,6 +440,16 @@ impl Waker { } } + /// Model an eventfd read that drains all pending readiness. + /// + /// Production eventfd reads without `EFD_SEMAPHORE` return the current + /// counter and reset it to zero atomically. The loom model uses one atomic + /// swap to preserve that contract for wake-coalescing tests. + #[cfg(feature = "loom")] + pub fn acknowledge(&self) { + self.inner.eventfd_counter.swap(0, Ordering::AcqRel); + } + /// Install the internal `eventfd` multishot poll request into the SQ. /// /// This uses multishot poll and is called on startup and whenever a wake @@ -370,6 +457,7 @@ impl Waker { /// /// Returns `false` if the local SQ is already full and the rearm must be /// retried in a later staging pass. + #[cfg(not(feature = "loom"))] pub fn reinstall(&self, submission_queue: &mut SubmissionQueue<'_>) -> bool { if submission_queue.is_full() { return false; @@ -390,6 +478,16 @@ impl Waker { true } + /// Model wake-poll reinstall as a successful no-op. + /// + /// The loom tests in this module do not model the `io_uring` submission + /// queue or wake-poll rearm state. Keeping this method present lets the + /// crate compile with `iouring,loom` while keeping that boundary explicit. + #[cfg(feature = "loom")] + pub const fn reinstall(&self, _submission_queue: &mut SubmissionQueue<'_>) -> bool { + true + } + /// Clear the current wait epoch after we resume running. /// /// Keeping wait bits clear while actively running avoids redundant futex @@ -410,6 +508,7 @@ impl Waker { /// This writes to the internal `eventfd` monitored by the ring's multishot /// poll request. The resulting wake CQE causes the loop to leave its /// eventfd-backed blocking section and resume in userspace. + #[cfg(not(feature = "loom"))] fn eventfd_wake(&self) { let value: u64 = 1; loop { @@ -444,10 +543,23 @@ impl Waker { } } + /// Model an eventfd write plus wake-CQE delivery. + /// + /// Incrementing `eventfd_counter` preserves the durable readiness bit of a + /// real eventfd, while notifying `eventfd_waiters` stands in for + /// `submit_and_wait` returning after the wake CQE becomes available. + #[cfg(feature = "loom")] + fn eventfd_wake(&self) { + self.inner.eventfd_counter.fetch_add(1, Ordering::Release); + let _guard = self.inner.eventfd_readiness.lock().unwrap(); + self.inner.eventfd_waiters.notify_one(); + } + /// Wake one thread sleeping on the fully-idle futex path. /// /// This is used only when the loop has no active ring waiters and is /// blocked in [`Waker::futex_wait`] on the packed wake-state word. + #[cfg(not(feature = "loom"))] fn futex_wake(&self) { loop { // SAFETY: `state` is a valid aligned futex word for the duration of @@ -484,6 +596,16 @@ impl Waker { } } + /// Model `FUTEX_WAKE` for the fully-idle path. + /// + /// Taking `futex_bucket` before notifying mirrors the serialization the + /// kernel futex bucket provides between compare-and-park and wake. + #[cfg(feature = "loom")] + fn futex_wake(&self) { + let _guard = self.inner.futex_bucket.lock().unwrap(); + self.inner.futex_waiters.notify_one(); + } + /// Sleep on the packed wake-state word for the fully-idle path. /// /// The caller must pass the exact post-arm snapshot from the same atomic @@ -497,6 +619,7 @@ impl Waker { /// Returns `true` only if the kernel actually blocked the thread and later /// resumed it. Returns `false` for stale-snapshot races, userspace /// equality mismatches, and unexpected futex wait failures. + #[cfg(not(feature = "loom"))] fn futex_wait(&self, snapshot: u32) -> bool { loop { // This is only a same-word equality check before entering the @@ -535,16 +658,32 @@ impl Waker { } } } + + /// Model `FUTEX_WAIT` for the fully-idle path. + /// + /// The condition variable wait keeps the compare and park under + /// `futex_bucket`, so loom can explore the same lost-wake boundary that the + /// kernel's atomic futex wait protects in production. + #[cfg(feature = "loom")] + fn futex_wait(&self, snapshot: u32) -> bool { + let mut guard = self.inner.futex_bucket.lock().unwrap(); + let mut slept = false; + while self.inner.state.load(Ordering::Acquire) == snapshot { + slept = true; + guard = self.inner.futex_waiters.wait(guard).unwrap(); + } + slept + } } #[cfg(test)] pub mod tests { use super::*; use io_uring::IoUring; + #[cfg(not(feature = "loom"))] use std::{ mem::size_of, os::fd::{AsRawFd, FromRawFd}, - sync::Arc, }; pub fn wait_until_futex_armed(waker: &Waker) { @@ -568,18 +707,26 @@ pub mod tests { } fn read_eventfd_count(waker: &Waker) -> u64 { - let mut value = 0u64; - // SAFETY: `wake_fd` is a valid eventfd descriptor and `value` points - // to writable 8-byte storage for the duration of the call. - let ret = unsafe { - libc::read( - waker.inner.wake_fd.as_raw_fd(), - &mut value as *mut u64 as *mut libc::c_void, - size_of::(), - ) - }; - assert_eq!(ret, size_of::() as isize); - value + #[cfg(not(feature = "loom"))] + { + let mut value = 0u64; + // SAFETY: `wake_fd` is a valid eventfd descriptor and `value` points + // to writable 8-byte storage for the duration of the call. + let ret = unsafe { + libc::read( + waker.inner.wake_fd.as_raw_fd(), + &mut value as *mut u64 as *mut libc::c_void, + size_of::(), + ) + }; + assert_eq!(ret, size_of::() as isize); + value + } + + #[cfg(feature = "loom")] + { + waker.inner.eventfd_counter.load(Ordering::Relaxed) + } } #[test] @@ -898,6 +1045,7 @@ pub mod tests { assert_eq!(sq.len(), before); } + #[cfg(not(feature = "loom"))] #[test] fn test_eventfd_wake_and_acknowledge_error_branches() { // Verify the explicit EAGAIN and generic error branches leave the @@ -947,3 +1095,961 @@ pub mod tests { assert_eq!(submitted_seq(&waker), before); } } + +#[cfg(all(test, feature = "loom"))] +mod loom_tests { + use super::*; + use loom::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + thread, + }; + + // This module uses loom to model the waker's producer/loop protocol over + // the packed atomic state word. The model keeps the production sequence and + // wait-bit state machine, but replaces futex and eventfd kernel surfaces + // with loom-visible condvars and counters. The tests keep schedules small + // while exercising the important races: publish versus arm-and-recheck, + // futex idle parking, eventfd wake coalescing, sticky out-of-band wakes, + // sequence wraparound, and the Release/Acquire edges that make producer + // state visible after `pending()` or `clear_wait()`. + + fn waker() -> Waker { + Waker::new().unwrap() + } + + // Return only the low wake-state bits from the modeled packed state. + fn state_bits(waker: &Waker) -> u32 { + waker.inner.state.load(Ordering::Relaxed) & STATE_MASK + } + + // Return the submitted sequence from the modeled packed state. + fn submitted_seq(waker: &Waker) -> u32 { + (waker.inner.state.load(Ordering::Relaxed) >> STATE_BITS) & SUBMISSION_SEQ_MASK + } + + // Return the modeled eventfd readiness counter. + fn eventfd_count(waker: &Waker) -> u64 { + waker.inner.eventfd_counter.load(Ordering::Relaxed) + } + + fn advance_seq(seq: u32) -> u32 { + seq.wrapping_add(1) & SUBMISSION_SEQ_MASK + } + + // Model the eventfd-backed blocking section used by `submit_and_wait`. + fn wait_for_eventfd_signal(waker: &Waker) { + let mut guard = waker.inner.eventfd_readiness.lock().unwrap(); + while waker.inner.eventfd_counter.load(Ordering::Acquire) == 0 { + guard = waker.inner.eventfd_waiters.wait(guard).unwrap(); + } + } + + // Clear a wake bit that may be left behind by a raced publisher. This is a + // cleanup helper: tests that care about wake coalescing should assert the + // modeled eventfd counter before calling it. + fn clear_sticky_wake(waker: &Waker) { + assert_eq!(state_bits(waker) & WAITING_MASK, 0); + if (state_bits(waker) & WAKE_SIGNALLED_BIT) != 0 { + let guard = waker.arm(submitted_seq(waker)); + assert!(guard.wake_latched()); + drop(guard); + } + // A raced publisher can queue eventfd readiness after the loop has + // already observed its sequence bump. Finish that modeled wake CQE so + // cleanup assertions do not confuse it with the unarmed-wake cases. + waker.acknowledge(); + assert_eq!(state_bits(waker), 0); + assert_eq!(eventfd_count(waker), 0); + } + + // Drain with the same eventfd arm-and-recheck shape used before + // `submit_and_wait`: poll `pending()`, arm, block only if the post-arm + // snapshot is still idle, then clear the arm and acknowledge readiness. + // + // The final `acknowledge()` is model cleanup for any wake CQE readiness + // produced during the brief arm window. It may be a no-op when the loop did + // not actually block, so tests that validate exact wake counts assert the + // counter directly instead of relying on this helper. + fn drain_with_eventfd_until(waker: &Waker, mut processed: u32, target: u32) -> u32 { + while processed != target { + if waker.pending(processed) { + processed = advance_seq(processed); + continue; + } + + let guard = waker.arm(processed); + if guard.still_idle() { + wait_for_eventfd_signal(waker); + assert!( + eventfd_count(waker) > 0, + "blocking eventfd wait must observe queued readiness before cleanup", + ); + } + drop(guard); + waker.acknowledge(); + } + processed + } + + // Drain with the fully-idle futex path instead of the eventfd path. + fn drain_with_futex_until(waker: &Waker, mut processed: u32, target: u32) -> u32 { + while processed != target { + if waker.pending(processed) { + processed = advance_seq(processed); + continue; + } + let _ = waker.park_idle(processed); + } + processed + } + + #[test] + fn publish_pending_pairing() { + // `publish` must make the producer's earlier enqueue-side write visible + // to a loop that observes the published sequence through `pending()`. + // The loop deliberately spins on `pending()` before joining the producer + // so the only intended synchronization is publish Release to pending + // Acquire. + loom::model(|| { + let waker = waker(); + let aux = Arc::new(AtomicU32::new(0)); + + let producer = thread::spawn({ + let waker = waker.clone(); + let aux = aux.clone(); + move || { + aux.store(42, Ordering::Relaxed); + waker.publish(); + } + }); + + while !waker.pending(0) { + thread::yield_now(); + } + + assert_eq!(aux.load(Ordering::Relaxed), 42); + producer.join().unwrap(); + assert_eq!(submitted_seq(&waker), 1); + }); + } + + #[test] + fn wake_clear_wait_pairing() { + // `wake` is used by out-of-band callers such as final-handle drop. It + // must publish the caller's earlier state change to the loop even though + // it does not advance the submitted sequence. + // + // The loop waits for the wake bit before joining the notifier, arms + // against the current sequence, and drops the guard so `clear_wait()`'s + // Acquire can pair with `wake()`'s Release. + loom::model(|| { + let waker = waker(); + let aux = Arc::new(AtomicU32::new(0)); + + let notifier = thread::spawn({ + let waker = waker.clone(); + let aux = aux.clone(); + move || { + aux.store(42, Ordering::Relaxed); + waker.wake(); + } + }); + + while waker.inner.state.load(Ordering::Relaxed) & WAKE_SIGNALLED_BIT == 0 { + thread::yield_now(); + } + + assert_eq!(eventfd_count(&waker), 0); + let guard = waker.arm(0); + assert!(guard.wake_latched()); + drop(guard); + + assert_eq!(aux.load(Ordering::Relaxed), 42); + assert_eq!(eventfd_count(&waker), 0); + notifier.join().unwrap(); + }); + } + + #[test] + fn unarmed_sticky_wakes_coalesce_and_rearm_across_epochs() { + // Out-of-band wakes that arrive before the loop arms must stick around + // and force the next arm cycle to skip blocking. Repeating the pattern + // across epochs verifies guard drop clears the latch enough for later + // unarmed wakes to be observed independently. + loom::model(|| { + let waker = waker(); + + // One unarmed wake latches the bit without queuing eventfd readiness. + waker.wake(); + assert_eq!(eventfd_count(&waker), 0); + let guard = waker.arm(0); + assert!(!guard.still_idle()); + assert!(guard.wake_latched()); + drop(guard); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + + // Concurrent unarmed wakes coalesce to the same single sticky bit. + let a = thread::spawn({ + let waker = waker.clone(); + move || waker.wake() + }); + let b = thread::spawn({ + let waker = waker.clone(); + move || waker.wake() + }); + + a.join().unwrap(); + b.join().unwrap(); + + assert_eq!(eventfd_count(&waker), 0); + let guard = waker.arm(0); + assert!(!guard.still_idle()); + assert!(guard.wake_latched()); + drop(guard); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + + // A publish followed by an explicit unarmed wake advances the + // sequence once and leaves the wake sticky for the caught-up epoch. + waker.publish(); + waker.wake(); + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(eventfd_count(&waker), 0); + let guard = waker.arm(1); + assert!(!guard.still_idle()); + assert!(guard.wake_latched()); + drop(guard); + + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn arm_distinguishes_sequence_progress_from_wake() { + // A published-ahead sequence must stop arming from looking idle, but it + // is not the same as a latched wake. This keeps `wake_latched()` useful + // for out-of-band wake decisions such as shutdown. + loom::model(|| { + let waker = waker(); + + waker.publish(); + let guard = waker.arm(0); + assert!(!guard.still_idle()); + assert!(!guard.wake_latched()); + drop(guard); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn pending_modular_boundaries() { + // `pending()` uses half-range modular arithmetic to distinguish + // published-ahead sequences from equal sequences and from sequences the + // loop has already processed. The exact half-domain boundary is + // intentionally not considered pending because direction is ambiguous. + loom::model(|| { + let waker = waker(); + + let set_submitted = |seq| { + waker + .inner + .state + .store(seq << STATE_BITS, Ordering::Relaxed); + }; + + set_submitted(0); + assert!(!waker.pending(0)); + + set_submitted(1); + assert!(waker.pending(0)); + + set_submitted(HALF_SUBMISSION_SEQUENCE_DOMAIN - 1); + assert!(waker.pending(0)); + + set_submitted(HALF_SUBMISSION_SEQUENCE_DOMAIN); + assert!(!waker.pending(0)); + + set_submitted(HALF_SUBMISSION_SEQUENCE_DOMAIN + 1); + assert!(!waker.pending(0)); + + set_submitted(0); + assert!(!waker.pending(1)); + + set_submitted(SUBMISSION_SEQ_MASK); + assert!(!waker.pending(0)); + + set_submitted(0); + assert!(waker.pending(SUBMISSION_SEQ_MASK)); + }); + } + + #[test] + fn park_idle_skips_pre_latched_wake() { + // If a wake is already latched before the fully-idle path arms, the + // post-arm recheck must skip sleeping and clear the sticky wake. + loom::model(|| { + let waker = waker(); + + waker.wake(); + assert!(waker.park_idle(0).is_none()); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn park_idle_skips_pre_published_sequence() { + // If a sequence is already published before the fully-idle path arms, + // the post-arm recheck must skip sleeping without manufacturing a wake. + loom::model(|| { + let waker = waker(); + + waker.publish(); + assert!(waker.park_idle(0).is_none()); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn arm_and_recheck_eventfd_race() { + // A publish racing with the eventfd-backed arm path must be visible + // either in the post-arm sequence snapshot or through a modeled eventfd + // wake. After the blocking section exits, guard drop clears wait state + // and `acknowledge` drains the eventfd counter. + loom::model(|| { + let waker = waker(); + let producer = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + + let guard = waker.arm(0); + if guard.still_idle() { + wait_for_eventfd_signal(&waker); + } + + drop(guard); + waker.acknowledge(); + producer.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn publish_clear_wait_pairing_when_armed() { + // When a producer publishes into an armed eventfd epoch, the loop can + // resume without first observing `pending()`. `clear_wait()` must still + // acquire the producer's enqueue-side writes before the loop checks the + // queue after waking. + loom::model(|| { + let waker = waker(); + let aux = Arc::new(AtomicU32::new(0)); + let guard = waker.arm(0); + assert!(guard.still_idle()); + + let producer = thread::spawn({ + let waker = waker.clone(); + let aux = aux.clone(); + move || { + aux.store(42, Ordering::Relaxed); + waker.publish(); + } + }); + + while waker.inner.state.load(Ordering::Relaxed) & WAKE_SIGNALLED_BIT == 0 { + thread::yield_now(); + } + + drop(guard); + assert_eq!(aux.load(Ordering::Relaxed), 42); + producer.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 1); + waker.acknowledge(); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn wake_clear_wait_pairing_when_armed() { + // When an out-of-band wake lands in an armed eventfd epoch, the loop + // resumes without any sequence progress. `clear_wait()` must still + // acquire the notifier's earlier state change before the loop checks + // for disconnect or shutdown state after waking. + loom::model(|| { + let waker = waker(); + let aux = Arc::new(AtomicU32::new(0)); + let guard = waker.arm(0); + assert!(guard.still_idle()); + + let notifier = thread::spawn({ + let waker = waker.clone(); + let aux = aux.clone(); + move || { + aux.store(42, Ordering::Relaxed); + waker.wake(); + } + }); + + while waker.inner.state.load(Ordering::Relaxed) & WAKE_SIGNALLED_BIT == 0 { + thread::yield_now(); + } + + drop(guard); + assert_eq!(aux.load(Ordering::Relaxed), 42); + notifier.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 1); + waker.acknowledge(); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn arm_and_recheck_futex_race() { + // The fully-idle path arms a futex wait target on the same state word + // that producers update. A racing publish must either change the + // post-arm snapshot before sleep or wake the modeled futex waiter. + loom::model(|| { + let waker = waker(); + let producer = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + + let _ = waker.park_idle(0); + producer.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 1); + clear_sticky_wake(&waker); + }); + } + + #[test] + fn publishers_dedup_eventfd_wake() { + // Two publishers in one armed eventfd epoch must both advance the + // submitted sequence, but only the first wake claimant should increment + // the modeled eventfd counter. + loom::model(|| { + let waker = waker(); + let guard = waker.arm(0); + assert!(guard.still_idle()); + + let a = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + let b = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + + a.join().unwrap(); + b.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 2); + assert_eq!(eventfd_count(&waker), 1); + + drop(guard); + waker.acknowledge(); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn mixed_publish_and_wake_dedup() { + // A publish and an out-of-band wake racing in the same armed eventfd + // epoch should coalesce to one eventfd signal while still preserving + // the publish's sequence increment. + loom::model(|| { + let waker = waker(); + let guard = waker.arm(0); + assert!(guard.still_idle()); + + let publisher = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + let notifier = thread::spawn({ + let waker = waker.clone(); + move || waker.wake() + }); + + publisher.join().unwrap(); + notifier.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(eventfd_count(&waker), 1); + + drop(guard); + waker.acknowledge(); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn mixed_publish_and_wake_futex_arm() { + // A publish and an out-of-band wake racing in the same futex-armed + // epoch should coalesce through the shared wake latch while preserving + // the publish's sequence increment. Unlike the eventfd path, there is + // no durable counter to inspect, so this splits `park_idle()` at the + // arm point and verifies the stale futex snapshot is rejected after the + // state changes. + loom::model(|| { + let waker = waker(); + let prev = waker + .inner + .state + .fetch_or(WAITING_ON_FUTEX_BIT, Ordering::Relaxed); + assert_eq!(prev & WAITING_MASK, 0); + let snapshot = prev | WAITING_ON_FUTEX_BIT; + + let publisher = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + let notifier = thread::spawn({ + let waker = waker.clone(); + move || waker.wake() + }); + + publisher.join().unwrap(); + notifier.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(eventfd_count(&waker), 0); + assert_eq!( + state_bits(&waker), + WAITING_ON_FUTEX_BIT | WAKE_SIGNALLED_BIT + ); + assert!(!waker.futex_wait(snapshot)); + waker.clear_wait(); + assert_eq!(state_bits(&waker), 0); + }); + } + + #[test] + fn drop_wake() { + // An out-of-band wake racing with the eventfd arm path must wake the + // loop without advancing the submitted sequence. If it arrives before + // arming, `wake_latched` skips the wait; otherwise the modeled eventfd + // signal releases the loop. + loom::model(|| { + let waker = waker(); + let notifier = thread::spawn({ + let waker = waker.clone(); + move || waker.wake() + }); + + let guard = waker.arm(0); + if guard.still_idle() { + wait_for_eventfd_signal(&waker); + } + + drop(guard); + waker.acknowledge(); + notifier.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn sequence_wraparound() { + // Preload the sequence to the last representable value, then publish + // twice so the visible sequence wraps through zero to one. The + // half-range modular `pending()` check must remain directional across + // that boundary. + loom::model(|| { + let waker = waker(); + waker + .inner + .state + .store(SUBMISSION_SEQ_MASK << STATE_BITS, Ordering::Relaxed); + + let producer = thread::spawn({ + let waker = waker.clone(); + move || { + waker.publish(); + waker.publish(); + } + }); + + assert_eq!(drain_with_eventfd_until(&waker, SUBMISSION_SEQ_MASK, 1), 1); + producer.join().unwrap(); + assert_eq!(submitted_seq(&waker), 1); + clear_sticky_wake(&waker); + }); + } + + #[test] + fn two_producers_mixed_ops() { + // Producer-only mixed publish/wake programs should preserve submitted + // sequence conservation and must not queue eventfd readiness while the + // loop is unarmed. A sticky wake bit may remain for the next arm cycle. + loom::model(|| { + let waker = waker(); + let publishes = Arc::new(AtomicU32::new(0)); + + let a = thread::spawn({ + let waker = waker.clone(); + let publishes = publishes.clone(); + move || { + waker.publish(); + publishes.fetch_add(1, Ordering::Relaxed); + waker.wake(); + waker.publish(); + publishes.fetch_add(1, Ordering::Relaxed); + } + }); + + let b = thread::spawn({ + let waker = waker.clone(); + let publishes = publishes.clone(); + move || { + waker.wake(); + waker.publish(); + publishes.fetch_add(1, Ordering::Relaxed); + } + }); + + a.join().unwrap(); + b.join().unwrap(); + + assert_eq!(submitted_seq(&waker), publishes.load(Ordering::Relaxed)); + assert_eq!(state_bits(&waker) & WAITING_MASK, 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn producer_with_draining_loop() { + // A minimal loop simulator must drain both publishes from one producer + // using the eventfd arm-and-recheck path whenever no sequence progress + // is currently visible. + loom::model(|| { + let waker = waker(); + let producer = thread::spawn({ + let waker = waker.clone(); + move || { + waker.publish(); + waker.publish(); + } + }); + + let processed = drain_with_eventfd_until(&waker, 0, 2); + producer.join().unwrap(); + + assert_eq!(processed, 2); + assert_eq!(submitted_seq(&waker), 2); + clear_sticky_wake(&waker); + }); + } + + #[test] + fn park_idle_with_concurrent_wake() { + // The fully-idle futex path must also handle pure out-of-band wakes. + // The loop either sees the wake bit before sleeping or is resumed by + // the modeled futex wake. No submission sequence bump is involved. + loom::model(|| { + let waker = waker(); + let notifier = thread::spawn({ + let waker = waker.clone(); + move || waker.wake() + }); + + let _ = waker.park_idle(0); + notifier.join().unwrap(); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + }); + } + + #[test] + fn two_cycle_drain_with_interleaved_wake() { + // A drain loop must survive an explicit wake between two publishes. + // The wake may be consumed as a sticky bit or as eventfd readiness, but + // both publishes must still be processed exactly once. + loom::model(|| { + let waker = waker(); + let producer = thread::spawn({ + let waker = waker.clone(); + move || { + waker.publish(); + waker.wake(); + waker.publish(); + } + }); + + let processed = drain_with_eventfd_until(&waker, 0, 2); + producer.join().unwrap(); + + assert_eq!(processed, 2); + assert_eq!(submitted_seq(&waker), 2); + clear_sticky_wake(&waker); + }); + } + + #[test] + fn multiple_park_idle_cycles() { + // Repeated fully-idle futex park cycles must continue to observe + // publishes. This uses `park_idle()` instead of the eventfd arm path + // whenever no sequence progress is currently visible. + loom::model(|| { + let waker = waker(); + let producer = thread::spawn({ + let waker = waker.clone(); + move || { + waker.publish(); + waker.publish(); + } + }); + + let processed = drain_with_futex_until(&waker, 0, 2); + producer.join().unwrap(); + + assert_eq!(processed, 2); + assert_eq!(submitted_seq(&waker), 2); + clear_sticky_wake(&waker); + }); + } + + #[test] + fn three_thread_stress() { + // Two producers publishing concurrently with one loop simulator should + // still preserve conservation and progress. This adds one more producer + // thread to the eventfd drain shape. + loom::model(|| { + let waker = waker(); + let a = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + let b = thread::spawn({ + let waker = waker.clone(); + move || waker.publish() + }); + + let processed = drain_with_eventfd_until(&waker, 0, 2); + a.join().unwrap(); + b.join().unwrap(); + + assert_eq!(processed, 2); + assert_eq!(submitted_seq(&waker), 2); + clear_sticky_wake(&waker); + }); + } + + #[derive(Clone, Copy, Debug)] + enum Op { + Publish, + Wake, + } + + // Execute one generated producer operation. + fn execute_op(waker: &Waker, publishes: &AtomicU32, op: Op) { + match op { + Op::Publish => { + waker.publish(); + publishes.fetch_add(1, Ordering::Relaxed); + } + Op::Wake => waker.wake(), + } + } + + // Generate a deterministic publish/wake program for loom exploration. + fn random_program(rng: &mut rand::rngs::SmallRng, len: usize) -> Vec { + use rand::Rng; + + (0..len) + .map(|_| { + if rng.gen_bool(0.5) { + Op::Publish + } else { + Op::Wake + } + }) + .collect() + } + + #[test] + fn fuzz_generic_programs() { + // Generate deterministic producer-only publish/wake programs and run + // each program under exhaustive loom scheduling. These programs check + // sequence conservation and that unarmed producers never queue eventfd + // readiness. Sticky wake bits are allowed because there is no loop to + // consume pure wakes. + use rand::SeedableRng; + + let seed: u64 = std::env::var("LOOM_FUZZ_SEED") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(42); + let iters: usize = std::env::var("LOOM_FUZZ_ITERS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(16); + let ops: usize = std::env::var("LOOM_FUZZ_OPS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3); + + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + let programs: Vec<(Vec, Vec)> = (0..iters) + .map(|_| (random_program(&mut rng, ops), random_program(&mut rng, ops))) + .collect(); + + for (iter, (program_a, program_b)) in programs.iter().enumerate() { + let program_a = std::sync::Arc::new(program_a.clone()); + let program_b = std::sync::Arc::new(program_b.clone()); + let report_a = program_a.clone(); + let report_b = program_b.clone(); + + loom::model(move || { + let waker = waker(); + let publishes = Arc::new(AtomicU32::new(0)); + + let a = thread::spawn({ + let program = program_a.clone(); + let waker = waker.clone(); + let publishes = publishes.clone(); + move || { + for op in program.iter() { + execute_op(&waker, &publishes, *op); + } + } + }); + + let b = thread::spawn({ + let program = program_b.clone(); + let waker = waker.clone(); + let publishes = publishes.clone(); + move || { + for op in program.iter() { + execute_op(&waker, &publishes, *op); + } + } + }); + + a.join().unwrap(); + b.join().unwrap(); + + let expected = publishes.load(Ordering::Relaxed); + let got = submitted_seq(&waker); + assert_eq!( + got, expected, + "publish conservation failed: seed={seed} iter={iter} a={report_a:?} b={report_b:?}", + ); + assert_eq!( + state_bits(&waker) & WAITING_MASK, + 0, + "wait target remained armed: seed={seed} iter={iter} a={report_a:?} b={report_b:?}", + ); + assert_eq!( + eventfd_count(&waker), + 0, + "eventfd readiness queued while unarmed: seed={seed} iter={iter} a={report_a:?} b={report_b:?}", + ); + }); + } + } + + #[test] + fn fuzz_with_loop_simulator() { + // Generate deterministic producer programs and run them against the + // eventfd loop simulator. The loop drains exactly the generated publish + // count while arbitrary generated wakes can arrive before, between, or + // after those publishes. + use rand::SeedableRng; + + let seed: u64 = std::env::var("LOOM_FUZZ_LOOP_SEED") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(7); + let iters: usize = std::env::var("LOOM_FUZZ_LOOP_ITERS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(16); + let ops: usize = std::env::var("LOOM_FUZZ_LOOP_OPS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3); + + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + let programs: Vec> = (0..iters).map(|_| random_program(&mut rng, ops)).collect(); + + for (iter, program) in programs.iter().enumerate() { + let publish_count = program + .iter() + .filter(|op| matches!(op, Op::Publish)) + .count() as u32; + let program = std::sync::Arc::new(program.clone()); + let report_program = program.clone(); + + loom::model(move || { + let waker = waker(); + let publishes = Arc::new(AtomicU32::new(0)); + + let producer = thread::spawn({ + let program = program.clone(); + let waker = waker.clone(); + let publishes = publishes.clone(); + move || { + for op in program.iter() { + execute_op(&waker, &publishes, *op); + } + } + }); + + let processed = drain_with_eventfd_until(&waker, 0, publish_count); + producer.join().unwrap(); + + assert_eq!( + processed, publish_count, + "loop progress failed: seed={seed} iter={iter} program={report_program:?}", + ); + assert_eq!( + submitted_seq(&waker), + publish_count, + "publish conservation failed: seed={seed} iter={iter} program={report_program:?}", + ); + assert_eq!( + publishes.load(Ordering::Relaxed), + publish_count, + "producer accounting failed: seed={seed} iter={iter} program={report_program:?}", + ); + clear_sticky_wake(&waker); + }); + } + } +} From 7008e8e16cef6c038808a957f86c15f0fd7d3a1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Mon, 4 May 2026 14:11:18 +0100 Subject: [PATCH 2/7] ci: run loom on iouring --- .github/workflows/loom.yml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index a75286c7440..4fb7ce4c928 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -18,9 +18,14 @@ env: jobs: Tests: - name: "Loom Tests" + name: "Loom Tests (flags: \"${{ matrix.flags }}\")" runs-on: ubuntu-latest timeout-minutes: 60 + strategy: + matrix: + flags: + - "--features iouring" + - "" steps: - name: Checkout repository uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4 @@ -29,10 +34,10 @@ jobs: - name: Run setup uses: ./.github/actions/setup with: - additional-cache-key: loom + additional-cache-key: loom-${{ matrix.flags || 'default' }} - name: Install just & nextest uses: taiki-e/install-action@0c5db7f7f897c03b771660e91d065338615679f4 # v2.60.0 with: tool: just@1.43.0,cargo-nextest@0.9.129 - name: Run loom tests - run: just test-loom --verbose + run: just test-loom ${{ matrix.flags }} --verbose From 3884284343b92cad06d971c42abb930b82a8737a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Mon, 4 May 2026 15:28:31 +0100 Subject: [PATCH 3/7] [runtime/iouring/waker] docs --- runtime/src/iouring/waker.rs | 42 +++++++++++++----------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/runtime/src/iouring/waker.rs b/runtime/src/iouring/waker.rs index b5b4995dd96..7eacd63d347 100644 --- a/runtime/src/iouring/waker.rs +++ b/runtime/src/iouring/waker.rs @@ -19,26 +19,6 @@ //! This keeps the arm-and-recheck handshake lock-free, enables futex sleep when //! the loop is truly idle, and avoids repeated wake writes while a wake is //! already pending. -//! -//! ## Loom Model -//! -//! The `loom` feature keeps the same packed state machine, but replaces the -//! kernel wake surfaces with loom-visible userspace models. The futex path uses -//! a mutex and condition variable to preserve the atomic compare-and-park -//! property of `FUTEX_WAIT`. The eventfd path uses a durable readiness counter -//! plus a condition variable to model both persistent wake readiness and a -//! blocked `submit_and_wait` returning after a wake CQE. -//! -//! The loom tests exercise the producer/loop protocol around that state word: -//! publishes must advance the submitted sequence exactly once, armed waits must -//! not lose concurrent publishes or out-of-band wakes, repeated wake attempts -//! within one epoch must coalesce, sticky unarmed wakes must be consumed by the -//! next arm cycle, and the `Release`/`Acquire` edges must make producer-side -//! state visible after the loop observes progress or clears a wait epoch. -//! -//! The model intentionally stops at this userspace protocol boundary. It does -//! not validate kernel CQE ordering, `io_uring_enter`, wake-poll rearming, or -//! syscall error handling; those are covered by the normal real-syscall tests. use super::UserData; use io_uring::squeue::SubmissionQueue; @@ -482,7 +462,7 @@ impl Waker { /// /// The loom tests in this module do not model the `io_uring` submission /// queue or wake-poll rearm state. Keeping this method present lets the - /// crate compile with `iouring,loom` while keeping that boundary explicit. + /// crate compile with `loom` while keeping that boundary explicit. #[cfg(feature = "loom")] pub const fn reinstall(&self, _submission_queue: &mut SubmissionQueue<'_>) -> bool { true @@ -1109,12 +1089,20 @@ mod loom_tests { // This module uses loom to model the waker's producer/loop protocol over // the packed atomic state word. The model keeps the production sequence and - // wait-bit state machine, but replaces futex and eventfd kernel surfaces - // with loom-visible condvars and counters. The tests keep schedules small - // while exercising the important races: publish versus arm-and-recheck, - // futex idle parking, eventfd wake coalescing, sticky out-of-band wakes, - // sequence wraparound, and the Release/Acquire edges that make producer - // state visible after `pending()` or `clear_wait()`. + // wait-bit state machine, but replaces kernel wake surfaces with + // loom-visible userspace models: the futex path uses a mutex and condvar to + // preserve the atomic compare-and-park property of `FUTEX_WAIT`, and the + // eventfd path uses a durable readiness counter plus a condvar to model + // both persistent wake readiness and a blocked `submit_and_wait` returning + // after a wake CQE. + // + // The tests keep schedules small while exercising the important races and + // invariants: publish versus arm-and-recheck, futex idle parking, eventfd + // wake coalescing, sticky out-of-band wakes, sequence wraparound, and the + // Release/Acquire edges that make producer state visible after `pending()` + // or `clear_wait()`. The model intentionally stops at this userspace + // protocol boundary; it does not validate kernel CQE ordering, + // `io_uring_enter`, wake-poll rearming, or syscall error handling. fn waker() -> Waker { Waker::new().unwrap() From 169b9d956dc3fbb33bb5ebba77811f627a4f0c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Mon, 4 May 2026 15:45:09 +0100 Subject: [PATCH 4/7] [runtime/iouring/waker] cleanup --- runtime/src/iouring/waker.rs | 211 +++++++++++++++++++---------------- 1 file changed, 117 insertions(+), 94 deletions(-) diff --git a/runtime/src/iouring/waker.rs b/runtime/src/iouring/waker.rs index 7eacd63d347..af54f93c467 100644 --- a/runtime/src/iouring/waker.rs +++ b/runtime/src/iouring/waker.rs @@ -678,15 +678,15 @@ pub mod tests { } } - fn state_bits(waker: &Waker) -> u32 { + pub fn state_bits(waker: &Waker) -> u32 { waker.inner.state.load(Ordering::Relaxed) & STATE_MASK } - fn submitted_seq(waker: &Waker) -> u32 { + pub fn submitted_seq(waker: &Waker) -> u32 { (waker.inner.state.load(Ordering::Relaxed) >> STATE_BITS) & SUBMISSION_SEQ_MASK } - fn read_eventfd_count(waker: &Waker) -> u64 { + pub fn eventfd_count(waker: &Waker) -> u64 { #[cfg(not(feature = "loom"))] { let mut value = 0u64; @@ -912,7 +912,7 @@ pub mod tests { } assert_eq!(submitted_seq(&waker), 4); - assert_eq!(read_eventfd_count(&waker), 1); + assert_eq!(eventfd_count(&waker), 1); drop(arm); } @@ -974,7 +974,7 @@ pub mod tests { } assert_eq!(submitted_seq(&waker), 0); - assert_eq!(read_eventfd_count(&waker), 1); + assert_eq!(eventfd_count(&waker), 1); drop(arm); } @@ -1078,7 +1078,10 @@ pub mod tests { #[cfg(all(test, feature = "loom"))] mod loom_tests { - use super::*; + use super::{ + tests::{eventfd_count, state_bits, submitted_seq}, + *, + }; use loom::{ sync::{ atomic::{AtomicU32, Ordering}, @@ -1101,44 +1104,56 @@ mod loom_tests { // wake coalescing, sticky out-of-band wakes, sequence wraparound, and the // Release/Acquire edges that make producer state visible after `pending()` // or `clear_wait()`. The model intentionally stops at this userspace - // protocol boundary; it does not validate kernel CQE ordering, + // protocol boundary. It does not validate kernel CQE ordering, // `io_uring_enter`, wake-poll rearming, or syscall error handling. - fn waker() -> Waker { - Waker::new().unwrap() - } - - // Return only the low wake-state bits from the modeled packed state. - fn state_bits(waker: &Waker) -> u32 { - waker.inner.state.load(Ordering::Relaxed) & STATE_MASK + // Minimal model of the inbound request queue that feeds the ring. + // + // The queue model deliberately uses only relaxed accesses. These tests rely + // on the waker's Release/Acquire edges to make an enqueued request visible + // to the loop after it observes progress or resumes from a wake. + struct QueuedRequest { + value: AtomicU32, } - // Return the submitted sequence from the modeled packed state. - fn submitted_seq(waker: &Waker) -> u32 { - (waker.inner.state.load(Ordering::Relaxed) >> STATE_BITS) & SUBMISSION_SEQ_MASK - } + impl QueuedRequest { + fn empty() -> Self { + Self { + value: AtomicU32::new(0), + } + } - // Return the modeled eventfd readiness counter. - fn eventfd_count(waker: &Waker) -> u64 { - waker.inner.eventfd_counter.load(Ordering::Relaxed) - } + fn enqueue(&self, value: u32) { + self.value.store(value, Ordering::Relaxed); + } - fn advance_seq(seq: u32) -> u32 { - seq.wrapping_add(1) & SUBMISSION_SEQ_MASK + fn read(&self) -> u32 { + self.value.load(Ordering::Relaxed) + } } - // Model the eventfd-backed blocking section used by `submit_and_wait`. - fn wait_for_eventfd_signal(waker: &Waker) { + // Wait until the modeled eventfd has durable readiness. + // + // In production, `submit_and_wait` returns after the wake poll produces a + // CQE. In the loom model, `eventfd_wake()` increments `eventfd_counter` and + // notifies this condvar, so this helper represents only that blocking + // boundary. + fn wait_for_eventfd_readiness(waker: &Waker) { let mut guard = waker.inner.eventfd_readiness.lock().unwrap(); while waker.inner.eventfd_counter.load(Ordering::Acquire) == 0 { guard = waker.inner.eventfd_waiters.wait(guard).unwrap(); } } - // Clear a wake bit that may be left behind by a raced publisher. This is a - // cleanup helper: tests that care about wake coalescing should assert the - // modeled eventfd counter before calling it. - fn clear_sticky_wake(waker: &Waker) { + // Finish any wake epoch left over after a loop-simulator test has already + // observed the sequence progress it cares about. + // + // A producer can claim `WAKE_SIGNALLED_BIT` and queue modeled eventfd + // readiness while the simulated loop is also able to make progress by + // observing `pending()`. At that point the leftover wake is cleanup noise, + // not the property under test. Tests that care about exact wake coalescing + // should assert the modeled eventfd counter before calling this helper. + fn finish_leftover_wake(waker: &Waker) { assert_eq!(state_bits(waker) & WAITING_MASK, 0); if (state_bits(waker) & WAKE_SIGNALLED_BIT) != 0 { let guard = waker.arm(submitted_seq(waker)); @@ -1153,24 +1168,26 @@ mod loom_tests { assert_eq!(eventfd_count(waker), 0); } - // Drain with the same eventfd arm-and-recheck shape used before - // `submit_and_wait`: poll `pending()`, arm, block only if the post-arm - // snapshot is still idle, then clear the arm and acknowledge readiness. + // Simulate the loop's eventfd-backed wait path until it has observed + // `target` published submissions. This is not modeling the request queue + // itself, only the waker-side control flow: check `pending()`, arm the + // eventfd target, block only if the post-arm snapshot is still idle, then + // drop the guard and acknowledge modeled eventfd readiness. // // The final `acknowledge()` is model cleanup for any wake CQE readiness // produced during the brief arm window. It may be a no-op when the loop did // not actually block, so tests that validate exact wake counts assert the // counter directly instead of relying on this helper. - fn drain_with_eventfd_until(waker: &Waker, mut processed: u32, target: u32) -> u32 { + fn simulate_eventfd_loop_until(waker: &Waker, mut processed: u32, target: u32) -> u32 { while processed != target { if waker.pending(processed) { - processed = advance_seq(processed); + processed = processed.wrapping_add(1) & SUBMISSION_SEQ_MASK; continue; } let guard = waker.arm(processed); if guard.still_idle() { - wait_for_eventfd_signal(waker); + wait_for_eventfd_readiness(waker); assert!( eventfd_count(waker) > 0, "blocking eventfd wait must observe queued readiness before cleanup", @@ -1182,11 +1199,14 @@ mod loom_tests { processed } - // Drain with the fully-idle futex path instead of the eventfd path. - fn drain_with_futex_until(waker: &Waker, mut processed: u32, target: u32) -> u32 { + // Simulate the loop's fully-idle futex path until it has observed `target` + // published submissions. Like the eventfd loop simulator, this models only + // waker-side control flow: check `pending()`, otherwise call `park_idle()` + // to arm the futex wait target and perform the stale-snapshot recheck. + fn simulate_futex_loop_until(waker: &Waker, mut processed: u32, target: u32) -> u32 { while processed != target { if waker.pending(processed) { - processed = advance_seq(processed); + processed = processed.wrapping_add(1) & SUBMISSION_SEQ_MASK; continue; } let _ = waker.park_idle(processed); @@ -1202,14 +1222,14 @@ mod loom_tests { // so the only intended synchronization is publish Release to pending // Acquire. loom::model(|| { - let waker = waker(); - let aux = Arc::new(AtomicU32::new(0)); + let waker = Waker::new().unwrap(); + let queued = Arc::new(QueuedRequest::empty()); let producer = thread::spawn({ let waker = waker.clone(); - let aux = aux.clone(); + let queued = queued.clone(); move || { - aux.store(42, Ordering::Relaxed); + queued.enqueue(42); waker.publish(); } }); @@ -1218,7 +1238,7 @@ mod loom_tests { thread::yield_now(); } - assert_eq!(aux.load(Ordering::Relaxed), 42); + assert_eq!(queued.read(), 42); producer.join().unwrap(); assert_eq!(submitted_seq(&waker), 1); }); @@ -1234,14 +1254,14 @@ mod loom_tests { // against the current sequence, and drops the guard so `clear_wait()`'s // Acquire can pair with `wake()`'s Release. loom::model(|| { - let waker = waker(); - let aux = Arc::new(AtomicU32::new(0)); + let waker = Waker::new().unwrap(); + let queued = Arc::new(QueuedRequest::empty()); let notifier = thread::spawn({ let waker = waker.clone(); - let aux = aux.clone(); + let queued = queued.clone(); move || { - aux.store(42, Ordering::Relaxed); + queued.enqueue(42); waker.wake(); } }); @@ -1255,7 +1275,7 @@ mod loom_tests { assert!(guard.wake_latched()); drop(guard); - assert_eq!(aux.load(Ordering::Relaxed), 42); + assert_eq!(queued.read(), 42); assert_eq!(eventfd_count(&waker), 0); notifier.join().unwrap(); }); @@ -1268,7 +1288,7 @@ mod loom_tests { // across epochs verifies guard drop clears the latch enough for later // unarmed wakes to be observed independently. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); // One unarmed wake latches the bit without queuing eventfd readiness. waker.wake(); @@ -1327,7 +1347,7 @@ mod loom_tests { // is not the same as a latched wake. This keeps `wake_latched()` useful // for out-of-band wake decisions such as shutdown. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); waker.publish(); let guard = waker.arm(0); @@ -1348,7 +1368,7 @@ mod loom_tests { // loop has already processed. The exact half-domain boundary is // intentionally not considered pending because direction is ambiguous. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let set_submitted = |seq| { waker @@ -1388,7 +1408,7 @@ mod loom_tests { // If a wake is already latched before the fully-idle path arms, the // post-arm recheck must skip sleeping and clear the sticky wake. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); waker.wake(); assert!(waker.park_idle(0).is_none()); @@ -1404,7 +1424,7 @@ mod loom_tests { // If a sequence is already published before the fully-idle path arms, // the post-arm recheck must skip sleeping without manufacturing a wake. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); waker.publish(); assert!(waker.park_idle(0).is_none()); @@ -1422,7 +1442,7 @@ mod loom_tests { // wake. After the blocking section exits, guard drop clears wait state // and `acknowledge` drains the eventfd counter. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let producer = thread::spawn({ let waker = waker.clone(); move || waker.publish() @@ -1430,7 +1450,7 @@ mod loom_tests { let guard = waker.arm(0); if guard.still_idle() { - wait_for_eventfd_signal(&waker); + wait_for_eventfd_readiness(&waker); } drop(guard); @@ -1450,16 +1470,16 @@ mod loom_tests { // acquire the producer's enqueue-side writes before the loop checks the // queue after waking. loom::model(|| { - let waker = waker(); - let aux = Arc::new(AtomicU32::new(0)); + let waker = Waker::new().unwrap(); + let queued = Arc::new(QueuedRequest::empty()); let guard = waker.arm(0); assert!(guard.still_idle()); let producer = thread::spawn({ let waker = waker.clone(); - let aux = aux.clone(); + let queued = queued.clone(); move || { - aux.store(42, Ordering::Relaxed); + queued.enqueue(42); waker.publish(); } }); @@ -1469,7 +1489,7 @@ mod loom_tests { } drop(guard); - assert_eq!(aux.load(Ordering::Relaxed), 42); + assert_eq!(queued.read(), 42); producer.join().unwrap(); assert_eq!(submitted_seq(&waker), 1); @@ -1487,16 +1507,16 @@ mod loom_tests { // acquire the notifier's earlier state change before the loop checks // for disconnect or shutdown state after waking. loom::model(|| { - let waker = waker(); - let aux = Arc::new(AtomicU32::new(0)); + let waker = Waker::new().unwrap(); + let queued = Arc::new(QueuedRequest::empty()); let guard = waker.arm(0); assert!(guard.still_idle()); let notifier = thread::spawn({ let waker = waker.clone(); - let aux = aux.clone(); + let queued = queued.clone(); move || { - aux.store(42, Ordering::Relaxed); + queued.enqueue(42); waker.wake(); } }); @@ -1506,7 +1526,7 @@ mod loom_tests { } drop(guard); - assert_eq!(aux.load(Ordering::Relaxed), 42); + assert_eq!(queued.read(), 42); notifier.join().unwrap(); assert_eq!(submitted_seq(&waker), 0); @@ -1523,7 +1543,7 @@ mod loom_tests { // that producers update. A racing publish must either change the // post-arm snapshot before sleep or wake the modeled futex waiter. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let producer = thread::spawn({ let waker = waker.clone(); move || waker.publish() @@ -1533,7 +1553,7 @@ mod loom_tests { producer.join().unwrap(); assert_eq!(submitted_seq(&waker), 1); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } @@ -1543,7 +1563,7 @@ mod loom_tests { // submitted sequence, but only the first wake claimant should increment // the modeled eventfd counter. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let guard = waker.arm(0); assert!(guard.still_idle()); @@ -1575,7 +1595,7 @@ mod loom_tests { // epoch should coalesce to one eventfd signal while still preserving // the publish's sequence increment. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let guard = waker.arm(0); assert!(guard.still_idle()); @@ -1610,7 +1630,7 @@ mod loom_tests { // arm point and verifies the stale futex snapshot is rejected after the // state changes. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let prev = waker .inner .state @@ -1649,7 +1669,7 @@ mod loom_tests { // arming, `wake_latched` skips the wait; otherwise the modeled eventfd // signal releases the loop. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let notifier = thread::spawn({ let waker = waker.clone(); move || waker.wake() @@ -1657,7 +1677,7 @@ mod loom_tests { let guard = waker.arm(0); if guard.still_idle() { - wait_for_eventfd_signal(&waker); + wait_for_eventfd_readiness(&waker); } drop(guard); @@ -1677,7 +1697,7 @@ mod loom_tests { // half-range modular `pending()` check must remain directional across // that boundary. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); waker .inner .state @@ -1691,10 +1711,13 @@ mod loom_tests { } }); - assert_eq!(drain_with_eventfd_until(&waker, SUBMISSION_SEQ_MASK, 1), 1); + assert_eq!( + simulate_eventfd_loop_until(&waker, SUBMISSION_SEQ_MASK, 1), + 1 + ); producer.join().unwrap(); assert_eq!(submitted_seq(&waker), 1); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } @@ -1704,7 +1727,7 @@ mod loom_tests { // sequence conservation and must not queue eventfd readiness while the // loop is unarmed. A sticky wake bit may remain for the next arm cycle. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let publishes = Arc::new(AtomicU32::new(0)); let a = thread::spawn({ @@ -1744,7 +1767,7 @@ mod loom_tests { // using the eventfd arm-and-recheck path whenever no sequence progress // is currently visible. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let producer = thread::spawn({ let waker = waker.clone(); move || { @@ -1753,12 +1776,12 @@ mod loom_tests { } }); - let processed = drain_with_eventfd_until(&waker, 0, 2); + let processed = simulate_eventfd_loop_until(&waker, 0, 2); producer.join().unwrap(); assert_eq!(processed, 2); assert_eq!(submitted_seq(&waker), 2); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } @@ -1768,7 +1791,7 @@ mod loom_tests { // The loop either sees the wake bit before sleeping or is resumed by // the modeled futex wake. No submission sequence bump is involved. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let notifier = thread::spawn({ let waker = waker.clone(); move || waker.wake() @@ -1789,7 +1812,7 @@ mod loom_tests { // The wake may be consumed as a sticky bit or as eventfd readiness, but // both publishes must still be processed exactly once. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let producer = thread::spawn({ let waker = waker.clone(); move || { @@ -1799,12 +1822,12 @@ mod loom_tests { } }); - let processed = drain_with_eventfd_until(&waker, 0, 2); + let processed = simulate_eventfd_loop_until(&waker, 0, 2); producer.join().unwrap(); assert_eq!(processed, 2); assert_eq!(submitted_seq(&waker), 2); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } @@ -1814,7 +1837,7 @@ mod loom_tests { // publishes. This uses `park_idle()` instead of the eventfd arm path // whenever no sequence progress is currently visible. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let producer = thread::spawn({ let waker = waker.clone(); move || { @@ -1823,12 +1846,12 @@ mod loom_tests { } }); - let processed = drain_with_futex_until(&waker, 0, 2); + let processed = simulate_futex_loop_until(&waker, 0, 2); producer.join().unwrap(); assert_eq!(processed, 2); assert_eq!(submitted_seq(&waker), 2); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } @@ -1838,7 +1861,7 @@ mod loom_tests { // still preserve conservation and progress. This adds one more producer // thread to the eventfd drain shape. loom::model(|| { - let waker = waker(); + let waker = Waker::new().unwrap(); let a = thread::spawn({ let waker = waker.clone(); move || waker.publish() @@ -1848,13 +1871,13 @@ mod loom_tests { move || waker.publish() }); - let processed = drain_with_eventfd_until(&waker, 0, 2); + let processed = simulate_eventfd_loop_until(&waker, 0, 2); a.join().unwrap(); b.join().unwrap(); assert_eq!(processed, 2); assert_eq!(submitted_seq(&waker), 2); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } @@ -1924,7 +1947,7 @@ mod loom_tests { let report_b = program_b.clone(); loom::model(move || { - let waker = waker(); + let waker = Waker::new().unwrap(); let publishes = Arc::new(AtomicU32::new(0)); let a = thread::spawn({ @@ -2005,7 +2028,7 @@ mod loom_tests { let report_program = program.clone(); loom::model(move || { - let waker = waker(); + let waker = Waker::new().unwrap(); let publishes = Arc::new(AtomicU32::new(0)); let producer = thread::spawn({ @@ -2019,7 +2042,7 @@ mod loom_tests { } }); - let processed = drain_with_eventfd_until(&waker, 0, publish_count); + let processed = simulate_eventfd_loop_until(&waker, 0, publish_count); producer.join().unwrap(); assert_eq!( @@ -2036,7 +2059,7 @@ mod loom_tests { publish_count, "producer accounting failed: seed={seed} iter={iter} program={report_program:?}", ); - clear_sticky_wake(&waker); + finish_leftover_wake(&waker); }); } } From 74b2adf0eb1a4b41384a1d133d7b7a1ac3ab5165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Mon, 4 May 2026 16:40:21 +0100 Subject: [PATCH 5/7] [runtime/iouring/waker] cleanup --- runtime/src/iouring/waker.rs | 471 +++++++++++++++-------------------- 1 file changed, 200 insertions(+), 271 deletions(-) diff --git a/runtime/src/iouring/waker.rs b/runtime/src/iouring/waker.rs index af54f93c467..8291fe4f8e8 100644 --- a/runtime/src/iouring/waker.rs +++ b/runtime/src/iouring/waker.rs @@ -699,6 +699,9 @@ pub mod tests { size_of::(), ) }; + if ret == -1 && std::io::Error::last_os_error().raw_os_error() == Some(libc::EAGAIN) { + return 0; + } assert_eq!(ret, size_of::() as isize); value } @@ -840,6 +843,20 @@ pub mod tests { assert_eq!(state_bits(&waker), 0); } + #[test] + fn test_publish_before_park_idle_skips_sleep() { + // Verify a sequence published before idle arming makes the next idle + // park return immediately without manufacturing a wake. + let waker = Waker::new().expect("eventfd creation should succeed"); + + waker.publish(); + assert!(waker.park_idle(0).is_none()); + + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + } + #[test] fn test_publish_after_futex_arm_rejects_stale_snapshot() { // Verify the futex idle path tolerates a publish that lands after @@ -933,6 +950,45 @@ pub mod tests { assert_eq!(state_bits(&waker), 0); } + #[test] + fn test_unarmed_wakes_rearm_across_epochs() { + // Verify unarmed wake latches are consumed when the loop next arms, + // and that later unarmed wakes can be observed in later epochs. + let waker = Waker::new().expect("eventfd creation should succeed"); + + waker.wake(); + let arm = waker.arm(0); + assert!(!arm.still_idle()); + assert!(arm.wake_latched()); + drop(arm); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + + waker.wake(); + let arm = waker.arm(0); + assert!(!arm.still_idle()); + assert!(arm.wake_latched()); + drop(arm); + + assert_eq!(submitted_seq(&waker), 0); + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + + waker.publish(); + waker.wake(); + assert_eq!(submitted_seq(&waker), 1); + assert_eq!(eventfd_count(&waker), 0); + let arm = waker.arm(1); + assert!(!arm.still_idle()); + assert!(arm.wake_latched()); + drop(arm); + + assert_eq!(state_bits(&waker), 0); + assert_eq!(eventfd_count(&waker), 0); + } + #[test] fn test_arm_after_publish_skips_blocking() { // Verify arming with a stale processed sequence notices the newly @@ -1082,6 +1138,7 @@ mod loom_tests { tests::{eventfd_count, state_bits, submitted_seq}, *, }; + use commonware_utils::test_rng; use loom::{ sync::{ atomic::{AtomicU32, Ordering}, @@ -1089,6 +1146,7 @@ mod loom_tests { }, thread, }; + use rand::Rng; // This module uses loom to model the waker's producer/loop protocol over // the packed atomic state word. The model keeps the production sequence and @@ -1145,6 +1203,16 @@ mod loom_tests { } } + // Wait until a producer/notifier has latched a wake bit. + // + // This is deliberately a relaxed spin: the tests using it pair with the + // producer's Release through the later `clear_wait()` Acquire. + fn wait_for_wake_signal(waker: &Waker) { + while state_bits(waker) & WAKE_SIGNALLED_BIT == 0 { + thread::yield_now(); + } + } + // Finish any wake epoch left over after a loop-simulator test has already // observed the sequence progress it cares about. // @@ -1214,6 +1282,51 @@ mod loom_tests { processed } + /// Producer operation used by the deterministic generated loom tests. + /// + /// Each generated program is built before entering `loom::model`, then run + /// by a producer thread. `Publish` models a producer making one request + /// visible to the loop, while `Wake` models an out-of-band notification that + /// must not affect submission sequence accounting. + #[derive(Clone, Copy, Debug)] + enum ProducerOp { + /// Publish one request to the waker. + Publish, + /// Notify the loop without publishing a request. + Wake, + } + + impl ProducerOp { + // Generate a deterministic publish/wake program for loom exploration. + fn generate_program(rng: &mut impl Rng, len: usize) -> Vec { + (0..len) + .map(|_| { + if rng.gen_bool(0.5) { + Self::Publish + } else { + Self::Wake + } + }) + .collect() + } + + // Execute one generated producer operation. + fn execute(self, waker: &Waker, publishes: &AtomicU32) { + match self { + ProducerOp::Publish => { + waker.publish(); + publishes.fetch_add(1, Ordering::Relaxed); + } + ProducerOp::Wake => waker.wake(), + } + } + } + + // Number of deterministic `ProducerOp` programs to generate per test. + const GENERATED_CASES: usize = 8; + // Number of operations in each generated `ProducerOp` program. + const OPS_PER_PROGRAM: usize = 3; + #[test] fn publish_pending_pairing() { // `publish` must make the producer's earlier enqueue-side write visible @@ -1266,9 +1379,7 @@ mod loom_tests { } }); - while waker.inner.state.load(Ordering::Relaxed) & WAKE_SIGNALLED_BIT == 0 { - thread::yield_now(); - } + wait_for_wake_signal(&waker); assert_eq!(eventfd_count(&waker), 0); let guard = waker.arm(0); @@ -1282,27 +1393,12 @@ mod loom_tests { } #[test] - fn unarmed_sticky_wakes_coalesce_and_rearm_across_epochs() { - // Out-of-band wakes that arrive before the loop arms must stick around - // and force the next arm cycle to skip blocking. Repeating the pattern - // across epochs verifies guard drop clears the latch enough for later - // unarmed wakes to be observed independently. + fn concurrent_unarmed_wakes_coalesce() { + // Concurrent out-of-band wakes that arrive before the loop arms should + // coalesce to one sticky wake bit without queuing eventfd readiness. loom::model(|| { let waker = Waker::new().unwrap(); - // One unarmed wake latches the bit without queuing eventfd readiness. - waker.wake(); - assert_eq!(eventfd_count(&waker), 0); - let guard = waker.arm(0); - assert!(!guard.still_idle()); - assert!(guard.wake_latched()); - drop(guard); - - assert_eq!(submitted_seq(&waker), 0); - assert_eq!(state_bits(&waker), 0); - assert_eq!(eventfd_count(&waker), 0); - - // Concurrent unarmed wakes coalesce to the same single sticky bit. let a = thread::spawn({ let waker = waker.clone(); move || waker.wake() @@ -1324,114 +1420,6 @@ mod loom_tests { assert_eq!(submitted_seq(&waker), 0); assert_eq!(state_bits(&waker), 0); assert_eq!(eventfd_count(&waker), 0); - - // A publish followed by an explicit unarmed wake advances the - // sequence once and leaves the wake sticky for the caught-up epoch. - waker.publish(); - waker.wake(); - assert_eq!(submitted_seq(&waker), 1); - assert_eq!(eventfd_count(&waker), 0); - let guard = waker.arm(1); - assert!(!guard.still_idle()); - assert!(guard.wake_latched()); - drop(guard); - - assert_eq!(state_bits(&waker), 0); - assert_eq!(eventfd_count(&waker), 0); - }); - } - - #[test] - fn arm_distinguishes_sequence_progress_from_wake() { - // A published-ahead sequence must stop arming from looking idle, but it - // is not the same as a latched wake. This keeps `wake_latched()` useful - // for out-of-band wake decisions such as shutdown. - loom::model(|| { - let waker = Waker::new().unwrap(); - - waker.publish(); - let guard = waker.arm(0); - assert!(!guard.still_idle()); - assert!(!guard.wake_latched()); - drop(guard); - - assert_eq!(submitted_seq(&waker), 1); - assert_eq!(state_bits(&waker), 0); - assert_eq!(eventfd_count(&waker), 0); - }); - } - - #[test] - fn pending_modular_boundaries() { - // `pending()` uses half-range modular arithmetic to distinguish - // published-ahead sequences from equal sequences and from sequences the - // loop has already processed. The exact half-domain boundary is - // intentionally not considered pending because direction is ambiguous. - loom::model(|| { - let waker = Waker::new().unwrap(); - - let set_submitted = |seq| { - waker - .inner - .state - .store(seq << STATE_BITS, Ordering::Relaxed); - }; - - set_submitted(0); - assert!(!waker.pending(0)); - - set_submitted(1); - assert!(waker.pending(0)); - - set_submitted(HALF_SUBMISSION_SEQUENCE_DOMAIN - 1); - assert!(waker.pending(0)); - - set_submitted(HALF_SUBMISSION_SEQUENCE_DOMAIN); - assert!(!waker.pending(0)); - - set_submitted(HALF_SUBMISSION_SEQUENCE_DOMAIN + 1); - assert!(!waker.pending(0)); - - set_submitted(0); - assert!(!waker.pending(1)); - - set_submitted(SUBMISSION_SEQ_MASK); - assert!(!waker.pending(0)); - - set_submitted(0); - assert!(waker.pending(SUBMISSION_SEQ_MASK)); - }); - } - - #[test] - fn park_idle_skips_pre_latched_wake() { - // If a wake is already latched before the fully-idle path arms, the - // post-arm recheck must skip sleeping and clear the sticky wake. - loom::model(|| { - let waker = Waker::new().unwrap(); - - waker.wake(); - assert!(waker.park_idle(0).is_none()); - - assert_eq!(submitted_seq(&waker), 0); - assert_eq!(state_bits(&waker), 0); - assert_eq!(eventfd_count(&waker), 0); - }); - } - - #[test] - fn park_idle_skips_pre_published_sequence() { - // If a sequence is already published before the fully-idle path arms, - // the post-arm recheck must skip sleeping without manufacturing a wake. - loom::model(|| { - let waker = Waker::new().unwrap(); - - waker.publish(); - assert!(waker.park_idle(0).is_none()); - - assert_eq!(submitted_seq(&waker), 1); - assert_eq!(state_bits(&waker), 0); - assert_eq!(eventfd_count(&waker), 0); }); } @@ -1484,9 +1472,7 @@ mod loom_tests { } }); - while waker.inner.state.load(Ordering::Relaxed) & WAKE_SIGNALLED_BIT == 0 { - thread::yield_now(); - } + wait_for_wake_signal(&waker); drop(guard); assert_eq!(queued.read(), 42); @@ -1504,8 +1490,8 @@ mod loom_tests { fn wake_clear_wait_pairing_when_armed() { // When an out-of-band wake lands in an armed eventfd epoch, the loop // resumes without any sequence progress. `clear_wait()` must still - // acquire the notifier's earlier state change before the loop checks - // for disconnect or shutdown state after waking. + // acquire the notifier's earlier state change before the loop checks for + // disconnect or shutdown state after waking. loom::model(|| { let waker = Waker::new().unwrap(); let queued = Arc::new(QueuedRequest::empty()); @@ -1521,9 +1507,7 @@ mod loom_tests { } }); - while waker.inner.state.load(Ordering::Relaxed) & WAKE_SIGNALLED_BIT == 0 { - thread::yield_now(); - } + wait_for_wake_signal(&waker); drop(guard); assert_eq!(queued.read(), 42); @@ -1592,8 +1576,8 @@ mod loom_tests { #[test] fn mixed_publish_and_wake_dedup() { // A publish and an out-of-band wake racing in the same armed eventfd - // epoch should coalesce to one eventfd signal while still preserving - // the publish's sequence increment. + // epoch should coalesce to one eventfd signal while still preserving the + // publish's sequence increment. loom::model(|| { let waker = Waker::new().unwrap(); let guard = waker.arm(0); @@ -1623,11 +1607,11 @@ mod loom_tests { #[test] fn mixed_publish_and_wake_futex_arm() { - // A publish and an out-of-band wake racing in the same futex-armed - // epoch should coalesce through the shared wake latch while preserving - // the publish's sequence increment. Unlike the eventfd path, there is - // no durable counter to inspect, so this splits `park_idle()` at the - // arm point and verifies the stale futex snapshot is rejected after the + // A publish and an out-of-band wake racing in the same futex-armed epoch + // should coalesce through the shared wake latch while preserving the + // publish's sequence increment. Unlike the eventfd path, there is no + // durable counter to inspect, so this splits `park_idle()` at the arm + // point and verifies the stale futex snapshot is rejected after the // state changes. loom::model(|| { let waker = Waker::new().unwrap(); @@ -1666,7 +1650,7 @@ mod loom_tests { fn drop_wake() { // An out-of-band wake racing with the eventfd arm path must wake the // loop without advancing the submitted sequence. If it arrives before - // arming, `wake_latched` skips the wait; otherwise the modeled eventfd + // arming, `wake_latched` skips the wait, otherwise the modeled eventfd // signal releases the loop. loom::model(|| { let waker = Waker::new().unwrap(); @@ -1788,8 +1772,8 @@ mod loom_tests { #[test] fn park_idle_with_concurrent_wake() { // The fully-idle futex path must also handle pure out-of-band wakes. - // The loop either sees the wake bit before sleeping or is resumed by - // the modeled futex wake. No submission sequence bump is involved. + // The loop either sees the wake bit before sleeping or is resumed by the + // modeled futex wake. No submission sequence bump is involved. loom::model(|| { let waker = Waker::new().unwrap(); let notifier = thread::spawn({ @@ -1808,9 +1792,9 @@ mod loom_tests { #[test] fn two_cycle_drain_with_interleaved_wake() { - // A drain loop must survive an explicit wake between two publishes. - // The wake may be consumed as a sticky bit or as eventfd readiness, but - // both publishes must still be processed exactly once. + // A drain loop must survive an explicit wake between two publishes. The + // wake may be consumed as a sticky bit or as eventfd readiness, but both + // publishes must still be processed exactly once. loom::model(|| { let waker = Waker::new().unwrap(); let producer = thread::spawn({ @@ -1881,151 +1865,96 @@ mod loom_tests { }); } - #[derive(Clone, Copy, Debug)] - enum Op { - Publish, - Wake, - } - - // Execute one generated producer operation. - fn execute_op(waker: &Waker, publishes: &AtomicU32, op: Op) { - match op { - Op::Publish => { - waker.publish(); - publishes.fetch_add(1, Ordering::Relaxed); - } - Op::Wake => waker.wake(), - } - } - - // Generate a deterministic publish/wake program for loom exploration. - fn random_program(rng: &mut rand::rngs::SmallRng, len: usize) -> Vec { - use rand::Rng; - - (0..len) + #[test] + fn generated_producer_only_programs() { + // Generate deterministic producer-only programs before entering loom, + // then model each case with four concurrent producers. Each producer + // runs a short sequence of `publish()` and out-of-band `wake()` calls + // without any loop thread consuming them. + // + // The invariant is producer-side conservation: every generated `Publish` + // must be reflected in the submitted sequence exactly once, while + // generated `Wake`s must not affect the sequence. Since the waker is + // never armed in this test, producers must also leave no wait target + // armed and must not queue modeled eventfd readiness. A sticky wake bit + // may remain because there is intentionally no loop to consume it. + let mut rng = test_rng(); + let programs = (0..GENERATED_CASES) .map(|_| { - if rng.gen_bool(0.5) { - Op::Publish - } else { - Op::Wake - } + [ + ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), + ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), + ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), + ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), + ] }) - .collect() - } - - #[test] - fn fuzz_generic_programs() { - // Generate deterministic producer-only publish/wake programs and run - // each program under exhaustive loom scheduling. These programs check - // sequence conservation and that unarmed producers never queue eventfd - // readiness. Sticky wake bits are allowed because there is no loop to - // consume pure wakes. - use rand::SeedableRng; - - let seed: u64 = std::env::var("LOOM_FUZZ_SEED") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(42); - let iters: usize = std::env::var("LOOM_FUZZ_ITERS") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(16); - let ops: usize = std::env::var("LOOM_FUZZ_OPS") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(3); - - let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); - let programs: Vec<(Vec, Vec)> = (0..iters) - .map(|_| (random_program(&mut rng, ops), random_program(&mut rng, ops))) - .collect(); - - for (iter, (program_a, program_b)) in programs.iter().enumerate() { - let program_a = std::sync::Arc::new(program_a.clone()); - let program_b = std::sync::Arc::new(program_b.clone()); - let report_a = program_a.clone(); - let report_b = program_b.clone(); + .collect::>(); + for (iter, programs) in programs.into_iter().enumerate() { loom::model(move || { let waker = Waker::new().unwrap(); let publishes = Arc::new(AtomicU32::new(0)); - let a = thread::spawn({ - let program = program_a.clone(); - let waker = waker.clone(); - let publishes = publishes.clone(); - move || { - for op in program.iter() { - execute_op(&waker, &publishes, *op); - } - } - }); - - let b = thread::spawn({ - let program = program_b.clone(); - let waker = waker.clone(); - let publishes = publishes.clone(); - move || { - for op in program.iter() { - execute_op(&waker, &publishes, *op); - } - } - }); - - a.join().unwrap(); - b.join().unwrap(); + let handles = programs + .iter() + .map(|program| { + let program = program.clone(); + let waker = waker.clone(); + let publishes = publishes.clone(); + thread::spawn(move || { + for &op in program.iter() { + op.execute(&waker, &publishes); + } + }) + }) + .collect::>(); + + for handle in handles { + handle.join().unwrap(); + } let expected = publishes.load(Ordering::Relaxed); let got = submitted_seq(&waker); assert_eq!( got, expected, - "publish conservation failed: seed={seed} iter={iter} a={report_a:?} b={report_b:?}", + "publish conservation failed: iter={iter} programs={programs:?}", ); assert_eq!( state_bits(&waker) & WAITING_MASK, 0, - "wait target remained armed: seed={seed} iter={iter} a={report_a:?} b={report_b:?}", + "wait target remained armed: iter={iter} programs={programs:?}", ); assert_eq!( eventfd_count(&waker), 0, - "eventfd readiness queued while unarmed: seed={seed} iter={iter} a={report_a:?} b={report_b:?}", + "eventfd readiness queued while unarmed: iter={iter} programs={programs:?}", ); }); } } #[test] - fn fuzz_with_loop_simulator() { - // Generate deterministic producer programs and run them against the - // eventfd loop simulator. The loop drains exactly the generated publish - // count while arbitrary generated wakes can arrive before, between, or - // after those publishes. - use rand::SeedableRng; - - let seed: u64 = std::env::var("LOOM_FUZZ_LOOP_SEED") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(7); - let iters: usize = std::env::var("LOOM_FUZZ_LOOP_ITERS") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(16); - let ops: usize = std::env::var("LOOM_FUZZ_LOOP_OPS") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(3); - - let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); - let programs: Vec> = (0..iters).map(|_| random_program(&mut rng, ops)).collect(); - - for (iter, program) in programs.iter().enumerate() { + fn generated_eventfd_loop_programs() { + // Generate deterministic single-producer programs before entering loom, + // then model each case with one producer and the eventfd loop simulator. + // The producer may interleave out-of-band `wake()` calls before, + // between, or after its generated `publish()` calls. + // + // The loop simulator must eventually observe exactly the generated + // publish count, regardless of whether progress arrives through + // `pending()` or through the arm, eventfd readiness, and `clear_wait()` + // path. Pure wakes are allowed to resume the loop, but they must not + // create sequence progress or disturb producer accounting. + let mut rng = test_rng(); + let programs = (0..GENERATED_CASES) + .map(|_| ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM)) + .collect::>(); + + for (iter, program) in programs.into_iter().enumerate() { let publish_count = program .iter() - .filter(|op| matches!(op, Op::Publish)) + .filter(|op| matches!(op, ProducerOp::Publish)) .count() as u32; - let program = std::sync::Arc::new(program.clone()); - let report_program = program.clone(); loom::model(move || { let waker = Waker::new().unwrap(); @@ -2036,8 +1965,8 @@ mod loom_tests { let waker = waker.clone(); let publishes = publishes.clone(); move || { - for op in program.iter() { - execute_op(&waker, &publishes, *op); + for &op in program.iter() { + op.execute(&waker, &publishes); } } }); @@ -2047,17 +1976,17 @@ mod loom_tests { assert_eq!( processed, publish_count, - "loop progress failed: seed={seed} iter={iter} program={report_program:?}", + "loop progress failed: iter={iter} program={program:?}", ); assert_eq!( submitted_seq(&waker), publish_count, - "publish conservation failed: seed={seed} iter={iter} program={report_program:?}", + "publish conservation failed: iter={iter} program={program:?}", ); assert_eq!( publishes.load(Ordering::Relaxed), publish_count, - "producer accounting failed: seed={seed} iter={iter} program={report_program:?}", + "producer accounting failed: iter={iter} program={program:?}", ); finish_leftover_wake(&waker); }); From 835399abdc6a851866e1f5722a11a8aee9f274da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Mon, 4 May 2026 20:49:46 +0100 Subject: [PATCH 6/7] [runtime/iouring/waker] cleanup --- runtime/src/iouring/waker.rs | 72 ++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/runtime/src/iouring/waker.rs b/runtime/src/iouring/waker.rs index 8291fe4f8e8..f3b0550af87 100644 --- a/runtime/src/iouring/waker.rs +++ b/runtime/src/iouring/waker.rs @@ -1322,11 +1322,6 @@ mod loom_tests { } } - // Number of deterministic `ProducerOp` programs to generate per test. - const GENERATED_CASES: usize = 8; - // Number of operations in each generated `ProducerOp` program. - const OPS_PER_PROGRAM: usize = 3; - #[test] fn publish_pending_pairing() { // `publish` must make the producer's earlier enqueue-side write visible @@ -1868,7 +1863,7 @@ mod loom_tests { #[test] fn generated_producer_only_programs() { // Generate deterministic producer-only programs before entering loom, - // then model each case with four concurrent producers. Each producer + // then model each case with two concurrent producers. Each producer // runs a short sequence of `publish()` and out-of-band `wake()` calls // without any loop thread consuming them. // @@ -1878,14 +1873,15 @@ mod loom_tests { // never armed in this test, producers must also leave no wait target // armed and must not queue modeled eventfd readiness. A sticky wake bit // may remain because there is intentionally no loop to consume it. + const CASES: usize = 24; + const OPS_PER_PROGRAM: usize = 5; + let mut rng = test_rng(); - let programs = (0..GENERATED_CASES) + let programs = (0..CASES) .map(|_| { [ ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), - ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), - ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM), ] }) .collect::>(); @@ -1933,21 +1929,14 @@ mod loom_tests { } } - #[test] - fn generated_eventfd_loop_programs() { - // Generate deterministic single-producer programs before entering loom, - // then model each case with one producer and the eventfd loop simulator. - // The producer may interleave out-of-band `wake()` calls before, - // between, or after its generated `publish()` calls. - // - // The loop simulator must eventually observe exactly the generated - // publish count, regardless of whether progress arrives through - // `pending()` or through the arm, eventfd readiness, and `clear_wait()` - // path. Pure wakes are allowed to resume the loop, but they must not - // create sequence progress or disturb producer accounting. + fn generated_loop_programs( + cases: usize, + ops_per_program: usize, + simulate_loop_until: fn(&Waker, u32, u32) -> u32, + ) { let mut rng = test_rng(); - let programs = (0..GENERATED_CASES) - .map(|_| ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM)) + let programs = (0..cases) + .map(|_| ProducerOp::generate_program(&mut rng, ops_per_program)) .collect::>(); for (iter, program) in programs.into_iter().enumerate() { @@ -1971,7 +1960,7 @@ mod loom_tests { } }); - let processed = simulate_eventfd_loop_until(&waker, 0, publish_count); + let processed = simulate_loop_until(&waker, 0, publish_count); producer.join().unwrap(); assert_eq!( @@ -1992,4 +1981,39 @@ mod loom_tests { }); } } + + #[test] + fn generated_eventfd_loop_programs() { + // Generate deterministic single-producer programs before entering loom, + // then model each case with one producer and the eventfd loop simulator. + // The producer may interleave out-of-band `wake()` calls before, + // between, or after its generated `publish()` calls. + // + // The loop simulator must eventually observe exactly the generated + // publish count, regardless of whether progress arrives through + // `pending()` or through the arm, eventfd readiness, and `clear_wait()` + // path. Pure wakes are allowed to resume the loop, but they must not + // create sequence progress or disturb producer accounting. + const CASES: usize = 96; + const OPS_PER_PROGRAM: usize = 3; + + generated_loop_programs(CASES, OPS_PER_PROGRAM, simulate_eventfd_loop_until); + } + + #[test] + fn generated_futex_loop_programs() { + // Generate deterministic single-producer programs before entering loom, + // then model each case with one producer and the futex idle loop + // simulator. The producer may interleave out-of-band `wake()` calls + // before, between, or after its generated `publish()` calls. + // + // This is the futex-path counterpart to `generated_eventfd_loop_programs`. + // The loop simulator must drain exactly the generated publish count + // through `pending()` or `park_idle()`, while pure wakes may resume the + // futex wait without creating sequence progress. + const CASES: usize = 16; + const OPS_PER_PROGRAM: usize = 3; + + generated_loop_programs(CASES, OPS_PER_PROGRAM, simulate_futex_loop_until); + } } From db06010d07f3b1a186ff8593102b101c7e96d31d Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 4 May 2026 21:57:20 -0700 Subject: [PATCH 7/7] nits --- .github/workflows/conformance-label.yml | 4 ++-- .github/workflows/loom.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/conformance-label.yml b/.github/workflows/conformance-label.yml index e12de9c3bcb..ea377a98e64 100644 --- a/.github/workflows/conformance-label.yml +++ b/.github/workflows/conformance-label.yml @@ -9,8 +9,8 @@ permissions: contents: read jobs: - conformance-label-check: - name: Conformance Label Check + Check: + name: Check runs-on: ubuntu-latest steps: - name: Checkout repository diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 4fb7ce4c928..268eb42afa9 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -18,7 +18,7 @@ env: jobs: Tests: - name: "Loom Tests (flags: \"${{ matrix.flags }}\")" + name: "Tests (flags: \"${{ matrix.flags }}\")" runs-on: ubuntu-latest timeout-minutes: 60 strategy: