Skip to content

Commit 9429b27

Browse files
authored
feat(driver, net): support recv_send_poll_first (#894)
1 parent 9098c7f commit 9429b27

11 files changed

Lines changed: 191 additions & 73 deletions

File tree

compio-driver/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ io-uring = { version = "0.7.12", optional = true }
6060
once_cell = { workspace = true, optional = true }
6161
polling = { version = "3.3.0", optional = true }
6262
rustix = { workspace = true, features = ["linux_5_11"] }
63+
linux-raw-sys = { version = "0.12.1", optional = true }
6364

6465
# Other platform dependencies
6566
[target.'cfg(all(unix, not(target_os = "linux")))'.dependencies]
@@ -83,6 +84,7 @@ io-uring = [
8384
"rustix/mm",
8485
"rustix/event",
8586
"rustix/system",
87+
"linux-raw-sys/io_uring",
8688
"dep:io-uring",
8789
"dep:once_cell",
8890
]

compio-driver/src/sys/op/managed/fallback.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ impl<S> RecvManaged<S> {
6969
op: Recv::new(fd, pool.pop()?.with_capacity(len), flags),
7070
})
7171
}
72+
73+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
74+
/// of the SQE on the IO_URING driver.
75+
// This method has been added here for the sake of API compatibility.
76+
pub fn poll_first(&mut self) {}
7277
}
7378

7479
impl<S> TakeBuffer for RecvManaged<S> {
@@ -91,6 +96,11 @@ impl<S: AsFd> RecvFromManaged<S> {
9196
op: RecvFrom::new(fd, pool.pop()?.with_capacity(len), flags),
9297
})
9398
}
99+
100+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
101+
/// of the SQE on the IO_URING driver.
102+
// This method has been added here for the sake of API compatibility.
103+
pub fn poll_first(&mut self) {}
94104
}
95105

96106
impl<S: AsFd> TakeBuffer for RecvFromManaged<S> {
@@ -119,6 +129,11 @@ impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
119129
op: RecvMsg::new(fd, [pool.pop()?.with_capacity(len)], control, flags),
120130
})
121131
}
132+
133+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
134+
/// of the SQE on the IO_URING driver.
135+
// This method has been added here for the sake of API compatibility.
136+
pub fn poll_first(&mut self) {}
122137
}
123138

124139
impl<C: IoBufMut, S: AsFd> TakeBuffer for RecvMsgManaged<C, S> {

compio-driver/src/sys/op/managed/fusion.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,39 @@ mop!(<S: AsFd> RecvMulti(fd: S, pool: &BufferPool, len: usize, flags: RecvFlags)
131131
mop!(<S: AsFd> RecvFromMulti(fd: S, pool: &BufferPool, flags: RecvFlags) with pool; RecvFromMultiResult);
132132
mop!(<S: AsFd> RecvMsgMulti(fd: S, pool: &BufferPool, control_len: usize, flags: RecvFlags) with pool; RecvMsgMultiResult);
133133

134+
impl<S: AsFd> RecvManaged<S> {
135+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
136+
/// of the SQE on the IO_URING driver.
137+
pub fn poll_first(&mut self) {
138+
match self.inner {
139+
RecvManagedInner::Poll(ref mut i) => i.poll_first(),
140+
RecvManagedInner::IoUring(ref mut i) => i.poll_first(),
141+
}
142+
}
143+
}
144+
145+
impl<S: AsFd> RecvFromManaged<S> {
146+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
147+
/// of the SQE on the IO_URING driver.
148+
pub fn poll_first(&mut self) {
149+
match self.inner {
150+
RecvFromManagedInner::Poll(ref mut i) => i.poll_first(),
151+
RecvFromManagedInner::IoUring(ref mut i) => i.poll_first(),
152+
}
153+
}
154+
}
155+
156+
impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
157+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
158+
/// of the SQE on the IO_URING driver.
159+
pub fn poll_first(&mut self) {
160+
match self.inner {
161+
RecvMsgManagedInner::Poll(ref mut i) => i.poll_first(),
162+
RecvMsgManagedInner::IoUring(ref mut i) => i.poll_first(),
163+
}
164+
}
165+
}
166+
134167
enum RecvFromMultiResultInner {
135168
Poll(fallback::RecvFromMultiResult),
136169
IoUring(iour::RecvFromMultiResult),

compio-driver/src/sys/op/managed/iour.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use rustix::net::RecvFlags;
1212
use socket2::{SockAddr, SockAddrStorage, socklen_t};
1313

1414
use crate::{
15-
BufferPool, BufferRef, Extra, IourOpCode as OpCode, OpEntry, op::TakeBuffer,
16-
sys::pal::is_kernel_at_least,
15+
BufferPool, BufferRef, Extra, IourOpCode as OpCode, OpEntry,
16+
op::TakeBuffer,
17+
sys::pal::{is_kernel_at_least, set_poll_first},
1718
};
1819

1920
/// Read a file at specified position into specified buffer.
@@ -143,6 +144,7 @@ pub struct RecvManaged<S> {
143144
buffer_group: u16,
144145
buffer_pool: BufferPool,
145146
buffer: Option<BufferRef>,
147+
poll_first: bool,
146148
}
147149

148150
impl<S> RecvManaged<S> {
@@ -157,21 +159,29 @@ impl<S> RecvManaged<S> {
157159
flags,
158160
buffer_pool: buffer_pool.clone(),
159161
buffer: None,
162+
poll_first: false,
160163
})
161164
}
165+
166+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
167+
/// of the SQE on the IO_URING driver.
168+
pub fn poll_first(&mut self) {
169+
self.poll_first = true;
170+
}
162171
}
163172

164173
unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
165174
type Control = ();
166175

167176
fn create_entry(&mut self, _: &mut Self::Control) -> OpEntry {
168177
let fd = self.fd.as_fd().as_raw_fd();
169-
opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
178+
let entry = opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
170179
.flags(self.flags.bits() as _)
171180
.buf_group(self.buffer_group)
172181
.build()
173-
.flags(Flags::BUFFER_SELECT)
174-
.into()
182+
.flags(Flags::BUFFER_SELECT);
183+
let entry = set_poll_first(entry, self.poll_first);
184+
entry.into()
175185
}
176186

177187
unsafe fn set_result(&mut self, _: &mut Self::Control, _: &io::Result<usize>, extra: &Extra) {
@@ -205,6 +215,7 @@ pub struct RecvFromManaged<S> {
205215
buffer_group: u16,
206216
buffer_pool: BufferPool,
207217
buffer: Option<BufferRef>,
218+
poll_first: bool,
208219
}
209220

210221
#[doc(hidden)]
@@ -236,8 +247,15 @@ impl<S> RecvFromManaged<S> {
236247
addr,
237248
buffer_pool: buffer_pool.clone(),
238249
buffer: None,
250+
poll_first: false,
239251
})
240252
}
253+
254+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
255+
/// of the SQE on the IO_URING driver.
256+
pub fn poll_first(&mut self) {
257+
self.poll_first = true;
258+
}
241259
}
242260

243261
impl<S> TakeBuffer for RecvFromManaged<S> {
@@ -262,12 +280,13 @@ unsafe impl<S: AsFd> OpCode for RecvFromManaged<S> {
262280
}
263281

264282
fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
265-
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &raw mut control.msg)
283+
let entry = opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &raw mut control.msg)
266284
.flags(self.flags.bits() as _)
267285
.buf_group(self.buffer_group)
268286
.build()
269-
.flags(Flags::BUFFER_SELECT)
270-
.into()
287+
.flags(Flags::BUFFER_SELECT);
288+
let entry = set_poll_first(entry, self.poll_first);
289+
entry.into()
271290
}
272291

273292
unsafe fn set_result(
@@ -311,6 +330,12 @@ impl<C: IoBufMut, S: AsFd> RecvMsgManaged<C, S> {
311330
control_len: 0,
312331
})
313332
}
333+
334+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
335+
/// of the SQE on the IO_URING driver.
336+
pub fn poll_first(&mut self) {
337+
self.op.poll_first();
338+
}
314339
}
315340

316341
unsafe impl<C: IoBufMut, S: AsFd> OpCode for RecvMsgManaged<C, S> {

compio-driver/src/sys/op/socket/iour.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -238,14 +238,15 @@ unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
238238
let fd = self.fd.as_fd().as_raw_fd();
239239
let slice = self.buffer.sys_slice_mut();
240240

241-
opcode::Recv::new(
241+
let entry = opcode::Recv::new(
242242
Fd(fd),
243243
slice.ptr() as _,
244244
slice.len().try_into().unwrap_or(u32::MAX),
245245
)
246246
.flags(self.flags.bits() as _)
247-
.build()
248-
.into()
247+
.build();
248+
let entry = set_poll_first(entry, self.poll_first);
249+
entry.into()
249250
}
250251

251252
fn call_blocking(&mut self, _: &mut Self::Control) -> io::Result<usize> {
@@ -261,10 +262,11 @@ unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
261262
}
262263

263264
fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
264-
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
265+
let entry = opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
265266
.flags(self.flags.bits() as _)
266-
.build()
267-
.into()
267+
.build();
268+
let entry = set_poll_first(entry, self.poll_first);
269+
entry.into()
268270
}
269271

270272
fn call_blocking(&mut self, control: &mut Self::Control) -> io::Result<usize> {
@@ -286,10 +288,11 @@ impl<S: AsFd> RecvFromHeader<S> {
286288
}
287289

288290
pub fn create_entry(&mut self, control: &mut RecvMsgControl) -> OpEntry {
289-
opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
291+
let entry = opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut control.msg)
290292
.flags(self.flags.bits() as _)
291-
.build()
292-
.into()
293+
.build();
294+
let entry = set_poll_first(entry, self.poll_first);
295+
entry.into()
293296
}
294297

295298
pub fn set_result(&mut self, control: &mut RecvMsgControl) {
@@ -357,10 +360,11 @@ unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C,
357360
}
358361

359362
fn create_entry(&mut self, control: &mut Self::Control) -> OpEntry {
360-
opcode::RecvMsg::new(Fd(self.header.fd.as_fd().as_raw_fd()), &mut control.msg)
363+
let entry = opcode::RecvMsg::new(Fd(self.header.fd.as_fd().as_raw_fd()), &mut control.msg)
361364
.flags(self.header.flags.bits() as _)
362-
.build()
363-
.into()
365+
.build();
366+
let entry = set_poll_first(entry, self.poll_first);
367+
entry.into()
364368
}
365369

366370
unsafe fn set_result(

compio-driver/src/sys/op/socket/mod.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,23 @@ pub struct Recv<T: IoBufMut, S> {
8383
pub(crate) fd: S,
8484
pub(crate) buffer: T,
8585
pub(crate) flags: RecvFlags,
86+
poll_first: bool,
8687
}
8788

8889
/// Receive data from remote into vectored buffer.
8990
pub struct RecvVectored<T: IoVectoredBufMut, S> {
9091
pub(crate) fd: S,
9192
pub(crate) buffer: T,
9293
pub(crate) flags: RecvFlags,
94+
poll_first: bool,
9395
}
9496

9597
pub(crate) struct RecvFromHeader<S> {
9698
pub(crate) fd: S,
9799
pub(crate) flags: RecvFlags,
98100
pub(crate) addr: SockAddrStorage,
99101
pub(crate) addr_len: socklen_t,
102+
poll_first: bool,
100103
}
101104

102105
/// Receive data and source address.
@@ -118,6 +121,7 @@ pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
118121
pub(crate) buffer: T,
119122
pub(crate) control: C,
120123
pub(crate) control_len: usize,
124+
poll_first: bool,
121125
}
122126

123127
impl<S> Connect<S> {
@@ -254,8 +258,15 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
254258
buffer,
255259
control,
256260
control_len: 0,
261+
poll_first: false,
257262
}
258263
}
264+
265+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
266+
/// of the SQE on the IO_URING driver.
267+
pub fn poll_first(&mut self) {
268+
self.poll_first = true;
269+
}
259270
}
260271

261272
impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
@@ -273,7 +284,18 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
273284
impl<T: IoBufMut, S> Recv<T, S> {
274285
/// Create [`Recv`].
275286
pub fn new(fd: S, buffer: T, flags: RecvFlags) -> Self {
276-
Self { fd, buffer, flags }
287+
Self {
288+
fd,
289+
buffer,
290+
flags,
291+
poll_first: false,
292+
}
293+
}
294+
295+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
296+
/// of the SQE on the IO_URING driver.
297+
pub fn poll_first(&mut self) {
298+
self.poll_first = true;
277299
}
278300
}
279301

@@ -288,7 +310,18 @@ impl<T: IoBufMut, S> IntoInner for Recv<T, S> {
288310
impl<T: IoVectoredBufMut, S> RecvVectored<T, S> {
289311
/// Create [`RecvVectored`].
290312
pub fn new(fd: S, buffer: T, flags: RecvFlags) -> Self {
291-
Self { fd, buffer, flags }
313+
Self {
314+
fd,
315+
buffer,
316+
flags,
317+
poll_first: false,
318+
}
319+
}
320+
321+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
322+
/// of the SQE on the IO_URING driver.
323+
pub fn poll_first(&mut self) {
324+
self.poll_first = true;
292325
}
293326
}
294327

@@ -309,6 +342,7 @@ impl<S> RecvFromHeader<S> {
309342
addr,
310343
flags,
311344
addr_len: name_len,
345+
poll_first: false,
312346
}
313347
}
314348

@@ -325,6 +359,12 @@ impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
325359
buffer,
326360
}
327361
}
362+
363+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
364+
/// of the SQE on the IO_URING driver.
365+
pub fn poll_first(&mut self) {
366+
self.header.poll_first = true;
367+
}
328368
}
329369

330370
impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
@@ -344,6 +384,12 @@ impl<T: IoBufMut, S> RecvFrom<T, S> {
344384
buffer,
345385
}
346386
}
387+
388+
/// This method sets the `IORING_RECVSEND_POLL_FIRST` flag in the `ioprio`
389+
/// of the SQE on the IO_URING driver.
390+
pub fn poll_first(&mut self) {
391+
self.header.poll_first = true;
392+
}
347393
}
348394

349395
impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {

0 commit comments

Comments
 (0)