Skip to content

Commit a40dbc4

Browse files
committed
fix(io,net): change of design
1 parent ada6666 commit a40dbc4

9 files changed

Lines changed: 248 additions & 185 deletions

File tree

compio-driver/src/sys/unix_op.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
802802
/// This function will panic if the control message buffer is misaligned.
803803
pub fn new(fd: S, buffer: T, control: C, addr: Option<SockAddr>, flags: i32) -> Self {
804804
assert!(
805-
control.buf_ptr().cast::<libc::cmsghdr>().is_aligned(),
805+
control.buf_len() == 0 || control.buf_ptr().cast::<libc::cmsghdr>().is_aligned(),
806806
"misaligned control message buffer"
807807
);
808808
Self {
@@ -832,7 +832,11 @@ impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
832832
}
833833
this.msg.msg_iov = this.slices.as_ptr() as _;
834834
this.msg.msg_iovlen = this.slices.len() as _;
835-
this.msg.msg_control = this.control.buf_ptr() as _;
835+
this.msg.msg_control = if this.control.buf_len() == 0 {
836+
std::ptr::null_mut()
837+
} else {
838+
this.control.buf_ptr() as _
839+
};
836840
this.msg.msg_controllen = this.control.buf_len() as _;
837841
}
838842
}

compio-io/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ windows-sys = { workspace = true, optional = true, features = [
3131
] }
3232

3333
[dev-dependencies]
34+
compio-net = { workspace = true }
35+
compio-runtime = { workspace = true }
3436
tokio = { workspace = true, features = ["macros", "rt"] }
3537
serde = { version = "1.0.219", features = ["derive"] }
3638
futures-executor = "0.3.30"

compio-io/src/ancillary/io.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#[cfg(feature = "allocator_api")]
2+
use std::alloc::Allocator;
3+
4+
use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, t_alloc};
5+
6+
/// Trait for asynchronous read with ancillary (control) data.
7+
/// Intended for connected stream sockets (TCP, Unix streams) where no source
8+
/// address is needed.
9+
pub trait AsyncReadAncillary {
10+
/// Read data with ancillary data into an owned buffer.
11+
async fn read_with_ancillary<T: IoBufMut, C: IoBufMut>(
12+
&mut self,
13+
buffer: T,
14+
control: C,
15+
) -> BufResult<(usize, usize), (T, C)>;
16+
17+
/// Read data with ancillary data into a vectored buffer.
18+
async fn read_vectored_with_ancillary<T: IoVectoredBufMut, C: IoBufMut>(
19+
&mut self,
20+
buffer: T,
21+
control: C,
22+
) -> BufResult<(usize, usize), (T, C)>;
23+
}
24+
25+
impl<A: AsyncReadAncillary + ?Sized> AsyncReadAncillary for &mut A {
26+
#[inline]
27+
async fn read_with_ancillary<T: IoBufMut, C: IoBufMut>(
28+
&mut self,
29+
buffer: T,
30+
control: C,
31+
) -> BufResult<(usize, usize), (T, C)> {
32+
(**self).read_with_ancillary(buffer, control).await
33+
}
34+
35+
#[inline]
36+
async fn read_vectored_with_ancillary<T: IoVectoredBufMut, C: IoBufMut>(
37+
&mut self,
38+
buffer: T,
39+
control: C,
40+
) -> BufResult<(usize, usize), (T, C)> {
41+
(**self).read_vectored_with_ancillary(buffer, control).await
42+
}
43+
}
44+
45+
impl<A: AsyncReadAncillary + ?Sized, #[cfg(feature = "allocator_api")] Alloc: Allocator>
46+
AsyncReadAncillary for t_alloc!(Box, A, Alloc)
47+
{
48+
#[inline]
49+
async fn read_with_ancillary<T: IoBufMut, C: IoBufMut>(
50+
&mut self,
51+
buffer: T,
52+
control: C,
53+
) -> BufResult<(usize, usize), (T, C)> {
54+
(**self).read_with_ancillary(buffer, control).await
55+
}
56+
57+
#[inline]
58+
async fn read_vectored_with_ancillary<T: IoVectoredBufMut, C: IoBufMut>(
59+
&mut self,
60+
buffer: T,
61+
control: C,
62+
) -> BufResult<(usize, usize), (T, C)> {
63+
(**self).read_vectored_with_ancillary(buffer, control).await
64+
}
65+
}
66+
67+
/// Trait for asynchronous write with ancillary (control) data.
68+
/// Intended for connected stream sockets (TCP, Unix streams) where no
69+
/// destination address is needed.
70+
pub trait AsyncWriteAncillary {
71+
/// Write data with ancillary data from an owned buffer.
72+
async fn write_with_ancillary<T: IoBuf, C: IoBuf>(
73+
&mut self,
74+
buffer: T,
75+
control: C,
76+
) -> BufResult<usize, (T, C)>;
77+
78+
/// Write data with ancillary data from a vectored buffer.
79+
async fn write_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
80+
&mut self,
81+
buffer: T,
82+
control: C,
83+
) -> BufResult<usize, (T, C)>;
84+
}
85+
86+
impl<A: AsyncWriteAncillary + ?Sized> AsyncWriteAncillary for &mut A {
87+
#[inline]
88+
async fn write_with_ancillary<T: IoBuf, C: IoBuf>(
89+
&mut self,
90+
buffer: T,
91+
control: C,
92+
) -> BufResult<usize, (T, C)> {
93+
(**self).write_with_ancillary(buffer, control).await
94+
}
95+
96+
#[inline]
97+
async fn write_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
98+
&mut self,
99+
buffer: T,
100+
control: C,
101+
) -> BufResult<usize, (T, C)> {
102+
(**self)
103+
.write_vectored_with_ancillary(buffer, control)
104+
.await
105+
}
106+
}
107+
108+
impl<A: AsyncWriteAncillary + ?Sized, #[cfg(feature = "allocator_api")] Alloc: Allocator>
109+
AsyncWriteAncillary for t_alloc!(Box, A, Alloc)
110+
{
111+
#[inline]
112+
async fn write_with_ancillary<T: IoBuf, C: IoBuf>(
113+
&mut self,
114+
buffer: T,
115+
control: C,
116+
) -> BufResult<usize, (T, C)> {
117+
(**self).write_with_ancillary(buffer, control).await
118+
}
119+
120+
#[inline]
121+
async fn write_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
122+
&mut self,
123+
buffer: T,
124+
control: C,
125+
) -> BufResult<usize, (T, C)> {
126+
(**self)
127+
.write_vectored_with_ancillary(buffer, control)
128+
.await
129+
}
130+
}

compio-io/src/ancillary/mod.rs

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
//! ancillary data payloads.
1616
//! - [`CodecError`]: Error type for encoding/decoding operations.
1717
//!
18+
//! # Traits
19+
//!
20+
//! - [`AsyncReadAncillary`]: read data together with ancillary data
21+
//! - [`AsyncWriteAncillary`]: write data together with ancillary data
22+
//!
1823
//! # Functions
1924
//!
2025
//! - [`ancillary_space`]: Helper function to calculate ancillary message size
@@ -27,33 +32,52 @@
2732
//!
2833
//! # Example
2934
//!
35+
//! Send and receive a file descriptor over a Unix socket pair using
36+
//! `SCM_RIGHTS`:
37+
//!
3038
//! ```
31-
//! use compio_io::ancillary::{AncillaryBuf, AncillaryIter, CodecError, ancillary_space};
39+
//! # #[cfg(unix)] {
40+
//! use std::os::unix::io::RawFd;
41+
//!
42+
//! use compio_io::ancillary::*;
43+
//! use compio_net::UnixStream;
44+
//!
45+
//! const BUF_SIZE: usize = ancillary_space::<RawFd>();
3246
//!
33-
//! const LEVEL: i32 = 1;
34-
//! const TYPE: i32 = 2;
47+
//! # compio_runtime::Runtime::new().unwrap().block_on(async {
48+
//! // Create a socket pair.
49+
//! let (std_a, std_b) = std::os::unix::net::UnixStream::pair().unwrap();
50+
//! let mut a = UnixStream::from_std(std_a).unwrap();
51+
//! let mut b = UnixStream::from_std(std_b).unwrap();
3552
//!
36-
//! // Build a buffer containing two `u32` ancillary messages.
37-
//! let mut buf = AncillaryBuf::<{ ancillary_space::<u32>() * 2 }>::new();
38-
//! let mut builder = buf.builder();
39-
//! builder.push(LEVEL, TYPE, &42u32).unwrap();
40-
//! builder.push(LEVEL, TYPE, &43u32).unwrap();
41-
//! // Buffer is full, cannot add more messages.
42-
//! assert!(matches!(
43-
//! builder.push(LEVEL, TYPE, &44u32),
44-
//! Err(CodecError::BufferTooSmall)
45-
//! ));
53+
//! // Pass fd 0 (stdin) as ancillary data via SCM_RIGHTS.
54+
//! let mut ctrl_send = AncillaryBuf::<BUF_SIZE>::new();
55+
//! let mut builder = ctrl_send.builder();
56+
//! builder
57+
//! .push(libc::SOL_SOCKET, libc::SCM_RIGHTS, &(0 as RawFd))
58+
//! .unwrap();
4659
//!
47-
//! // Read back the messages.
48-
//! unsafe {
49-
//! let mut iter = AncillaryIter::new(&buf);
50-
//! let msg = iter.next().unwrap();
51-
//! assert_eq!(msg.level(), LEVEL);
52-
//! assert_eq!(msg.ty(), TYPE);
53-
//! assert_eq!(msg.data::<u32>().unwrap(), 42u32);
54-
//! assert_eq!(iter.next().unwrap().data::<u32>().unwrap(), 43u32);
55-
//! assert!(iter.next().is_none());
56-
//! }
60+
//! // Send the payload together with the ancillary data.
61+
//! a.write_with_ancillary(b"hello", ctrl_send).await.0.unwrap();
62+
//!
63+
//! // Receive on the other end.
64+
//! let payload = Vec::with_capacity(5);
65+
//! let ctrl_recv = AncillaryBuf::<BUF_SIZE>::new();
66+
//! let ((_, ctrl_len), (payload, ctrl_recv)) =
67+
//! b.read_with_ancillary(payload, ctrl_recv).await.unwrap();
68+
//!
69+
//! assert_eq!(&payload[..], b"hello");
70+
//!
71+
//! // Parse the received ancillary messages.
72+
//! let mut iter = unsafe { AncillaryIter::new(&ctrl_recv[..ctrl_len]) };
73+
//! let msg = iter.next().unwrap();
74+
//! assert_eq!(msg.level(), libc::SOL_SOCKET);
75+
//! assert_eq!(msg.ty(), libc::SCM_RIGHTS);
76+
//! // The kernel duplicates the fd, so the received value may differ.
77+
//! let _received_fd = unsafe { msg.data::<RawFd>() };
78+
//! assert!(iter.next().is_none());
79+
//! # });
80+
//! # }
5781
//! ```
5882
5983
use std::{
@@ -67,6 +91,10 @@ use compio_buf::{IoBuf, IoBufMut, SetLen};
6791
#[cfg(windows)]
6892
use windows_sys::Win32::Networking::WinSock;
6993

94+
mod io;
95+
96+
pub use self::io::*;
97+
7098
cfg_if::cfg_if! {
7199
if #[cfg(windows)] {
72100
#[path = "windows.rs"]

compio-io/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ pub mod framed;
137137
#[cfg(feature = "compat")]
138138
pub mod compat;
139139
mod read;
140-
pub mod socket;
141140
pub mod util;
142141
mod write;
143142

compio-io/src/socket.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)