Skip to content

Commit 0332b03

Browse files
committed
refactor(driver): handle panicking
1 parent 50470ef commit 0332b03

11 files changed

Lines changed: 121 additions & 24 deletions

File tree

compio-driver/src/asyncify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn worker(
7676
counter.fetch_add(1, Ordering::AcqRel);
7777
let _guard = CounterGuard(counter);
7878
while let Ok(f) = receiver.recv_timeout(timeout) {
79-
f.run();
79+
f.run()
8080
}
8181
}
8282
}

compio-driver/src/key.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,17 @@ impl<C: crate::Carry + ?Sized> RawOp<C> {
6060
///
6161
/// [`Poll::Pending`]: std::task::Poll::Pending
6262
pub fn operate_blocking(&mut self) -> io::Result<usize> {
63-
use std::task::Poll;
63+
use std::{panic::AssertUnwindSafe, task::Poll};
64+
65+
use crate::panic::catch_unwind_io;
6466

6567
let optr = self.extra_mut().optr();
66-
let res = unsafe { self.carrier.operate(optr.cast()) };
67-
match res {
68-
Poll::Pending => unreachable!("this operation is not overlapped"),
69-
Poll::Ready(res) => res,
70-
}
68+
catch_unwind_io(AssertUnwindSafe(|| unsafe {
69+
match self.carrier.operate(optr.cast()) {
70+
Poll::Pending => unreachable!("this operation is not overlapped"),
71+
Poll::Ready(res) => res,
72+
}
73+
}))
7174
}
7275
}
7376

compio-driver/src/lib.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use compio_log::instrument;
2626

2727
mod control;
2828
mod macros;
29+
mod panic;
2930

3031
mod key;
3132
pub use key::Key;
@@ -55,6 +56,7 @@ use crate::{
5556
buffer_pool::{BufferAlloc, BufferPoolRoot},
5657
key::ErasedKey,
5758
op::OpCodeFlag,
59+
panic::resume_unwind_io,
5860
};
5961

6062
mod sys_slice;
@@ -212,7 +214,8 @@ impl Proactor {
212214
}
213215
self.cancel.remove(&key);
214216
if key.is_unique() && key.has_result() {
215-
Some(key.take_result())
217+
let (res, buf) = key.take_result().into_parts();
218+
Some(BufResult(resume_unwind_io(res), buf))
216219
} else {
217220
self.driver.cancel(key.erase());
218221
None
@@ -286,12 +289,14 @@ impl Proactor {
286289
///
287290
/// # Panics
288291
///
289-
/// This function will panic if the [`Key`] is not unique.
292+
/// This function will panic if the [`Key`] is not unique or if the
293+
/// operation is blocking and it panicked in the thread pool.
290294
pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
291295
instrument!(compio_log::Level::DEBUG, "pop", ?key);
292296
if key.has_result() {
293297
self.cancel.remove(&key);
294-
PushEntry::Ready(key.take_result())
298+
let (res, buf) = key.take_result().into_parts();
299+
PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
295300
} else {
296301
PushEntry::Pending(key)
297302
}
@@ -302,7 +307,8 @@ impl Proactor {
302307
///
303308
/// # Panics
304309
///
305-
/// This function will panic if the [`Key`] is not unique.
310+
/// This function will panic if the [`Key`] is not unique or if the
311+
/// operation is blocking and it panicked in the thread pool.
306312
pub fn pop_with_extra<T: OpCode>(
307313
&mut self,
308314
key: Key<T>,
@@ -311,8 +317,8 @@ impl Proactor {
311317
if key.has_result() {
312318
self.cancel.remove(&key);
313319
let extra = key.swap_extra(self.default_extra());
314-
let res = key.take_result();
315-
PushEntry::Ready((res, extra))
320+
let (res, buf) = key.take_result().into_parts();
321+
PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
316322
} else {
317323
PushEntry::Pending(key)
318324
}

compio-driver/src/panic.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{
2+
any::Any,
3+
error::Error,
4+
fmt::{Debug, Display},
5+
io,
6+
panic::{UnwindSafe, catch_unwind, resume_unwind},
7+
};
8+
9+
pub(crate) struct Panic(Box<dyn Any + Send>);
10+
11+
// Panic is unconditionally `Sync`
12+
unsafe impl Sync for Panic {}
13+
14+
impl Display for Panic {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
Debug::fmt(self, f)
17+
}
18+
}
19+
20+
impl Debug for Panic {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
f.debug_tuple("Panic").finish_non_exhaustive()
23+
}
24+
}
25+
26+
impl Error for Panic {}
27+
28+
impl From<Panic> for io::Error {
29+
fn from(value: Panic) -> Self {
30+
Self::other(value)
31+
}
32+
}
33+
34+
pub(crate) fn catch_unwind_io<F, R>(f: F) -> io::Result<R>
35+
where
36+
F: FnOnce() -> io::Result<R> + UnwindSafe,
37+
{
38+
catch_unwind(f).map_err(|err| io::Error::from(Panic(err)))?
39+
}
40+
41+
pub(crate) fn resume_unwind_io<T>(res: io::Result<T>) -> io::Result<T> {
42+
let Err(e) = res else { return res };
43+
match e.downcast::<Panic>() {
44+
Ok(p) => resume_unwind(p.0),
45+
Err(e) => Err(e),
46+
}
47+
}

compio-driver/src/sys/iocp/cp/global.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl Port {
103103
self.global_port.attach(fd)
104104
}
105105

106+
#[allow(dead_code)]
106107
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
107108
self.port.post(res, optr)
108109
}

compio-driver/src/sys/iocp/cp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl CompletionPort {
8383
Ok(())
8484
}
8585

86+
#[allow(dead_code)]
8687
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
8788
if let Some(overlapped) = unsafe { optr.as_mut() } {
8889
match &res {

compio-driver/src/sys/iocp/cp/multi.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ impl Port {
2222
self.port.attach(fd)
2323
}
2424

25+
#[allow(dead_code)]
2526
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
2627
self.port.post(res, optr)
2728
}

compio-driver/src/sys/iocp/mod.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::{
1313

1414
use compio_buf::BufResult;
1515
use compio_log::{instrument, trace};
16+
use flume::{Receiver, Sender};
1617
use windows_sys::Win32::{
1718
Foundation::{ERROR_CANCELLED, HANDLE},
1819
System::IO::OVERLAPPED,
@@ -396,6 +397,8 @@ pub(crate) struct Driver {
396397
notify: Arc<Notify>,
397398
waits: HashMap<usize, wait::Wait>,
398399
pool: AsyncifyPool,
400+
completed_tx: Sender<Entry>,
401+
completed_rx: Receiver<Entry>,
399402
_local_marker: PhantomData<ErasedKey>,
400403
}
401404

@@ -407,8 +410,12 @@ impl Driver {
407410
let driver = port.as_raw_handle() as _;
408411
let overlapped = Overlapped::new(driver);
409412
let notify = Arc::new(Notify::new(port, overlapped));
413+
let (completed_tx, completed_rx) = flume::unbounded();
414+
410415
Ok(Self {
411416
notify,
417+
completed_tx,
418+
completed_rx,
412419
waits: HashMap::default(),
413420
pool: builder.create_or_get_thread_pool(),
414421
_local_marker: PhantomData,
@@ -477,16 +484,14 @@ impl Driver {
477484
}
478485

479486
fn push_blocking(&mut self, key: ErasedKey) {
480-
let notify = self.notify.clone();
481487
// SAFETY: we're submitting into the driver, so it's safe to freeze here.
482488
let mut key = unsafe { key.freeze() };
489+
let tx = self.completed_tx.clone();
483490

484491
let mut closure = move || {
485-
let op = key.as_mut();
486-
let res = op.operate_blocking();
487-
let optr = op.extra_mut().optr();
488-
// key will be unfronzen in `create_entry` when the result is ready
489-
notify.port.post(res, optr).ok();
492+
let res = key.as_mut().operate_blocking();
493+
let entry = Entry::new(key.into_inner(), res);
494+
_ = tx.send(entry);
490495
};
491496

492497
while let Err(e) = self.pool.dispatch(closure) {
@@ -536,6 +541,10 @@ impl Driver {
536541

537542
let notify = &self.notify.overlapped as *const Overlapped;
538543

544+
while let Ok(entry) = self.completed_rx.try_recv() {
545+
entry.notify();
546+
}
547+
539548
for e in self.notify.port.poll(timeout)? {
540549
if let Some(e) = Self::create_entry(notify, &mut self.waits, e) {
541550
e.notify()

compio-driver/src/sys/iour/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
io,
77
marker::PhantomData,
88
os::fd::FromRawFd,
9+
panic::AssertUnwindSafe,
910
sync::Arc,
1011
task::{Poll, Wake, Waker},
1112
time::Duration,
@@ -39,6 +40,7 @@ use crate::{
3940
AsyncifyPool, DriverType, Entry, ProactorBuilder,
4041
control::Carrier,
4142
key::{BorrowedKey, ErasedKey},
43+
panic::catch_unwind_io,
4244
syscall,
4345
};
4446

@@ -544,7 +546,7 @@ impl Driver {
544546
// SAFETY: we're submitting into the driver, so it's safe to freeze here.
545547
let mut key = unsafe { key.freeze() };
546548
let mut closure = move || {
547-
let res = key.as_mut().carrier.call_blocking();
549+
let res = catch_unwind_io(AssertUnwindSafe(|| key.as_mut().carrier.call_blocking()));
548550
let _ = completed.send(Entry::new(key.into_inner(), res));
549551
waker.wake();
550552
};

compio-driver/src/sys/poll/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
collections::{HashMap, VecDeque},
88
io,
99
num::NonZeroUsize,
10+
panic::AssertUnwindSafe,
1011
sync::Arc,
1112
task::{Poll, Wake, Waker},
1213
time::Duration,
@@ -20,7 +21,7 @@ use smallvec::SmallVec;
2021

2122
use crate::{
2223
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder, control::Carrier,
23-
key::BorrowedKey, sys::op::Interest, syscall,
24+
key::BorrowedKey, panic::catch_unwind_io, sys::op::Interest, syscall,
2425
};
2526

2627
mod extra;
@@ -588,11 +589,11 @@ impl Driver {
588589
let mut key = unsafe { key.freeze() };
589590

590591
let mut closure = move || {
591-
let poll = key.as_mut().carrier.operate();
592-
let res = match poll {
592+
let operate = || match key.as_mut().carrier.operate() {
593593
Poll::Pending => unreachable!("this operation is not non-blocking"),
594594
Poll::Ready(res) => res,
595595
};
596+
let res = catch_unwind_io(AssertUnwindSafe(operate));
596597
let _ = completed.send(Entry::new(key.into_inner(), res));
597598
waker.wake();
598599
};

0 commit comments

Comments
 (0)