Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compio-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ cfg-if = { workspace = true }
either = "1.9.0"
once_cell = { workspace = true }
socket2 = { workspace = true }
futures-util = { workspace = true }

[target.'cfg(windows)'.dependencies]
widestring = { workspace = true }
Expand All @@ -40,7 +41,6 @@ libc = { workspace = true }
# Shared dev dependencies for all platforms
[dev-dependencies]
compio-macros = { workspace = true }
futures-util = { workspace = true }
tempfile = { workspace = true }

[features]
Expand Down
2 changes: 1 addition & 1 deletion compio-net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub type CMsgBuilder<'a> = compio_io::ancillary::AncillaryBuilder<'a>;
pub type PollFd<T> = compio_runtime::fd::PollFd<T>;
pub use opts::SocketOpts;
pub use resolve::ToSocketAddrsAsync;
pub(crate) use resolve::{each_addr, first_addr_buf};
pub(crate) use resolve::{each_addr, first_addr_buf, first_addr_buf_zerocopy};
pub(crate) use socket::*;
pub use split::*;
pub use tcp::*;
Expand Down
33 changes: 32 additions & 1 deletion compio-net/src/resolve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cfg_if::cfg_if! {
}

use std::{
future::Future,
future::{Future, Ready, ready},
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
};
Expand Down Expand Up @@ -169,3 +169,34 @@ pub async fn first_addr_buf<T, B, F: Future<Output = BufResult<T, B>>>(
)
}
}

pub async fn first_addr_buf_zerocopy<B, F1, F2>(
addr: impl ToSocketAddrsAsync,
buffer: B,
f: impl FnOnce(SocketAddr, B) -> F1,
) -> BufResult<usize, Either<Ready<B>, F2>>
where
F1: Future<Output = BufResult<usize, F2>>,
F2: Future<Output = B>,
{
fn ret<T, F>(fut: T) -> Either<Ready<T>, F> {
Either::Left(ready(fut))
}

let mut addrs = match addr.to_socket_addrs_async().await {
Ok(addrs) => addrs,
Err(e) => return BufResult(Err(e), ret(buffer)),
};
if let Some(addr) = addrs.next() {
let BufResult(res, fut) = f(addr, buffer).await;
BufResult(res, Either::Right(fut))
} else {
BufResult(
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"could not operate on first address",
)),
ret(buffer),
)
}
}
93 changes: 91 additions & 2 deletions compio-net/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectore
#[cfg(unix)]
use compio_driver::op::CreateSocket;
use compio_driver::{
AsRawFd, ToSharedFd, impl_raw_fd,
AsRawFd, OpCode, ToSharedFd, impl_raw_fd,
op::{
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged,
RecvFromVectored, RecvManaged, RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer,
Send, SendMsg, SendTo, SendToVectored, SendVectored, ShutdownSocket, VecBufResultExt,
Send, SendMsg, SendMsgZc, SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored,
SendVectoredZc, SendZc, ShutdownSocket, VecBufResultExt,
},
syscall,
};
use compio_runtime::{Attacher, BorrowedBuffer, BufferPool, fd::PollFd};
use futures_util::StreamExt;
use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -208,6 +210,22 @@ impl Socket {
compio_runtime::submit(op).await.into_inner()
}

pub async fn send_zerocopy<T: IoBuf>(
&self,
buf: T,
flags: i32,
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
submit_zerocopy(SendZc::new(self.to_shared_fd(), buf, flags)).await
}

pub async fn send_zerocopy_vectored<T: IoVectoredBuf>(
&self,
buf: T,
flags: i32,
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
submit_zerocopy(SendVectoredZc::new(self.to_shared_fd(), buf, flags)).await
}

pub async fn recv_from<T: IoBufMut>(
&self,
buffer: T,
Expand Down Expand Up @@ -275,6 +293,26 @@ impl Socket {
compio_runtime::submit(op).await.into_inner()
}

pub async fn send_to_zerocopy<T: IoBuf>(
&self,
buffer: T,
addr: &SockAddr,
flags: i32,
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
let op = SendToZc::new(self.to_shared_fd(), buffer, addr.clone(), flags);
submit_zerocopy(op).await
}
Comment thread
George-Miao marked this conversation as resolved.

pub async fn send_to_zerocopy_vectored<T: IoVectoredBuf>(
&self,
buffer: T,
addr: &SockAddr,
flags: i32,
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
let op = SendToVectoredZc::new(self.to_shared_fd(), buffer, addr.clone(), flags);
submit_zerocopy(op).await
}
Comment thread
George-Miao marked this conversation as resolved.

pub async fn send_msg<T: IoBuf, C: IoBuf>(
&self,
buffer: T,
Expand All @@ -299,6 +337,33 @@ impl Socket {
compio_runtime::submit(op).await.into_inner()
}

pub async fn send_msg_zerocopy<T: IoBuf, C: IoBuf>(
&self,
buffer: T,
control: C,
addr: Option<&SockAddr>,
flags: i32,
) -> BufResult<usize, impl Future<Output = (T, C)> + use<T, C>> {
self.send_msg_zerocopy_vectored([buffer], control, addr, flags)
.await
.map_buffer(|fut| async move {
let ([buffer], control) = fut.await;
(buffer, control)
})
}

pub async fn send_msg_zerocopy_vectored<T: IoVectoredBuf, C: IoBuf>(
&self,
buffer: T,
control: C,
addr: Option<&SockAddr>,
flags: i32,
) -> BufResult<usize, impl Future<Output = (T, C)> + use<T, C>> {
let fd = self.to_shared_fd();
let op = SendMsgZc::new(fd, buffer, control, addr.cloned(), flags);
submit_zerocopy(op).await
}

#[cfg(unix)]
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
Expand Down Expand Up @@ -377,3 +442,27 @@ impl Socket {
}

impl_raw_fd!(Socket, Socket2, socket, socket);

async fn submit_zerocopy<T: OpCode + IntoInner + 'static>(
op: T,
) -> BufResult<usize, impl Future<Output = T::Inner> + use<T>> {
let mut stream = compio_runtime::submit_multi(op);
let res = stream
.next()
.await
.expect("SubmitMulti should yield at least one item")
.0;

let fut = async move {
// we don't need 2nd CQE's result
_ = stream.next().await;

stream
.try_take()
.map_err(|_| ())
.expect("Cannot retrieve buffer")
.into_inner()
};

BufResult(res, fut)
}
8 changes: 4 additions & 4 deletions compio-net/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ where
for<'a> &'a T: AsyncWrite,
{
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
self.0.write(buf).await
(self.0).write(buf).await
}

async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
self.0.write_vectored(buf).await
(self.0).write_vectored(buf).await
}

async fn flush(&mut self) -> io::Result<()> {
self.0.flush().await
(self.0).flush().await
}

async fn shutdown(&mut self) -> io::Result<()> {
self.0.shutdown().await
(self.0).shutdown().await
}
}

Expand Down
30 changes: 28 additions & 2 deletions compio-net/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,30 @@ impl TcpStream {

self.inner.send(buf, MSG_OOB).await
}

/// Sends data using [zero-copy send](https://man7.org/linux/man-pages/man3/io_uring_prep_send_zc.3.html).
///
/// If the underlying platform doesn't support zero-copy send, it will fall
/// back to normal send.
pub async fn send_zerocopy<T: IoBuf>(
&self,
buf: T,
flags: i32,
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
self.inner.send_zerocopy(buf, flags).await
}

/// Sends vectorized data using [zero-copy send](https://man7.org/linux/man-pages/man3/io_uring_prep_send_zc.3.html).
///
/// If the underlying platform doesn't support zero-copy send, it will fall
/// back to normal send.
pub async fn send_zerocopy_vectored<T: IoVectoredBuf>(
&self,
buf: T,
flags: i32,
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
self.inner.send_zerocopy_vectored(buf, flags).await
Comment thread
George-Miao marked this conversation as resolved.
}
}

impl AsyncRead for TcpStream {
Expand Down Expand Up @@ -405,12 +429,14 @@ impl AsyncWrite for TcpStream {
impl AsyncWrite for &TcpStream {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
self.inner.send(buf, 0).await
let BufResult(res, fut) = self.send_zerocopy(buf, 0).await;
BufResult(res, fut.await)
}

#[inline]
async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
self.inner.send_vectored(buf, 0).await
let BufResult(res, fut) = self.send_zerocopy_vectored(buf, 0).await;
BufResult(res, fut.await)
}
Comment thread
George-Miao marked this conversation as resolved.

#[inline]
Expand Down
70 changes: 70 additions & 0 deletions compio-net/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,76 @@ impl UdpSocket {
.await
}

/// Sends data on the socket to the given address with zero copy.
///
/// Returns the result of send and a future that resolves to the
/// original buffer when the send is complete.
pub async fn send_to_zerocopy<A: ToSocketAddrsAsync, T: IoBuf>(
&self,
buffer: T,
addr: A,
) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
Comment thread
George-Miao marked this conversation as resolved.
super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
self.inner.send_to_zerocopy(buffer, &addr.into(), 0).await
})
.await
}

/// Sends vectored data on the socket to the given address with zero copy.
///
/// Returns the result of send and a future that resolves to the
/// original buffer when the send is complete.
pub async fn send_to_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf>(
&self,
buffer: T,
addr: A,
) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
self.inner
.send_to_zerocopy_vectored(buffer, &addr.into(), 0)
.await
})
.await
}

/// Sends data with control message on the socket to the given address with
/// zero copy.
///
/// Returns the result of send and a future that resolves to the
/// original buffer when the send is complete.
pub async fn send_msg_zerocopy<A: ToSocketAddrsAsync, T: IoBuf, C: IoBuf>(
&self,
buffer: T,
control: C,
addr: A,
) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
self.inner
.send_msg_zerocopy(b, c, Some(&addr.into()), 0)
.await
})
.await
}

/// Sends vectored data with control message on the socket to the given
/// address with zero copy.
///
/// Returns the result of send and a future that resolves to the
/// original buffer when the send is complete.
pub async fn send_msg_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf, C: IoBuf>(
&self,
buffer: T,
control: C,
addr: A,
) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
self.inner
.send_msg_zerocopy_vectored(b, c, Some(&addr.into()), 0)
.await
})
.await
}

/// Gets a socket option.
///
/// # Safety
Expand Down
Loading
Loading