Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compio-io/src/ancillary/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub trait AsyncReadAncillaryMulti {
type Return;

/// Read data and ancillary data into multiple managed buffers.
fn read_multi_with_ancillary<C: IoBufMut>(
fn read_multi_with_ancillary(
&mut self,
control_len: usize,
) -> impl Stream<Item = IoResult<Self::Return>>;
Expand Down
124 changes: 93 additions & 31 deletions compio-net/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use compio_driver::op::{Bind, CreateSocket, Listen, ShutdownSocket};
use compio_driver::{
AsRawFd, BufferRef, OpCode, ResultTakeBuffer, TakeBuffer, ToSharedFd, impl_raw_fd,
op::{
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged,
RecvFromVectored, RecvManaged, RecvMsg, RecvMulti, RecvResultExt, RecvVectored, Send,
SendMsg, SendMsgZc, SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored,
SendVectoredZc, SendZc, VecBufResultExt,
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged, RecvFromMulti,
RecvFromMultiResult, RecvFromVectored, RecvManaged, RecvMsg, RecvMsgManaged, RecvMsgMulti,
RecvMsgMultiResult, RecvMulti, RecvResultExt, RecvVectored, Send, SendMsg, SendMsgZc,
SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored, SendVectoredZc, SendZc,
VecBufResultExt,
},
syscall,
};
Expand Down Expand Up @@ -237,33 +238,6 @@ impl Socket {
.unwrap_or_else(|e| Either::Right(futures_util::stream::once(std::future::ready(Err(e)))))
}

pub async fn recv_from_managed(
&self,
len: usize,
flags: i32,
) -> io::Result<Option<(BufferRef, Option<SockAddr>)>> {
let fd = self.to_shared_fd();
let inner = Runtime::with_current(|rt| {
let buffer_pool = rt.buffer_pool()?;
let op = RecvFromManaged::new(fd, &buffer_pool, len, flags)?;
io::Result::Ok(rt.submit(op))
})?
.await;
let (len, op) = buf_try!(@try inner);
// Kernel returns 0 for the operation, drop the buffer and return Ok(None)
if len == 0 {
return Ok(None);
}
let Some((mut buf, addr)) = op.take_buffer() else {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("Read {len} bytes, but no buffer was selected by kernel"),
));
};
unsafe { buf.advance_to(len) };
Ok(Some((buf, addr)))
}

pub async fn send<T: IoBuf>(&self, buffer: T, flags: i32) -> BufResult<usize, T> {
let fd = self.to_shared_fd();
let op = Send::new(fd, buffer, flags);
Expand Down Expand Up @@ -318,6 +292,47 @@ impl Socket {
unsafe { res.map_vec_advanced() }
}

pub async fn recv_from_managed(
&self,
len: usize,
flags: i32,
) -> io::Result<Option<(BufferRef, Option<SockAddr>)>> {
let fd = self.to_shared_fd();
let inner = Runtime::with_current(|rt| {
let buffer_pool = rt.buffer_pool()?;
let op = RecvFromManaged::new(fd, &buffer_pool, len, flags)?;
io::Result::Ok(rt.submit(op))
})?
.await;
let (len, op) = buf_try!(@try inner);
// Kernel returns 0 for the operation, drop the buffer and return Ok(None)
if len == 0 {
return Ok(None);
}
let Some((mut buf, addr)) = op.take_buffer() else {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("Read {len} bytes, but no buffer was selected by kernel"),
));
};
unsafe { buf.advance_to(len) };
Ok(Some((buf, addr)))
}

pub fn recv_from_multi(
&self,
flags: i32,
) -> impl Stream<Item = io::Result<RecvFromMultiResult>> {
let fd = self.to_shared_fd();
Runtime::with_current(|rt| {
let buffer_pool = rt.buffer_pool()?;
let op = RecvFromMulti::new(fd, &buffer_pool, flags)?;
io::Result::Ok(rt.submit_multi(op).into_managed(buffer_pool))
})
.map(Either::Left)
.unwrap_or_else(|e| Either::Right(futures_util::stream::once(std::future::ready(Err(e)))))
}

pub async fn recv_msg<T: IoBufMut, C: IoBufMut>(
&self,
buffer: T,
Expand All @@ -341,6 +356,53 @@ impl Socket {
unsafe { res.map_vec_advanced() }
}

pub async fn recv_msg_managed<C: IoBufMut>(
&self,
len: usize,
control: C,
flags: i32,
) -> io::Result<Option<(BufferRef, C, Option<SockAddr>)>> {
let fd = self.to_shared_fd();
let inner = Runtime::with_current(|rt| {
let buffer_pool = rt.buffer_pool()?;
let op = RecvMsgManaged::new(fd, &buffer_pool, len, control, flags)?;
io::Result::Ok(rt.submit(op))
})?
.await;
let (len, op) = buf_try!(@try inner);
// Kernel returns 0 for the operation, drop the buffer and return Ok(None)
if len == 0 {
return Ok(None);
}
let Some(((mut buf, mut control), addr, control_len)) = op.take_buffer() else {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("Read {len} bytes, but no buffer was selected by kernel"),
));
};
unsafe { buf.advance_to(len) };
unsafe { control.advance_to(control_len) };
Ok(Some((buf, control, addr)))
}

pub fn recv_msg_multi(
&self,
control_len: usize,
flags: i32,
) -> impl Stream<Item = io::Result<RecvMsgMultiResult>> {
let fd = self.to_shared_fd();
Runtime::with_current(|rt| {
let buffer_pool = rt.buffer_pool()?;
let op = RecvMsgMulti::new(fd, &buffer_pool, control_len, flags)?;
io::Result::Ok(
rt.submit_multi(op)
.into_managed_with(buffer_pool, control_len),
)
})
.map(Either::Left)
.unwrap_or_else(|e| Either::Right(futures_util::stream::once(std::future::ready(Err(e)))))
}

pub async fn send_to<T: IoBuf>(
&self,
buffer: T,
Expand Down
55 changes: 53 additions & 2 deletions compio-net/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use std::{
};

use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{BufferRef, impl_raw_fd};
use compio_driver::{BufferRef, impl_raw_fd, op::RecvMsgMultiResult};
use compio_io::{
AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite,
ancillary::{AsyncReadAncillary, AsyncWriteAncillary},
ancillary::{
AsyncReadAncillary, AsyncReadAncillaryManaged, AsyncReadAncillaryMulti, AsyncWriteAncillary,
},
util::Splittable,
};
use compio_runtime::fd::PollFd;
Expand Down Expand Up @@ -532,6 +534,55 @@ impl AsyncReadAncillary for &TcpStream {
}
}

impl AsyncReadAncillaryManaged for TcpStream {
#[inline]
async fn read_managed_with_ancillary<C: IoBufMut>(
&mut self,
len: usize,
control: C,
) -> io::Result<Option<(Self::Buffer, C)>> {
(&*self).read_managed_with_ancillary(len, control).await
}
}

impl AsyncReadAncillaryManaged for &TcpStream {
#[inline]
async fn read_managed_with_ancillary<C: IoBufMut>(
Comment thread
Berrysoft marked this conversation as resolved.
&mut self,
len: usize,
control: C,
) -> io::Result<Option<(Self::Buffer, C)>> {
self.inner
.recv_msg_managed(len, control, 0)
.await
.map(|res| res.map(|(res, len, _addr)| (res, len)))
}
}

impl AsyncReadAncillaryMulti for TcpStream {
type Return = RecvMsgMultiResult;

#[inline]
fn read_multi_with_ancillary(
&mut self,
control_len: usize,
) -> impl Stream<Item = io::Result<Self::Return>> {
self.inner.recv_msg_multi(control_len, 0)
}
}

impl AsyncReadAncillaryMulti for &TcpStream {
type Return = RecvMsgMultiResult;

#[inline]
fn read_multi_with_ancillary(
&mut self,
control_len: usize,
) -> impl Stream<Item = io::Result<Self::Return>> {
self.inner.recv_msg_multi(control_len, 0)
}
}

impl AsyncWrite for TcpStream {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
Expand Down
91 changes: 67 additions & 24 deletions compio-net/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
};

use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{BufferRef, impl_raw_fd};
use compio_driver::{
BufferRef, impl_raw_fd,
op::{RecvFromMultiResult, RecvMsgMultiResult},
};
use futures_util::Stream;
use socket2::{Protocol, SockAddr, Socket as Socket2, Type};

Expand Down Expand Up @@ -222,29 +225,6 @@ impl UdpSocket {
self.inner.recv_multi(len, 0)
}

/// Read some bytes from this source and the runtime's buffer pool and
/// return a [`BufferRef`] with the sender address.
///
/// If `len` == 0, will use buffer pool's inner buffer size as the max len;
/// if `len` > 0, `min(len, inner buffer size)` will be the read max len
pub async fn recv_from_managed(
&self,
len: usize,
) -> io::Result<Option<(BufferRef, SocketAddr)>> {
let res = self.inner.recv_from_managed(len, 0).await?;
let ret = match res {
Some((buffer, addr)) => {
let addr = addr
.expect("should have addr")
.as_socket()
.expect("should be SocketAddr");
Some((buffer, addr))
}
None => None,
};
Ok(ret)
}

/// Sends some data to the socket from the buffer, returning the original
/// buffer and quantity of data sent.
pub async fn send<T: IoBuf>(&self, buffer: T) -> BufResult<usize, T> {
Expand Down Expand Up @@ -287,6 +267,35 @@ impl UdpSocket {
})
}

/// Read some bytes from this source and the runtime's buffer pool and
/// return a [`BufferRef`] with the sender address.
///
/// If `len` == 0, will use buffer pool's inner buffer size as the max len;
/// if `len` > 0, `min(len, inner buffer size)` will be the read max len
pub async fn recv_from_managed(
&self,
len: usize,
) -> io::Result<Option<(BufferRef, SocketAddr)>> {
let res = self.inner.recv_from_managed(len, 0).await?;
let ret = match res {
Some((buffer, addr)) => {
let addr = addr
.expect("should have addr")
.as_socket()
.expect("should be SocketAddr");
Some((buffer, addr))
}
None => None,
};
Ok(ret)
}

/// Read some bytes from this source and the runtime's buffer pool and
/// return a stream of [`RecvFromMultiResult`].
pub fn recv_from_multi(&self) -> impl Stream<Item = io::Result<RecvFromMultiResult>> {
self.inner.recv_from_multi(0)
}

/// Receives a single datagram message and ancillary data on the socket. On
/// success, returns the number of bytes received and the origin.
pub async fn recv_msg<T: IoBufMut, C: IoBufMut>(
Expand Down Expand Up @@ -325,6 +334,40 @@ impl UdpSocket {
})
}

/// Receives a single datagram message on the socket from the runtime's
/// buffer pool, together with ancillary data. The ancillary data buffer is
/// provided by the caller.
///
/// If `len` == 0, will use buffer pool's inner buffer size as the max len;
/// if `len` > 0, `min(len, inner buffer size)` will be the read max len
pub async fn recv_msg_managed<C: IoBufMut>(
&self,
len: usize,
control: C,
) -> io::Result<Option<(BufferRef, C, SocketAddr)>> {
let res = self.inner.recv_msg_managed(len, control, 0).await?;
let ret = match res {
Some((buffer, control, addr)) => {
let addr = addr
.expect("should have addr")
.as_socket()
.expect("should be SocketAddr");
Some((buffer, control, addr))
}
None => None,
};
Ok(ret)
}

/// Receives multiple single datagram messages and ancillary data on the
/// socket from the runtime's buffer pool.
pub fn recv_msg_multi(
&self,
control_len: usize,
) -> impl Stream<Item = io::Result<RecvMsgMultiResult>> {
self.inner.recv_msg_multi(control_len, 0)
}

/// Sends data on the socket to the given address. On success, returns the
/// number of bytes sent.
pub async fn send_to<T: IoBuf>(
Expand Down
Loading