Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mutex = ["dep:slab"]
waker_slot = ["dep:futures-util"]
event = ["dep:event-listener", "dep:local-event"]
bilock = ["waker_slot"]
async_flag = ["waker_slot"]

[dev-dependencies]
futures = { version = "0.3.31", features = ["executor"] }
126 changes: 126 additions & 0 deletions src/async_flag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/// Multithreaded notifier
pub mod sync {
super::impl_notify!(sync);

impl crate::AssertMt for AsyncFlag {}
impl crate::AssertMt for AsyncFlagHandle {}
}

/// Singlethreaded notifier
pub mod unsync {
super::impl_notify!(unsync);
}

macro_rules! impl_notify {
($sync:ident) => {
use std::{
pin::Pin,
task::{Context, Poll},
};

use crate::$sync::{flag::Flag, shared::Shared, waker_slot::WakerSlot};

#[derive(Debug)]
struct Inner {
waker: WakerSlot,
set: Flag,
}

#[derive(Debug, Clone)]
struct AsyncFlagImpl(Shared<Inner>);

impl AsyncFlagImpl {
pub fn new() -> Self {
Self(Shared::new(Inner {
waker: WakerSlot::new(),
set: Flag::new(false),
}))
}

pub fn notify(&self) {
self.0.set.swap(true);
self.0.waker.wake();
}

pub fn notified(&self) -> bool {
self.0.set.get()
}
}

impl Future for AsyncFlagImpl {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// quick check to avoid registration if already done.
if self.0.set.get() {
return Poll::Ready(());
}

self.0.waker.register(cx.waker());

// Need to check condition **after** `register` to avoid a race
// condition that would result in lost notifications.
if self.0.set.get() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

/// An event that won't wake until [`AsyncFlagHandle::notify`] is called
/// successfully.
#[derive(Debug)]
pub struct AsyncFlag {
flag: AsyncFlagImpl,
}

impl Default for AsyncFlag {
fn default() -> Self {
Self::new()
}
}

impl AsyncFlag {
/// Create [`AsyncFlag`].
pub fn new() -> Self {
Self {
flag: AsyncFlagImpl::new(),
}
}

/// Get a handle to notify the flag.
pub fn handle(&self) -> AsyncFlagHandle {
AsyncFlagHandle::new(self.flag.clone())
}

/// Returns whether the event has been notified.
pub fn notified(&self) -> bool {
self.flag.notified()
}

/// Wait for [`AsyncFlagHandle::notify`] to be called.
pub async fn wait(self) {
self.flag.await
}
}

/// A wake up handle to [`AsyncFlag`].
pub struct AsyncFlagHandle {
flag: AsyncFlagImpl,
}

impl AsyncFlagHandle {
fn new(flag: AsyncFlagImpl) -> Self {
Self { flag }
}

/// Notify the event.
pub fn notify(self) {
self.flag.notify()
}
}
};
}

use impl_notify;
1 change: 1 addition & 0 deletions src/flag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ macro_rules! impl_flag {
use crate::$sync::atomic::AtomicBool;

/// A boolean flag
#[derive(Debug)]
pub struct Flag(AtomicBool);

impl Flag {
Expand Down
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]

#[cfg(feature = "async_flag")]
mod async_flag;
#[cfg(feature = "bilock")]
mod bilock;
#[cfg(feature = "event")]
Expand All @@ -62,6 +64,9 @@ pub mod sync {
#[cfg(feature = "watch")]
pub use see::sync as watch;

#[doc(inline)]
#[cfg(feature = "async_flag")]
pub use crate::async_flag::sync as notify;
#[doc(inline)]
#[cfg(feature = "bilock")]
pub use crate::bilock::sync as bilock;
Expand All @@ -88,6 +93,9 @@ pub mod unsync {
#[cfg(feature = "watch")]
pub use see::unsync as watch;

#[doc(inline)]
#[cfg(feature = "async_flag")]
pub use crate::async_flag::unsync as notify;
#[doc(inline)]
#[cfg(feature = "bilock")]
pub use crate::bilock::unsync as bilock;
Expand Down