-
Notifications
You must be signed in to change notification settings - Fork 220
Expand file tree
/
Copy pathmod.rs
More file actions
3194 lines (2897 loc) · 130 KB
/
mod.rs
File metadata and controls
3194 lines (2897 loc) · 130 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! io_uring event loop implementation.
//!
//! This module provides a high-level interface for submitting logical requests to Linux's io_uring
//! subsystem and receiving their results. The design centers around a single event loop that
//! manages the submission queue (SQ) and completion queue (CQ) of an io_uring instance.
//!
//! Work is submitted via [Handle], which pushes [Request]s into an MPSC queue and signals
//! an internal wake source. The event loop blocks either in userspace futex wait
//! (when the ring is truly idle) or in `io_uring_enter` (when the ring has active
//! waiters), and is woken by:
//! - normal CQE progress in the ring
//! - futex wake when new work is queued while fully idle
//! - `eventfd` readiness when new work is queued or all submitters are dropped while
//! blocked in `submit_and_wait`
//!
//! # Kernel Requirements
//!
//! - Baseline: Linux kernel 5.13 or newer (required for io_uring multishot poll
//! used by the internal `eventfd` wake path).
//! - With [`Config::single_issuer`] enabled: Linux kernel 6.1 or newer, because
//! this implementation also enables `IORING_SETUP_DEFER_TASKRUN`.
//! - Effective requirement for runtime io_uring network/storage backends: 6.1+,
//! since those backends enable [`Config::single_issuer`].
//!
//! # Architecture
//!
//! ## Event Loop
//!
//! The core of this implementation is [IoUringLoop::run], which blocks its calling thread while
//! operating an event loop that:
//! 1. Drains logical requests from a bounded MPSC channel fed by [Handle]
//! 2. Admits requests into the waiter table and submits their first SQE
//! 3. Processes io_uring completion queue entries (CQEs), including internal wake CQEs
//! 4. Handles partial progress and retryable errors by requeuing requests
//! 5. Routes typed completion results back to the original requesters via oneshot channels
//!
//! ## Request Flow
//!
//! ```text
//! Data path:
//! Client task -> Handle -> bounded MPSC -> IoUringLoop -> SQE -> io_uring
//! Client task <- typed oneshot <- IoUringLoop <- CQE <- io_uring
//!
//! Wake paths:
//! Handle --futex wake--> packed wake state --> IoUringLoop
//! Handle --write(eventfd)--> wake_fd --POLLIN CQE (WAKE_USER_DATA)--> IoUringLoop
//!
//! Loop behavior:
//! 1) Drain CQEs.
//! 2) Advance timeouts.
//! 3) Rarely rearm wake polling, then stage cancels, ready-queue requests,
//! and new inbound requests into SQ.
//! 4) If work is pending or active waiters remain, submit and possibly block in
//! io_uring_enter until a CQE (data or wake) arrives.
//! 5) If the ring is fully idle, arm the shared wake word and sleep in futex
//! wait until a producer publishes work or latches an out-of-band wake.
//! ```
//!
//! ## Work Tracking
//!
//! Each admitted request is assigned a waiter id that serves as the `user_data` field in its
//! SQEs. The event loop maintains a flat `Waiters` store where each slot maps to an
//! [Request] that owns all resources (buffers, FDs, progress state, completion sender)
//! needed for the request's lifetime.
//!
//! ## Timeout Handling
//!
//! Requests can optionally carry an absolute deadline. When present:
//! - The loop tracks deadline ticks in a userspace timing wheel
//! - Already-expired requests complete immediately with timeout before SQE submission
//! - Requests that still have an SQE in flight submit an async-cancel SQE on expiry
//! - Requests parked only in the ready queue time out locally without staging cancel SQEs
//! - Timeouts apply to the whole logical request, not individual SQEs
//! - If the original op CQE completes the whole request, the caller sees success
//! - If the original op CQE only makes partial/retryable progress after timeout, the caller
//! sees timeout and no follow-up SQE is issued
//!
//! ## Submission Policy
//!
//! A logical request may need multiple SQEs before it completes. The loop keeps
//! such requests on a FIFO ready queue and stages work in this order:
//! 1. Rarely, a wake poll rearm SQE when a prior multishot wake CQE ended the
//! existing poll registration.
//! 2. Cancellation SQEs for timed-out requests.
//! 3. Ready-queue requests that were already admitted and need another SQE.
//! 4. Fresh requests drained from the channel, until waiter or SQ capacity is hit.
//!
//! After the channel has closed and buffered channel work has been drained,
//! there is no new channel work, so the drain phase continues servicing
//! cancellations and the ready queue until requests complete, time out, or are
//! abandoned by `shutdown_timeout`.
//!
//! ## Wake Handling
//!
//! The wake path uses one shared atomic state word plus an internal `eventfd`.
//! - [Handle::enqueue] increments an atomic submission sequence
//! - When the loop has no waiters, it sleeps in futex wait on that shared word
//! - When the loop blocks in `submit_and_wait`, it keeps a multishot `PollAdd`
//! on the internal `eventfd`
//! - Wake CQEs drain `eventfd` readiness and re-install poll when `IORING_CQE_F_MORE`
//! is not set
//! - The loop uses an arm-and-recheck sleep handshake (`submitted_seq` vs `processed_seq`)
//! - The rounded ring/channel size stays below half the packed submission-sequence
//! domain so modular sequence deltas remain directional
//! - A dedicated signalled bit coalesces repeated wake attempts while a wait is armed
//!
//! ## Shutdown Process
//!
//! When the request channel is closed and no buffered requests remain, the event
//! loop enters a drain phase:
//! 1. Stops accepting new requests
//! 2. Waits for all in-flight requests to complete or be cancelled
//! 3. If `shutdown_timeout` is configured, abandons remaining requests after the timeout
//! 4. Cleans up and exits. Dropping the last submitter latches one wake and, if a
//! target is currently armed, signals it immediately so shutdown is observed
//! promptly whether the loop is already blocked or about to sleep.
//!
//! If waiter capacity is full when the last submitter disconnects, buffered
//! channel work is still drained as capacity becomes available before drain
//! begins. `shutdown_timeout` bounds only the drain phase after that buffered
//! work has been drained.
//!
//! ## Liveness Model
//!
//! This loop enforces a configured upper bound on in-flight requests. New submissions arrive
//! through a FIFO MPSC queue, but already-admitted requests may be restaged ahead of that queue
//! according to the submission policy above.
//!
//! This implies a bounded-liveness caveat: if all in-flight requests are waiting on operations
//! that are still queued behind the capacity limit, the loop cannot make progress until some
//! in-flight request completes or is canceled.
//!
//! Concrete example with `cfg.size = 2`:
//!
//! 1. Queue `read(fd1)`, `read(fd2)`, `write(fd1)`, `write(fd2)` in that order.
//! 2. The loop stages the first two reads and reaches waiter capacity.
//! 3. If each read depends on its corresponding write being submitted through the same loop, both
//! reads remain blocked.
//! 4. The writes stay queued behind the capacity limit, so no completion is produced and the loop
//! cannot free capacity on its own.
//!
//! The runtime cannot infer dependency relationships between arbitrary queued and in-flight
//! requests, so it cannot implement dependency-aware admission (and doing so generically would
//! add substantial overhead).
//!
//! The practical way to recover from this condition is cancellation via per-request timeouts.
//! When timed-out in-flight requests are canceled, waiter capacity is eventually released and
//! queued requests can be staged. Without cancellation, liveness depends on workload structure:
//! callers must avoid submission patterns where in-flight requests require later queued requests
//! to run.
//!
//! Operational guidance:
//! - Workloads that may create causal dependencies across queued and in-flight requests must use
//! per-request timeouts.
//! - If cancellation is disabled, callers must guarantee that in-flight requests never depend on
//! later queued requests, otherwise the loop can deadlock.
use crate::{
telemetry::metrics::{raw, Gauge, Register},
Error, IoBufMut, IoBufs,
};
use commonware_utils::channel::{
mpsc::{self, error::TryRecvError},
oneshot,
};
use io_uring::{
cqueue::Entry as CqueueEntry,
opcode::AsyncCancel,
squeue::SubmissionQueue,
types::{SubmitArgs, Timespec},
IoUring,
};
use request::{ReadAtRequest, RecvRequest, Request, SendRequest, SyncRequest, WriteAtRequest};
use std::{
collections::VecDeque,
fs::File,
os::fd::OwnedFd,
sync::Arc,
time::{Duration, Instant},
};
mod request;
mod timeout;
use timeout::{Tick, TimeoutWheel};
mod waiter;
use waiter::{CompletionOutcome, StageOutcome, WaiterId, Waiters};
mod waker;
use waker::{Waker, HALF_SUBMISSION_SEQUENCE_DOMAIN, SUBMISSION_SEQ_MASK, WAKE_USER_DATA};
mod spinner;
pub use spinner::Config as SpinnerConfig;
use spinner::Spinner;
/// Maximum rounded ring size accepted by [`Config::size`].
///
/// Requested sizes are rounded up to the next power of two before validation.
pub const MAX_RING_SIZE: u32 = HALF_SUBMISSION_SEQUENCE_DOMAIN / 2;
/// Packed `io_uring` `user_data` value.
type UserData = u64;
/// Tracks io_uring metrics.
#[derive(Debug)]
pub struct Metrics {
/// Number of active logical requests whose CQEs haven't yet been fully
/// processed. Note this metric doesn't include timeouts, which are
/// generated internally by the io_uring event loop.
/// This is updated in the main loop and at shutdown drain exit, so it may
/// temporarily vary from the exact in-flight count between update points.
pending_operations: Gauge,
}
impl Metrics {
pub fn new(registry: &mut impl Register) -> Self {
Self {
pending_operations: registry.register(
"pending_operations",
"Number of active logical requests in the io_uring loop",
raw::Gauge::default(),
),
}
}
}
/// Configuration for an io_uring instance.
/// See `man io_uring`.
#[derive(Clone, Debug)]
pub struct Config {
/// Requested size of the ring.
///
/// This value is rounded up to the next power of two when constructing
/// [IoUringLoop], so the configured in-flight waiter capacity matches the
/// effective ring sizing behavior. After rounding, the maximum allowed size
/// is [`MAX_RING_SIZE`], larger rounded sizes panic during construction.
pub size: u32,
/// If true, use IOPOLL mode.
pub io_poll: bool,
/// If true, use single issuer mode.
/// Warning: when enabled, the same thread that creates the ring must be
/// the only thread that submits work to it.
///
/// This loop creates the ring inside [IoUringLoop::run] and performs all
/// ring submissions from that same thread, so it is compatible with
/// `single_issuer` when `run` is executed on a dedicated thread.
/// See IORING_SETUP_SINGLE_ISSUER in <https://man7.org/linux/man-pages/man2/io_uring_setup.2.html>.
pub single_issuer: bool,
/// Maximum request timeout supported by the userspace timeout wheel.
///
/// Deadlines are clamped to this horizon. This value should be set to the
/// largest expected per-request deadline budget.
pub max_request_timeout: Duration,
/// The maximum time the io_uring event loop will wait during the drain phase
/// after producer disconnect has been fully observed and buffered channel
/// work has been drained.
///
/// If None, the event loop will wait indefinitely for in-flight requests
/// to complete during that drain phase. In this case, the caller should be
/// careful to ensure that submitted requests will eventually complete.
pub shutdown_timeout: Option<Duration>,
/// Tick granularity used by the userspace timeout wheel.
///
/// Smaller values increase timing precision but increase wakeup and wheel
/// processing frequency.
pub timeout_wheel_tick: Duration,
/// Adaptive idle spinner configuration.
pub idle_spinner: SpinnerConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
size: 128,
io_poll: false,
single_issuer: false,
max_request_timeout: Duration::from_secs(60),
shutdown_timeout: None,
timeout_wheel_tick: Duration::from_millis(5),
idle_spinner: SpinnerConfig::default(),
}
}
}
struct HandleInner {
sender: Option<mpsc::Sender<Request>>,
waker: Waker,
}
impl Drop for HandleInner {
fn drop(&mut self) {
// Disconnect first, then wake. `Waker::wake` is the publishing edge
// for this out-of-band shutdown signal, so channel closure must happen
// before the wake is issued.
drop(self.sender.take());
// Wake the loop so shutdown observes disconnect promptly. This is an
// out-of-band wake for channel closure, so do not publish a synthetic
// submission sequence increment.
self.waker.wake();
}
}
/// Handle for submitting requests to an [IoUringLoop].
#[derive(Clone)]
pub struct Handle {
inner: Arc<HandleInner>,
}
impl Handle {
/// Enqueue a request for the io_uring loop.
///
/// On success, this publishes one submission and conditionally wakes the
/// loop if a futex or eventfd wait target is currently armed.
async fn enqueue(&self, request: Request) -> Result<(), mpsc::error::SendError<Request>> {
self.inner
.sender
.as_ref()
.expect("handle sender is only taken on drop")
.send(request)
.await?;
// Publish submission and wake the armed wait target, if any.
self.inner.waker.publish();
Ok(())
}
/// Submit a logical send request and wait for its completion.
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
pub async fn send(
&self,
fd: Arc<OwnedFd>,
bufs: IoBufs,
deadline: Instant,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::Send(SendRequest {
fd,
write: bufs.into(),
deadline: Some(deadline),
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::SendFailed)?;
rx.await.map_err(|_| Error::SendFailed)?
}
/// Submit a logical recv request and wait for its completion.
#[allow(clippy::result_large_err)]
#[cfg_attr(not(feature = "iouring-network"), allow(dead_code))]
pub async fn recv(
&self,
fd: Arc<OwnedFd>,
buf: IoBufMut,
offset: usize,
len: usize,
exact: bool,
deadline: Instant,
) -> Result<(IoBufMut, usize), (IoBufMut, Error)> {
assert!(
offset <= len && len <= buf.capacity(),
"recv invariant violated: need offset <= len <= capacity"
);
let (tx, rx) = oneshot::channel();
let request = Request::Recv(RecvRequest {
fd,
buf,
offset,
len,
exact,
deadline: Some(deadline),
result: None,
sender: tx,
});
if let Err(err) = self.enqueue(request).await {
let Request::Recv(request) = err.0 else {
unreachable!("recv enqueue returned wrong request variant");
};
return Err((request.buf, Error::RecvFailed));
}
rx.await.unwrap_or_else(|_| {
// Once the request is admitted, ownership of `buf` moves into the
// loop. If the loop dies before replying, there is no owned buffer
// left to recover here, so return an empty placeholder.
Err((IoBufMut::default(), Error::RecvFailed))
})
}
/// Submit a logical positioned read request and wait for its completion.
#[allow(clippy::result_large_err)]
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn read_at(
&self,
file: Arc<File>,
offset: u64,
len: usize,
buf: IoBufMut,
) -> Result<IoBufMut, (IoBufMut, Error)> {
assert!(len <= buf.capacity(), "read_at len exceeds buffer capacity");
let (tx, rx) = oneshot::channel();
let request = Request::ReadAt(ReadAtRequest {
file,
offset,
len,
read: 0,
buf,
result: None,
sender: tx,
});
if let Err(err) = self.enqueue(request).await {
let Request::ReadAt(request) = err.0 else {
unreachable!("read_at enqueue returned wrong request variant");
};
return Err((request.buf, Error::ReadFailed));
}
rx.await.unwrap_or_else(|_| {
// Once the request is admitted, ownership of `buf` moves into the
// loop. If the loop dies before replying, there is no owned buffer
// left to recover here, so return an empty placeholder.
Err((IoBufMut::default(), Error::ReadFailed))
})
}
/// Submit a logical positioned write request and wait for its completion.
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn write_at(&self, file: Arc<File>, offset: u64, bufs: IoBufs) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::WriteAt(WriteAtRequest {
file,
offset,
written: 0,
write: bufs.into(),
sync: false,
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::WriteFailed)?;
rx.await.map_err(|_| Error::WriteFailed)?
}
/// Submit a logical positioned write with per-write sync and wait for its completion.
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn write_at_sync(
&self,
file: Arc<File>,
offset: u64,
bufs: IoBufs,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::WriteAt(WriteAtRequest {
file,
offset,
written: 0,
write: bufs.into(),
sync: true,
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::WriteFailed)?;
rx.await.map_err(|_| Error::WriteFailed)?
}
/// Submit a logical fsync request and wait for its completion.
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn sync(&self, file: Arc<File>) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::Sync(SyncRequest {
file,
result: None,
sender: tx,
}))
.await
.map_err(|_| std::io::Error::other("failed to send work"))?;
rx.await
.map_err(|_| std::io::Error::other("failed to read result"))?
}
}
/// io_uring event loop state.
pub(crate) struct IoUringLoop {
cfg: Config,
metrics: Arc<Metrics>,
receiver: mpsc::Receiver<Request>,
waiters: Waiters,
ready_queue: VecDeque<WaiterId>,
pending_cancels: VecDeque<WaiterId>,
timeout_wheel: TimeoutWheel,
idle_spinner: Spinner,
waker: Waker,
wake_rearm_needed: bool,
processed_seq: u32,
}
/// Outcome of one `fill_submission_queue()` staging pass.
///
/// This tells the outer loop whether staging drained all currently visible
/// work, hit submission-queue pressure, hit waiter-capacity pressure, or
/// discovered producer disconnect.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum FillResult {
/// The producer side disconnected while draining the request channel.
Disconnected,
/// Staging drained all currently visible work without hitting a hard limit.
Drained,
/// The submission queue filled before waiter capacity was exhausted.
AtSubmissionQueueCapacity,
/// The waiter table filled, regardless of whether the submission queue also filled.
AtWaiterCapacity,
}
impl FillResult {
/// Derive the staging outcome from the current fill state.
///
/// Waiter saturation dominates submission-queue saturation: once the
/// waiter table is full, the loop cannot admit more work until completions
/// arrive, regardless of remaining SQ capacity. That same waiter-full path
/// is also where shutdown needs extra normalization, because a closed and
/// drained request channel should transition directly to `Disconnected`
/// instead of blocking forever on waiter pressure alone.
#[inline]
fn from_fill_state(
waiters: &Waiters,
submission_queue: &SubmissionQueue<'_>,
receiver: &mpsc::Receiver<Request>,
) -> Self {
// Check waiter pressure first because it dominates SQ pressure and is
// the only case that needs shutdown normalization.
if waiters.is_full() {
// A waiter-full loop that also has no remaining producers and no
// buffered requests must enter shutdown drain rather than sleeping
// for completions forever.
if receiver.is_closed() && receiver.is_empty() {
return Self::Disconnected;
}
Self::AtWaiterCapacity
} else if submission_queue.is_full() {
Self::AtSubmissionQueueCapacity
} else {
Self::Drained
}
}
}
impl IoUringLoop {
/// Create a new io_uring loop and submit handle.
///
/// The loop allocates its own metrics, request channel, and internal `eventfd` wake source.
pub(crate) fn new(mut cfg: Config, registry: &mut impl Register) -> (Handle, Self) {
assert!(
!cfg.max_request_timeout.is_zero(),
"max_request_timeout must be non-zero for timeout wheel"
);
assert!(
!cfg.timeout_wheel_tick.is_zero(),
"timeout_wheel_tick must be non-zero for timeout wheel"
);
cfg.size = cfg
.size
.checked_next_power_of_two()
.expect("ring size exceeds u32::MAX");
// `pending()` interprets packed submission-sequence deltas with
// half-range modular ordering. After rounding to a power of two, that
// means the maximum admissible ring size is `MAX_RING_SIZE`.
assert!(
cfg.size <= MAX_RING_SIZE,
"rounded ring size must be at most {}",
MAX_RING_SIZE
);
let size = cfg.size as usize;
let metrics = Arc::new(Metrics::new(registry));
let (sender, receiver) = mpsc::channel(size);
let waker = Waker::new().expect("unable to create wake eventfd");
let timeout_wheel = TimeoutWheel::new(
cfg.max_request_timeout,
cfg.timeout_wheel_tick,
Instant::now(),
);
let idle_spinner = Spinner::new(&cfg.idle_spinner, || waker.pending(0));
let waiters = Waiters::new(size);
let handle = Handle {
inner: Arc::new(HandleInner {
sender: Some(sender),
waker: waker.clone(),
}),
};
(
handle,
Self {
cfg,
metrics,
receiver,
waiters,
ready_queue: VecDeque::with_capacity(size),
pending_cancels: VecDeque::with_capacity(size),
timeout_wheel,
idle_spinner,
waker,
wake_rearm_needed: true,
processed_seq: 0,
},
)
}
/// Runs the io_uring event loop until all submitters are dropped and in-flight work drains.
///
/// This method blocks the current thread.
pub(crate) fn run(mut self) {
let mut ring = new_ring(&self.cfg).expect("unable to create io_uring instance");
loop {
// Process available completions.
for cqe in ring.completion() {
self.handle_cqe(cqe);
}
// Process due deadlines before staging new submissions so timed-out
// requests move to cancellation promptly and free capacity sooner.
self.advance_timeouts();
// Stage as much inbound work as capacity allows.
let fill_result = self.fill_submission_queue(&mut ring);
// Update pending operations metric.
self.metrics.pending_operations.set(self.waiters.len() as _);
match fill_result {
FillResult::Disconnected => {
// Producer side disconnected. Drain in-flight requests and exit.
self.drain(&mut ring);
return;
}
FillResult::AtWaiterCapacity => {
// Waiter pressure means completions are required before the
// loop can admit more work, so the default behavior is to
// submit pending SQEs and block for progress with the
// eventfd-backed wake path armed.
//
// The only exception is an already-latched out-of-band
// wake, such as final-handle disconnect. In that case the
// loop must recheck shutdown instead of sleeping, but a
// published-ahead submission sequence alone is not a reason
// to skip blocking here because waiter pressure still
// prevents admitting more work.
let arm = self.waker.arm(self.processed_seq);
if !arm.wake_latched() {
self.submit_and_wait(&mut ring, 1, self.timeout_wheel.next_deadline())
.expect("unable to submit to ring");
}
continue;
}
FillResult::AtSubmissionQueueCapacity => {
// SQ pressure alone only means the staged batch must be
// flushed into the kernel. Do that without waiting and then
// re-enter the loop to drain completions or stage more work.
self.submit(&mut ring).expect("unable to submit to ring");
continue;
}
FillResult::Drained => {
// The staging pass drained all currently visible work from
// the request channel. Fall through to the normal
// idle-vs-blocking decision below.
}
}
// If the ring is truly idle, avoid `io_uring_enter` entirely and
// wait on the shared wake state via futex until a producer changes
// it. This bypasses the eventfd wake path when there are no active
// waiters. Before parking, spin briefly to avoid the futex
// round-trip when work is imminent.
if self.waiters.is_empty() {
if self
.idle_spinner
.spin(|| self.waker.pending(self.processed_seq))
{
continue;
}
if let Some(park_duration) = self.waker.park_idle(self.processed_seq) {
self.idle_spinner.on_wake(park_duration);
}
continue;
}
// Otherwise, active waiters remain in the ring and no forced
// submit is needed. Arm the eventfd-backed blocking path and block
// only if the post-arm snapshot still looks idle.
let arm = self.waker.arm(self.processed_seq);
if arm.still_idle() {
self.submit_and_wait(&mut ring, 1, self.timeout_wheel.next_deadline())
.expect("unable to submit to ring");
}
}
}
/// Admit a request into the waiter table and schedule its timeout.
///
/// Returns the waiter id if the request was admitted, or `None` if the
/// deadline already expired (in which case the request is completed
/// immediately with a timeout error).
fn admit_request(&mut self, request: Request) -> Option<WaiterId> {
let deadline = request.deadline();
let target_tick = match deadline {
Some(deadline) => match self.timeout_wheel.target_tick(deadline) {
Some(target_tick) => Some(target_tick),
None => {
request.timeout();
return None;
}
},
None => None,
};
let waiter_id = self.waiters.insert(request, target_tick);
if let Some(target_tick) = target_tick {
self.timeout_wheel.schedule(waiter_id, target_tick);
}
Some(waiter_id)
}
/// Build and push the SQE for a request in the waiter table.
///
/// If the request was marked for cancellation while sitting in the ready
/// queue (timeout fired between requeue and staging), it is completed with
/// a timeout error instead of issuing a follow-up SQE. If the original
/// caller dropped its wait handle before staging, the request is retired
/// locally without issuing another SQE.
fn stage_request(&mut self, waiter_id: WaiterId, submission_queue: &mut SubmissionQueue<'_>) {
match self.waiters.stage(waiter_id) {
StageOutcome::Timeout(request) => request.timeout(),
StageOutcome::Orphaned { target_tick } => {
// The caller disappeared before another SQE was issued, so all that
// remains is to release deadline tracking (the waiter, and associated
// resources, were already dropped inside `Waiters`).
if let Some(tick) = target_tick {
self.timeout_wheel.remove(tick);
}
}
StageOutcome::Submit(sqe) => {
// SAFETY:
// - All resources are stored in `self.waiters` until CQE processing, so
// SQE pointers remain valid and FD numbers cannot be reused early.
// - SQ capacity was checked by caller.
unsafe {
submission_queue
.push(&sqe)
.expect("unable to push to queue");
}
}
}
}
/// Stage requeued requests from `ready_queue` in FIFO order.
///
/// Stops when all queued requests are staged or the SQ reaches capacity.
/// Returns `true` when SQ capacity is hit and at least one ready request
/// remains queued.
fn stage_ready_requests(&mut self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
while !submission_queue.is_full() {
let Some(waiter_id) = self.ready_queue.pop_front() else {
return false;
};
self.stage_request(waiter_id, submission_queue);
}
!self.ready_queue.is_empty()
}
/// Stage pending submission work into the SQ.
///
/// In one pass, this may rearm wake polling, stage cancellations, restage
/// ready-queue requests, and admit new requests.
///
/// Advances `processed_seq` by exactly the number of drained submissions.
///
/// Returns why staging stopped.
fn fill_submission_queue(&mut self, ring: &mut IoUring) -> FillResult {
let mut submission_queue = ring.submission();
let mut wheel_aligned = self.timeout_wheel.next_deadline().is_some();
// Reinstall wake poll only when a prior wake CQE indicated multishot
// termination. Otherwise keep the existing poll registration.
//
// This check runs before every possible transition into the eventfd-backed
// blocking path. The fully idle futex path does not need the poll to be
// live, so an iteration that parks in futex may skip kernel entry
// entirely. If multishot termination was observed earlier, the next
// iteration that might block in `submit_and_wait` stages the rearm SQE
// here before entering the kernel again.
if self.wake_rearm_needed {
// If the SQ is already full from a previous iteration, submit them first.
if !self.waker.reinstall(&mut submission_queue) {
// Even if waiter capacity is also exhausted, we must not take
// the blocking path yet: the wake poll is not rearmed, so
// `submit_and_wait` would sleep without the eventfd wake path
// being live. Flush staged SQEs first, then retry rearm in the
// next pass.
return FillResult::AtSubmissionQueueCapacity;
}
self.wake_rearm_needed = false;
}
// Stage pending cancel SQEs first so timed-out requests are canceled promptly.
if self.stage_cancellations(&mut submission_queue) {
return FillResult::from_fill_state(&self.waiters, &submission_queue, &self.receiver);
}
// Requeued work already owns waiter capacity, so restage it before
// admitting fresh channel requests.
if self.stage_ready_requests(&mut submission_queue) {
return FillResult::from_fill_state(&self.waiters, &submission_queue, &self.receiver);
}
// Stage operations until the channel is empty, waiter capacity is hit,
// or the SQ is full. Waiter capacity is bounded by `cfg.size`.
while !self.waiters.is_full() && !submission_queue.is_full() {
// Try to drain one operation from the channel. If the first
// `try_recv()` reports `Empty`, do one acquire-guided recheck
// before concluding there is nothing more to drain in this pass.
let request = match self.receiver.try_recv() {
Ok(request) => request,
Err(TryRecvError::Disconnected) => return FillResult::Disconnected,
Err(TryRecvError::Empty) => {
// Catch the race where a producer submitted after the
// empty observation but before the loop decided to stop
// draining in this pass.
if !self.waker.pending(self.processed_seq) {
break;
}
// `pending()`'s acquire load observed a published-ahead
// sequence delta after the empty observation. Producers
// execute `send().await` before `publish()`, and that
// release/acquire edge makes the corresponding enqueue
// visible here before the second `try_recv()`.
//
// The rounded ring size stays below half the packed
// sequence domain, so the observed delta is directional:
// producers are genuinely ahead. Tokio's bounded MPSC also
// guarantees `try_recv()` returns `Disconnected` only when
// the channel is closed AND empty (we keep canaries for
// both assumptions below).
self.receiver.try_recv().expect(
"published-ahead sequence observed after acquire, but channel had no request",
)
}
};
// `processed_seq` counts items drained from the channel. Once a
// request is removed from the queue by either receive path above,
// it is considered processed for the wake protocol.
self.processed_seq = self.processed_seq.wrapping_add(1) & SUBMISSION_SEQ_MASK;
// Avoid per-loop clock reads when no deadlines are active. When the
// first deadline arrives after an idle period, align wheel time once
// before converting deadlines to ticks.
if !wheel_aligned && request.has_deadline() {
assert!(self.timeout_wheel.advance(Instant::now()).is_none());
wheel_aligned = true;
}
if let Some(waiter_id) = self.admit_request(request) {
self.stage_request(waiter_id, &mut submission_queue);
}
}
FillResult::from_fill_state(&self.waiters, &submission_queue, &self.receiver)
}
/// Stage queued cancellation SQEs from `pending_cancels` in FIFO order.
///
/// Stops when all queued cancellations are staged or the SQ reaches
/// capacity. Returns `true` when SQ capacity is hit and at least one
/// cancellation remains queued.
fn stage_cancellations(&mut self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
while !submission_queue.is_full() {
let Some(waiter_id) = self.pending_cancels.pop_front() else {
return false;
};
// This waiter timed out earlier, but its queued cancel may have
// gone stale before we got around to staging it. If the original
// op CQE already retired the outstanding SQE, there is nothing
// left for the kernel to cancel.
if !self.waiters.is_in_flight(waiter_id) {
continue;
}
let cancel = AsyncCancel::new(waiter_id.user_data())
.build()
.user_data(waiter_id.cancel_user_data());
// SAFETY: AsyncCancel SQE uses stable user_data only.
unsafe {
submission_queue
.push(&cancel)
.expect("unable to push cancel to queue");
}
}
!self.pending_cancels.is_empty()
}
/// Handle a single CQE from the ring.
///
/// Internal wake CQEs are handled in-place. All other CQEs are forwarded to
/// the request state machine for progress evaluation.
fn handle_cqe(&mut self, cqe: CqueueEntry) {
let user_data = cqe.user_data();
if user_data == WAKE_USER_DATA {
assert!(
cqe.result() >= 0,
"wake poll CQE failed: requires multishot poll (Linux 5.13+)"
);
// Drain wake readiness from eventfd for this wake CQE.
self.waker.acknowledge();
// Multishot can terminate, so we must re-arm to keep the wake
// path live.
if !io_uring::cqueue::more(cqe.flags()) {
self.wake_rearm_needed = true;
}
return;
}
match self.waiters.on_completion(user_data, cqe.result()) {
CompletionOutcome::Cancel => {
// Async-cancel CQEs are handled entirely inside `Waiters` they do
// not directly complete or requeue a logical request here.
}
CompletionOutcome::Requeue(waiter_id) => {
// Request needs another SQE. Add it to the ready queue.
self.ready_queue.push_back(waiter_id);
}
CompletionOutcome::Complete {
request,
target_tick,
} => {
if let Some(tick) = target_tick {
self.timeout_wheel.remove(tick);
}
request.complete();
}
}
}
/// Advance the timeout wheel and enqueue cancellations for newly expired requests.
///
/// This is a no-op when no active deadlines exist. Expired stale wheel
/// entries are ignored when waiter generation no longer matches.
fn advance_timeouts(&mut self) {
// Fast path: no active deadlines means no clock read and no wheel scan.
if self.timeout_wheel.next_deadline().is_none() {
return;
}
// No newly expired entries at this tick.
let Some(expired) = self.timeout_wheel.advance(Instant::now()) else {
return;
};
// Mark expired waiters as cancel-requested and queue their IDs for
// later cancel SQE staging.
for entry in expired {
// `false` means stale timeout entry (slot reused) or waiter already
// transitioned to cancel-requested/completed.
if self.waiters.cancel(entry.waiter_id) {
// Once cancel is requested, this waiter is no longer deadline-active.
self.timeout_wheel.remove(entry.target_tick);
// Only timed-out waiters with an outstanding op SQE need
// AsyncCancel. Waiters parked in the ready queue have no
// kernel op to cancel and will time out locally when restaged.
if self.waiters.is_in_flight(entry.waiter_id) {
self.pending_cancels.push_back(entry.waiter_id);
}
}
}
}
/// Drain in-flight requests during shutdown.
///
/// Keeps draining CQEs until all waiters complete or shutdown budget is
/// exhausted.
///
/// If `shutdown_timeout` is `None`, this waits until all waiters complete or are cancelled.
/// If `shutdown_timeout` is `Some`, this waits until completion or timeout,
/// then abandons any remaining waiters.
fn drain(&mut self, ring: &mut IoUring) {
let mut remaining = self.cfg.shutdown_timeout;
// Keep driving completions until all in-flight waiters finish or the
// shutdown budget is exhausted.
loop {
// Always drain CQEs first, even after a timed wait: completions can
// race with timeout expiry and still be pending in the queue.
for cqe in ring.completion() {
self.handle_cqe(cqe);