Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions compio-driver/src/buffer_pool/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::{
};

use compio_buf::{IntoInner, IoBuf, IoBufMut, SetLen, Slice};
#[cfg(not(fusion))]
pub use {BufferPool as FallbackBufferPool, OwnedBuffer as FallbackOwnedBuffer};

struct BufferPoolInner {
buffers: RefCell<VecDeque<Vec<u8>>>,
Expand Down
3 changes: 2 additions & 1 deletion compio-driver/src/buffer_pool/fusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::{
};

pub use fallback::BufferPool as FallbackBufferPool;
pub(crate) use fallback::OwnedBuffer;
pub(crate) use fallback::OwnedBuffer as FallbackOwnedBuffer;
pub use iour::BufferPool as IoUringBufferPool;
pub(crate) use iour::OwnedBuffer as IoUringOwnedBuffer;

use super::{fallback, iour};

Expand Down
76 changes: 63 additions & 13 deletions compio-driver/src/buffer_pool/iour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,32 @@ use std::{
fmt::{Debug, Formatter},
io,
ops::{Deref, DerefMut},
rc::Rc,
};

use io_uring_buf_ring::IoUringBufRing;
#[cfg(not(fusion))]
pub use {BufferPool as IoUringBufferPool, OwnedBuffer as IoUringOwnedBuffer};

struct BufferPoolInner {
buf_ring: IoUringBufRing<Vec<u8>>,
}

impl BufferPoolInner {
fn reuse_buffer(&self, buffer_id: u16) {
// SAFETY: 0 is always valid length. We just want to get the buffer once and
// return it immediately.
unsafe { self.buf_ring.get_buf(buffer_id, 0) };
}
}

/// Buffer pool
///
/// A buffer pool to allow user no need to specify a specific buffer to do the
/// IO operation
#[derive(Clone)]
pub struct BufferPool {
buf_ring: IoUringBufRing<Vec<u8>>,
inner: Rc<BufferPoolInner>,
}

impl Debug for BufferPool {
Expand All @@ -27,33 +43,67 @@ impl Debug for BufferPool {

impl BufferPool {
pub(crate) fn new(buf_ring: IoUringBufRing<Vec<u8>>) -> Self {
Self { buf_ring }
Self {
inner: Rc::new(BufferPoolInner { buf_ring }),
}
}

pub(crate) fn buffer_group(&self) -> u16 {
self.buf_ring.buffer_group()
self.inner.buf_ring.buffer_group()
}

pub(crate) fn into_inner(self) -> Result<IoUringBufRing<Vec<u8>>, Self> {
Rc::try_unwrap(self.inner)
.map(|inner| inner.buf_ring)
.map_err(|inner| Self { inner })
}

pub(crate) fn into_inner(self) -> IoUringBufRing<Vec<u8>> {
self.buf_ring
#[doc(hidden)]
pub unsafe fn get_buffer(&self, buffer_id: u16, available_len: usize) -> OwnedBuffer {
OwnedBuffer {
pool: self.inner.clone(),
params: Some((buffer_id, available_len)),
}
}

/// ## Safety
/// * `available_len` should be the returned value from the op.
pub(crate) unsafe fn get_buffer(
/// * `len` should be the returned value from the op.
pub(crate) unsafe fn create_proxy(
&self,
buffer_id: u16,
available_len: usize,
slice: OwnedBuffer,
len: usize,
) -> io::Result<BorrowedBuffer<'_>> {
unsafe { self.buf_ring.get_buf(buffer_id, available_len) }
let Some((buffer_id, available_len)) = slice.leak() else {
return Err(io::Error::other("no buffer selected"));
};
debug_assert_eq!(available_len, len);
unsafe { self.inner.buf_ring.get_buf(buffer_id, available_len) }
.map(BorrowedBuffer)
.ok_or_else(|| io::Error::other(format!("cannot find buffer {buffer_id}")))
}

pub(crate) fn reuse_buffer(&self, buffer_id: u16) {
// SAFETY: 0 is always valid length. We just want to get the buffer once and
// return it immediately.
unsafe { self.buf_ring.get_buf(buffer_id, 0) };
self.inner.reuse_buffer(buffer_id);
}
}

#[doc(hidden)]
pub struct OwnedBuffer {
pool: Rc<BufferPoolInner>,
params: Option<(u16, usize)>,
}

impl OwnedBuffer {
pub fn leak(mut self) -> Option<(u16, usize)> {
self.params.take()
}
}

impl Drop for OwnedBuffer {
fn drop(&mut self) {
if let Some((buffer_id, _)) = self.params {
self.pool.reuse_buffer(buffer_id);
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions compio-driver/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,12 @@ impl ErasedKey {
pub(crate) fn set_result(&self, res: io::Result<usize>) {
let mut this = self.borrow();
#[cfg(io_uring)]
if let Ok(res) = res
&& this.extra.is_iour()
{
unsafe {
Pin::new_unchecked(&mut this.op).set_result(res);
let this = &mut *this;
if this.extra.is_iour() {
unsafe {
Pin::new_unchecked(&mut this.op).set_result(&res, &this.extra);
}
}
}
if let PushEntry::Pending(Some(w)) =
Expand Down
2 changes: 0 additions & 2 deletions compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ impl Proactor {
instrument!(compio_log::Level::DEBUG, "pop", ?key);
if key.has_result() {
self.cancel.remove(&key);
self.driver.cleanup_multishot(&key);
PushEntry::Ready(key.take_result())
} else {
PushEntry::Pending(key)
Expand All @@ -253,7 +252,6 @@ impl Proactor {
instrument!(compio_log::Level::DEBUG, "pop", ?key);
if key.has_result() {
self.cancel.remove(&key);
self.driver.cleanup_multishot(&key);
let extra = key.swap_extra(self.default_extra());
let res = key.take_result();
PushEntry::Ready((res, extra))
Expand Down
12 changes: 6 additions & 6 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,10 @@ pub(crate) mod managed {
use socket2::SockAddr;

use super::{Read, ReadAt, Recv, RecvFrom};
use crate::{AsFd, BorrowedBuffer, BufferPool, OwnedBuffer, TakeBuffer};
use crate::{AsFd, BorrowedBuffer, BufferPool, FallbackOwnedBuffer, TakeBuffer};

fn take_buffer(
slice: OwnedBuffer,
slice: FallbackOwnedBuffer,
buffer_pool: &BufferPool,
result: io::Result<usize>,
) -> io::Result<BorrowedBuffer<'_>> {
Expand All @@ -516,7 +516,7 @@ pub(crate) mod managed {
/// Read a file at specified position into managed buffer.
pub struct ReadManagedAt<S> {
#[pin]
pub(crate) op: ReadAt<OwnedBuffer, S>,
pub(crate) op: ReadAt<FallbackOwnedBuffer, S>,
}
}

Expand Down Expand Up @@ -549,7 +549,7 @@ pub(crate) mod managed {
/// Read a file into managed buffer.
pub struct ReadManaged<S> {
#[pin]
pub(crate) op: Read<OwnedBuffer, S>,
pub(crate) op: Read<FallbackOwnedBuffer, S>,
}
}

Expand Down Expand Up @@ -585,7 +585,7 @@ pub(crate) mod managed {
/// use [`ReadManaged`].
pub struct RecvManaged<S> {
#[pin]
pub(crate) op: Recv<OwnedBuffer, S>,
pub(crate) op: Recv<FallbackOwnedBuffer, S>,
}
}

Expand Down Expand Up @@ -618,7 +618,7 @@ pub(crate) mod managed {
/// Receive data and source address into managed buffer.
pub struct RecvFromManaged<S: AsFd> {
#[pin]
pub(crate) op: RecvFrom<OwnedBuffer, S>,
pub(crate) op: RecvFrom<FallbackOwnedBuffer, S>,
}
}

Expand Down
7 changes: 0 additions & 7 deletions compio-driver/src/sys/fusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,6 @@ impl Driver {
FuseDriver::IoUring(driver) => driver.pop_multishot(key),
}
}

pub fn cleanup_multishot(&mut self, key: &ErasedKey) {
match &mut self.fuse {
FuseDriver::Poll(driver) => driver.cleanup_multishot(key),
FuseDriver::IoUring(driver) => driver.cleanup_multishot(key),
}
}
}

impl AsRawFd for Driver {
Expand Down
40 changes: 40 additions & 0 deletions compio-driver/src/sys/fusion/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ macro_rules! op {
fn create_entry(self: std::pin::Pin<&mut Self>) -> OpEntry {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry()
}

fn create_entry_fallback(self: std::pin::Pin<&mut Self>) -> OpEntry {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry_fallback()
}

fn call_blocking(self: std::pin::Pin<&mut Self>) -> std::io::Result<usize> {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.call_blocking()
}

unsafe fn set_result(self: std::pin::Pin<&mut Self>, result: &std::io::Result<usize>, extra: &crate::Extra) {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).set_result(result, extra) }
}

unsafe fn push_multishot(self: std::pin::Pin<&mut Self>, result: std::io::Result<usize>, extra: crate::Extra) {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).push_multishot(result, extra) }
}

fn pop_multishot(self: std::pin::Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.pop_multishot()
}
}
};
}
Expand Down Expand Up @@ -202,6 +222,26 @@ macro_rules! mop {
fn create_entry(self: std::pin::Pin<&mut Self>) -> OpEntry {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry()
}

fn create_entry_fallback(self: std::pin::Pin<&mut Self>) -> OpEntry {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry_fallback()
}

fn call_blocking(self: std::pin::Pin<&mut Self>) -> std::io::Result<usize> {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.call_blocking()
}

unsafe fn set_result(self: std::pin::Pin<&mut Self>, result: &std::io::Result<usize>, extra: &crate::Extra) {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).set_result(result, extra) }
}

unsafe fn push_multishot(self: std::pin::Pin<&mut Self>, result: std::io::Result<usize>, extra: crate::Extra) {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ).push_multishot(result, extra) }
}

fn pop_multishot(self: std::pin::Pin<&mut Self>) -> Option<BufResult<usize, crate::Extra>> {
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.pop_multishot()
}
}
};
}
Expand Down
2 changes: 0 additions & 2 deletions compio-driver/src/sys/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,6 @@ impl Driver {
pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
None
}

pub fn cleanup_multishot(&mut self, _: &ErasedKey) {}
}

impl AsRawFd for Driver {
Expand Down
Loading