Skip to content

Commit 441e9f3

Browse files
committed
refactor(driver): handle panicking
1 parent 18af3a4 commit 441e9f3

7 files changed

Lines changed: 98 additions & 15 deletions

File tree

compio-driver/src/asyncify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn worker(
7676
counter.fetch_add(1, Ordering::AcqRel);
7777
let _guard = CounterGuard(counter);
7878
while let Ok(f) = receiver.recv_timeout(timeout) {
79-
f.run();
79+
f.run()
8080
}
8181
}
8282
}

compio-driver/src/lib.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use compio_buf::BufResult;
2525
use compio_log::instrument;
2626

2727
mod macros;
28+
mod panic;
2829

2930
mod key;
3031
pub use key::Key;
@@ -49,7 +50,7 @@ pub use cancel::*;
4950

5051
mod control;
5152

52-
use crate::{key::ErasedKey, op::OpCodeFlag};
53+
use crate::{key::ErasedKey, op::OpCodeFlag, panic::resume_unwind_io};
5354

5455
mod sys_slice;
5556

@@ -276,12 +277,15 @@ impl Proactor {
276277
///
277278
/// # Panics
278279
///
279-
/// This function will panic if the [`Key`] is not unique.
280+
/// This function will panic if the [`Key`] is not unique or if the
281+
/// operation is blocking and it panicked in the thread pool.
280282
pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
281283
instrument!(compio_log::Level::DEBUG, "pop", ?key);
282284
if key.has_result() {
283285
self.cancel.remove(&key);
284-
PushEntry::Ready(key.take_result())
286+
let (res, buf) = key.take_result().into_parts();
287+
288+
PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
285289
} else {
286290
PushEntry::Pending(key)
287291
}
@@ -292,7 +296,8 @@ impl Proactor {
292296
///
293297
/// # Panics
294298
///
295-
/// This function will panic if the [`Key`] is not unique.
299+
/// This function will panic if the [`Key`] is not unique or if the
300+
/// operation is blocking and it panicked in the thread pool.
296301
pub fn pop_with_extra<T: OpCode>(
297302
&mut self,
298303
key: Key<T>,
@@ -301,8 +306,8 @@ impl Proactor {
301306
if key.has_result() {
302307
self.cancel.remove(&key);
303308
let extra = key.swap_extra(self.default_extra());
304-
let res = key.take_result();
305-
PushEntry::Ready((res, extra))
309+
let (res, buf) = key.take_result().into_parts();
310+
PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
306311
} else {
307312
PushEntry::Pending(key)
308313
}

compio-driver/src/panic.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{
2+
any::Any,
3+
error::Error,
4+
fmt::{Debug, Display},
5+
io,
6+
panic::{UnwindSafe, catch_unwind, resume_unwind},
7+
};
8+
9+
pub(crate) struct Panic(Box<dyn Any + Send>);
10+
11+
// Panic is unconditionally `Sync`
12+
unsafe impl Sync for Panic {}
13+
14+
impl Display for Panic {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
Debug::fmt(self, f)
17+
}
18+
}
19+
20+
impl Debug for Panic {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
f.debug_tuple("Panic").finish_non_exhaustive()
23+
}
24+
}
25+
26+
impl Error for Panic {}
27+
28+
impl From<Panic> for io::Error {
29+
fn from(value: Panic) -> Self {
30+
Self::other(value)
31+
}
32+
}
33+
34+
pub(crate) fn catch_unwind_io<F, R>(f: F) -> io::Result<R>
35+
where
36+
F: FnOnce() -> io::Result<R> + UnwindSafe,
37+
{
38+
catch_unwind(f).map_err(|err| io::Error::from(Panic(err)))?
39+
}
40+
41+
pub(crate) fn resume_unwind_io<T>(res: io::Result<T>) -> io::Result<T> {
42+
let Err(e) = res else { return res };
43+
match e.downcast::<Panic>() {
44+
Ok(p) => resume_unwind(p.0),
45+
Err(e) => Err(e),
46+
}
47+
}

compio-driver/src/sys/iocp/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
AsHandle, AsRawHandle, AsRawSocket, AsSocket, BorrowedHandle, BorrowedSocket, OwnedHandle,
77
OwnedSocket,
88
},
9+
panic::AssertUnwindSafe,
910
sync::Arc,
1011
task::{Poll, Wake, Waker},
1112
time::Duration,
@@ -18,7 +19,10 @@ use windows_sys::Win32::{
1819
System::IO::OVERLAPPED,
1920
};
2021

21-
use crate::{AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder, control::Carrier};
22+
use crate::{
23+
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder, control::Carrier,
24+
panic::catch_unwind_io,
25+
};
2226

2327
pub(crate) mod op;
2428

@@ -483,7 +487,7 @@ impl Driver {
483487

484488
let mut closure = move || {
485489
let op = key.as_mut();
486-
let res = op.operate_blocking();
490+
let res = catch_unwind_io(AssertUnwindSafe(|| op.operate_blocking()));
487491
let optr = op.extra_mut().optr();
488492
// key will be unfronzen in `create_entry` when the result is ready
489493
notify.port.post(res, optr).ok();
@@ -524,7 +528,7 @@ impl Driver {
524528
entry
525529
} else {
526530
let key = entry.into_key();
527-
let result = key.borrow().operate_blocking();
531+
let result = catch_unwind_io(AssertUnwindSafe(|| key.borrow().operate_blocking()));
528532
Entry::new(key, result)
529533
};
530534

compio-driver/src/sys/iour/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
io,
77
marker::PhantomData,
88
os::fd::FromRawFd,
9+
panic::AssertUnwindSafe,
910
sync::Arc,
1011
task::{Poll, Wake, Waker},
1112
time::Duration,
@@ -39,6 +40,7 @@ use crate::{
3940
AsyncifyPool, DriverType, Entry, ProactorBuilder,
4041
control::Carrier,
4142
key::{BorrowedKey, ErasedKey},
43+
panic::catch_unwind_io,
4244
syscall,
4345
};
4446

@@ -544,7 +546,7 @@ impl Driver {
544546
// SAFETY: we're submitting into the driver, so it's safe to freeze here.
545547
let mut key = unsafe { key.freeze() };
546548
let mut closure = move || {
547-
let res = key.as_mut().carrier.call_blocking();
549+
let res = catch_unwind_io(AssertUnwindSafe(|| key.as_mut().carrier.call_blocking()));
548550
let _ = completed.send(Entry::new(key.into_inner(), res));
549551
waker.wake();
550552
};

compio-driver/src/sys/poll/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
collections::{HashMap, VecDeque},
88
io,
99
num::NonZeroUsize,
10+
panic::AssertUnwindSafe,
1011
sync::Arc,
1112
task::{Poll, Wake, Waker},
1213
time::Duration,
@@ -20,7 +21,7 @@ use smallvec::SmallVec;
2021

2122
use crate::{
2223
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder, control::Carrier,
23-
key::BorrowedKey, sys::op::Interest, syscall,
24+
key::BorrowedKey, panic::catch_unwind_io, sys::op::Interest, syscall,
2425
};
2526

2627
mod extra;
@@ -588,11 +589,11 @@ impl Driver {
588589
let mut key = unsafe { key.freeze() };
589590

590591
let mut closure = move || {
591-
let poll = key.as_mut().carrier.operate();
592-
let res = match poll {
592+
let operate = || match key.as_mut().carrier.operate() {
593593
Poll::Pending => unreachable!("this operation is not non-blocking"),
594594
Poll::Ready(res) => res,
595595
};
596+
let res = catch_unwind_io(AssertUnwindSafe(operate));
596597
let _ = completed.send(Entry::new(key.into_inner(), res));
597598
waker.wake();
598599
};

compio-driver/tests/asyncify.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use std::{
2+
panic::{AssertUnwindSafe, catch_unwind},
3+
time::Duration,
4+
};
5+
16
use compio_buf::BufResult;
27
use compio_driver::{Key, OpCode, Proactor, PushEntry, op::Asyncify};
38

@@ -12,14 +17,33 @@ fn take_key<T: OpCode, R>(res: PushEntry<Key<T>, R>) -> Key<T> {
1217

1318
fn wait_for<T: OpCode>(driver: &mut Proactor, mut key: Key<T>) -> BufResult<usize, T> {
1419
loop {
15-
driver.poll(None).unwrap();
20+
_ = driver.poll(Some(Duration::from_millis(1)));
1621
match driver.pop(key) {
1722
PushEntry::Pending(k) => key = k,
1823
PushEntry::Ready(res) => break res,
1924
}
2025
}
2126
}
2227

28+
#[test]
29+
fn panic() {
30+
let mut driver = Proactor::builder().thread_pool_limit(1).build().unwrap();
31+
32+
// make panicking less noisy
33+
std::panic::set_hook(Box::new(|_| {}));
34+
35+
let a = take_key(driver.push(Asyncify::new(|| -> BufResult<usize, ()> {
36+
panic!("this should not blow up driver's thread pool");
37+
})));
38+
let b = take_key(driver.push(Asyncify::new(|| BufResult(Ok(1), ()))));
39+
40+
let res = wait_for(&mut driver, b);
41+
assert!(res.0.is_ok_and(|res| res == 1));
42+
43+
let res = catch_unwind(AssertUnwindSafe(|| wait_for(&mut driver, a)));
44+
assert!(res.is_err());
45+
}
46+
2347
#[test]
2448
#[should_panic(expected = "the thread pool is needed but no worker thread is running")]
2549
fn disable() {

0 commit comments

Comments
 (0)