Skip to content

Commit 3e5adb4

Browse files
committed
feat(driver): make thread safety optional
1 parent 494b249 commit 3e5adb4

18 files changed

Lines changed: 504 additions & 82 deletions

File tree

compio-dispatcher/tests/listener.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::num::NonZeroUsize;
22

3-
use compio_buf::arrayvec::ArrayVec;
3+
use compio_buf::{IntoInner, arrayvec::ArrayVec};
44
use compio_dispatcher::Dispatcher;
55
use compio_io::{AsyncReadExt, AsyncWriteExt};
66
use compio_net::{TcpListener, TcpStream};
@@ -27,9 +27,11 @@ async fn listener_dispatch() {
2727
});
2828
let mut handles = FuturesUnordered::new();
2929
for _i in 0..CLIENT_NUM {
30-
let (mut srv, _) = listener.accept().await.unwrap();
30+
let (srv, _) = listener.accept().await.unwrap();
31+
let srv = srv.try_into_sync().unwrap();
3132
let handle = dispatcher
3233
.dispatch(move || async move {
34+
let mut srv = srv.into_inner();
3335
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
3436
assert_eq!(buf.as_slice(), b"Hello world!");
3537
})

compio-driver/src/fd.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,35 @@ use std::os::fd::FromRawFd;
33
#[cfg(windows)]
44
use std::os::windows::io::{FromRawHandle, FromRawSocket, RawHandle, RawSocket};
55
use std::{
6+
cell::{Cell, RefCell},
67
future::{Future, poll_fn},
78
mem::ManuallyDrop,
89
ops::Deref,
910
panic::RefUnwindSafe,
10-
sync::{
11-
Arc,
12-
atomic::{AtomicBool, Ordering},
13-
},
14-
task::Poll,
11+
rc::Rc,
12+
task::{Poll, Waker},
1513
};
1614

17-
use futures_util::task::AtomicWaker;
18-
1915
use crate::{AsFd, AsRawFd, BorrowedFd, RawFd};
2016

2117
#[derive(Debug)]
2218
struct Inner<T> {
2319
fd: T,
2420
// whether there is a future waiting
25-
waits: AtomicBool,
26-
waker: AtomicWaker,
21+
waits: Cell<bool>,
22+
waker: RefCell<Option<Waker>>,
2723
}
2824

2925
impl<T> RefUnwindSafe for Inner<T> {}
3026

3127
/// A shared fd. It is passed to the operations to make sure the fd won't be
3228
/// closed before the operations complete.
3329
#[derive(Debug)]
34-
pub struct SharedFd<T>(Arc<Inner<T>>);
30+
pub struct SharedFd<T>(Rc<Inner<T>>);
31+
// We use `Rc` internally to avoid the overhead of `Arc`. It is not `Send` or
32+
// `Sync`, but we have to access it in another thread when processing blocking
33+
// operations. We ensure that the access is safe because there will be only
34+
// one thread accessing it at a time.
3535

3636
impl<T: AsFd> SharedFd<T> {
3737
/// Create the shared fd from an owned fd.
@@ -46,10 +46,10 @@ impl<T> SharedFd<T> {
4646
/// # Safety
4747
/// * T should own the fd.
4848
pub unsafe fn new_unchecked(fd: T) -> Self {
49-
Self(Arc::new(Inner {
49+
Self(Rc::new(Inner {
5050
fd,
51-
waits: AtomicBool::new(false),
52-
waker: AtomicWaker::new(),
51+
waits: Cell::new(false),
52+
waker: RefCell::new(None),
5353
}))
5454
}
5555

@@ -67,7 +67,7 @@ impl<T> SharedFd<T> {
6767
unsafe fn try_unwrap_inner(this: &ManuallyDrop<Self>) -> Option<T> {
6868
let ptr = ManuallyDrop::new(std::ptr::read(&this.0));
6969
// The ptr is duplicated without increasing the strong count, should forget.
70-
match Arc::try_unwrap(ManuallyDrop::into_inner(ptr)) {
70+
match Rc::try_unwrap(ManuallyDrop::into_inner(ptr)) {
7171
Ok(inner) => Some(inner.fd),
7272
Err(ptr) => {
7373
std::mem::forget(ptr);
@@ -80,13 +80,13 @@ impl<T> SharedFd<T> {
8080
pub fn take(self) -> impl Future<Output = Option<T>> {
8181
let this = ManuallyDrop::new(self);
8282
async move {
83-
if !this.0.waits.swap(true, Ordering::AcqRel) {
83+
if !this.0.waits.replace(true) {
8484
poll_fn(move |cx| {
8585
if let Some(fd) = unsafe { Self::try_unwrap_inner(&this) } {
8686
return Poll::Ready(Some(fd));
8787
}
8888

89-
this.0.waker.register(cx.waker());
89+
this.0.waker.borrow_mut().replace(cx.waker().clone());
9090

9191
if let Some(fd) = unsafe { Self::try_unwrap_inner(&this) } {
9292
Poll::Ready(Some(fd))
@@ -105,8 +105,10 @@ impl<T> SharedFd<T> {
105105
impl<T> Drop for SharedFd<T> {
106106
fn drop(&mut self) {
107107
// It's OK to wake multiple times.
108-
if Arc::strong_count(&self.0) == 2 && self.0.waits.load(Ordering::Acquire) {
109-
self.0.waker.wake()
108+
if Rc::strong_count(&self.0) == 2 && self.0.waits.get() {
109+
if let Some(waker) = self.0.waker.borrow_mut().take() {
110+
waker.wake()
111+
}
110112
}
111113
}
112114
}

compio-driver/src/iocp/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ pub enum OwnedFd {
4444
Socket(OwnedSocket),
4545
}
4646

47+
impl OwnedFd {
48+
/// Creates a new [`OwnedFd`] instance that shares the same underlying
49+
/// object as the existing [`OwnedFd`] instance.
50+
pub fn try_clone(&self) -> io::Result<Self> {
51+
match self {
52+
Self::File(fd) => fd.try_clone().map(OwnedFd::File),
53+
Self::Socket(s) => s.try_clone().map(OwnedFd::Socket),
54+
}
55+
}
56+
}
57+
4758
impl AsRawFd for OwnedFd {
4859
fn as_raw_fd(&self) -> RawFd {
4960
match self {
@@ -152,6 +163,17 @@ pub enum BorrowedFd<'a> {
152163
Socket(BorrowedSocket<'a>),
153164
}
154165

166+
impl<'a> BorrowedFd<'a> {
167+
/// Creates a new [`OwnedFd`] instance that shares the same underlying
168+
/// object as the existing [`BorrowedFd`] instance.
169+
pub fn try_clone_to_owned(&self) -> io::Result<OwnedFd> {
170+
match self {
171+
Self::File(fd) => fd.try_clone_to_owned().map(OwnedFd::File),
172+
Self::Socket(s) => s.try_clone_to_owned().map(OwnedFd::Socket),
173+
}
174+
}
175+
}
176+
155177
impl AsRawFd for BorrowedFd<'_> {
156178
fn as_raw_fd(&self) -> RawFd {
157179
match self {

compio-fs/src/async_fd.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ impl<T: AsFd> AsyncFd<T> {
4444
inner: Attacher::new_unchecked(source),
4545
}
4646
}
47+
48+
/// Create [`AsyncFd`] from a shared file descriptor without attaching
49+
/// the source.
50+
///
51+
/// # Safety
52+
///
53+
/// See [`AsyncFd::new_unchecked`].
54+
pub unsafe fn from_shared_fd_unchecked(fd: SharedFd<T>) -> Self {
55+
Self {
56+
inner: Attacher::from_shared_fd_unchecked(fd),
57+
}
58+
}
4759
}
4860

4961
impl<T: AsFd + 'static> AsyncRead for AsyncFd<T> {

compio-fs/src/file.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ impl File {
5959
})
6060
}
6161

62+
pub(crate) unsafe fn from_std_unchecked(file: std::fs::File) -> Self {
63+
Self {
64+
inner: Attacher::new_unchecked(file),
65+
}
66+
}
67+
6268
/// Attempts to open a file in read-only mode.
6369
///
6470
/// See the [`OpenOptions::open`] method for more details.
@@ -105,7 +111,7 @@ impl File {
105111
/// Queries metadata about the underlying file.
106112
#[cfg(windows)]
107113
pub async fn metadata(&self) -> io::Result<Metadata> {
108-
let file = self.inner.clone();
114+
let file = self.inner.try_clone()?;
109115
compio_runtime::spawn_blocking(move || file.metadata().map(Metadata::from_std))
110116
.await
111117
.unwrap_or_else(|e| resume_unwind(e))
@@ -122,7 +128,7 @@ impl File {
122128
/// Changes the permissions on the underlying file.
123129
#[cfg(windows)]
124130
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
125-
let file = self.inner.clone();
131+
let file = self.inner.try_clone()?;
126132
compio_runtime::spawn_blocking(move || file.set_permissions(perm.0))
127133
.await
128134
.unwrap_or_else(|e| resume_unwind(e))
@@ -172,6 +178,21 @@ impl File {
172178
pub async fn sync_data(&self) -> io::Result<()> {
173179
self.sync_impl(true).await
174180
}
181+
182+
/// Attempts to clone the file handle.
183+
pub fn try_clone(&self) -> io::Result<SyncFile> {
184+
self.inner.try_clone().map(SyncFile::new)
185+
}
186+
187+
/// Try to convert into a thread safe file handle.
188+
pub fn try_into_sync(self) -> Result<SyncFile, Self> {
189+
match self.inner.into_inner().try_unwrap() {
190+
Ok(inner) => Ok(SyncFile::new(inner)),
191+
Err(fd) => Err(Self {
192+
inner: unsafe { Attacher::from_shared_fd_unchecked(fd) },
193+
}),
194+
}
195+
}
175196
}
176197

177198
impl AsyncReadAt for File {
@@ -267,3 +288,29 @@ impl Splittable for &File {
267288
}
268289

269290
impl_raw_fd!(File, std::fs::File, inner, file);
291+
292+
/// A thread safe file handle.
293+
#[derive(Debug)]
294+
pub struct SyncFile {
295+
file: std::fs::File,
296+
}
297+
298+
impl SyncFile {
299+
pub(crate) fn new(file: std::fs::File) -> Self {
300+
Self { file }
301+
}
302+
303+
/// Attempts to clone the file handle.
304+
pub fn try_clone(&self) -> io::Result<Self> {
305+
self.file.try_clone().map(Self::new)
306+
}
307+
}
308+
309+
impl IntoInner for SyncFile {
310+
type Inner = File;
311+
312+
fn into_inner(self) -> Self::Inner {
313+
// Safety: the handle is cloned from an existing File.
314+
unsafe { File::from_std_unchecked(self.file) }
315+
}
316+
}

compio-fs/src/named_pipe.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use std::ptr::null_mut;
77
use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};
88

9-
use compio_buf::{BufResult, IoBuf, IoBufMut};
9+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
1010
use compio_driver::{AsRawFd, RawFd, ToSharedFd, impl_raw_fd, op::ConnectNamedPipe, syscall};
1111
use compio_io::{
1212
AsyncRead, AsyncReadAt, AsyncReadManaged, AsyncReadManagedAt, AsyncWrite, AsyncWriteAt,
@@ -29,7 +29,7 @@ use windows_sys::Win32::{
2929
},
3030
};
3131

32-
use crate::{AsyncFd, File, OpenOptions};
32+
use crate::{AsyncFd, File, OpenOptions, SyncFile};
3333

3434
/// A [Windows named pipe] server.
3535
///
@@ -180,6 +180,21 @@ impl NamedPipeServer {
180180
syscall!(BOOL, DisconnectNamedPipe(self.as_raw_fd() as _))?;
181181
Ok(())
182182
}
183+
184+
/// Attempts to clone the named pipe server.
185+
pub fn try_clone(&self) -> io::Result<SyncNamedPipeServer> {
186+
self.handle.try_clone().map(SyncNamedPipeServer::new)
187+
}
188+
189+
/// Try to convert the named pipe server into a thread safe handle.
190+
pub fn try_into_sync(self) -> Result<SyncNamedPipeServer, Self> {
191+
match self.handle.into_inner().try_unwrap() {
192+
Ok(file) => Ok(SyncNamedPipeServer::new(file)),
193+
Err(fd) => Err(Self {
194+
handle: unsafe { AsyncFd::from_shared_fd_unchecked(fd) },
195+
}),
196+
}
197+
}
183198
}
184199

185200
impl AsyncRead for NamedPipeServer {
@@ -277,6 +292,33 @@ impl Splittable for &NamedPipeServer {
277292

278293
impl_raw_fd!(NamedPipeServer, std::fs::File, handle, file);
279294

295+
/// A thread safe named pipe server.
296+
#[derive(Debug)]
297+
pub struct SyncNamedPipeServer {
298+
inner: std::fs::File,
299+
}
300+
301+
impl SyncNamedPipeServer {
302+
pub(crate) fn new(inner: std::fs::File) -> Self {
303+
Self { inner }
304+
}
305+
306+
/// Attempts to clone the named pipe server.
307+
pub fn try_clone(&self) -> io::Result<Self> {
308+
self.inner.try_clone().map(Self::new)
309+
}
310+
}
311+
312+
impl IntoInner for SyncNamedPipeServer {
313+
type Inner = NamedPipeServer;
314+
315+
fn into_inner(self) -> Self::Inner {
316+
NamedPipeServer {
317+
handle: unsafe { AsyncFd::new_unchecked(self.inner) },
318+
}
319+
}
320+
}
321+
280322
/// A [Windows named pipe] client.
281323
///
282324
/// Constructed using [`ClientOptions::open`].
@@ -344,6 +386,19 @@ impl NamedPipeClient {
344386
// Safety: we're ensuring the lifetime of the named pipe.
345387
unsafe { named_pipe_info(self.as_raw_fd()) }
346388
}
389+
390+
/// Attempts to clone the named pipe client.
391+
pub fn try_clone(&self) -> io::Result<SyncNamedPipeClient> {
392+
self.handle.try_clone().map(SyncNamedPipeClient::new)
393+
}
394+
395+
/// Try to convert the named pipe client into a thread safe handle.
396+
pub fn try_into_sync(self) -> Result<SyncNamedPipeClient, Self> {
397+
match self.handle.try_into_sync() {
398+
Ok(file) => Ok(SyncNamedPipeClient::new(file)),
399+
Err(handle) => Err(Self { handle }),
400+
}
401+
}
347402
}
348403

349404
impl AsyncRead for NamedPipeClient {
@@ -443,6 +498,33 @@ impl Splittable for &NamedPipeClient {
443498

444499
impl_raw_fd!(NamedPipeClient, std::fs::File, handle, file);
445500

501+
/// A thread safe named pipe client.
502+
#[derive(Debug)]
503+
pub struct SyncNamedPipeClient {
504+
inner: SyncFile,
505+
}
506+
507+
impl SyncNamedPipeClient {
508+
pub(crate) fn new(inner: SyncFile) -> Self {
509+
Self { inner }
510+
}
511+
512+
/// Attempts to clone the named pipe client.
513+
pub fn try_clone(&self) -> io::Result<Self> {
514+
self.inner.try_clone().map(Self::new)
515+
}
516+
}
517+
518+
impl IntoInner for SyncNamedPipeClient {
519+
type Inner = NamedPipeClient;
520+
521+
fn into_inner(self) -> Self::Inner {
522+
NamedPipeClient {
523+
handle: self.inner.into_inner(),
524+
}
525+
}
526+
}
527+
446528
/// A builder structure for construct a named pipe with named pipe-specific
447529
/// options. This is required to use for named pipe servers who wants to modify
448530
/// pipe-related options.

0 commit comments

Comments
 (0)