Skip to content
This repository was archived by the owner on May 17, 2026. It is now read-only.

Commit 102a4d7

Browse files
committed
feat(unix): remove thread-based impl
1 parent f9474b3 commit 102a4d7

9 files changed

Lines changed: 49 additions & 159 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ resolver = "3"
44
[workspace.dependencies]
55
async-io = "2.6.0"
66
compio = { version = "0.19.0-rc.1", default-features = false }
7-
flume = "0.12.0"
87
futures-executor = "0.3"
98
futures-util = "0.3"
109
rustix = "1"
11-
smol = "2"
1210
tokio = "1"
1311

1412
[package]
@@ -41,25 +39,20 @@ rustix = { workspace = true, features = ["event", "io_uring"] }
4139
[target.'cfg(unix)'.dependencies]
4240
tokio = { workspace = true, optional = true, features = ["net", "time"] }
4341

44-
smol = { workspace = true, optional = true }
42+
async-io = { workspace = true, optional = true }
4543
futures-util = { workspace = true, optional = true }
4644

47-
flume = { workspace = true, optional = true }
48-
rustix = { workspace = true, optional = true, features = ["pipe"] }
49-
5045
[dev-dependencies]
5146
compio = { workspace = true, default-features = false, features = ["fs"] }
5247

5348
tokio = { workspace = true, features = ["rt", "macros", "fs", "io-util"] }
5449
futures-executor = { workspace = true }
55-
smol = { workspace = true }
5650

5751
[features]
5852
io-uring = ["compio/io-uring"]
5953
polling = ["compio/polling"]
60-
futures = ["dep:flume", "dep:rustix", "dep:futures-util"]
6154
tokio = ["dep:tokio"]
62-
smol = ["dep:smol", "dep:futures-util"]
55+
smol = ["dep:async-io", "dep:futures-util"]
6356
enable_log = ["compio-log/enable_log"]
6457

6558
[patch.crates-io]

src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ impl<A: sys::Adapter> RuntimeCompat<A> {
4141
self.runtime.current_timeout()
4242
};
4343

44-
// io-uring needs to be submitted before waiting.
45-
if self.runtime.driver_type().is_iouring() {
46-
self.runtime.poll_with(Some(Duration::ZERO));
47-
}
44+
self.runtime.submit();
4845

4946
match self.runtime.wait(timeout).await {
5047
Ok(_) => {}

src/sys/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ cfg_if::cfg_if! {
1717
pub trait Adapter: Sized + Deref<Target = Runtime> {
1818
fn new(runtime: Runtime) -> io::Result<Self>;
1919

20+
fn submit(&self) {
21+
// io-uring needs to be submitted before waiting.
22+
if self.driver_type().is_iouring() {
23+
self.poll_with(Some(Duration::ZERO));
24+
}
25+
}
26+
2027
async fn wait(&self, timeout: Option<Duration>) -> io::Result<()>;
2128

2229
fn clear(&self) -> io::Result<()>;

src/sys/unix/futures.rs

Lines changed: 0 additions & 117 deletions
This file was deleted.

src/sys/unix/mod.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
use std::os::fd::OwnedFd;
33
use std::{
44
io,
5+
ops::Deref,
56
os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
67
};
78

@@ -11,22 +12,18 @@ use mod_use::mod_use;
1112
#[cfg(feature = "tokio")]
1213
mod_use![tokio];
1314

14-
#[cfg(feature = "futures")]
15-
mod_use![futures];
16-
1715
#[cfg(feature = "smol")]
1816
mod_use![smol];
1917

2018
struct UnixAdapter {
21-
driver: RawFd,
19+
runtime: Runtime,
2220
#[cfg(target_os = "linux")]
2321
efd: Option<OwnedFd>,
2422
}
2523

2624
#[cfg(target_os = "linux")]
2725
impl UnixAdapter {
28-
fn new(runtime: &Runtime) -> io::Result<Self> {
29-
let driver = runtime.as_raw_fd();
26+
fn new(runtime: Runtime) -> io::Result<Self> {
3027
if runtime.driver_type().is_iouring() {
3128
use rustix::{
3229
event::{EventfdFlags, eventfd},
@@ -37,18 +34,18 @@ impl UnixAdapter {
3734
let efd_raw = efd.as_raw_fd();
3835
unsafe {
3936
io_uring_register(
40-
BorrowedFd::borrow_raw(driver),
37+
BorrowedFd::borrow_raw(runtime.as_raw_fd()),
4138
IoringRegisterOp::RegisterEventfd,
4239
(&raw const efd_raw).cast(),
4340
1,
4441
)?;
4542
}
4643
Ok(Self {
47-
driver,
44+
runtime,
4845
efd: Some(efd),
4946
})
5047
} else {
51-
Ok(Self { driver, efd: None })
48+
Ok(Self { runtime, efd: None })
5249
}
5350
}
5451

@@ -80,11 +77,11 @@ impl AsRawFd for UnixAdapter {
8077
self.efd
8178
.as_ref()
8279
.map(|f| f.as_raw_fd())
83-
.unwrap_or(self.driver)
80+
.unwrap_or_else(|| self.runtime.as_raw_fd())
8481
}
8582
#[cfg(not(target_os = "linux"))]
8683
{
87-
self.driver
84+
self.runtime.as_raw_fd()
8885
}
8986
}
9087
}
@@ -94,3 +91,11 @@ impl AsFd for UnixAdapter {
9491
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
9592
}
9693
}
94+
95+
impl Deref for UnixAdapter {
96+
type Target = Runtime;
97+
98+
fn deref(&self) -> &Self::Target {
99+
&self.runtime
100+
}
101+
}

src/sys/unix/smol.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1-
use std::{io, time::Duration};
1+
use std::{io, ops::Deref, time::Duration};
22

3+
use async_io::Async;
34
use compio::runtime::Runtime;
45
use futures_util::FutureExt;
5-
use smol::Async;
66

77
use crate::{Adapter, sys::unix::UnixAdapter};
88

99
pub struct SmolAdapter(Async<UnixAdapter>);
1010

1111
impl Adapter for SmolAdapter {
12-
fn new(runtime: &Runtime) -> io::Result<Self> {
12+
fn new(runtime: Runtime) -> io::Result<Self> {
1313
Ok(Self(Async::new_nonblocking(UnixAdapter::new(runtime)?)?))
1414
}
1515

1616
async fn wait(&self, timeout: Option<Duration>) -> io::Result<()> {
1717
let fut = self.0.readable();
1818
if let Some(timeout) = timeout {
19-
let timer = smol::Timer::after(timeout);
19+
let timer = async_io::Timer::after(timeout);
2020
futures_util::select! {
2121
res = fut.fuse() => res,
2222
_ = timer.fuse() => Err(io::ErrorKind::TimedOut.into()),
@@ -30,3 +30,11 @@ impl Adapter for SmolAdapter {
3030
self.0.get_ref().clear()
3131
}
3232
}
33+
34+
impl Deref for SmolAdapter {
35+
type Target = Runtime;
36+
37+
fn deref(&self) -> &Self::Target {
38+
self.0.get_ref()
39+
}
40+
}

src/sys/unix/tokio.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{io, time::Duration};
1+
use std::{io, ops::Deref, time::Duration};
22

33
use compio::runtime::Runtime;
44
use tokio::io::{Interest, unix::AsyncFd};
@@ -8,7 +8,7 @@ use crate::{Adapter, sys::unix::UnixAdapter};
88
pub struct TokioAdapter(AsyncFd<UnixAdapter>);
99

1010
impl Adapter for TokioAdapter {
11-
fn new(runtime: &Runtime) -> io::Result<Self> {
11+
fn new(runtime: Runtime) -> io::Result<Self> {
1212
Ok(Self(AsyncFd::with_interest(
1313
UnixAdapter::new(runtime)?,
1414
Interest::READABLE,
@@ -30,3 +30,11 @@ impl Adapter for TokioAdapter {
3030
self.0.get_ref().clear()
3131
}
3232
}
33+
34+
impl Deref for TokioAdapter {
35+
type Target = Runtime;
36+
37+
fn deref(&self) -> &Self::Target {
38+
self.0.get_ref()
39+
}
40+
}

src/sys/windows.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,6 @@ macro_rules! impl_adapter {
9090
};
9191
}
9292

93-
#[cfg(feature = "futures")]
94-
impl_adapter!(FuturesAdapter);
95-
9693
#[cfg(feature = "tokio")]
9794
impl_adapter!(TokioAdapter);
9895

tests/fs.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,10 @@ async fn tokio() {
2727
test_impl::<compio_compat::TokioAdapter>().await;
2828
}
2929

30-
#[cfg(feature = "futures")]
31-
#[test]
32-
fn futures() {
33-
futures_executor::block_on(async {
34-
test_impl::<compio_compat::FuturesAdapter>().await;
35-
})
36-
}
37-
3830
#[cfg(feature = "smol")]
3931
#[test]
4032
fn smol() {
41-
smol::block_on(async {
33+
futures_executor::block_on(async {
4234
test_impl::<compio_compat::SmolAdapter>().await;
4335
})
4436
}

0 commit comments

Comments
 (0)