Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion compio-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ windows-sys = { workspace = true, optional = true, features = [
] }

[dev-dependencies]
aligned-array = "1.0.1"
tokio = { workspace = true, features = ["macros", "rt"] }
serde = { version = "1.0.219", features = ["derive"] }
futures-executor = "0.3.30"
Expand All @@ -57,3 +56,7 @@ required-features = ["compat"]
[[test]]
name = "framed"
required-features = ["codec-serde-json"]

[[test]]
name = "ancillary"
required-features = ["ancillary"]
132 changes: 106 additions & 26 deletions compio-io/src/ancillary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,36 @@
//! - [`AncillaryRef`]: A reference to a single ancillary data entry.
//! - [`AncillaryIter`]: An iterator over a buffer of ancillary messages.
//! - [`AncillaryBuilder`]: A builder for constructing ancillary messages into a
//! caller-supplied send buffer.
//! [`AncillaryBuf`].
//! - [`AncillaryBuf`]: A fixed-size, properly aligned stack buffer for
Comment thread
fantix marked this conversation as resolved.
Outdated
//! ancillary data
//! ancillary data.
//!
//! # Example
//!
//! ```
//! use compio_io::ancillary::{AncillaryBuf, AncillaryIter, ancillary_space};
//!
//! const LEVEL: i32 = 1;
//! const TYPE: i32 = 2;
//!
//! // Build a buffer containing two `u32` ancillary messages.
//! let mut buf = AncillaryBuf::<{ ancillary_space::<u32>() * 2 }>::new();
//! let mut builder = buf.builder();
//! builder.try_push(LEVEL, TYPE, 42u32).unwrap();
//! builder.try_push(LEVEL, TYPE, 43u32).unwrap();
//! assert!(builder.try_push(LEVEL, TYPE, 44u32).is_none()); // buffer is full
//!
//! // Read it back.
//! unsafe {
//! let mut iter = AncillaryIter::new(&buf);
//! let msg = iter.next().unwrap();
//! assert_eq!(msg.level(), LEVEL);
//! assert_eq!(msg.ty(), TYPE);
//! assert_eq!(*msg.data::<u32>(), 42u32);
//! assert_eq!(iter.next().unwrap().data::<u32>(), &43u32);
//! assert!(iter.next().is_none());
//! }
//! ```

use std::{
marker::PhantomData,
Expand Down Expand Up @@ -101,33 +128,18 @@ impl<'a> Iterator for AncillaryIter<'a> {
}

/// Helper to construct ancillary (control) messages.
pub struct AncillaryBuilder<'a> {
pub struct AncillaryBuilder<'a, const N: usize> {
inner: sys::CMsgIter,
len: usize,
_p: PhantomData<&'a mut ()>,
buffer: &'a mut AncillaryBuf<N>,
}

impl<'a> AncillaryBuilder<'a> {
/// Create [`AncillaryBuilder`] with the given buffer. The buffer will be
/// zeroed on creation.
///
/// # Panics
///
/// This function will panic if the buffer is too short or not properly
/// aligned.
pub fn new(buffer: &'a mut [MaybeUninit<u8>]) -> Self {
impl<'a, const N: usize> AncillaryBuilder<'a, N> {
fn new(buffer: &'a mut AncillaryBuf<N>) -> Self {
// TODO: optimize zeroing
buffer.fill(MaybeUninit::new(0));
Self {
inner: sys::CMsgIter::new(buffer.as_ptr().cast(), buffer.len()),
len: 0,
_p: PhantomData,
}
}

/// Finishes building, returns length of the control message.
pub fn finish(self) -> usize {
self.len
buffer.as_uninit().fill(MaybeUninit::new(0));
Comment thread
George-Miao marked this conversation as resolved.
buffer.len = 0;
let inner = sys::CMsgIter::new(buffer.as_ptr(), buffer.buf_capacity());
Self { inner, buffer }
}

/// Try to append a control message entry into the buffer. If the buffer
Expand All @@ -143,7 +155,7 @@ impl<'a> AncillaryBuilder<'a> {
let mut cmsg = self.inner.current_mut()?;
cmsg.set_level(level);
cmsg.set_ty(ty);
self.len += cmsg.set_data(value);
self.buffer.len += cmsg.set_data(value);

self.inner.next();
}
Expand Down Expand Up @@ -175,6 +187,16 @@ impl<const N: usize> AncillaryBuf<N> {
_align: [],
}
}

/// Create [`AncillaryBuilder`] with this buffer. The buffer will be zeroed
/// on creation.
///
/// # Panics
///
/// This function will panic if this buffer is too short.
pub fn builder(&mut self) -> AncillaryBuilder<'_, N> {
AncillaryBuilder::new(self)
}
Comment thread
fantix marked this conversation as resolved.
}

impl<const N: usize> Default for AncillaryBuf<N> {
Expand Down Expand Up @@ -215,3 +237,61 @@ impl<const N: usize> DerefMut for AncillaryBuf<N> {
&mut self.inner[0..self.len]
}
}

// Deprecated compio_net::CMsgBuilder
Comment thread
Berrysoft marked this conversation as resolved.
#[doc(hidden)]
pub struct CMsgBuilder<'a> {
inner: sys::CMsgIter,
len: usize,
_p: PhantomData<&'a mut ()>,
}

impl<'a> CMsgBuilder<'a> {
pub fn new(buffer: &'a mut [MaybeUninit<u8>]) -> Self {
buffer.fill(MaybeUninit::new(0));
Self {
inner: sys::CMsgIter::new(buffer.as_ptr().cast(), buffer.len()),
len: 0,
_p: PhantomData,
}
}

pub fn finish(self) -> usize {
self.len
}

pub fn try_push<T>(&mut self, level: i32, ty: i32, value: T) -> Option<()> {
if !self.inner.is_aligned::<T>() || !self.inner.is_space_enough::<T>() {
return None;
}

// SAFETY: the buffer is zeroed and the pointer is valid and aligned
unsafe {
let mut cmsg = self.inner.current_mut()?;
cmsg.set_level(level);
cmsg.set_ty(ty);
self.len += cmsg.set_data(value);
Comment thread
fantix marked this conversation as resolved.

self.inner.next();
}

Some(())
}
}

/// Returns the buffer size required to hold one ancillary message carrying a
/// value of type `T`.
///
/// This is the platform-appropriate equivalent of `CMSG_SPACE(sizeof(T))` on
/// Unix or `WSA_CMSG_SPACE(sizeof(T))` on Windows, and can be used as a const
/// generic argument for [`AncillaryBuf`].
pub const fn ancillary_space<T>() -> usize {
#[cfg(unix)]
// SAFETY: CMSG_SPACE is always safe
unsafe {
libc::CMSG_SPACE(std::mem::size_of::<T>() as libc::c_uint) as usize
}

#[cfg(windows)]
sys::wsa_cmsg_space(std::mem::size_of::<T>())
}
2 changes: 1 addition & 1 deletion compio-io/src/ancillary/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ unsafe fn wsa_cmsg_data(cmsg: *const CMSGHDR) -> *mut u8 {
}

#[inline]
const fn wsa_cmsg_space(length: usize) -> usize {
pub(crate) const fn wsa_cmsg_space(length: usize) -> usize {
WSA_CMSGDATA_OFFSET + wsa_cmsghdr_align(length)
}

Expand Down
18 changes: 7 additions & 11 deletions compio-io/tests/ancillary.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
use std::mem::MaybeUninit;

use aligned_array::{A8, Aligned};
use compio_buf::{IoBuf, IoBufMut};
use compio_io::ancillary::{AncillaryBuilder, AncillaryIter};
use compio_buf::IoBuf;
use compio_io::ancillary::{AncillaryBuf, AncillaryIter, CMsgBuilder};

#[test]
fn test_cmsg() {
let mut buf: Aligned<A8, [u8; 64]> = Aligned([0u8; 64]);
let mut builder = AncillaryBuilder::new(buf.as_uninit());
let mut buf = AncillaryBuf::<64>::new();
let mut builder = buf.builder();

builder.try_push(0, 0, ()).unwrap(); // 16 / 12
builder.try_push(1, 1, u32::MAX).unwrap(); // 16 + 4 + 4 / 12 + 4
builder.try_push(2, 2, i64::MIN).unwrap(); // 16 + 8 / 12 + 8
Comment thread
fantix marked this conversation as resolved.
Outdated
let len = builder.finish();
assert!(len == 64 || len == 48);
assert!(buf.buf_len() == 64 || buf.buf_len() == 48);

unsafe {
let buf = buf.slice(..len);
let mut iter = AncillaryIter::new(&buf);

let cmsg = iter.next().unwrap();
Expand All @@ -38,13 +35,12 @@ fn test_cmsg() {
#[test]
#[should_panic]
fn invalid_buffer_length() {
let mut buf = [MaybeUninit::new(0u8); 1];
AncillaryBuilder::new(&mut buf);
AncillaryBuf::<1>::new().builder();
}

#[test]
#[should_panic]
fn invalid_buffer_alignment() {
let mut buf = [MaybeUninit::new(0u8); 64];
AncillaryBuilder::new(&mut buf[1..]);
CMsgBuilder::new(&mut buf[1..]);
}
4 changes: 2 additions & 2 deletions compio-net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pub type CMsgIter<'a> = compio_io::ancillary::AncillaryIter<'a>;
/// Helper to construct control message.
#[deprecated(
since = "0.12.0",
note = "use `compio_io::ancillary::AncillaryBuilder` instead"
note = "use `compio_io::ancillary::AncillaryBuf::builder()` instead"
)]
pub type CMsgBuilder<'a> = compio_io::ancillary::AncillaryBuilder<'a>;
pub type CMsgBuilder<'a> = compio_io::ancillary::CMsgBuilder<'a>;

/// Providing functionalities to wait for readiness.
#[deprecated(since = "0.12.0", note = "Use `compio::runtime::fd::PollFd` instead")]
Expand Down
10 changes: 3 additions & 7 deletions compio-quic/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use std::{
sync::atomic::Ordering,
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetLen, buf_try};
use compio_io::ancillary::{AncillaryBuf, AncillaryBuilder, AncillaryIter};
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, buf_try};
use compio_io::ancillary::{AncillaryBuf, AncillaryIter};
use compio_net::UdpSocket;
use quinn_proto::{EcnCodepoint, Transmit};
#[cfg(windows)]
Expand Down Expand Up @@ -368,7 +368,7 @@ impl Socket {
let ecn = transmit.ecn.map_or(0, |x| x as u8);

let mut control = AncillaryBuf::<CMSG_LEN>::new();
let mut builder = AncillaryBuilder::new(control.as_uninit());
let mut builder = control.builder();

// ECN
if is_ipv4 {
Expand Down Expand Up @@ -462,10 +462,6 @@ impl Socket {
let _ = segment_size;
}

let len = builder.finish();
// SAFETY: AncillaryBuilder ensures the buffer is initialized within len
unsafe { control.set_len(len) };

let mut buffer = buffer.slice(0..transmit.size);

loop {
Expand Down
Loading