From b1024a51477147390c165a1891765ede97d6b27f Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 11 Mar 2026 00:38:04 +0800 Subject: [PATCH 1/3] feat(driver,iour): send zerocopy --- compio-driver/src/op.rs | 4 +- compio-driver/src/sys/fusion/op.rs | 10 + compio-driver/src/sys/iour/op.rs | 285 ++++++++++++++++++++++++++++- compio-driver/src/sys/poll/op.rs | 5 + 4 files changed, 295 insertions(+), 9 deletions(-) diff --git a/compio-driver/src/op.rs b/compio-driver/src/op.rs index 7fc282b92..0f2f906af 100644 --- a/compio-driver/src/op.rs +++ b/compio-driver/src/op.rs @@ -10,8 +10,6 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, SetLen}; use pin_project_lite::pin_project; use socket2::{SockAddr, SockAddrStorage, socklen_t}; -#[cfg(linux_all)] -pub use crate::sys::op::Splice; pub use crate::sys::op::{ Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo, SendToVectored, SendVectored, @@ -28,6 +26,8 @@ pub use crate::sys::op::{ pub use crate::sys::op::{ ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti, }; +#[cfg(linux_all)] +pub use crate::sys::op::{SendMsgZc, SendToVectoredZc, SendToZc, SendVectoredZc, SendZc, Splice}; use crate::{Extra, OwnedFd, SharedFd, TakeBuffer, sys::aio::*}; /// Trait to update the buffer length inside the [`BufResult`]. diff --git a/compio-driver/src/sys/fusion/op.rs b/compio-driver/src/sys/fusion/op.rs index 15c3f6b59..6f6fc2d2c 100644 --- a/compio-driver/src/sys/fusion/op.rs +++ b/compio-driver/src/sys/fusion/op.rs @@ -132,6 +132,16 @@ op!( RecvFromVectored(fd: S, buffer: T, flags: i32 op!( SendToVectored(fd: S, buffer: T, addr: SockAddr, flags: i32)); op!( FileStat(fd: S)); op!( PathStat(dirfd: S, path: CString, follow_symlink: bool)); +#[cfg(linux_all)] +op!( SendZc(fd: S, buffer: T, flags: i32)); +#[cfg(linux_all)] +op!( SendVectoredZc(fd: S, buffer: T, flags: i32)); +#[cfg(linux_all)] +op!( SendToZc(fd: S, buffer: T, addr: SockAddr, flags: i32)); +#[cfg(linux_all)] +op!( SendToVectoredZc(fd: S, buffer: T, addr: SockAddr, flags: i32)); +#[cfg(linux_all)] +op!( SendMsgZc(fd: S, buffer: T, control: C, addr: Option, flags: i32)); macro_rules! mop { (<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? ) with $pool:ident) => { diff --git a/compio-driver/src/sys/iour/op.rs b/compio-driver/src/sys/iour/op.rs index 3470a3aa0..81e332b5d 100644 --- a/compio-driver/src/sys/iour/op.rs +++ b/compio-driver/src/sys/iour/op.rs @@ -610,6 +610,62 @@ unsafe impl OpCode for Send { } } +pin_project! { + /// Zerocopy [`Send`]. + pub struct SendZc { + #[pin] + pub(crate) op: Send, + pub(crate) res: Option>, + _p: PhantomPinned, + } +} + +impl SendZc { + /// Create [`SendZc`]. + pub fn new(fd: S, buffer: T, flags: i32) -> Self { + Self { + op: Send::new(fd, buffer, flags), + res: None, + _p: PhantomPinned, + } + } +} + +unsafe impl OpCode for SendZc { + fn create_entry(self: Pin<&mut Self>) -> OpEntry { + let this = self.project(); + let slice = this.op.buffer.as_init(); + opcode::SendZc::new( + Fd(this.op.fd.as_fd().as_raw_fd()), + slice.as_ptr(), + slice.len().try_into().unwrap_or(u32::MAX), + ) + .flags(this.op.flags) + .build() + .into() + } + + fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry { + self.project().op.create_entry() + } + + unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result, extra: crate::Extra) { + self.project().res.replace(BufResult(res, extra)); + } + + fn pop_multishot(self: Pin<&mut Self>) -> Option> { + self.project().res.take() + } +} + +impl IntoInner for SendZc { + type Inner = T; + + fn into_inner(self) -> Self::Inner { + self.op.into_inner() + } +} + unsafe impl OpCode for SendVectored { fn create_entry(mut self: Pin<&mut Self>) -> OpEntry { self.as_mut().set_msg(); @@ -621,6 +677,59 @@ unsafe impl OpCode for SendVectored { } } +pin_project! { + /// Zerocopy [`SendVectored`]. + pub struct SendVectoredZc { + #[pin] + pub(crate) op: SendVectored, + pub(crate) res: Option>, + _p: PhantomPinned, + } +} + +impl SendVectoredZc { + /// Create [`SendVectoredZc`]. + pub fn new(fd: S, buffer: T, flags: i32) -> Self { + Self { + op: SendVectored::new(fd, buffer, flags), + res: None, + _p: PhantomPinned, + } + } +} + +unsafe impl OpCode for SendVectoredZc { + fn create_entry(self: Pin<&mut Self>) -> OpEntry { + let mut this = self.project(); + this.op.as_mut().set_msg(); + let op = this.op.project(); + opcode::SendMsgZc::new(Fd(op.fd.as_fd().as_raw_fd()), op.msg) + .flags(*op.flags as _) + .build() + .into() + } + + fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry { + self.project().op.create_entry() + } + + unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result, extra: crate::Extra) { + self.project().res.replace(BufResult(res, extra)); + } + + fn pop_multishot(self: Pin<&mut Self>) -> Option> { + self.project().res.take() + } +} + +impl IntoInner for SendVectoredZc { + type Inner = T; + + fn into_inner(self) -> Self::Inner { + self.op.into_inner() + } +} + struct RecvFromHeader { pub(crate) fd: S, pub(crate) addr: SockAddrStorage, @@ -755,15 +864,11 @@ impl SendToHeader { } impl SendToHeader { - pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry { + pub fn set_msg(&mut self, slices: &mut [SysSlice]) { self.msg.msg_name = self.addr.as_ptr() as _; self.msg.msg_namelen = self.addr.len(); self.msg.msg_iov = slices.as_mut_ptr() as _; self.msg.msg_iovlen = slices.len() as _; - opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg) - .flags(self.flags as _) - .build() - .into() } } @@ -792,7 +897,11 @@ unsafe impl OpCode for SendTo { fn create_entry(self: Pin<&mut Self>) -> OpEntry { let this = self.project(); let slice = this.slice.insert(this.buffer.as_ref().sys_slice()); - this.header.create_entry(std::slice::from_mut(slice)) + this.header.set_msg(std::slice::from_mut(slice)); + opcode::SendMsg::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg) + .flags(this.header.flags as _) + .build() + .into() } } @@ -804,6 +913,59 @@ impl IntoInner for SendTo { } } +pin_project! { + /// Zerocopy [`SendTo`]. + pub struct SendToZc { + #[pin] + pub(crate) op: SendTo, + pub(crate) res: Option>, + _p: PhantomPinned, + } +} + +impl SendToZc { + /// Create [`SendToZc`]. + pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self { + Self { + op: SendTo::new(fd, buffer, addr, flags), + res: None, + _p: PhantomPinned, + } + } +} + +unsafe impl OpCode for SendToZc { + fn create_entry(self: Pin<&mut Self>) -> OpEntry { + let this = self.project().op.project(); + let slice = this.slice.insert(this.buffer.as_ref().sys_slice()); + this.header.set_msg(std::slice::from_mut(slice)); + opcode::SendMsgZc::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg) + .flags(this.header.flags as _) + .build() + .into() + } + + fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry { + self.project().op.create_entry() + } + + unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result, extra: crate::Extra) { + self.project().res.replace(BufResult(res, extra)); + } + + fn pop_multishot(self: Pin<&mut Self>) -> Option> { + self.project().res.take() + } +} + +impl IntoInner for SendToZc { + type Inner = T; + + fn into_inner(self) -> Self::Inner { + self.op.into_inner() + } +} + pin_project! { /// Send data to specified address from vectored buffer. pub struct SendToVectored { @@ -829,7 +991,11 @@ unsafe impl OpCode for SendToVectored { fn create_entry(self: Pin<&mut Self>) -> OpEntry { let this = self.project(); *this.slice = this.buffer.as_ref().sys_slices(); - this.header.create_entry(this.slice) + this.header.set_msg(this.slice); + opcode::SendMsg::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg) + .flags(this.header.flags as _) + .build() + .into() } } @@ -841,6 +1007,59 @@ impl IntoInner for SendToVectored { } } +pin_project! { + /// Zerocopy [`SendToVectored`]. + pub struct SendToVectoredZc { + #[pin] + pub(crate) op: SendToVectored, + pub(crate) res: Option>, + _p: PhantomPinned, + } +} + +impl SendToVectoredZc { + /// Create [`SendToVectoredZc`]. + pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self { + Self { + op: SendToVectored::new(fd, buffer, addr, flags), + res: None, + _p: PhantomPinned, + } + } +} + +unsafe impl OpCode for SendToVectoredZc { + fn create_entry(self: Pin<&mut Self>) -> OpEntry { + let this = self.project().op.project(); + *this.slice = this.buffer.as_ref().sys_slices(); + this.header.set_msg(this.slice); + opcode::SendMsgZc::new(Fd(this.header.fd.as_fd().as_raw_fd()), &this.header.msg) + .flags(this.header.flags as _) + .build() + .into() + } + + fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry { + self.project().op.create_entry() + } + + unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result, extra: crate::Extra) { + self.project().res.replace(BufResult(res, extra)); + } + + fn pop_multishot(self: Pin<&mut Self>) -> Option> { + self.project().res.take() + } +} + +impl IntoInner for SendToVectoredZc { + type Inner = T; + + fn into_inner(self) -> Self::Inner { + self.op.into_inner() + } +} + unsafe impl OpCode for RecvMsg { fn create_entry(mut self: Pin<&mut Self>) -> OpEntry { self.as_mut().set_msg(); @@ -863,6 +1082,58 @@ unsafe impl OpCode for SendMsg { } } +pin_project! { + /// Zerocopy [`SendMsg`]. + pub struct SendMsgZc { + #[pin] + pub(crate) op: SendMsg, + pub(crate) res: Option>, + _p: PhantomPinned, + } +} + +impl SendMsgZc { + /// Create [`SendMsgZc`]. + pub fn new(fd: S, buffer: T, control: C, addr: Option, flags: i32) -> Self { + Self { + op: SendMsg::new(fd, buffer, control, addr, flags), + res: None, + _p: PhantomPinned, + } + } +} + +unsafe impl OpCode for SendMsgZc { + fn create_entry(self: Pin<&mut Self>) -> OpEntry { + let mut this = self.project(); + this.op.as_mut().set_msg(); + let op = this.op.project(); + opcode::SendMsgZc::new(Fd(op.fd.as_fd().as_raw_fd()), op.msg) + .flags(*op.flags as _) + .build() + .into() + } + + fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry { + self.project().op.create_entry() + } + + unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result, extra: crate::Extra) { + self.project().res.replace(BufResult(res, extra)); + } + + fn pop_multishot(self: Pin<&mut Self>) -> Option> { + self.project().res.take() + } +} +impl IntoInner for SendMsgZc { + type Inner = (T, C); + + fn into_inner(self) -> Self::Inner { + self.op.into_inner() + } +} + unsafe impl OpCode for PollOnce { fn create_entry(self: Pin<&mut Self>) -> OpEntry { let flags = match self.interest { diff --git a/compio-driver/src/sys/poll/op.rs b/compio-driver/src/sys/poll/op.rs index be99dc513..164567324 100644 --- a/compio-driver/src/sys/poll/op.rs +++ b/compio-driver/src/sys/poll/op.rs @@ -17,6 +17,11 @@ use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 a use pin_project_lite::pin_project; use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t}; +#[cfg(linux_all)] +pub use self::{ + Send as SendZc, SendMsg as SendMsgZc, SendTo as SendToZc, SendToVectored as SendToVectoredZc, + SendVectored as SendVectoredZc, +}; use super::{AsFd, Decision, OpCode, OpType, syscall}; pub use crate::sys::unix_op::*; use crate::{op::*, sys_slice::*}; From f7744ce00ae26aa4c5e001cf4641caf5f5f87802 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 11 Mar 2026 00:52:13 +0800 Subject: [PATCH 2/3] test(driver): send zerocopy on linux --- compio-driver/tests/send_zc.rs | 65 ++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 compio-driver/tests/send_zc.rs diff --git a/compio-driver/tests/send_zc.rs b/compio-driver/tests/send_zc.rs new file mode 100644 index 000000000..5412da8c7 --- /dev/null +++ b/compio-driver/tests/send_zc.rs @@ -0,0 +1,65 @@ +#![cfg(linux_all)] + +use std::{ + io::Read, + net::{TcpListener, TcpStream}, + os::fd::AsRawFd, +}; + +use compio_buf::BufResult; +use compio_driver::{Proactor, PushEntry, SharedFd, op::SendZc}; + +#[test] +fn send_zc() { + let mut driver = Proactor::new().unwrap(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + + let handle = std::thread::spawn(move || { + let (mut stream, _) = server.accept().unwrap(); + let mut buffer = [0u8; 12]; + stream.read_exact(&mut buffer).unwrap(); + assert_eq!(&buffer, b"Hello world!"); + }); + + let stream = TcpStream::connect(addr).unwrap(); + let stream = socket2::Socket::from(stream); + if driver.driver_type().is_polling() { + stream.set_nonblocking(true).unwrap(); + } + let stream = SharedFd::new(stream); + + driver.attach(stream.as_raw_fd()).unwrap(); + + let buffer: &'static [u8; 12] = b"Hello world!"; + let op = SendZc::new(stream.clone(), buffer, 0); + let res = match driver.push(op) { + PushEntry::Ready(BufResult(res, _)) => res.unwrap(), + PushEntry::Pending(mut key) => { + let mut len = None; + while len.is_none() { + driver.poll(None).unwrap(); + if let Some(BufResult(res, _)) = driver.pop_multishot(&key) { + len = Some(res.unwrap()); + } + match driver.pop(key) { + PushEntry::Pending(k) => key = k, + PushEntry::Ready(BufResult(res, _)) => { + if len.is_none() { + len = Some(res.unwrap()) + } else { + res.unwrap(); + } + break; + } + } + } + len.unwrap() + } + }; + assert_eq!(res, 12); + if let Err(e) = handle.join() { + std::panic::resume_unwind(e) + } +} From 822a4050114702995799174bb5bc2b006260b076 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 11 Mar 2026 00:56:07 +0800 Subject: [PATCH 3/3] fix(driver,stub): send zerocopy --- compio-driver/src/sys/stub/op.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/compio-driver/src/sys/stub/op.rs b/compio-driver/src/sys/stub/op.rs index 29f71f8e3..3e6c3b1d9 100644 --- a/compio-driver/src/sys/stub/op.rs +++ b/compio-driver/src/sys/stub/op.rs @@ -5,6 +5,10 @@ use std::ffi::CString; use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use socket2::{SockAddr, SockAddrStorage, socklen_t}; +pub use self::{ + Send as SendZc, SendMsg as SendMsgZc, SendTo as SendToZc, SendToVectored as SendToVectoredZc, + SendVectored as SendVectoredZc, +}; use super::{OpCode, stub_unimpl}; pub use crate::sys::unix_op::*; use crate::{AsFd, op::*};