Skip to content

Commit 23a60f8

Browse files
authored
feat(driver): multishot op (#715)
* feat(driver,iour): support multishot cqe * refactor(driver): take_buffer * feat(driver): take_buffer for multishot result * feat(driver): read multi * feat(driver): read multi at & recv multi * test(driver): recv multi * fix(driver): read file at * fix(driver): warnings * docs(driver): for read* * fix(driver): format * fix(driver): remove queue if empty * fix(driver,iour): use question mark * fix(driver,iour): cleanup multishot on pop * fix(driver,iour): move `wake_by_ref` * fix(driver,iour): avoid `as_raw()` * fix(driver,iour): use `ErasedKey` as key
1 parent 81e0b0d commit 23a60f8

11 files changed

Lines changed: 422 additions & 70 deletions

File tree

compio-driver/src/key.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ impl<T: ?Sized> RawOp<T> {
4949
// SAFETY: inner is always pinned with ThinCell.
5050
unsafe { Pin::new_unchecked(&mut self.op) }
5151
}
52+
53+
#[cfg(io_uring)]
54+
pub fn wake_by_ref(&mut self) {
55+
if let PushEntry::Pending(Some(w)) = &self.result {
56+
w.wake_by_ref();
57+
}
58+
}
5259
}
5360

5461
#[cfg(windows)]

compio-driver/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl Proactor {
232232
instrument!(compio_log::Level::DEBUG, "pop", ?key);
233233
if key.has_result() {
234234
self.cancel.remove(&key);
235+
self.driver.cleanup_multishot(&key);
235236
PushEntry::Ready(key.take_result())
236237
} else {
237238
PushEntry::Pending(key)
@@ -251,6 +252,7 @@ impl Proactor {
251252
instrument!(compio_log::Level::DEBUG, "pop", ?key);
252253
if key.has_result() {
253254
self.cancel.remove(&key);
255+
self.driver.cleanup_multishot(&key);
254256
let extra = key.swap_extra(self.default_extra());
255257
let res = key.take_result();
256258
PushEntry::Ready((res, extra))
@@ -259,6 +261,14 @@ impl Proactor {
259261
}
260262
}
261263

264+
/// Get one completion entry for a multishot operation. If it returns
265+
/// [`None`], the user should call [`Proactor::pop_with_extra`] to get the
266+
/// final result of the operation.
267+
pub fn pop_multishot<T>(&mut self, key: &Key<T>) -> Option<BufResult<usize, Extra>> {
268+
instrument!(compio_log::Level::DEBUG, "pop_multishot", ?key);
269+
self.driver.pop_multishot(key)
270+
}
271+
262272
/// Update the waker of the specified op.
263273
pub fn update_waker<T>(&mut self, op: &mut Key<T>, waker: &Waker) {
264274
op.set_waker(waker);

compio-driver/src/op.rs

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ pub use crate::sys::op::{
2525
WriteVectored, WriteVectoredAt,
2626
};
2727
#[cfg(io_uring)]
28-
pub use crate::sys::op::{ReadManaged, ReadManagedAt, RecvFromManaged, RecvManaged};
28+
pub use crate::sys::op::{
29+
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
30+
};
2931
use crate::{Extra, OwnedFd, SharedFd, TakeBuffer, sys::aio::*};
3032

3133
/// Trait to update the buffer length inside the [`BufResult`].
@@ -177,6 +179,24 @@ impl<T: TakeBuffer> ResultTakeBuffer for (BufResult<usize, T>, Extra) {
177179
}
178180
}
179181

182+
impl ResultTakeBuffer for BufResult<usize, Extra> {
183+
type Buffer<'a> = crate::BorrowedBuffer<'a>;
184+
type BufferPool = crate::BufferPool;
185+
186+
fn take_buffer(self, pool: &Self::BufferPool) -> io::Result<Self::Buffer<'_>> {
187+
#[cfg(io_uring)]
188+
{
189+
let BufResult(result, extra) = self;
190+
crate::sys::take_buffer(pool, result, extra.buffer_id()?)
191+
}
192+
#[cfg(not(io_uring))]
193+
{
194+
let _pool = pool;
195+
unreachable!("take_buffer should not be called for non-io-uring ops")
196+
}
197+
}
198+
}
199+
180200
pin_project! {
181201
/// Spawn a blocking function in the thread pool.
182202
pub struct Asyncify<F, D> {
@@ -477,6 +497,21 @@ pub(crate) mod managed {
477497
use super::{Read, ReadAt, Recv, RecvFrom};
478498
use crate::{AsFd, BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
479499

500+
fn take_buffer(
501+
slice: OwnedBuffer,
502+
buffer_pool: &BufferPool,
503+
result: io::Result<usize>,
504+
) -> io::Result<BorrowedBuffer<'_>> {
505+
let result = result?;
506+
#[cfg(fusion)]
507+
let buffer_pool = buffer_pool.as_poll();
508+
// SAFETY: result is valid
509+
let res = unsafe { buffer_pool.create_proxy(slice, result) };
510+
#[cfg(fusion)]
511+
let res = BorrowedBuffer::new_poll(res);
512+
Ok(res)
513+
}
514+
480515
pin_project! {
481516
/// Read a file at specified position into managed buffer.
482517
pub struct ReadManagedAt<S> {
@@ -506,15 +541,7 @@ pub(crate) mod managed {
506541
result: io::Result<usize>,
507542
_: u16,
508543
) -> io::Result<BorrowedBuffer<'_>> {
509-
let result = result?;
510-
#[cfg(fusion)]
511-
let buffer_pool = buffer_pool.as_poll();
512-
let slice = self.op.into_inner();
513-
// SAFETY: result is valid
514-
let res = unsafe { buffer_pool.create_proxy(slice, result) };
515-
#[cfg(fusion)]
516-
let res = BorrowedBuffer::new_poll(res);
517-
Ok(res)
544+
take_buffer(self.op.into_inner(), buffer_pool, result)
518545
}
519546
}
520547

@@ -547,15 +574,7 @@ pub(crate) mod managed {
547574
result: io::Result<usize>,
548575
_: u16,
549576
) -> io::Result<Self::Buffer<'_>> {
550-
let result = result?;
551-
#[cfg(fusion)]
552-
let buffer_pool = buffer_pool.as_poll();
553-
let slice = self.op.into_inner();
554-
// SAFETY: result is valid
555-
let res = unsafe { buffer_pool.create_proxy(slice, result) };
556-
#[cfg(fusion)]
557-
let res = BorrowedBuffer::new_poll(res);
558-
Ok(res)
577+
take_buffer(self.op.into_inner(), buffer_pool, result)
559578
}
560579
}
561580

@@ -591,15 +610,7 @@ pub(crate) mod managed {
591610
result: io::Result<usize>,
592611
_: u16,
593612
) -> io::Result<Self::Buffer<'_>> {
594-
let result = result?;
595-
#[cfg(fusion)]
596-
let buffer_pool = buffer_pool.as_poll();
597-
let slice = self.op.into_inner();
598-
// SAFETY: result is valid
599-
let res = unsafe { buffer_pool.create_proxy(slice, result) };
600-
#[cfg(fusion)]
601-
let res = BorrowedBuffer::new_poll(res);
602-
Ok(res)
613+
take_buffer(self.op.into_inner(), buffer_pool, result)
603614
}
604615
}
605616

@@ -644,6 +655,13 @@ pub(crate) mod managed {
644655
Ok((res, addr))
645656
}
646657
}
658+
659+
/// Read a file at specified position into multiple managed buffers.
660+
pub type ReadMultiAt<S> = ReadManagedAt<S>;
661+
/// Read a file into multiple managed buffers.
662+
pub type ReadMulti<S> = ReadManaged<S>;
663+
/// Receive data from remote into multiple managed buffers.
664+
pub type RecvMulti<S> = RecvManaged<S>;
647665
}
648666

649667
#[cfg(not(io_uring))]

compio-driver/src/sys/fusion/mod.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ use std::{
88
time::Duration,
99
};
1010

11+
use compio_buf::BufResult;
1112
use compio_log::warn;
1213
pub use iour::{IourOpCode, OpEntry};
1314
pub use poll::{Decision, OpType, PollOpCode};
1415

15-
pub(crate) use super::iour::is_op_supported;
16+
pub(crate) use super::iour::{is_op_supported, take_buffer};
1617
use super::{iour, poll};
1718
pub use crate::driver_type::DriverType; // Re-export so current user won't be broken
1819
use crate::{BufferPool, ProactorBuilder, key::ErasedKey};
@@ -199,6 +200,23 @@ impl Driver {
199200
}
200201
}
201202
}
203+
204+
pub fn pop_multishot(
205+
&mut self,
206+
key: &ErasedKey,
207+
) -> Option<BufResult<usize, crate::sys::Extra>> {
208+
match &mut self.fuse {
209+
FuseDriver::Poll(driver) => driver.pop_multishot(key),
210+
FuseDriver::IoUring(driver) => driver.pop_multishot(key),
211+
}
212+
}
213+
214+
pub fn cleanup_multishot(&mut self, key: &ErasedKey) {
215+
match &mut self.fuse {
216+
FuseDriver::Poll(driver) => driver.cleanup_multishot(key),
217+
FuseDriver::IoUring(driver) => driver.cleanup_multishot(key),
218+
}
219+
}
202220
}
203221

204222
impl AsRawFd for Driver {

compio-driver/src/sys/fusion/op.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,6 @@ mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize)
210210
mop!(<S: AsFd> ReadManaged(fd: S, pool: &BufferPool, len: usize) with pool);
211211
mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool);
212212
mop!(<S: AsFd> RecvFromManaged(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool, buffer: (crate::BorrowedBuffer<'a>, Option<SockAddr>));
213+
mop!(<S: AsFd> ReadMultiAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
214+
mop!(<S: AsFd> ReadMulti(fd: S, pool: &BufferPool, len: usize) with pool);
215+
mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool);

compio-driver/src/sys/iocp/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::{
1212
time::Duration,
1313
};
1414

15+
use compio_buf::BufResult;
1516
use compio_log::{instrument, trace};
1617
use windows_sys::Win32::{
1718
Foundation::{ERROR_CANCELLED, HANDLE},
@@ -497,6 +498,12 @@ impl Driver {
497498
pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
498499
Ok(())
499500
}
501+
502+
pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
503+
None
504+
}
505+
506+
pub fn cleanup_multishot(&mut self, _: &ErasedKey) {}
500507
}
501508

502509
impl AsRawFd for Driver {

compio-driver/src/sys/iour/mod.rs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22
#[allow(unused_imports)]
33
pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
44
use std::{
5+
collections::{HashMap, VecDeque},
56
io,
6-
marker::PhantomData,
77
os::fd::FromRawFd,
88
pin::Pin,
99
sync::Arc,
1010
task::{Poll, Wake, Waker},
1111
time::Duration,
1212
};
1313

14+
use compio_buf::BufResult;
1415
use compio_log::{instrument, trace, warn};
1516
cfg_if::cfg_if! {
1617
if #[cfg(feature = "io-uring-cqe32")] {
@@ -37,13 +38,14 @@ use slab::Slab;
3738

3839
use crate::{
3940
AsyncifyPool, BufferPool, DriverType, Entry, ProactorBuilder,
40-
key::{ErasedKey, RefExt},
41+
key::{BorrowedKey, ErasedKey, RefExt},
4142
syscall,
4243
};
4344

4445
mod extra;
4546
pub use extra::Extra;
4647
pub(crate) mod op;
48+
pub(crate) use op::take_buffer;
4749

4850
pub(crate) fn is_op_supported(code: u8) -> bool {
4951
#[cfg(feature = "once_cell_try")]
@@ -157,7 +159,7 @@ pub(crate) struct Driver {
157159
completed_rx: Receiver<Entry>,
158160
buffer_group_ids: Slab<()>,
159161
need_push_notifier: bool,
160-
_local_marker: PhantomData<ErasedKey>,
162+
multishot_results: HashMap<ErasedKey, VecDeque<CEntry>>,
161163
}
162164

163165
impl Driver {
@@ -206,7 +208,7 @@ impl Driver {
206208
pool: builder.create_or_get_thread_pool(),
207209
buffer_group_ids: Slab::new(),
208210
need_push_notifier: true,
209-
_local_marker: PhantomData,
211+
multishot_results: HashMap::new(),
210212
})
211213
}
212214

@@ -298,7 +300,19 @@ impl Driver {
298300
}
299301
self.notifier.clear().expect("cannot clear notifier");
300302
}
301-
_ => create_entry(entry).notify(),
303+
key => {
304+
let flags = entry.flags();
305+
if more(flags) {
306+
let key = unsafe { BorrowedKey::from_raw(key as _) };
307+
key.borrow().wake_by_ref();
308+
self.multishot_results
309+
.entry(key.clone())
310+
.or_default()
311+
.push_back(entry);
312+
} else {
313+
create_entry(entry).notify()
314+
}
315+
}
302316
}
303317
}
304318
has_entry
@@ -331,6 +345,7 @@ impl Driver {
331345
warn!("could not push AsyncCancel entry");
332346
}
333347
}
348+
self.cleanup_multishot(&key);
334349
}
335350

336351
fn push_raw_with_key(&mut self, entry: SEntry, key: ErasedKey) -> io::Result<()> {
@@ -512,6 +527,23 @@ impl Driver {
512527

513528
Ok(())
514529
}
530+
531+
pub fn pop_multishot(
532+
&mut self,
533+
key: &ErasedKey,
534+
) -> Option<BufResult<usize, crate::sys::Extra>> {
535+
let queue = self.multishot_results.get_mut(key)?;
536+
let entry = queue.pop_front()?;
537+
let result = create_result(entry.result());
538+
#[allow(clippy::useless_conversion)]
539+
let mut extra: crate::sys::Extra = self.default_extra().into();
540+
extra.set_flags(entry.flags());
541+
Some(BufResult(result, extra))
542+
}
543+
544+
pub fn cleanup_multishot(&mut self, key: &ErasedKey) {
545+
self.multishot_results.remove(key);
546+
}
515547
}
516548

517549
impl AsRawFd for Driver {
@@ -522,7 +554,16 @@ impl AsRawFd for Driver {
522554

523555
fn create_entry(cq_entry: CEntry) -> Entry {
524556
let result = cq_entry.result();
525-
let result = if result < 0 {
557+
let result = create_result(result);
558+
let key = unsafe { ErasedKey::from_raw(cq_entry.user_data() as _) };
559+
let mut entry = Entry::new(key, result);
560+
entry.set_flags(cq_entry.flags());
561+
562+
entry
563+
}
564+
565+
fn create_result(result: i32) -> io::Result<usize> {
566+
if result < 0 {
526567
let result = if result == -libc::ECANCELED {
527568
libc::ETIMEDOUT
528569
} else {
@@ -531,12 +572,7 @@ fn create_entry(cq_entry: CEntry) -> Entry {
531572
Err(io::Error::from_raw_os_error(result))
532573
} else {
533574
Ok(result as _)
534-
};
535-
let key = unsafe { ErasedKey::from_raw(cq_entry.user_data() as _) };
536-
let mut entry = Entry::new(key, result);
537-
entry.set_flags(cq_entry.flags());
538-
539-
entry
575+
}
540576
}
541577

542578
fn timespec(duration: std::time::Duration) -> Timespec {

0 commit comments

Comments
 (0)