Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions compio-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ io-uring = { version = "0.7.12", optional = true }
once_cell = { workspace = true, optional = true }
polling = { version = "3.3.0", optional = true }
rustix = { workspace = true, features = ["linux_5_11"] }
linux-raw-sys = { version = "0.12.1", optional = true }

# Other platform dependencies
[target.'cfg(all(unix, not(target_os = "linux")))'.dependencies]
Expand All @@ -83,6 +84,7 @@ io-uring = [
"rustix/mm",
"rustix/event",
"rustix/system",
"linux-raw-sys/io_uring",
"dep:io-uring",
"dep:once_cell",
]
Expand Down
15 changes: 15 additions & 0 deletions compio-driver/src/sys/op/managed/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ impl<S> RecvManaged<S> {
op: Recv::new(fd, pool.pop()?.with_capacity(len), flags),
})
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
// This method has been added here for the sake of API compatibility.
pub fn poll_first(&mut self) {}
Comment thread
Berrysoft marked this conversation as resolved.
}

impl<S> TakeBuffer for RecvManaged<S> {
Expand All @@ -91,6 +96,11 @@ impl<S: AsFd> RecvFromManaged<S> {
op: RecvFrom::new(fd, pool.pop()?.with_capacity(len), flags),
})
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
// This method has been added here for the sake of API compatibility.
pub fn poll_first(&mut self) {}
}

impl<S: AsFd> TakeBuffer for RecvFromManaged<S> {
Expand Down Expand Up @@ -119,6 +129,11 @@ impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
op: RecvMsg::new(fd, [pool.pop()?.with_capacity(len)], control, flags),
})
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
// This method has been added here for the sake of API compatibility.
pub fn poll_first(&mut self) {}
}

impl<C: IoBufMut, S: AsFd> TakeBuffer for RecvMsgManaged<C, S> {
Expand Down
33 changes: 33 additions & 0 deletions compio-driver/src/sys/op/managed/fusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,39 @@ mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags)
mop!(<S: AsFd> RecvFromMulti(fd: S, pool: &BufferPool, flags: RecvFlags) with pool; RecvFromMultiResult);
mop!(<S: AsFd> RecvMsgMulti(fd: S, pool: &BufferPool, control_len: usize, flags: RecvFlags) with pool; RecvMsgMultiResult);

impl<S: AsFd> RecvManaged<S> {
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
match self.inner {
RecvManagedInner::Poll(ref mut i) => i.poll_first(),
RecvManagedInner::IoUring(ref mut i) => i.poll_first(),
}
}
}

impl<S: AsFd> RecvFromManaged<S> {
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
match self.inner {
RecvFromManagedInner::Poll(ref mut i) => i.poll_first(),
RecvFromManagedInner::IoUring(ref mut i) => i.poll_first(),
}
}
}

impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
match self.inner {
RecvMsgManagedInner::Poll(ref mut i) => i.poll_first(),
RecvMsgManagedInner::IoUring(ref mut i) => i.poll_first(),
}
}
}

enum RecvFromMultiResultInner {
Poll(fallback::RecvFromMultiResult),
IoUring(iour::RecvFromMultiResult),
Expand Down
41 changes: 33 additions & 8 deletions compio-driver/src/sys/op/managed/iour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use rustix::net::RecvFlags;
use socket2::{SockAddr, SockAddrStorage, socklen_t};

use crate::{
BufferPool, BufferRef, Extra, IourOpCode as OpCode, OpEntry, op::TakeBuffer,
sys::pal::is_kernel_at_least,
BufferPool, BufferRef, Extra, IourOpCode as OpCode, OpEntry,
op::TakeBuffer,
sys::pal::{is_kernel_at_least, set_poll_first},
};

/// Read a file at specified position into specified buffer.
Expand Down Expand Up @@ -143,6 +144,7 @@ pub struct RecvManaged<S> {
buffer_group: u16,
buffer_pool: BufferPool,
buffer: Option<BufferRef>,
poll_first: bool,
}

impl<S> RecvManaged<S> {
Expand All @@ -157,21 +159,29 @@ impl<S> RecvManaged<S> {
flags,
buffer_pool: buffer_pool.clone(),
buffer: None,
poll_first: false,
})
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.poll_first = true;
}
}

unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
type Control = ();

fn create_entry(&mut self, _: &mut Self::Control) -> OpEntry {
let fd = self.fd.as_fd().as_raw_fd();
opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
let entry = opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
.flags(self.flags.bits() as _)
.buf_group(self.buffer_group)
.build()
.flags(Flags::BUFFER_SELECT)
.into()
.flags(Flags::BUFFER_SELECT);
let entry = set_poll_first(entry, self.poll_first);
entry.into()
}

unsafe fn set_result(&mut self, _: &mut Self::Control, _: &io::Result<usize>, extra: &Extra) {
Expand Down Expand Up @@ -205,6 +215,7 @@ pub struct RecvFromManaged<S> {
buffer_group: u16,
buffer_pool: BufferPool,
buffer: Option<BufferRef>,
poll_first: bool,
}

#[doc(hidden)]
Expand Down Expand Up @@ -236,8 +247,15 @@ impl<S> RecvFromManaged<S> {
addr,
buffer_pool: buffer_pool.clone(),
buffer: None,
poll_first: false,
})
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.poll_first = true;
}
}

impl<S> TakeBuffer for RecvFromManaged<S> {
Expand All @@ -262,12 +280,13 @@ unsafe impl<S: AsFd> OpCode for RecvFromManaged<S> {
}

fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &raw mut control.msg)
let entry = opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &raw mut control.msg)
.flags(self.flags.bits() as _)
.buf_group(self.buffer_group)
.build()
.flags(Flags::BUFFER_SELECT)
.into()
.flags(Flags::BUFFER_SELECT);
let entry = set_poll_first(entry, self.poll_first);
entry.into()
}

unsafe fn set_result(
Expand Down Expand Up @@ -311,6 +330,12 @@ impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
control_len: 0,
})
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.op.poll_first();
}
}

unsafe impl<C: IoBufMut, S: AsFd> OpCode for RecvMsgManaged<C, S> {
Expand Down
28 changes: 16 additions & 12 deletions compio-driver/src/sys/op/socket/iour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,15 @@ unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
let fd = self.fd.as_fd().as_raw_fd();
let slice = self.buffer.sys_slice_mut();

opcode::Recv::new(
let entry = opcode::Recv::new(
Fd(fd),
slice.ptr() as _,
slice.len().try_into().unwrap_or(u32::MAX),
)
.flags(self.flags.bits() as _)
.build()
.into()
.build();
let entry = set_poll_first(entry, self.poll_first);
entry.into()
}

fn call_blocking(&mut self, _: &mut Self::Control) -> io::Result<usize> {
Expand All @@ -261,10 +262,11 @@ unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
}

fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
let entry = opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
.flags(self.flags.bits() as _)
.build()
.into()
.build();
let entry = set_poll_first(entry, self.poll_first);
entry.into()
}

fn call_blocking(&mut self, control: &mut Self::Control) -> io::Result<usize> {
Expand All @@ -286,10 +288,11 @@ impl<S: AsFd> RecvFromHeader<S> {
}

pub fn create_entry(&mut self, control: &mut RecvMsgControl) -> OpEntry {
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
let entry = opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
.flags(self.flags.bits() as _)
.build()
.into()
.build();
let entry = set_poll_first(entry, self.poll_first);
entry.into()
}

pub fn set_result(&mut self, control: &mut RecvMsgControl) {
Expand Down Expand Up @@ -357,10 +360,11 @@ unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C,
}

fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
opcode::RecvMsg::new(Fd(self.header.fd.as_fd().as_raw_fd()), &mut control.msg)
let entry = opcode::RecvMsg::new(Fd(self.header.fd.as_fd().as_raw_fd()), &mut control.msg)
.flags(self.header.flags.bits() as _)
.build()
.into()
.build();
let entry = set_poll_first(entry, self.poll_first);
entry.into()
}

unsafe fn set_result(
Expand Down
50 changes: 48 additions & 2 deletions compio-driver/src/sys/op/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ pub struct Recv<T: IoBufMut, S> {
pub(crate) fd: S,
pub(crate) buffer: T,
pub(crate) flags: RecvFlags,
poll_first: bool,
}

/// Receive data from remote into vectored buffer.
pub struct RecvVectored<T: IoVectoredBufMut, S> {
pub(crate) fd: S,
pub(crate) buffer: T,
pub(crate) flags: RecvFlags,
poll_first: bool,
}

pub(crate) struct RecvFromHeader<S> {
pub(crate) fd: S,
pub(crate) flags: RecvFlags,
pub(crate) addr: SockAddrStorage,
pub(crate) addr_len: socklen_t,
poll_first: bool,
}

/// Receive data and source address.
Expand All @@ -118,6 +121,7 @@ pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
pub(crate) buffer: T,
pub(crate) control: C,
pub(crate) control_len: usize,
poll_first: bool,
}

impl<S> Connect<S> {
Expand Down Expand Up @@ -254,8 +258,15 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
buffer,
control,
control_len: 0,
poll_first: false,
}
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.poll_first = true;
}
}

impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
Expand All @@ -273,7 +284,18 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
impl<T: IoBufMut, S> Recv<T, S> {
/// Create [`Recv`].
pub fn new(fd: S, buffer: T, flags: RecvFlags) -> Self {
Self { fd, buffer, flags }
Self {
fd,
buffer,
flags,
poll_first: false,
}
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.poll_first = true;
}
}

Expand All @@ -288,7 +310,18 @@ impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
/// Create [`RecvVectored`].
pub fn new(fd: S, buffer: T, flags: RecvFlags) -> Self {
Self { fd, buffer, flags }
Self {
fd,
buffer,
flags,
poll_first: false,
}
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.poll_first = true;
}
}

Expand All @@ -309,6 +342,7 @@ impl<S> RecvFromHeader<S> {
addr,
flags,
addr_len: name_len,
poll_first: false,
}
}

Expand All @@ -325,6 +359,12 @@ impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
buffer,
}
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.header.poll_first = true;
}
}

impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
Expand All @@ -344,6 +384,12 @@ impl<T: IoBufMut, S> RecvFrom<T, S> {
buffer,
}
}

/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
/// of the SQE on the IO_URING driver.
pub fn poll_first(&mut self) {
self.header.poll_first = true;
}
}

impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
Expand Down
Loading