Skip to content

Commit 8dff845

Browse files
authored
feat(driver): recvmsg multi (#842)
* feat(driver): recvmsg multi * refactor(driver): hide impl details
1 parent 9143e8a commit 8dff845

8 files changed

Lines changed: 911 additions & 27 deletions

File tree

compio-driver/src/op/managed.rs

Lines changed: 154 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use std::io;
22

3-
use compio_buf::IntoInner;
3+
use compio_buf::{IntoInner, IoBuf, IoBufMut, SetLen};
44
use socket2::SockAddr;
55

66
use super::{Read, ReadAt, Recv, RecvFrom};
7-
use crate::{AsFd, BufferPool, BufferRef, op::TakeBuffer};
7+
use crate::{
8+
AsFd, BufferPool, BufferRef,
9+
op::{RecvMsg, TakeBuffer},
10+
};
811

912
/// Read a file at specified position into managed buffer.
1013
pub struct ReadManagedAt<S> {
@@ -97,9 +100,158 @@ impl<S: AsFd> TakeBuffer for RecvFromManaged<S> {
97100
}
98101
}
99102

103+
/// Receive data into managed buffer, and ancillary data into control buffer.
104+
pub struct RecvMsgManaged<C: IoBufMut, S: AsFd> {
105+
pub(crate) op: RecvMsg<[BufferRef; 1], C, S>,
106+
}
107+
108+
impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
109+
/// Create [`RecvMsgManaged`].
110+
pub fn new(fd: S, pool: &BufferPool, len: usize, control: C, flags: i32) -> io::Result<Self> {
111+
Ok(Self {
112+
op: RecvMsg::new(fd, [pool.pop()?.with_capacity(len)], control, flags),
113+
})
114+
}
115+
}
116+
117+
impl<C: IoBufMut, S: AsFd> TakeBuffer for RecvMsgManaged<C, S> {
118+
type Buffer = ((BufferRef, C), Option<SockAddr>, usize);
119+
120+
fn take_buffer(self) -> Option<Self::Buffer> {
121+
let (([buf], control), addr, len) = self.op.into_inner();
122+
Some(((buf, control), addr, len))
123+
}
124+
}
125+
100126
/// Read a file at specified position into multiple managed buffers.
101127
pub type ReadMultiAt<S> = ReadManagedAt<S>;
102128
/// Read a file into multiple managed buffers.
103129
pub type ReadMulti<S> = ReadManaged<S>;
104130
/// Receive data from remote into multiple managed buffers.
105131
pub type RecvMulti<S> = RecvManaged<S>;
132+
133+
/// Result of [`RecvFromMulti`].
134+
pub struct RecvFromMultiResult {
135+
buffer: BufferRef,
136+
addr: Option<SockAddr>,
137+
}
138+
139+
impl RecvFromMultiResult {
140+
#[doc(hidden)]
141+
pub unsafe fn new(_: BufferRef) -> Self {
142+
unreachable!("should not be called directly")
143+
}
144+
145+
/// Get the payload data.
146+
pub fn data(&self) -> &[u8] {
147+
self.buffer.as_init()
148+
}
149+
150+
/// Get the source address if applicable.
151+
pub fn addr(&self) -> Option<SockAddr> {
152+
self.addr.clone()
153+
}
154+
}
155+
156+
impl IntoInner for RecvFromMultiResult {
157+
type Inner = BufferRef;
158+
159+
fn into_inner(self) -> Self::Inner {
160+
self.buffer
161+
}
162+
}
163+
164+
/// Receive data and source address multi times into multiple managed buffers.
165+
pub struct RecvFromMulti<S: AsFd> {
166+
pub(crate) op: RecvFromManaged<S>,
167+
pub(crate) len: usize,
168+
}
169+
170+
impl<S: AsFd> RecvFromMulti<S> {
171+
/// Create [`RecvFromMulti`].
172+
pub fn new(fd: S, pool: &BufferPool, flags: i32) -> io::Result<Self> {
173+
Ok(Self {
174+
op: RecvFromManaged::new(fd, pool, 0, flags)?,
175+
len: 0,
176+
})
177+
}
178+
}
179+
180+
impl<S: AsFd> TakeBuffer for RecvFromMulti<S> {
181+
type Buffer = RecvFromMultiResult;
182+
183+
fn take_buffer(self) -> Option<Self::Buffer> {
184+
let (mut buffer, addr) = self.op.take_buffer()?;
185+
unsafe { buffer.advance_to(self.len) };
186+
Some(RecvFromMultiResult { buffer, addr })
187+
}
188+
}
189+
190+
/// Result of [`RecvMsgMulti`].
191+
pub struct RecvMsgMultiResult {
192+
buffer: BufferRef,
193+
control: BufferRef,
194+
addr: Option<SockAddr>,
195+
}
196+
197+
impl RecvMsgMultiResult {
198+
#[doc(hidden)]
199+
pub unsafe fn new(_: BufferRef, _: usize) -> Self {
200+
unreachable!("should not be called directly")
201+
}
202+
203+
/// Get the payload data.
204+
pub fn data(&self) -> &[u8] {
205+
self.buffer.as_init()
206+
}
207+
208+
/// Get the source address if applicable.
209+
pub fn addr(&self) -> Option<SockAddr> {
210+
self.addr.clone()
211+
}
212+
213+
/// Get the ancillary data.
214+
pub fn ancillary(&self) -> &[u8] {
215+
self.control.as_init()
216+
}
217+
}
218+
219+
impl IntoInner for RecvMsgMultiResult {
220+
type Inner = BufferRef;
221+
222+
fn into_inner(self) -> Self::Inner {
223+
self.buffer
224+
}
225+
}
226+
227+
/// Receive data, ancillary data and source address multi times into multiple
228+
/// managed buffers.
229+
pub struct RecvMsgMulti<S: AsFd> {
230+
pub(crate) op: RecvMsgManaged<BufferRef, S>,
231+
pub(crate) len: usize,
232+
}
233+
234+
impl<S: AsFd> RecvMsgMulti<S> {
235+
/// Create [`RecvMsgMulti`].
236+
pub fn new(fd: S, pool: &BufferPool, control_len: usize, flags: i32) -> io::Result<Self> {
237+
Ok(Self {
238+
op: RecvMsgManaged::new(fd, pool, 0, pool.pop()?.with_capacity(control_len), flags)?,
239+
len: 0,
240+
})
241+
}
242+
}
243+
244+
impl<S: AsFd> TakeBuffer for RecvMsgMulti<S> {
245+
type Buffer = RecvMsgMultiResult;
246+
247+
fn take_buffer(self) -> Option<Self::Buffer> {
248+
let ((mut buffer, mut control), addr, control_len) = self.op.take_buffer()?;
249+
unsafe { buffer.advance_to(self.len) };
250+
unsafe { control.advance_to(control_len) };
251+
Some(RecvMsgMultiResult {
252+
buffer,
253+
control,
254+
addr,
255+
})
256+
}
257+
}

compio-driver/src/op/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ pub(crate) mod managed;
1414

1515
#[cfg(not(io_uring))]
1616
pub use managed::{
17-
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
17+
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvFromMulti,
18+
RecvFromMultiResult, RecvManaged, RecvMsgManaged, RecvMsgMulti, RecvMsgMultiResult, RecvMulti,
1819
};
1920

2021
#[cfg(linux_all)]
@@ -33,7 +34,8 @@ pub use crate::sys::op::{
3334
pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
3435
#[cfg(io_uring)]
3536
pub use crate::sys::op::{
36-
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
37+
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvFromMulti,
38+
RecvFromMultiResult, RecvManaged, RecvMsgManaged, RecvMsgMulti, RecvMsgMultiResult, RecvMulti,
3739
};
3840
use crate::{BufferRef, OwnedFd, SharedFd};
3941

compio-driver/src/sys/fusion/op.rs

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,8 @@ macro_rules! mop {
205205

206206
fn take_buffer(self) -> Option<$inner> {
207207
match self.inner {
208-
[< $name Inner >]::IoUring(op) => op.take_buffer(),
209-
[< $name Inner >]::Poll(op) => op.take_buffer(),
208+
[< $name Inner >]::IoUring(op) => op.take_buffer().map(Into::into),
209+
[< $name Inner >]::Poll(op) => op.take_buffer().map(Into::into),
210210
}
211211
}
212212
}
@@ -272,6 +272,157 @@ mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize)
272272
mop!(<S: AsFd> ReadManaged(fd: S, pool: &BufferPool, len: usize) with pool);
273273
mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool);
274274
mop!(<S: AsFd> RecvFromManaged(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool; (BufferRef, Option<SockAddr>));
275+
mop!(<C: IoBufMut, S: AsFd> RecvMsgManaged(fd: S, pool: &BufferPool, len: usize, control: C, flags: i32) with pool; ((BufferRef, C), Option<SockAddr>, usize));
275276
mop!(<S: AsFd> ReadMultiAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
276277
mop!(<S: AsFd> ReadMulti(fd: S, pool: &BufferPool, len: usize) with pool);
277278
mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: i32) with pool);
279+
mop!(<S: AsFd> RecvFromMulti(fd: S, pool: &BufferPool, flags: i32) with pool; RecvFromMultiResult);
280+
mop!(<S: AsFd> RecvMsgMulti(fd: S, pool: &BufferPool, control_len: usize, flags: i32) with pool; RecvMsgMultiResult);
281+
282+
enum RecvFromMultiResultInner {
283+
Poll(crate::op::managed::RecvFromMultiResult),
284+
IoUring(iour::RecvFromMultiResult),
285+
}
286+
287+
/// Result of [`RecvFromMulti`].
288+
pub struct RecvFromMultiResult {
289+
inner: RecvFromMultiResultInner,
290+
}
291+
292+
impl From<crate::op::managed::RecvFromMultiResult> for RecvFromMultiResult {
293+
fn from(result: crate::op::managed::RecvFromMultiResult) -> Self {
294+
Self {
295+
inner: RecvFromMultiResultInner::Poll(result),
296+
}
297+
}
298+
}
299+
300+
impl From<iour::RecvFromMultiResult> for RecvFromMultiResult {
301+
fn from(result: iour::RecvFromMultiResult) -> Self {
302+
Self {
303+
inner: RecvFromMultiResultInner::IoUring(result),
304+
}
305+
}
306+
}
307+
308+
impl RecvFromMultiResult {
309+
/// Create [`RecvFromMultiResult`] from a buffer received from
310+
/// [`RecvFromMulti`]. It should be used for io-uring only.
311+
///
312+
/// # Safety
313+
///
314+
/// The buffer must be received from [`RecvFromMulti`] or have the same
315+
/// format as the buffer received from [`RecvFromMulti`].
316+
pub unsafe fn new(buffer: BufferRef) -> Self {
317+
Self {
318+
inner: RecvFromMultiResultInner::IoUring(unsafe {
319+
iour::RecvFromMultiResult::new(buffer)
320+
}),
321+
}
322+
}
323+
324+
/// Get the payload data.
325+
pub fn data(&self) -> &[u8] {
326+
match &self.inner {
327+
RecvFromMultiResultInner::Poll(result) => result.data(),
328+
RecvFromMultiResultInner::IoUring(result) => result.data(),
329+
}
330+
}
331+
332+
/// Get the source address if applicable.
333+
pub fn addr(&self) -> Option<SockAddr> {
334+
match &self.inner {
335+
RecvFromMultiResultInner::Poll(result) => result.addr(),
336+
RecvFromMultiResultInner::IoUring(result) => result.addr(),
337+
}
338+
}
339+
}
340+
341+
impl IntoInner for RecvFromMultiResult {
342+
type Inner = BufferRef;
343+
344+
fn into_inner(self) -> Self::Inner {
345+
match self.inner {
346+
RecvFromMultiResultInner::Poll(result) => result.into_inner(),
347+
RecvFromMultiResultInner::IoUring(result) => result.into_inner(),
348+
}
349+
}
350+
}
351+
352+
enum RecvMsgMultiResultInner {
353+
Poll(crate::op::managed::RecvMsgMultiResult),
354+
IoUring(iour::RecvMsgMultiResult),
355+
}
356+
357+
/// Result of [`RecvMsgMulti`].
358+
pub struct RecvMsgMultiResult {
359+
inner: RecvMsgMultiResultInner,
360+
}
361+
362+
impl From<crate::op::managed::RecvMsgMultiResult> for RecvMsgMultiResult {
363+
fn from(result: crate::op::managed::RecvMsgMultiResult) -> Self {
364+
Self {
365+
inner: RecvMsgMultiResultInner::Poll(result),
366+
}
367+
}
368+
}
369+
370+
impl From<iour::RecvMsgMultiResult> for RecvMsgMultiResult {
371+
fn from(result: iour::RecvMsgMultiResult) -> Self {
372+
Self {
373+
inner: RecvMsgMultiResultInner::IoUring(result),
374+
}
375+
}
376+
}
377+
378+
impl RecvMsgMultiResult {
379+
/// Create [`RecvMsgMultiResult`] from a buffer received from
380+
/// [`RecvMsgMulti`]. It should be used for io-uring only.
381+
///
382+
/// # Safety
383+
///
384+
/// The buffer must be received from [`RecvMsgMulti`] or have the same
385+
/// format as the buffer received from [`RecvMsgMulti`].
386+
pub unsafe fn new(buffer: BufferRef, clen: usize) -> Self {
387+
Self {
388+
inner: RecvMsgMultiResultInner::IoUring(unsafe {
389+
iour::RecvMsgMultiResult::new(buffer, clen)
390+
}),
391+
}
392+
}
393+
394+
/// Get the payload data.
395+
pub fn data(&self) -> &[u8] {
396+
match &self.inner {
397+
RecvMsgMultiResultInner::Poll(result) => result.data(),
398+
RecvMsgMultiResultInner::IoUring(result) => result.data(),
399+
}
400+
}
401+
402+
/// Get the ancillary data.
403+
pub fn ancillary(&self) -> &[u8] {
404+
match &self.inner {
405+
RecvMsgMultiResultInner::Poll(result) => result.ancillary(),
406+
RecvMsgMultiResultInner::IoUring(result) => result.ancillary(),
407+
}
408+
}
409+
410+
/// Get the source address if applicable.
411+
pub fn addr(&self) -> Option<SockAddr> {
412+
match &self.inner {
413+
RecvMsgMultiResultInner::Poll(result) => result.addr(),
414+
RecvMsgMultiResultInner::IoUring(result) => result.addr(),
415+
}
416+
}
417+
}
418+
419+
impl IntoInner for RecvMsgMultiResult {
420+
type Inner = BufferRef;
421+
422+
fn into_inner(self) -> Self::Inner {
423+
match self.inner {
424+
RecvMsgMultiResultInner::Poll(result) => result.into_inner(),
425+
RecvMsgMultiResultInner::IoUring(result) => result.into_inner(),
426+
}
427+
}
428+
}

0 commit comments

Comments
 (0)