Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions compio-driver/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ impl<T: ?Sized> RawOp<T> {
// SAFETY: inner is always pinned with ThinCell.
unsafe { Pin::new_unchecked(&mut self.op) }
}

#[cfg(io_uring)]
pub fn wake_by_ref(&mut self) {
if let PushEntry::Pending(Some(w)) = &self.result {
w.wake_by_ref();
}
}
}

#[cfg(windows)]
Expand Down
10 changes: 10 additions & 0 deletions compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl Proactor {
instrument!(compio_log::Level::DEBUG, "pop", ?key);
if key.has_result() {
self.cancel.remove(&key);
self.driver.cleanup_multishot(&key);
PushEntry::Ready(key.take_result())
} else {
PushEntry::Pending(key)
Expand All @@ -248,6 +249,7 @@ impl Proactor {
instrument!(compio_log::Level::DEBUG, "pop", ?key);
if key.has_result() {
self.cancel.remove(&key);
self.driver.cleanup_multishot(&key);
let extra = key.swap_extra(self.default_extra());
let res = key.take_result();
PushEntry::Ready((res, extra))
Expand All @@ -256,6 +258,14 @@ impl Proactor {
}
}

/// Get one completion entry for a multishot operation. If it returns
/// [`None`], the user should call [`Proactor::pop_with_extra`] to get the
/// final result of the operation.
pub fn pop_multishot<T>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
self.driver.pop_multishot(key)
}

/// Update the waker of the specified op.
pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: &Waker) {
op.set_waker(waker);
Expand Down
74 changes: 46 additions & 28 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub use crate::sys::op::{
WriteVectored, WriteVectoredAt,
};
#[cfg(io_uring)]
pub use crate::sys::op::{ReadManaged, ReadManagedAt, RecvFromManaged, RecvManaged};
pub use crate::sys::op::{
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
};
use crate::{Extra, OwnedFd, SharedFd, TakeBuffer, sys::aio::*};

/// Trait to update the buffer length inside the [`BufResult`].
Expand Down Expand Up @@ -177,6 +179,24 @@ impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, Extra) {
}
}

impl ResultTakeBuffer for BufResult<usize, Extra> {
type Buffer<'a> = crate::BorrowedBuffer<'a>;
type BufferPool = crate::BufferPool;

fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
#[cfg(io_uring)]
{
let BufResult(result, extra) = self;
crate::sys::take_buffer(pool, result, extra.buffer_id()?)
}
#[cfg(not(io_uring))]
{
let _pool = pool;
unreachable!("take_buffer should not be called for non-io-uring ops")
}
}
}

pin_project! {
/// Spawn a blocking function in the thread pool.
pub struct Asyncify<F, D> {
Expand Down Expand Up @@ -477,6 +497,21 @@ pub(crate) mod managed {
use super::{Read, ReadAt, Recv, RecvFrom};
use crate::{AsFd, BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};

fn take_buffer(
slice: OwnedBuffer,
buffer_pool: &BufferPool,
result: io::Result<usize>,
) -> io::Result<BorrowedBuffer<'_>> {
let result = result?;
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_poll();
// SAFETY: result is valid
let res = unsafe { buffer_pool.create_proxy(slice, result) };
#[cfg(fusion)]
let res = BorrowedBuffer::new_poll(res);
Ok(res)
}

pin_project! {
/// Read a file at specified position into managed buffer.
pub struct ReadManagedAt<S> {
Expand Down Expand Up @@ -506,15 +541,7 @@ pub(crate) mod managed {
result: io::Result<usize>,
_: u16,
) -> io::Result<BorrowedBuffer<'_>> {
let result = result?;
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_poll();
let slice = self.op.into_inner();
// SAFETY: result is valid
let res = unsafe { buffer_pool.create_proxy(slice, result) };
#[cfg(fusion)]
let res = BorrowedBuffer::new_poll(res);
Ok(res)
take_buffer(self.op.into_inner(), buffer_pool, result)
}
}

Expand Down Expand Up @@ -547,15 +574,7 @@ pub(crate) mod managed {
result: io::Result<usize>,
_: u16,
) -> io::Result<Self::Buffer<'_>> {
let result = result?;
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_poll();
let slice = self.op.into_inner();
// SAFETY: result is valid
let res = unsafe { buffer_pool.create_proxy(slice, result) };
#[cfg(fusion)]
let res = BorrowedBuffer::new_poll(res);
Ok(res)
take_buffer(self.op.into_inner(), buffer_pool, result)
}
}

Expand Down Expand Up @@ -591,15 +610,7 @@ pub(crate) mod managed {
result: io::Result<usize>,
_: u16,
) -> io::Result<Self::Buffer<'_>> {
let result = result?;
#[cfg(fusion)]
let buffer_pool = buffer_pool.as_poll();
let slice = self.op.into_inner();
// SAFETY: result is valid
let res = unsafe { buffer_pool.create_proxy(slice, result) };
#[cfg(fusion)]
let res = BorrowedBuffer::new_poll(res);
Ok(res)
take_buffer(self.op.into_inner(), buffer_pool, result)
}
}

Expand Down Expand Up @@ -644,6 +655,13 @@ pub(crate) mod managed {
Ok((res, addr))
}
}

/// Read a file at specified position into multiple managed buffers.
pub type ReadMultiAt<S> = ReadManagedAt<S>;
/// Read a file into multiple managed buffers.
pub type ReadMulti<S> = ReadManaged<S>;
/// Receive data from remote into multiple managed buffers.
pub type RecvMulti<S> = RecvManaged<S>;
}

#[cfg(not(io_uring))]
Expand Down
20 changes: 19 additions & 1 deletion compio-driver/src/sys/fusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use std::{
time::Duration,
};

use compio_buf::BufResult;
use compio_log::warn;
pub use iour::{IourOpCode, OpEntry};
pub use poll::{Decision, OpType, PollOpCode};

pub(crate) use super::iour::is_op_supported;
pub(crate) use super::iour::{is_op_supported, take_buffer};
use super::{iour, poll};
pub use crate::driver_type::DriverType; // Re-export so current user won't be broken
use crate::{BufferPool, ProactorBuilder, key::ErasedKey};
Expand Down Expand Up @@ -199,6 +200,23 @@ impl Driver {
}
}
}

pub fn pop_multishot(
&mut self,
key: &ErasedKey,
) -> Option<BufResult<usize, crate::sys::Extra>> {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.pop_multishot(key),
FuseDriver::IoUring(driver) => driver.pop_multishot(key),
}
}

pub fn cleanup_multishot(&mut self, key: &ErasedKey) {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.cleanup_multishot(key),
FuseDriver::IoUring(driver) => driver.cleanup_multishot(key),
}
}
}

impl AsRawFd for Driver {
Expand Down
3 changes: 3 additions & 0 deletions compio-driver/src/sys/fusion/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,6 @@ mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize)
mop!(<S: AsFd> ReadManaged(fd: S, pool: &BufferPool, len: usize) with pool);
mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool);
mop!(<S: AsFd> RecvFromManaged(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool, buffer: (crate::BorrowedBuffer<'a>, Option<SockAddr>));
mop!(<S: AsFd> ReadMultiAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
mop!(<S: AsFd> ReadMulti(fd: S, pool: &BufferPool, len: usize) with pool);
mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool);
7 changes: 7 additions & 0 deletions compio-driver/src/sys/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
time::Duration,
};

use compio_buf::BufResult;
use compio_log::{instrument, trace};
use windows_sys::Win32::{
Foundation::{ERROR_CANCELLED, HANDLE},
Expand Down Expand Up @@ -494,6 +495,12 @@ impl Driver {
pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
Ok(())
}

pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
None
}

pub fn cleanup_multishot(&mut self, _: &ErasedKey) {}
}

impl AsRawFd for Driver {
Expand Down
60 changes: 48 additions & 12 deletions compio-driver/src/sys/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
#[allow(unused_imports)]
pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use std::{
collections::{HashMap, VecDeque},
io,
marker::PhantomData,
os::fd::FromRawFd,
pin::Pin,
sync::Arc,
task::{Poll, Wake, Waker},
time::Duration,
};

use compio_buf::BufResult;
use compio_log::{instrument, trace, warn};
cfg_if::cfg_if! {
if #[cfg(feature = "io-uring-cqe32")] {
Expand All @@ -37,13 +38,14 @@ use slab::Slab;

use crate::{
AsyncifyPool, BufferPool, DriverType, Entry, ProactorBuilder,
key::{ErasedKey, RefExt},
key::{BorrowedKey, ErasedKey, RefExt},
syscall,
};

mod extra;
pub use extra::Extra;
pub(crate) mod op;
pub(crate) use op::take_buffer;

pub(crate) fn is_op_supported(code: u8) -> bool {
#[cfg(feature = "once_cell_try")]
Expand Down Expand Up @@ -157,7 +159,7 @@ pub(crate) struct Driver {
completed_rx: Receiver<Entry>,
buffer_group_ids: Slab<()>,
need_push_notifier: bool,
_local_marker: PhantomData<ErasedKey>,
multishot_results: HashMap<ErasedKey, VecDeque<CEntry>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement Ord for ErasedKey if you prefer BTreeMap.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either one is OK.

}

impl Driver {
Expand Down Expand Up @@ -206,7 +208,7 @@ impl Driver {
pool: builder.create_or_get_thread_pool(),
buffer_group_ids: Slab::new(),
need_push_notifier: true,
_local_marker: PhantomData,
multishot_results: HashMap::new(),
})
}

Expand Down Expand Up @@ -298,7 +300,19 @@ impl Driver {
}
self.notifier.clear().expect("cannot clear notifier");
}
_ => create_entry(entry).notify(),
key => {
let flags = entry.flags();
if more(flags) {
let key = unsafe { BorrowedKey::from_raw(key as _) };
key.borrow().wake_by_ref();
self.multishot_results
.entry(key.clone())
.or_default()
.push_back(entry);
} else {
create_entry(entry).notify()
}
}
}
}
has_entry
Expand Down Expand Up @@ -331,6 +345,7 @@ impl Driver {
warn!("could not push AsyncCancel entry");
}
}
self.cleanup_multishot(&key);
}

fn push_raw_with_key(&mut self, entry: SEntry, key: ErasedKey) -> io::Result<()> {
Expand Down Expand Up @@ -512,6 +527,23 @@ impl Driver {

Ok(())
}

pub fn pop_multishot(
&mut self,
key: &ErasedKey,
) -> Option<BufResult<usize, crate::sys::Extra>> {
let queue = self.multishot_results.get_mut(key)?;
let entry = queue.pop_front()?;
let result = create_result(entry.result());
#[allow(clippy::useless_conversion)]
let mut extra: crate::sys::Extra = self.default_extra().into();
extra.set_flags(entry.flags());
Some(BufResult(result, extra))
}

pub fn cleanup_multishot(&mut self, key: &ErasedKey) {
self.multishot_results.remove(key);
}
}

impl AsRawFd for Driver {
Expand All @@ -522,7 +554,16 @@ impl AsRawFd for Driver {

fn create_entry(cq_entry: CEntry) -> Entry {
let result = cq_entry.result();
let result = if result < 0 {
let result = create_result(result);
let key = unsafe { ErasedKey::from_raw(cq_entry.user_data() as _) };
let mut entry = Entry::new(key, result);
entry.set_flags(cq_entry.flags());

entry
}

fn create_result(result: i32) -> io::Result<usize> {
if result < 0 {
let result = if result == -libc::ECANCELED {
libc::ETIMEDOUT
} else {
Expand All @@ -531,12 +572,7 @@ fn create_entry(cq_entry: CEntry) -> Entry {
Err(io::Error::from_raw_os_error(result))
} else {
Ok(result as _)
};
let key = unsafe { ErasedKey::from_raw(cq_entry.user_data() as _) };
let mut entry = Entry::new(key, result);
entry.set_flags(cq_entry.flags());

entry
}
}

fn timespec(duration: std::time::Duration) -> Timespec {
Expand Down
Loading