Skip to content

Commit c0b0733

Browse files
committed
refactor(driver): handle panicking
1 parent 18af3a4 commit c0b0733

7 files changed

Lines changed: 103 additions & 19 deletions

File tree

compio-driver/src/asyncify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn worker(
7676
counter.fetch_add(1, Ordering::AcqRel);
7777
let _guard = CounterGuard(counter);
7878
while let Ok(f) = receiver.recv_timeout(timeout) {
79-
f.run();
79+
f.run()
8080
}
8181
}
8282
}

compio-driver/src/key.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,17 @@ impl<C: crate::Carry + ?Sized> RawOp<C> {
6060
///
6161
/// [`Poll::Pending`]: std::task::Poll::Pending
6262
pub fn operate_blocking(&mut self) -> io::Result<usize> {
63-
use std::task::Poll;
63+
use std::{panic::AssertUnwindSafe, task::Poll};
64+
65+
use crate::panic::catch_unwind_io;
6466

6567
let optr = self.extra_mut().optr();
66-
let res = unsafe { self.carrier.operate(optr.cast()) };
67-
match res {
68-
Poll::Pending => unreachable!("this operation is not overlapped"),
69-
Poll::Ready(res) => res,
70-
}
68+
catch_unwind_io(AssertUnwindSafe(|| unsafe {
69+
match self.carrier.operate(optr.cast()) {
70+
Poll::Pending => unreachable!("this operation is not overlapped"),
71+
Poll::Ready(res) => res,
72+
}
73+
}))
7174
}
7275
}
7376

compio-driver/src/lib.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use compio_buf::BufResult;
2525
use compio_log::instrument;
2626

2727
mod macros;
28+
mod panic;
2829

2930
mod key;
3031
pub use key::Key;
@@ -49,7 +50,7 @@ pub use cancel::*;
4950

5051
mod control;
5152

52-
use crate::{key::ErasedKey, op::OpCodeFlag};
53+
use crate::{key::ErasedKey, op::OpCodeFlag, panic::resume_unwind_io};
5354

5455
mod sys_slice;
5556

@@ -202,7 +203,8 @@ impl Proactor {
202203
}
203204
self.cancel.remove(&key);
204205
if key.is_unique() && key.has_result() {
205-
Some(key.take_result())
206+
let (res, buf) = key.take_result().into_parts();
207+
Some(BufResult(resume_unwind_io(res), buf))
206208
} else {
207209
self.driver.cancel(key.erase());
208210
None
@@ -276,12 +278,14 @@ impl Proactor {
276278
///
277279
/// # Panics
278280
///
279-
/// This function will panic if the [`Key`] is not unique.
281+
/// This function will panic if the [`Key`] is not unique or if the
282+
/// operation is blocking and it panicked in the thread pool.
280283
pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
281284
instrument!(compio_log::Level::DEBUG, "pop", ?key);
282285
if key.has_result() {
283286
self.cancel.remove(&key);
284-
PushEntry::Ready(key.take_result())
287+
let (res, buf) = key.take_result().into_parts();
288+
PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
285289
} else {
286290
PushEntry::Pending(key)
287291
}
@@ -292,7 +296,8 @@ impl Proactor {
292296
///
293297
/// # Panics
294298
///
295-
/// This function will panic if the [`Key`] is not unique.
299+
/// This function will panic if the [`Key`] is not unique or if the
300+
/// operation is blocking and it panicked in the thread pool.
296301
pub fn pop_with_extra<T: OpCode>(
297302
&mut self,
298303
key: Key<T>,
@@ -301,8 +306,8 @@ impl Proactor {
301306
if key.has_result() {
302307
self.cancel.remove(&key);
303308
let extra = key.swap_extra(self.default_extra());
304-
let res = key.take_result();
305-
PushEntry::Ready((res, extra))
309+
let (res, buf) = key.take_result().into_parts();
310+
PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
306311
} else {
307312
PushEntry::Pending(key)
308313
}

compio-driver/src/panic.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{
2+
any::Any,
3+
error::Error,
4+
fmt::{Debug, Display},
5+
io,
6+
panic::{UnwindSafe, catch_unwind, resume_unwind},
7+
};
8+
9+
pub(crate) struct Panic(Box<dyn Any + Send>);
10+
11+
// Panic is unconditionally `Sync`
12+
unsafe impl Sync for Panic {}
13+
14+
impl Display for Panic {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
Debug::fmt(self, f)
17+
}
18+
}
19+
20+
impl Debug for Panic {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
f.debug_tuple("Panic").finish_non_exhaustive()
23+
}
24+
}
25+
26+
impl Error for Panic {}
27+
28+
impl From<Panic> for io::Error {
29+
fn from(value: Panic) -> Self {
30+
Self::other(value)
31+
}
32+
}
33+
34+
pub(crate) fn catch_unwind_io<F, R>(f: F) -> io::Result<R>
35+
where
36+
F: FnOnce() -> io::Result<R> + UnwindSafe,
37+
{
38+
catch_unwind(f).map_err(|err| io::Error::from(Panic(err)))?
39+
}
40+
41+
pub(crate) fn resume_unwind_io<T>(res: io::Result<T>) -> io::Result<T> {
42+
let Err(e) = res else { return res };
43+
match e.downcast::<Panic>() {
44+
Ok(p) => resume_unwind(p.0),
45+
Err(e) => Err(e),
46+
}
47+
}

compio-driver/src/sys/iour/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
io,
77
marker::PhantomData,
88
os::fd::FromRawFd,
9+
panic::AssertUnwindSafe,
910
sync::Arc,
1011
task::{Poll, Wake, Waker},
1112
time::Duration,
@@ -39,6 +40,7 @@ use crate::{
3940
AsyncifyPool, DriverType, Entry, ProactorBuilder,
4041
control::Carrier,
4142
key::{BorrowedKey, ErasedKey},
43+
panic::catch_unwind_io,
4244
syscall,
4345
};
4446

@@ -544,7 +546,7 @@ impl Driver {
544546
// SAFETY: we're submitting into the driver, so it's safe to freeze here.
545547
let mut key = unsafe { key.freeze() };
546548
let mut closure = move || {
547-
let res = key.as_mut().carrier.call_blocking();
549+
let res = catch_unwind_io(AssertUnwindSafe(|| key.as_mut().carrier.call_blocking()));
548550
let _ = completed.send(Entry::new(key.into_inner(), res));
549551
waker.wake();
550552
};

compio-driver/src/sys/poll/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
collections::{HashMap, VecDeque},
88
io,
99
num::NonZeroUsize,
10+
panic::AssertUnwindSafe,
1011
sync::Arc,
1112
task::{Poll, Wake, Waker},
1213
time::Duration,
@@ -20,7 +21,7 @@ use smallvec::SmallVec;
2021

2122
use crate::{
2223
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder, control::Carrier,
23-
key::BorrowedKey, sys::op::Interest, syscall,
24+
key::BorrowedKey, panic::catch_unwind_io, sys::op::Interest, syscall,
2425
};
2526

2627
mod extra;
@@ -588,11 +589,11 @@ impl Driver {
588589
let mut key = unsafe { key.freeze() };
589590

590591
let mut closure = move || {
591-
let poll = key.as_mut().carrier.operate();
592-
let res = match poll {
592+
let operate = || match key.as_mut().carrier.operate() {
593593
Poll::Pending => unreachable!("this operation is not non-blocking"),
594594
Poll::Ready(res) => res,
595595
};
596+
let res = catch_unwind_io(AssertUnwindSafe(operate));
596597
let _ = completed.send(Entry::new(key.into_inner(), res));
597598
waker.wake();
598599
};

compio-driver/tests/asyncify.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use std::{
2+
panic::{AssertUnwindSafe, catch_unwind},
3+
time::Duration,
4+
};
5+
16
use compio_buf::BufResult;
27
use compio_driver::{Key, OpCode, Proactor, PushEntry, op::Asyncify};
38

@@ -12,14 +17,35 @@ fn take_key<T: OpCode, R>(res: PushEntry<Key<T>, R>) -> Key<T> {
1217

1318
fn wait_for<T: OpCode>(driver: &mut Proactor, mut key: Key<T>) -> BufResult<usize, T> {
1419
loop {
15-
driver.poll(None).unwrap();
20+
_ = driver.poll(Some(Duration::from_millis(1)));
1621
match driver.pop(key) {
1722
PushEntry::Pending(k) => key = k,
1823
PushEntry::Ready(res) => break res,
1924
}
2025
}
2126
}
2227

28+
#[test]
29+
fn panic() {
30+
let mut driver = Proactor::builder().thread_pool_limit(1).build().unwrap();
31+
32+
// make panicking less noisy
33+
std::panic::set_hook(Box::new(|_| {}));
34+
35+
let a = take_key(driver.push(Asyncify::new(|| -> BufResult<usize, ()> {
36+
panic!("this should not blow up driver's thread pool");
37+
})));
38+
let b = take_key(driver.push(Asyncify::new(|| BufResult(Ok(1), ()))));
39+
40+
let res_b = wait_for(&mut driver, b);
41+
let res_a = catch_unwind(AssertUnwindSafe(|| wait_for(&mut driver, a)));
42+
43+
_ = std::panic::take_hook(); // restore to default hook
44+
45+
assert!(res_b.0.is_ok_and(|res| res == 1));
46+
assert!(res_a.is_err());
47+
}
48+
2349
#[test]
2450
#[should_panic(expected = "the thread pool is needed but no worker thread is running")]
2551
fn disable() {

0 commit comments

Comments
 (0)