Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pollster"
version = "0.2.5"
version = "0.2.6"
description = "Synchronously block the thread until a future completes"
categories = ["asynchronous", "concurrency"]
keywords = ["async", "minimal", "executor", "runtime", "block_on"]
Expand All @@ -12,4 +12,5 @@ readme = "README.md"

[dev-dependencies]
futures-timer = "3.0"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "time", "rt", "rt-multi-thread"] }
pollster_old = { package = "pollster", version = "=0.2.5" }
30 changes: 30 additions & 0 deletions benches/basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#![feature(test)]
extern crate test;

use test::{black_box, Bencher};

fn simple_fut() -> std::pin::Pin<Box<dyn std::future::Future<Output = ()>>> {
Box::pin(tokio::time::sleep(std::time::Duration::from_nanos(1)))
}

#[bench]
fn basic_pollster(b: &mut Bencher) {
let rt = tokio::runtime::Runtime::new().unwrap();
let guard = rt.enter();

use pollster::FutureExt as _;
b.iter(|| {
black_box(simple_fut().block_on());
});
}

#[bench]
fn basic_pollster_old(b: &mut Bencher) {
let rt = tokio::runtime::Runtime::new().unwrap();
let guard = rt.enter();

use pollster_old::FutureExt as _;
b.iter(|| {
black_box(simple_fut().block_on());
});
}
140 changes: 70 additions & 70 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#![doc = include_str!("../README.md")]
#![feature(once_cell, pin_macro)]

use std::{
future::Future,
sync::{Arc, Condvar, Mutex},
sync::Arc,
task::{Context, Poll, Wake, Waker},
pin::pin,
thread::{self, Thread},
sync::atomic::{AtomicUsize, Ordering},
mem,
};

/// An extension trait that allows blocking on a future in suffix position.
Expand All @@ -24,74 +29,71 @@ pub trait FutureExt: Future {

impl<F: Future> FutureExt for F {}

enum SignalState {
Empty,
Waiting,
Notified,
}
// struct Signal {
// thread: AtomicUsize,
// }

struct Signal {
state: Mutex<SignalState>,
cond: Condvar,
}
// impl Signal {
// fn new() -> Self {
// Self { thread: AtomicUsize::new(0) }
// }

impl Signal {
fn new() -> Self {
Self {
state: Mutex::new(SignalState::Empty),
cond: Condvar::new(),
}
}
// fn wait(&self) {
// let thread_ptr = unsafe { Arc::into_raw(mem::transmute::<Thread, Arc<()>>(thread::current())) };
// match self.thread.compare_exchange(
// 0,
// thread_ptr as usize,
// Ordering::Acquire,
// Ordering::Relaxed,
// ) {
// Ok(_) => {
// while self.thread.load(Ordering::Relaxed) == thread_ptr as usize {
// thread::park();
// }
// },
// Err(_) => {},
// }
// }

fn wait(&self) {
let mut state = self.state.lock().unwrap();
match *state {
SignalState::Notified => {
// Notify() was called before we got here, consume it here without waiting and return immediately.
*state = SignalState::Empty;
return;
}
// This should not be possible because our signal is created within a function and never handed out to any
// other threads. If this is the case, we have a serious problem so we panic immediately to avoid anything
// more problematic happening.
SignalState::Waiting => {
unreachable!("Multiple threads waiting on the same signal: Open a bug report!");
}
SignalState::Empty => {
// Nothing has happened yet, and we're the only thread waiting (as should be the case!). Set the state
// accordingly and begin polling the condvar in a loop until it's no longer telling us to wait. The
// loop prevents incorrect spurious wakeups.
*state = SignalState::Waiting;
while let SignalState::Waiting = *state {
state = self.cond.wait(state).unwrap();
}
}
}
}
// fn notify(&self) {
// match self.thread.swap(1, Ordering::Acquire) {
// 0 => {}, // No thread waiting yet
// 1 => {}, // Notified twice, no effect
// ptr => unsafe {
// let thread = mem::transmute::<Arc<()>, Thread>(Arc::from_raw(ptr as *mut ()));
// thread.unpark();
// mem::forget(thread);
// },
// }
// }
// }

fn notify(&self) {
let mut state = self.state.lock().unwrap();
match *state {
// The signal was already notified, no need to do anything because the thread will be waking up anyway
SignalState::Notified => {}
// The signal wasnt notified but a thread isnt waiting on it, so we can avoid doing unnecessary work by
// skipping the condvar and leaving behind a message telling the thread that a notification has already
// occurred should it come along in the future.
SignalState::Empty => *state = SignalState::Notified,
// The signal wasnt notified and there's a waiting thread. Reset the signal so it can be wait()'ed on again
// and wake up the thread. Because there should only be a single thread waiting, `notify_all` would also be
// valid.
SignalState::Waiting => {
*state = SignalState::Empty;
self.cond.notify_one();
}
}
}
}
// impl Drop for Signal {
// fn drop(&mut self) {
// match self.thread.load(Ordering::Relaxed) {
// 0 | 1 => {},
// ptr => unsafe {
// drop(mem::transmute::<Arc<()>, Thread>(Arc::from_raw(ptr as *mut ())));
// },
// }
// }
// }

impl Wake for Signal {
// impl Wake for Signal {
// fn wake(self: Arc<Self>) {
// self.notify();
// }
// }

struct ThreadSignal;

impl Wake for ThreadSignal {
fn wake(self: Arc<Self>) {
self.notify();
let this = unsafe { mem::transmute::<&Arc<Self>, &Thread>(&self) };
this.unpark();
std::mem::forget(this);

drop(unsafe { mem::transmute::<Arc<Self>, Thread>(self) });
}
}

Expand All @@ -105,25 +107,23 @@ impl Wake for Signal {
/// ```
pub fn block_on<F: Future>(mut fut: F) -> F::Output {
// Pin the future so that it can be polled.
// SAFETY: We shadow `fut` so that it cannot be used again. The future is now pinned to the stack and will not be
// moved until the end of this scope. This is, incidentally, exactly what the `pin_mut!` macro from `pin_utils`
// does.
let mut fut = unsafe { std::pin::Pin::new_unchecked(&mut fut) };
let mut fut = pin!(fut);

// Signal used to wake up the thread for polling as the future moves to completion. We need to use an `Arc`
// because, although the lifetime of `fut` is limited to this function, the underlying IO abstraction might keep
// the signal alive for far longer. `Arc` is a thread-safe way to allow this to happen.
// TODO: Investigate ways to reuse this `Arc<Signal>`... perhaps via a `static`?
let signal = Arc::new(Signal::new());
// let signal = Arc::new(Signal::new());
let signal = unsafe { mem::transmute::<Thread, Arc<ThreadSignal>>(thread::current()) };

// Create a context that will be passed to the future.
let waker = Waker::from(Arc::clone(&signal));
let waker = Waker::from(signal);
let mut context = Context::from_waker(&waker);

// Poll the future to completion
loop {
match fut.as_mut().poll(&mut context) {
Poll::Pending => signal.wait(),
Poll::Pending => thread::park(),//signal.wait(),
Poll::Ready(item) => break item,
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn mpsc() {
use tokio::sync::mpsc;

const BOUNDED: usize = 16;
const MESSAGES: usize = 100_000;
const MESSAGES: usize = 100;

let (a_tx, mut a_rx) = mpsc::channel(BOUNDED);
let (b_tx, mut b_rx) = mpsc::channel(BOUNDED);
Expand Down