Skip to content

Commit 5dee5df

Browse files
committed
refactor(driver): get rid of pin
1 parent 31d234d commit 5dee5df

23 files changed

Lines changed: 2538 additions & 1774 deletions

File tree

compio-driver/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ cfg-if = { workspace = true }
2323
flume = { workspace = true, default-features = false }
2424
futures-util = { workspace = true }
2525
socket2 = { workspace = true, features = ["all"] }
26-
pin-project-lite = { workspace = true }
2726
thin-cell = { workspace = true }
2827
smallvec = { workspace = true, optional = true, features = ["union"] }
2928
synchrony = { workspace = true, features = ["waker_slot"] }
3029
bitflags = { version = "2.11.0" }
30+
paste = { workspace = true }
3131

3232
# Windows specific dependencies
3333
[target.'cfg(windows)'.dependencies]
@@ -52,7 +52,6 @@ io-uring = { version = "0.7.0", optional = true }
5252
io_uring_buf_ring = { version = "0.2.2", optional = true }
5353
once_cell = { workspace = true, optional = true }
5454
polling = { version = "3.3.0", optional = true }
55-
paste = { workspace = true }
5655
slab = { workspace = true, optional = true }
5756

5857
[dev-dependencies]

compio-driver/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ fn main() {
1414
freebsd: { target_os = "freebsd" },
1515
solarish: { any(target_os = "illumos", target_os = "solaris") },
1616
aio: { any(freebsd, solarish) },
17+
polling: { all(unix, feature = "polling") },
1718
io_uring: { all(target_os = "linux", feature = "io-uring") },
1819
fusion: { all(target_os = "linux", feature = "io-uring", feature = "polling") },
1920

compio-driver/src/control.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use std::mem::MaybeUninit;
2+
3+
use compio_buf::IntoInner;
4+
5+
use crate::{DriverType, OpCode};
6+
7+
cfg_if::cfg_if! {
8+
if #[cfg(fusion)] {
9+
use crate::{PollOpCode, IourOpCode};
10+
} else if #[cfg(io_uring)] {
11+
use crate::OpCode as IourOpCode;
12+
} else if #[cfg(polling)]{
13+
use crate::OpCode as PollOpCode;
14+
}
15+
}
16+
17+
#[cfg(not(fusion))]
18+
type ControlInner<T> = <T as OpCode>::Control;
19+
20+
#[cfg(fusion)]
21+
enum ControlInner<T: OpCode + ?Sized> {
22+
Poll(<T as PollOpCode>::Control),
23+
IoUring(<T as IourOpCode>::Control),
24+
}
25+
26+
#[cfg(fusion)]
27+
impl<T: OpCode> ControlInner<T> {
28+
pub fn iour(&mut self) -> &mut <T as IourOpCode>::Control {
29+
match self {
30+
ControlInner::Poll(_) => unreachable!("Current driver is not `io-uring`"),
31+
ControlInner::IoUring(control) => control,
32+
}
33+
}
34+
35+
pub fn poll(&mut self) -> &mut <T as PollOpCode>::Control {
36+
match self {
37+
ControlInner::Poll(control) => control,
38+
ControlInner::IoUring(_) => unreachable!("Current driver is not `polling`"),
39+
}
40+
}
41+
}
42+
43+
/// A utility type that put a [`OpCode`] and its [`OpCode::Control`] together.
44+
///
45+
/// The only way to access this type is through [`ThinCell`], which pins it on
46+
/// the heap and guarantees any self-referential pointers to be valid.
47+
pub(crate) struct Carrier<T: OpCode + ?Sized> {
48+
control: MaybeUninit<ControlInner<T>>,
49+
op: T,
50+
}
51+
52+
impl<T: OpCode> IntoInner for Carrier<T> {
53+
type Inner = T;
54+
55+
fn into_inner(self) -> Self::Inner {
56+
self.op
57+
}
58+
}
59+
60+
impl<T: OpCode> Carrier<T> {
61+
pub fn new(op: T) -> Self {
62+
Self {
63+
control: MaybeUninit::uninit(),
64+
op,
65+
}
66+
}
67+
68+
/// Init the Carrier
69+
///
70+
/// # Safety
71+
///
72+
/// self must have stable address until control is no longer needed, which
73+
/// include all function calls via `dyn Carry`.
74+
pub unsafe fn init(&mut self, driver_ty: DriverType) {
75+
#[cfg(fusion)]
76+
{
77+
let control = match driver_ty {
78+
DriverType::Poll => ControlInner::Poll(unsafe { PollOpCode::init(&mut self.op) }),
79+
DriverType::IoUring => {
80+
ControlInner::IoUring(unsafe { IourOpCode::init(&mut self.op) })
81+
}
82+
DriverType::IOCP => unreachable!("Cannot be windows"),
83+
};
84+
85+
self.control.write(control);
86+
}
87+
88+
#[cfg(not(fusion))]
89+
{
90+
_ = driver_ty;
91+
92+
let control = unsafe { OpCode::init(&mut self.op) };
93+
self.control.write(control);
94+
}
95+
}
96+
97+
#[cfg(io_uring)]
98+
pub fn as_iour(&mut self) -> (&mut T, &mut <T as IourOpCode>::Control) {
99+
let control = unsafe { self.control.assume_init_mut() };
100+
#[cfg(fusion)]
101+
{
102+
(&mut self.op, control.iour())
103+
}
104+
#[cfg(not(fusion))]
105+
{
106+
(&mut self.op, control)
107+
}
108+
}
109+
110+
#[cfg(polling)]
111+
pub fn as_poll(&mut self) -> (&mut T, &mut <T as PollOpCode>::Control) {
112+
let control = unsafe { self.control.assume_init_mut() };
113+
#[cfg(fusion)]
114+
{
115+
(&mut self.op, control.poll())
116+
}
117+
#[cfg(not(fusion))]
118+
{
119+
(&mut self.op, control)
120+
}
121+
}
122+
123+
#[cfg(windows)]
124+
pub fn as_iocp(&self) -> (&T, &<T as OpCode>::Control) {
125+
let control = unsafe { self.control.assume_init_ref() };
126+
127+
(&self.op, control)
128+
}
129+
130+
#[cfg(windows)]
131+
pub fn as_iocp_mut(&mut self) -> (&mut T, &mut <T as OpCode>::Control) {
132+
let control = unsafe { self.control.assume_init_mut() };
133+
134+
(&mut self.op, control)
135+
}
136+
}

compio-driver/src/key.rs

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,20 @@ use std::{
66
io,
77
mem::{self, ManuallyDrop},
88
ops::{Deref, DerefMut},
9-
pin::Pin,
109
task::Waker,
1110
};
1211

13-
use compio_buf::BufResult;
14-
use thin_cell::unsync::{Ref, ThinCell};
12+
use compio_buf::{BufResult, IntoInner};
13+
use thin_cell::unsync::{Inner, Ref, ThinCell};
1514

16-
use crate::{Extra, OpCode, PushEntry};
15+
use crate::{Carry, DriverType, Extra, OpCode, PushEntry, control::Carrier};
1716

1817
/// An operation with other needed information.
1918
///
2019
/// You should not use `RawOp` directly. Instead, use [`Key`] to manage the
2120
/// reference-counted pointer to it.
2221
#[repr(C)]
23-
pub(crate) struct RawOp<T: ?Sized> {
22+
pub(crate) struct RawOp<M: ?Sized> {
2423
// Platform-specific extra data.
2524
//
2625
// - On Windows, it holds the `OVERLAPPED` buffer and a pointer to the driver.
@@ -33,10 +32,10 @@ pub(crate) struct RawOp<T: ?Sized> {
3332
// The cancelled flag indicates the op has been cancelled.
3433
cancelled: bool,
3534
result: PushEntry<Option<Waker>, io::Result<usize>>,
36-
pub(crate) op: T,
35+
pub(crate) Carrier: M,
3736
}
3837

39-
impl<T: ?Sized> RawOp<T> {
38+
impl<M: ?Sized> RawOp<M> {
4039
pub fn extra(&self) -> &Extra {
4140
&self.extra
4241
}
@@ -45,11 +44,6 @@ impl<T: ?Sized> RawOp<T> {
4544
&mut self.extra
4645
}
4746

48-
fn pinned_op(&mut self) -> Pin<&mut T> {
49-
// SAFETY: inner is always pinned with ThinCell.
50-
unsafe { Pin::new_unchecked(&mut self.op) }
51-
}
52-
5347
#[cfg(io_uring)]
5448
pub fn wake_by_ref(&mut self) {
5549
if let PushEntry::Pending(Some(w)) = &self.result {
@@ -59,7 +53,7 @@ impl<T: ?Sized> RawOp<T> {
5953
}
6054

6155
#[cfg(windows)]
62-
impl<T: OpCode + ?Sized> RawOp<T> {
56+
impl<M: crate::Carry + ?Sized> RawOp<M> {
6357
/// Call [`OpCode::operate`] and assume that it is not an overlapped op,
6458
/// which means it never returns [`Poll::Pending`].
6559
///
@@ -68,22 +62,21 @@ impl<T: OpCode + ?Sized> RawOp<T> {
6862
use std::task::Poll;
6963

7064
let optr = self.extra_mut().optr();
71-
let op = self.pinned_op();
72-
let res = unsafe { op.operate(optr.cast()) };
65+
let res = unsafe { self.Carrier.operate(optr.cast()) };
7366
match res {
7467
Poll::Pending => unreachable!("this operation is not overlapped"),
7568
Poll::Ready(res) => res,
7669
}
7770
}
7871
}
7972

80-
impl<T: ?Sized> Debug for RawOp<T> {
73+
impl<M: ?Sized> Debug for RawOp<M> {
8174
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8275
f.debug_struct("RawOp")
8376
.field("extra", &self.extra)
8477
.field("cancelled", &self.cancelled)
8578
.field("result", &self.result)
86-
.field("op", &"<...>")
79+
.field("Carrier", &"<...>")
8780
.finish()
8881
}
8982
}
@@ -120,7 +113,9 @@ impl<T> Key<T> {
120113
pub(crate) fn erase(self) -> ErasedKey {
121114
self.erased
122115
}
116+
}
123117

118+
impl<T: OpCode> Key<T> {
124119
/// Take the inner result if it is completed.
125120
///
126121
/// # Panics
@@ -135,8 +130,8 @@ impl<T> Key<T> {
135130

136131
impl<T: OpCode + 'static> Key<T> {
137132
/// Create [`RawOp`] and get the [`Key`] to it.
138-
pub(crate) fn new(op: T, extra: impl Into<Extra>) -> Self {
139-
let erased = ErasedKey::new(op, extra.into());
133+
pub(crate) fn new(op: T, extra: impl Into<Extra>, driver_ty: DriverType) -> Self {
134+
let erased = ErasedKey::new(op, extra.into(), driver_ty);
140135

141136
Self {
142137
erased,
@@ -171,7 +166,7 @@ impl<T> DerefMut for Key<T> {
171166
#[derive(Clone)]
172167
#[repr(transparent)]
173168
pub struct ErasedKey {
174-
inner: ThinCell<RawOp<dyn OpCode>>,
169+
inner: ThinCell<RawOp<dyn Carry>>,
175170
}
176171

177172
impl PartialEq for ErasedKey {
@@ -199,16 +194,22 @@ impl std::borrow::Borrow<usize> for ErasedKey {
199194

200195
impl ErasedKey {
201196
/// Create [`RawOp`] and get the [`ErasedKey`] to it.
202-
pub(crate) fn new<T: OpCode + 'static>(op: T, extra: Extra) -> Self {
197+
pub(crate) fn new<T: OpCode + 'static>(op: T, extra: Extra, driver_ty: DriverType) -> Self {
203198
let raw_op = RawOp {
204199
extra,
205200
cancelled: false,
206201
result: PushEntry::Pending(None),
207-
op,
202+
Carrier: Carrier::new(op),
208203
};
209-
// SAFETY: Unsize coersion from `RawOp<T>` to `RawOp<dyn OpCode>`
210-
let inner = unsafe { ThinCell::new_unsize(raw_op, |p| p as _) };
211-
Self { inner }
204+
let mut inner = ThinCell::new(raw_op);
205+
// SAFETY:
206+
// - ThinCell is just created, there will be no shared owner or borrower
207+
// - Carrier is being pinned by ThinCell, it will have a stable address until
208+
// move out
209+
unsafe { inner.borrow_unchecked().Carrier.init(driver_ty) };
210+
Self {
211+
inner: unsafe { inner.unsize(|p| p as *const Inner<RawOp<dyn Carry>>) },
212+
}
212213
}
213214

214215
/// Create from `user_data` pointer.
@@ -255,7 +256,7 @@ impl ErasedKey {
255256
}
256257

257258
#[inline]
258-
pub(crate) fn borrow(&self) -> Ref<'_, RawOp<dyn OpCode>> {
259+
pub(crate) fn borrow(&self) -> Ref<'_, RawOp<dyn Carry>> {
259260
self.inner.borrow()
260261
}
261262

@@ -283,7 +284,7 @@ impl ErasedKey {
283284
let this = &mut *this;
284285
if this.extra.is_iour() {
285286
unsafe {
286-
Pin::new_unchecked(&mut this.op).set_result(&res, &this.extra);
287+
this.Carrier.set_result(&res, &this.extra);
287288
}
288289
}
289290
}
@@ -323,12 +324,12 @@ impl ErasedKey {
323324
///
324325
/// Panics if the result is not ready or the `Key` is not unique (multiple
325326
/// references or borrowed).
326-
unsafe fn take_result<T>(self) -> BufResult<usize, T> {
327+
unsafe fn take_result<T: OpCode>(self) -> BufResult<usize, T> {
327328
// SAFETY: Caller guarantees that `T` is the actual concrete type.
328-
let this = unsafe { self.inner.downcast_unchecked::<RawOp<T>>() };
329+
let this = unsafe { self.inner.downcast_unchecked::<RawOp<Carrier<T>>>() };
329330
let op = this.try_unwrap().map_err(|_| ()).expect("Key not unique");
330331
let res = op.result.take_ready().expect("Result not ready");
331-
BufResult(res, op.op)
332+
BufResult(res, op.Carrier.into_inner())
332333
}
333334

334335
/// Unsafely freeze the `Key` by bypassing borrow flag of [`ThinCell`],
@@ -363,14 +364,10 @@ pub(crate) struct FrozenKey {
363364
}
364365

365366
impl FrozenKey {
366-
pub fn as_mut(&mut self) -> &mut RawOp<dyn OpCode> {
367+
pub fn as_mut(&mut self) -> &mut RawOp<dyn Carry> {
367368
unsafe { self.inner.inner.borrow_unchecked() }
368369
}
369370

370-
pub fn pinned_op(&mut self) -> Pin<&mut dyn OpCode> {
371-
self.as_mut().pinned_op()
372-
}
373-
374371
pub fn into_inner(self) -> ErasedKey {
375372
ManuallyDrop::into_inner(self.inner)
376373
}
@@ -405,29 +402,23 @@ impl Deref for BorrowedKey {
405402
}
406403
}
407404

408-
pub trait RefExt {
409-
fn pinned_op(&mut self) -> Pin<&mut dyn OpCode>;
410-
}
411-
412-
impl RefExt for Ref<'_, RawOp<dyn OpCode>> {
413-
fn pinned_op(&mut self) -> Pin<&mut dyn OpCode> {
414-
self.deref_mut().pinned_op()
415-
}
416-
}
417-
418405
#[cfg(test)]
419406
mod test {
420407
use std::borrow::Borrow;
421408

422409
use compio_buf::BufResult;
423410

424-
use crate::{Proactor, key::ErasedKey, op::Asyncify};
411+
use crate::{DriverType, Proactor, key::ErasedKey, op::Asyncify};
425412

426413
#[test]
427414
fn test_key_borrow() {
428415
let driver = Proactor::new().unwrap();
429416
let extra = driver.default_extra();
430-
let key = ErasedKey::new(Asyncify::new(|| BufResult(Ok(0), [0u8])), extra);
417+
let key = ErasedKey::new(
418+
Asyncify::new(|| BufResult(Ok(0), [0u8])),
419+
extra,
420+
DriverType::Poll,
421+
);
431422
assert_eq!(&key.as_raw(), Borrow::<usize>::borrow(&key));
432423
}
433424
}

0 commit comments

Comments
 (0)