Skip to content

Commit 0e92075

Browse files
committed
refactor(driver): handle panicking
1 parent 50470ef commit 0e92075

12 files changed

Lines changed: 352 additions & 245 deletions

File tree

compio-driver/src/asyncify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn worker(
7676
counter.fetch_add(1, Ordering::AcqRel);
7777
let _guard = CounterGuard(counter);
7878
while let Ok(f) = receiver.recv_timeout(timeout) {
79-
f.run();
79+
f.run()
8080
}
8181
}
8282
}

compio-driver/src/key.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,17 @@ impl<C: crate::Carry + ?Sized> RawOp<C> {
6060
///
6161
/// [`Poll::Pending`]: std::task::Poll::Pending
6262
pub fn operate_blocking(&mut self) -> io::Result<usize> {
63-
use std::task::Poll;
63+
use std::{panic::AssertUnwindSafe, task::Poll};
64+
65+
use crate::panic::catch_unwind_io;
6466

6567
let optr = self.extra_mut().optr();
66-
let res = unsafe { self.carrier.operate(optr.cast()) };
67-
match res {
68-
Poll::Pending => unreachable!("this operation is not overlapped"),
69-
Poll::Ready(res) => res,
70-
}
68+
catch_unwind_io(AssertUnwindSafe(|| unsafe {
69+
match self.carrier.operate(optr.cast()) {
70+
Poll::Pending => unreachable!("this operation is not overlapped"),
71+
Poll::Ready(res) => res,
72+
}
73+
}))
7174
}
7275
}
7376

compio-driver/src/lib.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use compio_log::instrument;
2626

2727
mod control;
2828
mod macros;
29+
mod panic;
2930

3031
mod key;
3132
pub use key::Key;
@@ -55,6 +56,7 @@ use crate::{
5556
buffer_pool::{BufferAlloc, BufferPoolRoot},
5657
key::ErasedKey,
5758
op::OpCodeFlag,
59+
panic::resume_unwind_io,
5860
};
5961

6062
mod sys_slice;
@@ -212,7 +214,8 @@ impl Proactor {
212214
}
213215
self.cancel.remove(&key);
214216
if key.is_unique() && key.has_result() {
215-
Some(key.take_result())
217+
let (res, buf) = key.take_result().into_parts();
218+
Some(BufResult(resume_unwind_io(res), buf))
216219
} else {
217220
self.driver.cancel(key.erase());
218221
None
@@ -286,12 +289,14 @@ impl Proactor {
286289
///
287290
/// # Panics
288291
///
289-
/// This function will panic if the [`Key`] is not unique.
292+
/// This function will panic if the [`Key`] is not unique or if the
293+
/// operation is blocking and it panicked in the thread pool.
290294
pub fn pop<T: OpCode>(&mut self, key: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
291295
instrument!(compio_log::Level::DEBUG, "pop", ?key);
292296
if key.has_result() {
293297
self.cancel.remove(&key);
294-
PushEntry::Ready(key.take_result())
298+
let (res, buf) = key.take_result().into_parts();
299+
PushEntry::Ready(BufResult(resume_unwind_io(res), buf))
295300
} else {
296301
PushEntry::Pending(key)
297302
}
@@ -302,7 +307,8 @@ impl Proactor {
302307
///
303308
/// # Panics
304309
///
305-
/// This function will panic if the [`Key`] is not unique.
310+
/// This function will panic if the [`Key`] is not unique or if the
311+
/// operation is blocking and it panicked in the thread pool.
306312
pub fn pop_with_extra<T: OpCode>(
307313
&mut self,
308314
key: Key<T>,
@@ -311,8 +317,8 @@ impl Proactor {
311317
if key.has_result() {
312318
self.cancel.remove(&key);
313319
let extra = key.swap_extra(self.default_extra());
314-
let res = key.take_result();
315-
PushEntry::Ready((res, extra))
320+
let (res, buf) = key.take_result().into_parts();
321+
PushEntry::Ready((BufResult(resume_unwind_io(res), buf), extra))
316322
} else {
317323
PushEntry::Pending(key)
318324
}

compio-driver/src/panic.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{
2+
any::Any,
3+
error::Error,
4+
fmt::{Debug, Display},
5+
io,
6+
panic::{UnwindSafe, catch_unwind, resume_unwind},
7+
};
8+
9+
pub(crate) struct Panic(Box<dyn Any + Send>);
10+
11+
// Panic is unconditionally `Sync`
12+
unsafe impl Sync for Panic {}
13+
14+
impl Display for Panic {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
Debug::fmt(self, f)
17+
}
18+
}
19+
20+
impl Debug for Panic {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
f.debug_tuple("Panic").finish_non_exhaustive()
23+
}
24+
}
25+
26+
impl Error for Panic {}
27+
28+
impl From<Panic> for io::Error {
29+
fn from(value: Panic) -> Self {
30+
Self::other(value)
31+
}
32+
}
33+
34+
pub(crate) fn catch_unwind_io<F, R>(f: F) -> io::Result<R>
35+
where
36+
F: FnOnce() -> io::Result<R> + UnwindSafe,
37+
{
38+
catch_unwind(f).map_err(|err| io::Error::from(Panic(err)))?
39+
}
40+
41+
pub(crate) fn resume_unwind_io<T>(res: io::Result<T>) -> io::Result<T> {
42+
let Err(e) = res else { return res };
43+
match e.downcast::<Panic>() {
44+
Ok(p) => resume_unwind(p.0),
45+
Err(e) => Err(e),
46+
}
47+
}

compio-driver/src/sys/iocp/cp/global.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl Port {
103103
self.global_port.attach(fd)
104104
}
105105

106+
#[allow(dead_code)]
106107
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
107108
self.port.post(res, optr)
108109
}

compio-driver/src/sys/iocp/cp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl CompletionPort {
8383
Ok(())
8484
}
8585

86+
#[allow(dead_code)]
8687
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
8788
if let Some(overlapped) = unsafe { optr.as_mut() } {
8889
match &res {

compio-driver/src/sys/iocp/cp/multi.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ impl Port {
2222
self.port.attach(fd)
2323
}
2424

25+
#[allow(dead_code)]
2526
pub fn post(&self, res: io::Result<usize>, optr: *mut Overlapped) -> io::Result<()> {
2627
self.port.post(res, optr)
2728
}

compio-driver/src/sys/iocp/fd.rs

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
use super::*;
2+
3+
/// On windows, handle and socket are in the same size.
4+
/// Both of them could be attached to an IOCP.
5+
/// Therefore, both could be seen as fd.
6+
pub type RawFd = HANDLE;
7+
8+
/// Extracts raw fds.
9+
pub trait AsRawFd {
10+
/// Extracts the raw fd.
11+
fn as_raw_fd(&self) -> RawFd;
12+
}
13+
14+
/// Owned handle or socket on Windows.
15+
#[derive(Debug)]
16+
pub enum OwnedFd {
17+
/// Win32 handle.
18+
File(OwnedHandle),
19+
/// Windows socket handle.
20+
Socket(OwnedSocket),
21+
}
22+
23+
impl AsRawFd for OwnedFd {
24+
fn as_raw_fd(&self) -> RawFd {
25+
match self {
26+
Self::File(fd) => fd.as_raw_handle() as _,
27+
Self::Socket(s) => s.as_raw_socket() as _,
28+
}
29+
}
30+
}
31+
32+
impl AsRawFd for RawFd {
33+
fn as_raw_fd(&self) -> RawFd {
34+
*self
35+
}
36+
}
37+
38+
impl AsRawFd for std::fs::File {
39+
fn as_raw_fd(&self) -> RawFd {
40+
self.as_raw_handle() as _
41+
}
42+
}
43+
44+
impl AsRawFd for OwnedHandle {
45+
fn as_raw_fd(&self) -> RawFd {
46+
self.as_raw_handle() as _
47+
}
48+
}
49+
50+
impl AsRawFd for socket2::Socket {
51+
fn as_raw_fd(&self) -> RawFd {
52+
self.as_raw_socket() as _
53+
}
54+
}
55+
56+
impl AsRawFd for OwnedSocket {
57+
fn as_raw_fd(&self) -> RawFd {
58+
self.as_raw_socket() as _
59+
}
60+
}
61+
62+
impl AsRawFd for std::process::ChildStdin {
63+
fn as_raw_fd(&self) -> RawFd {
64+
self.as_raw_handle() as _
65+
}
66+
}
67+
68+
impl AsRawFd for std::process::ChildStdout {
69+
fn as_raw_fd(&self) -> RawFd {
70+
self.as_raw_handle() as _
71+
}
72+
}
73+
74+
impl AsRawFd for std::process::ChildStderr {
75+
fn as_raw_fd(&self) -> RawFd {
76+
self.as_raw_handle() as _
77+
}
78+
}
79+
80+
impl From<OwnedHandle> for OwnedFd {
81+
fn from(value: OwnedHandle) -> Self {
82+
Self::File(value)
83+
}
84+
}
85+
86+
impl From<std::fs::File> for OwnedFd {
87+
fn from(value: std::fs::File) -> Self {
88+
Self::File(OwnedHandle::from(value))
89+
}
90+
}
91+
92+
impl From<std::process::ChildStdin> for OwnedFd {
93+
fn from(value: std::process::ChildStdin) -> Self {
94+
Self::File(OwnedHandle::from(value))
95+
}
96+
}
97+
98+
impl From<std::process::ChildStdout> for OwnedFd {
99+
fn from(value: std::process::ChildStdout) -> Self {
100+
Self::File(OwnedHandle::from(value))
101+
}
102+
}
103+
104+
impl From<std::process::ChildStderr> for OwnedFd {
105+
fn from(value: std::process::ChildStderr) -> Self {
106+
Self::File(OwnedHandle::from(value))
107+
}
108+
}
109+
110+
impl From<OwnedSocket> for OwnedFd {
111+
fn from(value: OwnedSocket) -> Self {
112+
Self::Socket(value)
113+
}
114+
}
115+
116+
impl From<socket2::Socket> for OwnedFd {
117+
fn from(value: socket2::Socket) -> Self {
118+
Self::Socket(OwnedSocket::from(value))
119+
}
120+
}
121+
122+
/// Borrowed handle or socket on Windows.
123+
#[derive(Debug)]
124+
pub enum BorrowedFd<'a> {
125+
/// Win32 handle.
126+
File(BorrowedHandle<'a>),
127+
/// Windows socket handle.
128+
Socket(BorrowedSocket<'a>),
129+
}
130+
131+
impl AsRawFd for BorrowedFd<'_> {
132+
fn as_raw_fd(&self) -> RawFd {
133+
match self {
134+
Self::File(fd) => fd.as_raw_handle() as RawFd,
135+
Self::Socket(s) => s.as_raw_socket() as RawFd,
136+
}
137+
}
138+
}
139+
140+
impl<'a> From<BorrowedHandle<'a>> for BorrowedFd<'a> {
141+
fn from(value: BorrowedHandle<'a>) -> Self {
142+
Self::File(value)
143+
}
144+
}
145+
146+
impl<'a> From<BorrowedSocket<'a>> for BorrowedFd<'a> {
147+
fn from(value: BorrowedSocket<'a>) -> Self {
148+
Self::Socket(value)
149+
}
150+
}
151+
152+
/// Extracts fds.
153+
pub trait AsFd {
154+
/// Extracts the borrowed fd.
155+
fn as_fd(&self) -> BorrowedFd<'_>;
156+
}
157+
158+
impl AsFd for OwnedFd {
159+
fn as_fd(&self) -> BorrowedFd<'_> {
160+
match self {
161+
Self::File(fd) => fd.as_fd(),
162+
Self::Socket(s) => s.as_fd(),
163+
}
164+
}
165+
}
166+
167+
impl AsFd for std::fs::File {
168+
fn as_fd(&self) -> BorrowedFd<'_> {
169+
self.as_handle().into()
170+
}
171+
}
172+
173+
impl AsFd for OwnedHandle {
174+
fn as_fd(&self) -> BorrowedFd<'_> {
175+
self.as_handle().into()
176+
}
177+
}
178+
179+
impl AsFd for BorrowedHandle<'_> {
180+
fn as_fd(&self) -> BorrowedFd<'_> {
181+
(*self).into()
182+
}
183+
}
184+
185+
impl AsFd for socket2::Socket {
186+
fn as_fd(&self) -> BorrowedFd<'_> {
187+
self.as_socket().into()
188+
}
189+
}
190+
191+
impl AsFd for OwnedSocket {
192+
fn as_fd(&self) -> BorrowedFd<'_> {
193+
self.as_socket().into()
194+
}
195+
}
196+
197+
impl AsFd for BorrowedSocket<'_> {
198+
fn as_fd(&self) -> BorrowedFd<'_> {
199+
(*self).into()
200+
}
201+
}
202+
203+
impl AsFd for std::process::ChildStdin {
204+
fn as_fd(&self) -> BorrowedFd<'_> {
205+
self.as_handle().into()
206+
}
207+
}
208+
209+
impl AsFd for std::process::ChildStdout {
210+
fn as_fd(&self) -> BorrowedFd<'_> {
211+
self.as_handle().into()
212+
}
213+
}
214+
215+
impl AsFd for std::process::ChildStderr {
216+
fn as_fd(&self) -> BorrowedFd<'_> {
217+
self.as_handle().into()
218+
}
219+
}

0 commit comments

Comments
 (0)