Skip to content

Commit 7964618

Browse files
committed
feat(io,net): zerocopy API
1 parent 22cabd3 commit 7964618

6 files changed

Lines changed: 184 additions & 9 deletions

File tree

compio-io/src/write/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#[cfg(feature = "allocator_api")]
22
use std::alloc::Allocator;
3-
use std::io::Cursor;
3+
use std::{future::ready, io::Cursor};
44

55
use compio_buf::{BufResult, IntoInner, IoBuf, IoVectoredBuf, buf_try, t_alloc};
66

@@ -35,6 +35,32 @@ pub trait AsyncWrite {
3535
loop_write_vectored!(buf, iter, self.write(iter))
3636
}
3737

38+
/// Like `write`, except that it attempts to write the buffer without
39+
/// copying in kernel.
40+
///
41+
/// The returned future can be used to wait for the buffer to be release by
42+
/// the kernel. By default this falls back to `write`, and the returned
43+
/// future will be ready immediately.
44+
async fn write_zerocopy<T: IoBuf>(
45+
&mut self,
46+
buf: T,
47+
) -> BufResult<usize, impl Future<Output = T> + use<'_, Self, T>> {
48+
self.write(buf).await.map_buffer(ready)
49+
}
50+
51+
/// Like `write_zerocopy`, except that it attempts to write the buffers
52+
/// without copying in kernel.
53+
///
54+
/// The returned future can be used to wait for the buffer to be release by
55+
/// the kernel. By default this falls back to `write_vectored`, and the
56+
/// returned future will be ready immediately.
57+
async fn write_zerocopy_vectored<T: IoVectoredBuf>(
58+
&mut self,
59+
buf: T,
60+
) -> BufResult<usize, impl Future<Output = T> + use<'_, Self, T>> {
61+
self.write_vectored(buf).await.map_buffer(ready)
62+
}
63+
3864
/// Attempts to flush the object, ensuring that any buffered data reach
3965
/// their destination.
4066
async fn flush(&mut self) -> IoResult<()>;

compio-net/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ cfg-if = { workspace = true }
2424
either = "1.9.0"
2525
once_cell = { workspace = true }
2626
socket2 = { workspace = true }
27+
futures-util = { workspace = true }
2728

2829
[target.'cfg(windows)'.dependencies]
2930
widestring = { workspace = true }
@@ -40,7 +41,6 @@ libc = { workspace = true }
4041
# Shared dev dependencies for all platforms
4142
[dev-dependencies]
4243
compio-macros = { workspace = true }
43-
futures-util = { workspace = true }
4444
tempfile = { workspace = true }
4545

4646
[features]

compio-net/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
#![cfg_attr(docsrs, feature(doc_cfg))]
66
#![cfg_attr(feature = "once_cell_try", feature(once_cell_try))]
7-
#![allow(unused_features)]
7+
#![allow(unused_features, refining_impl_trait)]
88
#![warn(missing_docs)]
99
#![deny(rustdoc::broken_intra_doc_links)]
1010
#![doc(

compio-net/src/socket.rs

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectore
88
#[cfg(unix)]
99
use compio_driver::op::CreateSocket;
1010
use compio_driver::{
11-
AsRawFd, ToSharedFd, impl_raw_fd,
11+
AsRawFd, OpCode, ToSharedFd, impl_raw_fd,
1212
op::{
1313
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged,
1414
RecvFromVectored, RecvManaged, RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer,
15-
Send, SendMsg, SendTo, SendToVectored, SendVectored, ShutdownSocket, VecBufResultExt,
15+
Send, SendMsg, SendMsgZc, SendTo, SendToVectored, SendVectored, SendVectoredZc, SendZc,
16+
ShutdownSocket, VecBufResultExt,
1617
},
1718
syscall,
1819
};
1920
use compio_runtime::{Attacher, BorrowedBuffer, BufferPool, fd::PollFd};
21+
use futures_util::StreamExt;
2022
use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type};
2123

2224
#[derive(Debug, Clone)]
@@ -208,6 +210,22 @@ impl Socket {
208210
compio_runtime::submit(op).await.into_inner()
209211
}
210212

213+
pub async fn send_zerocopy<T: IoBuf>(
214+
&self,
215+
buf: T,
216+
flags: i32,
217+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
218+
submit_zc(SendZc::new(self.to_shared_fd(), buf, flags)).await
219+
}
220+
221+
pub async fn send_zerocopy_vectored<T: IoVectoredBuf>(
222+
&self,
223+
buf: T,
224+
flags: i32,
225+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
226+
submit_zc(SendVectoredZc::new(self.to_shared_fd(), buf, flags)).await
227+
}
228+
211229
pub async fn recv_from<T: IoBufMut>(
212230
&self,
213231
buffer: T,
@@ -275,6 +293,26 @@ impl Socket {
275293
compio_runtime::submit(op).await.into_inner()
276294
}
277295

296+
pub async fn send_to_zerocopy<T: IoBuf>(
297+
&self,
298+
buffer: T,
299+
addr: &SockAddr,
300+
flags: i32,
301+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
302+
let op = SendTo::new(self.to_shared_fd(), buffer, addr.clone(), flags);
303+
submit_zc(op).await
304+
}
305+
306+
pub async fn send_to_zerocopy_vectored<T: IoVectoredBuf>(
307+
&self,
308+
buffer: T,
309+
addr: &SockAddr,
310+
flags: i32,
311+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
312+
let op = SendToVectored::new(self.to_shared_fd(), buffer, addr.clone(), flags);
313+
submit_zc(op).await
314+
}
315+
278316
pub async fn send_msg<T: IoBuf, C: IoBuf>(
279317
&self,
280318
buffer: T,
@@ -299,6 +337,33 @@ impl Socket {
299337
compio_runtime::submit(op).await.into_inner()
300338
}
301339

340+
pub async fn send_msg_zerocopy<T: IoBuf, C: IoBuf>(
341+
&self,
342+
buffer: T,
343+
control: C,
344+
addr: Option<&SockAddr>,
345+
flags: i32,
346+
) -> BufResult<usize, impl Future<Output = (T, C)> + use<T, C>> {
347+
self.send_msg_vectoed_zerocopy([buffer], control, addr, flags)
348+
.await
349+
.map_buffer(|fut| async move {
350+
let ([buffer], control) = fut.await;
351+
(buffer, control)
352+
})
353+
}
354+
355+
pub async fn send_msg_vectoed_zerocopy<T: IoVectoredBuf, C: IoBuf>(
356+
&self,
357+
buffer: T,
358+
control: C,
359+
addr: Option<&SockAddr>,
360+
flags: i32,
361+
) -> BufResult<usize, impl Future<Output = (T, C)> + use<T, C>> {
362+
let fd = self.to_shared_fd();
363+
let op = SendMsgZc::new(fd, buffer, control, addr.cloned(), flags);
364+
submit_zc(op).await
365+
}
366+
302367
#[cfg(unix)]
303368
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
304369
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
@@ -377,3 +442,27 @@ impl Socket {
377442
}
378443

379444
impl_raw_fd!(Socket, Socket2, socket, socket);
445+
446+
async fn submit_zc<T: OpCode + IntoInner + 'static>(
447+
op: T,
448+
) -> BufResult<usize, impl Future<Output = T::Inner> + use<T>> {
449+
let mut stream = compio_runtime::submit_multi(op);
450+
let res = stream
451+
.next()
452+
.await
453+
.expect("SubmitMulti should yield at least one item")
454+
.0;
455+
456+
let fut = async move {
457+
loop {
458+
match stream.try_take() {
459+
Ok(op) => return op.into_inner(),
460+
Err(st) => stream = st,
461+
}
462+
463+
_ = stream.next().await;
464+
}
465+
};
466+
467+
BufResult(res, fut)
468+
}

compio-net/src/split.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,33 @@ where
3737
for<'a> &'a T: AsyncWrite,
3838
{
3939
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
40-
self.0.write(buf).await
40+
(self.0).write(buf).await
4141
}
4242

4343
async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
44-
self.0.write_vectored(buf).await
44+
(self.0).write_vectored(buf).await
45+
}
46+
47+
async fn write_zerocopy<B: IoBuf>(
48+
&mut self,
49+
buf: B,
50+
) -> BufResult<usize, impl Future<Output = B> + use<B, T>> {
51+
(self.0).write_zerocopy(buf).await
52+
}
53+
54+
async fn write_zerocopy_vectored<B: IoVectoredBuf>(
55+
&mut self,
56+
buf: B,
57+
) -> BufResult<usize, impl Future<Output = B> + use<B, T>> {
58+
(self.0).write_zerocopy_vectored(buf).await
4559
}
4660

4761
async fn flush(&mut self) -> io::Result<()> {
48-
self.0.flush().await
62+
(self.0).flush().await
4963
}
5064

5165
async fn shutdown(&mut self) -> io::Result<()> {
52-
self.0.shutdown().await
66+
(self.0).shutdown().await
5367
}
5468
}
5569

@@ -108,6 +122,20 @@ where
108122
(&self.0).write_vectored(buf).await
109123
}
110124

125+
async fn write_zerocopy<B: IoBuf>(
126+
&mut self,
127+
buf: B,
128+
) -> BufResult<usize, impl Future<Output = B> + use<'_, B, T>> {
129+
(&self.0).write_zerocopy(buf).await
130+
}
131+
132+
async fn write_zerocopy_vectored<B: IoVectoredBuf>(
133+
&mut self,
134+
buf: B,
135+
) -> BufResult<usize, impl Future<Output = B> + use<'_, B, T>> {
136+
(&self.0).write_zerocopy_vectored(buf).await
137+
}
138+
111139
async fn flush(&mut self) -> io::Result<()> {
112140
(&self.0).flush().await
113141
}

compio-net/src/tcp.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,22 @@ impl AsyncWrite for TcpStream {
391391
(&*self).write_vectored(buf).await
392392
}
393393

394+
#[inline]
395+
async fn write_zerocopy<T: IoBuf>(
396+
&mut self,
397+
buf: T,
398+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
399+
self.inner.send_zerocopy(buf, 0).await
400+
}
401+
402+
#[inline]
403+
async fn write_zerocopy_vectored<T: IoVectoredBuf>(
404+
&mut self,
405+
buf: T,
406+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
407+
self.inner.send_zerocopy_vectored(buf, 0).await
408+
}
409+
394410
#[inline]
395411
async fn flush(&mut self) -> io::Result<()> {
396412
(&*self).flush().await
@@ -413,6 +429,22 @@ impl AsyncWrite for &TcpStream {
413429
self.inner.send_vectored(buf, 0).await
414430
}
415431

432+
#[inline]
433+
async fn write_zerocopy<T: IoBuf>(
434+
&mut self,
435+
buf: T,
436+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
437+
self.inner.send_zerocopy(buf, 0).await
438+
}
439+
440+
#[inline]
441+
async fn write_zerocopy_vectored<T: IoVectoredBuf>(
442+
&mut self,
443+
buf: T,
444+
) -> BufResult<usize, impl Future<Output = T> + use<T>> {
445+
self.inner.send_zerocopy_vectored(buf, 0).await
446+
}
447+
416448
#[inline]
417449
async fn flush(&mut self) -> io::Result<()> {
418450
Ok(())

0 commit comments

Comments
 (0)