Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crossbeam-utils/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod parker;
mod sharded_lock;
mod wait_group;

pub use self::parker::{Parker, UnparkReason, Unparker};
pub use self::parker::{Parker, UnparkReason, UnparkResult, Unparker};
#[cfg(not(crossbeam_loom))]
pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
pub use self::wait_group::WaitGroup;
62 changes: 58 additions & 4 deletions crossbeam-utils/src/sync/parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,44 @@ impl Unparker {
/// [`park`]: Parker::park
/// [`park_timeout`]: Parker::park_timeout
pub fn unpark(&self) {
self.inner.unpark();
}

/// Atomically makes the token available if it is not already, returning an [`UnparkResult`]
/// indicating the result of the unpark operation.
///
/// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
/// any.
///
/// # Examples
///
/// ```
/// use std::thread;
/// use std::time::Duration;
/// use crossbeam_utils::sync::{Parker, UnparkResult};
///
/// let p = Parker::new();
/// let u = p.unparker().clone();
///
/// let result = u.unpark_with_result();
/// assert_eq!(result, UnparkResult::NotParked);
/// p.park(); // consume the token and immediately return
///
/// # let t =
/// thread::spawn(move || {
/// thread::sleep(Duration::from_millis(500));
/// let result = u.unpark_with_result();
/// assert_eq!(result, UnparkResult::Notified);
/// });
///
/// // Wakes up when `u.unpark()` provides the token.
/// p.park();
/// # t.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
///
/// [`park`]: Parker::park
/// [`park_timeout`]: Parker::park_timeout
pub fn unpark_with_result(&self) -> UnparkResult {
self.inner.unpark()
}

Expand Down Expand Up @@ -324,6 +362,21 @@ pub enum UnparkReason {
Timeout,
}

/// An enum that reports the result of an `Unparker::unpark` call. This includes whether the parker
/// was parked when `unpark` was called, and if so, whether it was already notified by a previous
/// `unpark` call.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnparkResult {
/// The parker was not parked when `unpark` was called.
NotParked,

/// The parker was parked, but was already notified by an earlier `unpark` call.
AlreadyNotified,

/// The parker was parked and has been successfully notified by this `unpark` call.
Notified,
}

const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;
Expand Down Expand Up @@ -407,15 +460,15 @@ impl Inner {
}
}

pub(crate) fn unpark(&self) {
pub(crate) fn unpark(&self) -> UnparkResult {
// To ensure the unparked thread will observe any writes we made before this call, we must
// perform a release operation that `park` can synchronize with. To do that we must write
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
// than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
EMPTY => return UnparkResult::NotParked, // no one was waiting
NOTIFIED => return UnparkResult::AlreadyNotified, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}

Expand All @@ -429,5 +482,6 @@ impl Inner {
// it doesn't get woken only to have to wait for us to release `lock`.
drop(self.lock.lock().unwrap());
self.cvar.notify_one();
UnparkResult::Notified
}
}
30 changes: 28 additions & 2 deletions crossbeam-utils/tests/parker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::thread::sleep;
use std::thread::{sleep, spawn};
use std::time::Duration;

use crossbeam_utils::sync::{Parker, UnparkReason};
use crossbeam_utils::sync::{Parker, UnparkReason, UnparkResult};
use crossbeam_utils::thread;

#[test]
Expand Down Expand Up @@ -47,3 +47,29 @@ fn park_timeout_unpark_called_other_thread() {
.unwrap();
}
}

#[test]
fn unpark_with_result_called_before_park() {
let p = Parker::new();
let u = p.unparker().clone();

let result = u.unpark_with_result();
assert_eq!(result, UnparkResult::NotParked);

p.park(); // consume the token and immediately return
}

#[test]
fn unpark_with_result_called_after_park() {
let p = Parker::new();
let u = p.unparker().clone();

let t = spawn(move || {
sleep(Duration::from_millis(500));
let result = u.unpark_with_result();
assert_eq!(result, UnparkResult::Notified);
});

p.park(); // consume the token and immediately return
t.join().unwrap();
}