Skip to content

Commit 1fa8180

Browse files
committed
feat(io_uring): generic access to context and push for Ring and Completion
1 parent 5e62a01 commit 1fa8180

File tree

4 files changed

+93
-78
lines changed

4 files changed

+93
-78
lines changed

fs/src/io_uring/dir_remover.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
agave_io_uring::{Completion, Ring, RingOp},
2+
agave_io_uring::{Completion, Ring, RingAccess as _, RingOp},
33
io_uring::{opcode, squeue, types, IoUring},
44
slab::Slab,
55
std::{
@@ -196,7 +196,7 @@ impl UnlinkOp {
196196
//
197197
// Safety: the entry doesn't hold any pointers
198198
if let Some(fd) = dir.fd.take() {
199-
comp.push(Op::Close(CloseOp::new(self.dir_key, fd.into_raw_fd())));
199+
comp.push(Op::Close(CloseOp::new(self.dir_key, fd.into_raw_fd())))?;
200200
}
201201
}
202202

fs/src/io_uring/file_creator.rs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use {
99
},
1010
FileInfo, FileSize, IoSize,
1111
},
12-
agave_io_uring::{Completion, FixedSlab, Ring, RingOp},
12+
agave_io_uring::{Completion, FixedSlab, Ring, RingAccess, RingOp},
1313
core::slice,
1414
io_uring::{opcode, squeue, types, IoUring},
1515
libc::{O_CREAT, O_NOATIME, O_NOFOLLOW, O_RDWR, O_TRUNC},
@@ -276,22 +276,15 @@ impl IoUringFileCreator<'_> {
276276
if let Some(file) = &file_state.open_file {
277277
if write_len == 0 {
278278
// File size was aligned with previously used buffers, return back unused `buf`
279-
state.buffers.push_front(buf);
280-
file_state.writes_completed += 1;
279+
// state.buffers.push_front(buf);
280+
// file_state.writes_completed += 1;
281281

282282
// In case no operation is in progress (i.e. completions were run for all buffers)
283283
// and EOF was reached just now, the `file_complete` needs to be called, since
284284
// no other operation will run it in its completion handler.
285285
// This is not necessary if `write_len > 0`, since completion of the write to be
286286
// added will handle EOF case properly.
287-
if let Some(file_info) = file_state.try_take_completed_file_info() {
288-
match (state.file_complete)(file_info) {
289-
Some(unconsumed_file) => self.ring.push(FileCreatorOp::Close(
290-
CloseOp::new(file_key, unconsumed_file),
291-
))?,
292-
None => state.mark_file_complete(file_key),
293-
}
294-
}
287+
FileCreatorState::mark_write_completed(&mut self.ring, file_key, 0, buf)?;
295288
// Skip issuing empty write
296289
break;
297290
}
@@ -373,11 +366,11 @@ impl<'a> FileCreatorState<'a> {
373366

374367
/// Calls `file_complete` callback with completed file info and optionally schedules close
375368
fn mark_write_completed(
376-
ring: &mut Completion<'_, Self, FileCreatorOp>,
369+
ring: &mut impl RingAccess<Context = Self, Operation = FileCreatorOp>,
377370
file_key: usize,
378371
write_len: IoSize,
379372
buf: IoBufferChunk,
380-
) {
373+
) -> io::Result<()> {
381374
let this = ring.context_mut();
382375
this.submitted_writes_size -= write_len as usize;
383376
this.buffers.push_front(buf);
@@ -389,10 +382,11 @@ impl<'a> FileCreatorState<'a> {
389382
Some(unconsumed_file) => ring.push(FileCreatorOp::Close(CloseOp::new(
390383
file_key,
391384
unconsumed_file,
392-
))),
385+
)))?,
393386
None => this.mark_file_complete(file_key),
394387
};
395388
}
389+
Ok(())
396390
}
397391

398392
fn mark_file_complete(&mut self, file_key: usize) {
@@ -462,7 +456,7 @@ impl OpenOp {
462456
let backlog = ring.context_mut().mark_file_opened(self.file_key, fd);
463457
for (buf, offset, len) in backlog {
464458
if len == 0 {
465-
FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf);
459+
FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf)?;
466460
break;
467461
}
468462
let op = WriteOp {
@@ -474,7 +468,7 @@ impl OpenOp {
474468
write_len: len,
475469
};
476470
ring.context_mut().submitted_writes_size += len as usize;
477-
ring.push(FileCreatorOp::Write(op));
471+
ring.push(FileCreatorOp::Write(op))?;
478472
}
479473

480474
Ok(())
@@ -580,20 +574,17 @@ impl<'a> WriteOp {
580574

581575
if written < *write_len {
582576
log::warn!("short write ({written}/{}), file={}", *write_len, *file_key);
583-
ring.push(FileCreatorOp::Write(WriteOp {
577+
return ring.push(FileCreatorOp::Write(WriteOp {
584578
file_key: *file_key,
585579
fd: *fd,
586580
offset: *offset + written as FileSize,
587581
buf,
588582
buf_offset: total_written,
589583
write_len: *write_len - written,
590584
}));
591-
return Ok(());
592585
}
593586

594-
FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf);
595-
596-
Ok(())
587+
FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf)
597588
}
598589
}
599590

fs/src/io_uring/sequential_file_reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
IO_PRIO_BE_HIGHEST,
77
},
88
crate::{buffered_reader::FileBufRead, io_uring::sqpoll, FileSize, IoSize},
9-
agave_io_uring::{Completion, Ring, RingOp},
9+
agave_io_uring::{Completion, Ring, RingAccess as _, RingOp},
1010
io_uring::{opcode, squeue, types, IoUring},
1111
std::{
1212
collections::VecDeque,
@@ -825,7 +825,7 @@ impl RingOp<BuffersState> for ReadOp {
825825
// Safety:
826826
// The op points to a buffer which is guaranteed to be valid for the
827827
// lifetime of the operation
828-
completion.push(op);
828+
completion.push(op)?;
829829
} else {
830830
buffers[*reader_buf_index as usize] = ReadBufState::Full {
831831
buf,

io-uring/src/ring.rs

Lines changed: 77 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,28 @@ use {
99
std::{io, os::fd::RawFd, time::Duration},
1010
};
1111

12+
/// Trait for accessing the context and pushing operations to the [Ring].
13+
///
14+
/// Enables generic operations on [Ring] or [Completion].
15+
pub trait RingAccess {
16+
type Context;
17+
type Operation;
18+
19+
/// Returns a reference to the context value stored in a [Ring].
20+
fn context(&self) -> &Self::Context;
21+
22+
/// Returns a mutable reference to the context value stored in a [Ring].
23+
fn context_mut(&mut self) -> &mut Self::Context;
24+
25+
/// Pushes an operation for execution in io_uring.
26+
///
27+
/// Once completed, [RingOp::complete] will be called with the result.
28+
///
29+
/// Note that the exact moment the operation is submitted to the kernel is implementation
30+
/// specific.
31+
fn push(&mut self, op: Self::Operation) -> io::Result<()>;
32+
}
33+
1234
/// An io_uring instance.
1335
pub struct Ring<T, E: RingOp<T>> {
1436
ring: IoUring,
@@ -30,16 +52,6 @@ impl<T, E: RingOp<T>> Ring<T, E> {
3052
}
3153
}
3254

33-
/// Returns a reference to the context value.
34-
pub fn context(&self) -> &T {
35-
&self.context
36-
}
37-
38-
/// Returns a mutable reference to the context value.
39-
pub fn context_mut(&mut self) -> &mut T {
40-
&mut self.context
41-
}
42-
4355
/// Registers in-memory fixed buffers for I/O with the kernel.
4456
///
4557
/// # Safety
@@ -64,40 +76,6 @@ impl<T, E: RingOp<T>> Ring<T, E> {
6476
self.ring.submitter().register_files(fds)
6577
}
6678

67-
/// Pushes an operation to the submission queue.
68-
///
69-
/// Once completed, [RingOp::complete] will be called with the result.
70-
///
71-
/// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
72-
/// the submission queue is full, submit will be called internally to make room for the new
73-
/// operation.
74-
///
75-
/// See also [Ring::submit].
76-
pub fn push(&mut self, op: E) -> io::Result<()> {
77-
loop {
78-
self.process_completions()?;
79-
80-
if !self.entries.is_full() {
81-
break;
82-
}
83-
// if the entries slab is full, we need to submit and poll
84-
// completions to make room
85-
self.submit_and_wait(1, None)?;
86-
}
87-
let key = self.entries.insert(op);
88-
let entry = self.entries.get_mut(key).unwrap().entry();
89-
let entry = entry.user_data(key as u64);
90-
// Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
91-
// of the operation. E implementations must still ensure that the entry
92-
// remains valid until the last E::complete call.
93-
while unsafe { self.ring.submission().push(&entry) }.is_err() {
94-
self.submit()?;
95-
self.process_completions()?;
96-
}
97-
98-
Ok(())
99-
}
100-
10179
/// Submits all pending operations to the kernel.
10280
///
10381
/// If the ring can't accept any more submissions because the completion
@@ -226,23 +204,69 @@ pub struct Completion<'a, T, E: RingOp<T>> {
226204
context: &'a mut T,
227205
}
228206

229-
impl<T, E: RingOp<T>> Completion<'_, T, E> {
230-
/// Returns a reference to the context value stored in a [Ring].
231-
pub fn context(&self) -> &T {
232-
self.context
207+
impl<T, E: RingOp<T>> RingAccess for Ring<T, E> {
208+
type Context = T;
209+
type Operation = E;
210+
211+
fn context(&self) -> &T {
212+
&self.context
233213
}
234214

235-
/// Returns a mutable reference to the context value stored in a [Ring].
236-
pub fn context_mut(&mut self) -> &mut T {
237-
self.context
215+
fn context_mut(&mut self) -> &mut T {
216+
&mut self.context
238217
}
239218

240219
/// Pushes an operation to the submission queue.
241220
///
221+
/// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
222+
/// the submission queue is full, submit will be called internally to make room for the new
223+
/// operation.
224+
///
225+
/// See also [Ring::submit].
226+
fn push(&mut self, op: E) -> io::Result<()> {
227+
loop {
228+
self.process_completions()?;
229+
230+
if !self.entries.is_full() {
231+
break;
232+
}
233+
// if the entries slab is full, we need to submit and poll
234+
// completions to make room
235+
self.submit_and_wait(1, None)?;
236+
}
237+
let key = self.entries.insert(op);
238+
let entry = self.entries.get_mut(key).unwrap().entry();
239+
let entry = entry.user_data(key as u64);
240+
// Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
241+
// of the operation. E implementations must still ensure that the entry
242+
// remains valid until the last E::complete call.
243+
while unsafe { self.ring.submission().push(&entry) }.is_err() {
244+
self.submit()?;
245+
self.process_completions()?;
246+
}
247+
248+
Ok(())
249+
}
250+
}
251+
252+
impl<T, E: RingOp<T>> RingAccess for Completion<'_, T, E> {
253+
type Context = T;
254+
type Operation = E;
255+
256+
fn context(&self) -> &T {
257+
self.context
258+
}
259+
260+
fn context_mut(&mut self) -> &mut T {
261+
self.context
262+
}
263+
242264
/// This can be used to push new operations from within [RingOp::complete].
243265
///
244-
/// See also [Ring::push].
245-
pub fn push(&mut self, op: E) {
266+
/// Note that the operations are buffered until completion is finished and then pushed
267+
/// to the parent [Ring].
268+
fn push(&mut self, op: E) -> io::Result<()> {
246269
self.new_entries.push(op);
270+
Ok(())
247271
}
248272
}

0 commit comments

Comments
 (0)