Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions compio-runtime/src/cancel.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
use std::{
cell::{Cell, RefCell},
collections::HashSet,
fmt::Debug,
mem,
ops::DerefMut,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};

use compio_driver::{Cancel, Key, OpCode};
use compio_driver::{Cancel, Key, OpCode, Proactor};
use futures_util::{FutureExt, ready};
use synchrony::unsync::event::{Event, EventListener};

use crate::{ContextExt, Runtime};

#[derive(Debug)]
struct Inner {
tokens: RefCell<HashSet<Cancel>>,
is_cancelled: Cell<bool>,
runtime: Runtime,
driver: Rc<RefCell<Proactor>>,
notify: Event,
}

impl Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("tokens", &self.tokens)
.field("is_cancelled", &self.is_cancelled)
.field("driver", &"...")
.field("notify", &self.notify)
.finish()
}
}

/// A token that can be used to cancel multiple operations at once.
///
/// When [`CancelToken::cancel`] is called, all operations that have been
Expand Down Expand Up @@ -58,7 +69,7 @@ impl CancelToken {
Self(Rc::new(Inner {
tokens: RefCell::new(HashSet::new()),
is_cancelled: Cell::new(false),
runtime: Runtime::current(),
driver: Runtime::with_current(|r| r.driver.clone()),
notify: Event::new(),
}))
}
Expand All @@ -75,7 +86,7 @@ impl CancelToken {
}
let tokens = mem::take(self.0.tokens.borrow_mut().deref_mut());
for t in tokens {
self.0.runtime.cancel_token(t);
self.0.driver.borrow_mut().cancel_token(t);
}
}

Expand All @@ -96,9 +107,9 @@ impl CancelToken {
/// [`with_cancel`]: crate::FutureExt::with_cancel
pub fn register<T: OpCode>(&self, key: &Key<T>) {
if self.0.is_cancelled.get() {
self.0.runtime.cancel(key.clone());
self.0.driver.borrow_mut().cancel(key.clone());
} else {
let token = self.0.runtime.register_cancel(key);
let token = self.0.driver.borrow_mut().register_cancel(key);
self.0.tokens.borrow_mut().insert(token);
}
}
Expand Down
53 changes: 31 additions & 22 deletions compio-runtime/src/future/future.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
//! Future for submitting operations to the runtime.

use std::{
cell::RefCell,
future::Future,
marker::PhantomData,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};

use compio_buf::BufResult;
use compio_driver::{Extra, Key, OpCode, PushEntry};
use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry};
use futures_util::future::FusedFuture;

use crate::{
CancelToken, Runtime,
CancelToken,
future::{poll_task, poll_task_with_extra, submit_raw},
waker::{get_ext, get_waker},
};

Expand Down Expand Up @@ -59,19 +62,18 @@ pin_project_lite::pin_project! {
///
/// [`.with_extra()`]: Submit::with_extra
pub struct Submit<T: OpCode, E = ()> {
runtime: Runtime,
driver: Rc<RefCell<Proactor>>,
state: Option<State<T, E>>,
}

impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if let Some(State::Submitted { key, .. }) = this.state.take() {
this.runtime.cancel(key);
this.driver.borrow_mut().cancel(key);
}
}
}

}

enum State<T: OpCode, E> {
Expand All @@ -89,9 +91,9 @@ impl<T: OpCode, E> State<T, E> {
}

impl<T: OpCode> Submit<T, ()> {
pub(crate) fn new(runtime: Runtime, op: T) -> Self {
pub(crate) fn new(driver: Rc<RefCell<Proactor>>, op: T) -> Self {
Submit {
runtime,
driver,
state: Some(State::Idle { op }),
}
}
Expand All @@ -101,10 +103,10 @@ impl<T: OpCode> Submit<T, ()> {
/// This is useful if you need to access extra information provided by the
/// runtime upon completion of the operation.
pub fn with_extra(mut self) -> Submit<T, Extra> {
let runtime = self.runtime.clone();
let driver = self.driver.clone();
let Some(state) = self.state.take() else {
return Submit {
runtime,
driver,
state: None,
};
};
Expand All @@ -116,7 +118,7 @@ impl<T: OpCode> Submit<T, ()> {
State::Idle { op } => State::Idle { op },
};
Submit {
runtime,
driver,
state: Some(state),
}
}
Expand All @@ -130,16 +132,20 @@ impl<T: OpCode + 'static> Future for Submit<T, ()> {

loop {
match this.state.take().expect("Cannot poll after ready") {
State::Submitted { key, .. } => match this.runtime.poll_task(cx.get_waker(), key) {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
State::Submitted { key, .. } => {
let entry = poll_task(&mut this.driver.borrow_mut(), cx.get_waker(), key);
match entry {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
}
PushEntry::Ready(res) => return Poll::Ready(res),
}
PushEntry::Ready(res) => return Poll::Ready(res),
},
}
State::Idle { op } => {
let extra = cx.as_extra(|| this.runtime.default_extra());
match this.runtime.submit_raw(op, extra) {
let extra = cx.as_extra(|| this.driver.borrow().default_extra());
let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
match entry {
PushEntry::Pending(key) => {
// TODO: Should we register it only the first time or every time it's
// being polled?
Expand Down Expand Up @@ -168,7 +174,9 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
loop {
match this.state.take().expect("Cannot poll after ready") {
State::Submitted { key, .. } => {
match this.runtime.poll_task_with_extra(cx.get_waker(), key) {
let entry =
poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key);
match entry {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));
return Poll::Pending;
Expand All @@ -177,8 +185,9 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
}
}
State::Idle { op } => {
let extra = cx.as_extra(|| this.runtime.default_extra());
match this.runtime.submit_raw(op, extra) {
let extra = cx.as_extra(|| this.driver.borrow().default_extra());
let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
match entry {
PushEntry::Pending(key) => {
if let Some(cancel) = cx.get_cancel() {
cancel.register(&key);
Expand All @@ -187,7 +196,7 @@ impl<T: OpCode + 'static> Future for Submit<T, Extra> {
*this.state = Some(State::submitted(key))
}
PushEntry::Ready(res) => {
return Poll::Ready((res, this.runtime.default_extra()));
return Poll::Ready((res, this.driver.borrow().default_extra()));
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions compio-runtime/src/future/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,57 @@
use std::task::Waker;

use compio_buf::BufResult;
use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry};
use compio_log::instrument;

fn poll_task<T: OpCode>(
driver: &mut Proactor,
waker: &Waker,
key: Key<T>,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
instrument!(compio_log::Level::DEBUG, "poll_task", ?key);
driver.pop(key).map_pending(|k| {
driver.update_waker(&k, waker);
k
})
}

fn poll_task_with_extra<T: OpCode>(
driver: &mut Proactor,
waker: &Waker,
key: Key<T>,
) -> PushEntry<Key<T>, (BufResult<usize, T>, Extra)> {
instrument!(compio_log::Level::DEBUG, "poll_task_with_extra", ?key);
driver.pop_with_extra(key).map_pending(|k| {
driver.update_waker(&k, waker);
k
})
}

fn poll_multishot<T: OpCode>(
driver: &mut Proactor,
waker: &Waker,
key: &Key<T>,
) -> Option<BufResult<usize, Extra>> {
instrument!(compio_log::Level::DEBUG, "poll_multishot", ?key);
if let Some(res) = driver.pop_multishot(key) {
return Some(res);
}
driver.update_waker(key, waker);
None
}

fn submit_raw<T: OpCode + 'static>(
driver: &mut Proactor,
op: T,
extra: Option<Extra>,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
match extra {
Some(e) => driver.push_with_extra(op, e),
None => driver.push(op),
}
}

mod combinator;
#[allow(clippy::module_inception)]
mod future;
Expand Down
32 changes: 21 additions & 11 deletions compio-runtime/src/future/stream.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
use std::{
cell::RefCell,
marker::PhantomData,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};

use compio_buf::{BufResult, SetLen};
use compio_driver::{
BufferPool, BufferRef, Extra, Key, OpCode, PushEntry, TakeBuffer,
BufferPool, BufferRef, Extra, Key, OpCode, Proactor, PushEntry, TakeBuffer,
op::{RecvFromMultiResult, RecvMsgMultiResult},
};
use futures_util::{Stream, StreamExt, stream::FusedStream};

use crate::{ContextExt, Runtime};
use crate::{
ContextExt,
future::{poll_multishot, poll_task_with_extra, submit_raw},
};

pin_project_lite::pin_project! {
/// Returned [`Stream`] for [`Runtime::submit_multi`].
///
/// When this is dropped and the operation hasn't finished yet, it will try to
/// cancel the operation.
pub struct SubmitMulti<T: OpCode> {
runtime: Runtime,
driver: Rc<RefCell<Proactor>>,
state: Option<State<T>>,
}

impl<T: OpCode> PinnedDrop for SubmitMulti<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
if let Some(State::Submitted { key }) = this.state.take() {
this.runtime.cancel(key);
this.driver.borrow_mut().cancel(key);
}
}
}
Expand All @@ -46,9 +51,9 @@ impl<T: OpCode> State<T> {
}

impl<T: OpCode> SubmitMulti<T> {
pub(crate) fn new(runtime: Runtime, op: T) -> Self {
pub(crate) fn new(driver: Rc<RefCell<Proactor>>, op: T) -> Self {
SubmitMulti {
runtime,
driver,
state: Some(State::Idle { op }),
}
}
Expand Down Expand Up @@ -82,8 +87,9 @@ impl<T: OpCode + 'static> Stream for SubmitMulti<T> {
loop {
match this.state.take().expect("State error, this is a bug") {
State::Idle { op } => {
let extra = cx.as_extra(|| this.runtime.default_extra());
match this.runtime.submit_raw(op, extra) {
let extra = cx.as_extra(|| this.driver.borrow().default_extra());
let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
match entry {
PushEntry::Pending(key) => {
if let Some(cancel) = cx.get_cancel() {
cancel.register(&key);
Expand All @@ -93,21 +99,25 @@ impl<T: OpCode + 'static> Stream for SubmitMulti<T> {
}
PushEntry::Ready(BufResult(res, op)) => {
*this.state = Some(State::Finished { op });
let extra = this.runtime.default_extra();
let extra = this.driver.borrow().default_extra();

return Poll::Ready(Some(BufResult(res, extra)));
}
}
}

State::Submitted { key, .. } => {
if let Some(res) = this.runtime.poll_multishot(cx.get_waker(), &key) {
if let Some(res) =
poll_multishot(&mut this.driver.borrow_mut(), cx.get_waker(), &key)
{
*this.state = Some(State::submitted(key));

return Poll::Ready(Some(res));
};

match this.runtime.poll_task_with_extra(cx.get_waker(), key) {
let entry =
poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key);
match entry {
PushEntry::Pending(key) => {
*this.state = Some(State::submitted(key));

Expand Down
Loading
Loading