Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/ci_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ jobs:
- os: 'ubuntu-22.04-arm'
- os: 'ubuntu-22.04'
features: 'sync'
- os: 'ubuntu-22.04'
features: 'compat-all'
- os: 'ubuntu-22.04'
features: 'polling,ring,compat-all'
no_default_features: true
- os: 'ubuntu-22.04'
features: 'polling' # fusion
- os: 'ubuntu-22.04'
Expand Down Expand Up @@ -66,13 +71,13 @@ jobs:
features: 'iocp-wait-packet'
- os: 'windows-latest'
target: 'x86_64-pc-windows-msvc'
features: 'native-tls,ring,py-dynamic-openssl'
features: 'native-tls,ring,py-dynamic-openssl,compat-all'
- os: 'windows-11-arm'
target: 'aarch64-pc-windows-msvc'
- os: 'macos-14'
- os: 'macos-15'
- os: 'macos-15'
features: 'native-tls,ring,py-dynamic-openssl'
features: 'native-tls,ring,py-dynamic-openssl,compat-all'
steps:
- uses: actions/checkout@v6
- name: Setup Rust Toolchain
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"compio-signal",
"compio-tls",
"compio-ws",
"compio-compat",
]
resolver = "3"

Expand All @@ -40,7 +41,9 @@ compio-tls = { path = "./compio-tls", version = "0.10.0-rc.1", default-features
compio-process = { path = "./compio-process", version = "0.9.0-rc.1" }
compio-quic = { path = "./compio-quic", version = "0.8.0-rc.1", default-features = false }
compio-ws = { path = "./compio-ws", version = "0.4.0-rc.1", default-features = false }
compio-compat = { path = "./compio-compat", version = "0.1.0-rc.1" }

async-io = "2.6.0"
bytes = "1.7.1"
bytemuck = "1.25.0"
cfg_aliases = "0.2.1"
Expand All @@ -54,6 +57,7 @@ futures-executor = "0.3.30"
futures-rustls = { version = "0.26.0", default-features = false }
futures-util = "0.3.29"
libc = "0.2.175"
mod_use = "0.2.3"
native-tls = "0.2.13"
compio-py-dynamic-openssl = "0.5.0"
nix = "0.31.1"
Expand Down
57 changes: 57 additions & 0 deletions compio-compat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
[package]
name = "compio-compat"
version = "0.1.0-rc.1"
description = "Compatibility layer for compio to work with various async runtimes."
categories = ["asynchronous", "filesystem", "network-programming"]
keywords = ["async", "fs", "iocp", "io-uring", "net"]
readme = "README.md"
edition = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[dependencies]
compio-runtime = { workspace = true }
compio-log = { workspace = true }

cfg-if = { workspace = true }
mod_use = { workspace = true }

[target.'cfg(windows)'.dependencies]
compio-driver = { workspace = true }

futures-channel = { workspace = true }
windows-sys = { workspace = true, features = ["Win32_System_Threading"] }
windows-threading = "0.2.1"

[target.'cfg(target_os = "linux")'.dependencies]
rustix = { workspace = true, features = ["event", "io_uring"] }

[target.'cfg(unix)'.dependencies]
tokio = { workspace = true, optional = true, features = ["net", "time"] }

async-io = { workspace = true, optional = true }
futures-util = { workspace = true, optional = true }

[dev-dependencies]
compio-fs = { workspace = true }
compio-net = { workspace = true }
compio-io = { workspace = true }

tokio = { workspace = true, features = [
"rt",
"macros",
"fs",
"net",
"io-util",
] }
futures-executor = { workspace = true }

futures-util = { workspace = true }

[features]
tokio = ["dep:tokio"]
futures = ["dep:async-io", "dep:futures-util"]

[[test]]
name = "net"
required-features = ["tokio"]
37 changes: 37 additions & 0 deletions compio-compat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<div align="center">
<a href='https://compio.rs'>
<img height="150" src="https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-with-text.svg">
</a>
</div>

---

# compio-compat

[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/compio-rs/compio/blob/master/LICENSE)
[![crates.io](https://img.shields.io/crates/v/compio-compat)](https://crates.io/crates/compio-compat)
[![docs.rs](https://img.shields.io/badge/docs.rs-compio--compat-latest)](https://docs.rs/compio-compat)
[![Check](https://github.com/compio-rs/compio/actions/workflows/ci_check.yml/badge.svg)](https://github.com/compio-rs/compio/actions/workflows/ci_check.yml)
[![Test](https://github.com/compio-rs/compio/actions/workflows/ci_test.yml/badge.svg)](https://github.com/compio-rs/compio/actions/workflows/ci_test.yml)

Run compio in other async runtimes.

## Usage

Use `compio::compat` re-exported from `compio` crate.

```rust
use compio::compat::{RuntimeCompat, TokioAdapter};

#[tokio::main]
async fn main() {
// Create a compio runtime:
let runtime = compio::runtime::Runtime::new().unwrap();
// Create the compat layer:
let runtime = RuntimeCompat::<TokioAdapter>::new(runtime).unwrap();
// Execute your future:
runtime.execute(async {
// Run compio-specific code
}).await;
}
```
89 changes: 89 additions & 0 deletions compio-compat/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Runtime-compatibility layers for compio.
//!
//! This crate provides a compatibility layer for compio's runtime, allowing it
//! to be used with different underlying event loop implementations, e.g.,
//! `tokio` or `smol`.

#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc(
html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]
#![doc(
html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
)]

use std::{
io,
ops::Deref,
task::{Context, Poll},
time::Duration,
};

use compio_log::error;
use compio_runtime::Runtime;
use mod_use::mod_use;

mod_use![sys];

/// A compatibility layer for [`Runtime`]. It is driven by the underlying
/// [`Adapter`].
pub struct RuntimeCompat<A> {
runtime: A,
}

impl<A: Adapter> RuntimeCompat<A> {
/// Creates a new [`RuntimeCompat`] with the given runtime.
pub fn new(runtime: Runtime) -> io::Result<Self> {
let runtime = A::new(runtime)?;
Ok(Self { runtime })
}

/// Executes the given future on the runtime, driving it to completion.
pub async fn execute<F: Future>(&self, f: F) -> F::Output {
let waker = self.runtime.waker();
Comment thread
Berrysoft marked this conversation as resolved.
let mut context = Context::from_waker(&waker);
let mut future = std::pin::pin!(f);
loop {
if let Poll::Ready(result) = self.runtime.enter(|| future.as_mut().poll(&mut context)) {
self.runtime.enter(|| self.runtime.run());
return result;
}

let mut remaining_tasks = self.runtime.enter(|| self.runtime.run());

remaining_tasks |= self.runtime.flush();

let timeout = if remaining_tasks {
Some(Duration::ZERO)
} else {
self.runtime.current_timeout()
};

match self.runtime.wait(timeout).await {
Ok(_) => {}
Err(e)
if matches!(
e.kind(),
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted
) => {}
Err(e) => panic!("failed to wait for driver: {e:?}"),
}

if let Err(_e) = self.runtime.clear() {
error!("failed to clear notifier: {_e:?}");
}

self.runtime.poll_with(Some(Duration::ZERO));
}
}
}

impl<A: Adapter> Deref for RuntimeCompat<A> {
type Target = Runtime;

fn deref(&self) -> &Self::Target {
&self.runtime
}
}
27 changes: 27 additions & 0 deletions compio-compat/src/sys/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::{io, ops::Deref, time::Duration};

use compio_runtime::Runtime;
use mod_use::mod_use;

cfg_if::cfg_if! {
if #[cfg(windows)] {
mod_use![windows];
} else if #[cfg(unix)] {
mod_use![unix];
} else {
compile_error!("Unsupported platform");
}
}

/// Adapter trait for different runtimes.
#[allow(async_fn_in_trait)]
pub trait Adapter: Sized + Deref<Target = Runtime> {
/// Creates a new adapter with the given runtime.
fn new(runtime: Runtime) -> io::Result<Self>;

/// Waits for the runtime to be ready, with an optional timeout.
async fn wait(&self, timeout: Option<Duration>) -> io::Result<()>;

/// Clears the runtime's state after waiting.
fn clear(&self) -> io::Result<()>;
}
41 changes: 41 additions & 0 deletions compio-compat/src/sys/unix/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::{io, ops::Deref, time::Duration};

use async_io::Async;
use compio_runtime::Runtime;
use futures_util::FutureExt;

use crate::{Adapter, sys::unix::UnixAdapter};

/// Adapter for general runtime. It is driven by `async-io`.
pub struct FuturesAdapter(Async<UnixAdapter>);

impl Adapter for FuturesAdapter {
fn new(runtime: Runtime) -> io::Result<Self> {
Ok(Self(Async::new_nonblocking(UnixAdapter::new(runtime)?)?))
}

async fn wait(&self, timeout: Option<Duration>) -> io::Result<()> {
let fut = self.0.readable();
if let Some(timeout) = timeout {
let timer = async_io::Timer::after(timeout);
futures_util::select! {
res = fut.fuse() => res,
_ = timer.fuse() => Err(io::ErrorKind::TimedOut.into()),
}
} else {
fut.await
}
}

fn clear(&self) -> io::Result<()> {
self.0.get_ref().clear()
}
}

impl Deref for FuturesAdapter {
type Target = Runtime;

fn deref(&self) -> &Self::Target {
self.0.get_ref()
}
}
Loading
Loading