diff --git a/.github/workflows/ci_test.yml b/.github/workflows/ci_test.yml index fcc344a49..f32695787 100644 --- a/.github/workflows/ci_test.yml +++ b/.github/workflows/ci_test.yml @@ -37,6 +37,11 @@ jobs: - os: 'ubuntu-22.04-arm' - os: 'ubuntu-22.04' features: 'sync' + - os: 'ubuntu-22.04' + features: 'compat-all' + - os: 'ubuntu-22.04' + features: 'polling,ring,compat-all' + no_default_features: true - os: 'ubuntu-22.04' features: 'polling' # fusion - os: 'ubuntu-22.04' @@ -66,13 +71,13 @@ jobs: features: 'iocp-wait-packet' - os: 'windows-latest' target: 'x86_64-pc-windows-msvc' - features: 'native-tls,ring,py-dynamic-openssl' + features: 'native-tls,ring,py-dynamic-openssl,compat-all' - os: 'windows-11-arm' target: 'aarch64-pc-windows-msvc' - os: 'macos-14' - os: 'macos-15' - os: 'macos-15' - features: 'native-tls,ring,py-dynamic-openssl' + features: 'native-tls,ring,py-dynamic-openssl,compat-all' steps: - uses: actions/checkout@v6 - name: Setup Rust Toolchain diff --git a/Cargo.toml b/Cargo.toml index 4aeafeb29..17a5963c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "compio-signal", "compio-tls", "compio-ws", + "compio-compat", ] resolver = "3" @@ -40,7 +41,9 @@ compio-tls = { path = "./compio-tls", version = "0.10.0-rc.1", default-features compio-process = { path = "./compio-process", version = "0.9.0-rc.1" } compio-quic = { path = "./compio-quic", version = "0.8.0-rc.1", default-features = false } compio-ws = { path = "./compio-ws", version = "0.4.0-rc.1", default-features = false } +compio-compat = { path = "./compio-compat", version = "0.1.0-rc.1" } +async-io = "2.6.0" bytes = "1.7.1" bytemuck = "1.25.0" cfg_aliases = "0.2.1" @@ -54,6 +57,7 @@ futures-executor = "0.3.30" futures-rustls = { version = "0.26.0", default-features = false } futures-util = "0.3.29" libc = "0.2.175" +mod_use = "0.2.3" native-tls = "0.2.13" compio-py-dynamic-openssl = "0.5.0" nix = "0.31.1" diff --git a/compio-compat/Cargo.toml b/compio-compat/Cargo.toml new file mode 100644 index 000000000..f65cfcf87 --- /dev/null +++ b/compio-compat/Cargo.toml @@ -0,0 +1,57 @@ +[package] +name = "compio-compat" +version = "0.1.0-rc.1" +description = "Compatibility layer for compio to work with various async runtimes." +categories = ["asynchronous", "filesystem", "network-programming"] +keywords = ["async", "fs", "iocp", "io-uring", "net"] +readme = "README.md" +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +compio-runtime = { workspace = true } +compio-log = { workspace = true } + +cfg-if = { workspace = true } +mod_use = { workspace = true } + +[target.'cfg(windows)'.dependencies] +compio-driver = { workspace = true } + +futures-channel = { workspace = true } +windows-sys = { workspace = true, features = ["Win32_System_Threading"] } +windows-threading = "0.2.1" + +[target.'cfg(target_os = "linux")'.dependencies] +rustix = { workspace = true, features = ["event", "io_uring"] } + +[target.'cfg(unix)'.dependencies] +tokio = { workspace = true, optional = true, features = ["net", "time"] } + +async-io = { workspace = true, optional = true } +futures-util = { workspace = true, optional = true } + +[dev-dependencies] +compio-fs = { workspace = true } +compio-net = { workspace = true } +compio-io = { workspace = true } + +tokio = { workspace = true, features = [ + "rt", + "macros", + "fs", + "net", + "io-util", +] } +futures-executor = { workspace = true } + +futures-util = { workspace = true } + +[features] +tokio = ["dep:tokio"] +futures = ["dep:async-io", "dep:futures-util"] + +[[test]] +name = "net" +required-features = ["tokio"] diff --git a/compio-compat/README.md b/compio-compat/README.md new file mode 100644 index 000000000..1a2216626 --- /dev/null +++ b/compio-compat/README.md @@ -0,0 +1,37 @@ +
+ + + +
+ +--- + +# compio-compat + +[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/compio-rs/compio/blob/master/LICENSE) +[![crates.io](https://img.shields.io/crates/v/compio-compat)](https://crates.io/crates/compio-compat) +[![docs.rs](https://img.shields.io/badge/docs.rs-compio--compat-latest)](https://docs.rs/compio-compat) +[![Check](https://github.com/compio-rs/compio/actions/workflows/ci_check.yml/badge.svg)](https://github.com/compio-rs/compio/actions/workflows/ci_check.yml) +[![Test](https://github.com/compio-rs/compio/actions/workflows/ci_test.yml/badge.svg)](https://github.com/compio-rs/compio/actions/workflows/ci_test.yml) + +Run compio in other async runtimes. + +## Usage + +Use `compio::compat` re-exported from `compio` crate. + +```rust +use compio::compat::{RuntimeCompat, TokioAdapter}; + +#[tokio::main] +async fn main() { + // Create a compio runtime: + let runtime = compio::runtime::Runtime::new().unwrap(); + // Create the compat layer: + let runtime = RuntimeCompat::::new(runtime).unwrap(); + // Execute your future: + runtime.execute(async { + // Run compio-specific code + }).await; +} +``` diff --git a/compio-compat/src/lib.rs b/compio-compat/src/lib.rs new file mode 100644 index 000000000..d37462ed3 --- /dev/null +++ b/compio-compat/src/lib.rs @@ -0,0 +1,89 @@ +//! Runtime-compatibility layers for compio. +//! +//! This crate provides a compatibility layer for compio's runtime, allowing it +//! to be used with different underlying event loop implementations, e.g., +//! `tokio` or `smol`. + +#![cfg_attr(docsrs, feature(doc_cfg))] +#![warn(missing_docs)] +#![deny(rustdoc::broken_intra_doc_links)] +#![doc( + html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg" +)] +#![doc( + html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg" +)] + +use std::{ + io, + ops::Deref, + task::{Context, Poll}, + time::Duration, +}; + +use compio_log::error; +use compio_runtime::Runtime; +use mod_use::mod_use; + +mod_use![sys]; + +/// A compatibility layer for [`Runtime`]. It is driven by the underlying +/// [`Adapter`]. +pub struct RuntimeCompat { + runtime: A, +} + +impl RuntimeCompat { + /// Creates a new [`RuntimeCompat`] with the given runtime. + pub fn new(runtime: Runtime) -> io::Result { + let runtime = A::new(runtime)?; + Ok(Self { runtime }) + } + + /// Executes the given future on the runtime, driving it to completion. + pub async fn execute(&self, f: F) -> F::Output { + let waker = self.runtime.waker(); + let mut context = Context::from_waker(&waker); + let mut future = std::pin::pin!(f); + loop { + if let Poll::Ready(result) = self.runtime.enter(|| future.as_mut().poll(&mut context)) { + self.runtime.enter(|| self.runtime.run()); + return result; + } + + let mut remaining_tasks = self.runtime.enter(|| self.runtime.run()); + + remaining_tasks |= self.runtime.flush(); + + let timeout = if remaining_tasks { + Some(Duration::ZERO) + } else { + self.runtime.current_timeout() + }; + + match self.runtime.wait(timeout).await { + Ok(_) => {} + Err(e) + if matches!( + e.kind(), + io::ErrorKind::TimedOut | io::ErrorKind::Interrupted + ) => {} + Err(e) => panic!("failed to wait for driver: {e:?}"), + } + + if let Err(_e) = self.runtime.clear() { + error!("failed to clear notifier: {_e:?}"); + } + + self.runtime.poll_with(Some(Duration::ZERO)); + } + } +} + +impl Deref for RuntimeCompat { + type Target = Runtime; + + fn deref(&self) -> &Self::Target { + &self.runtime + } +} diff --git a/compio-compat/src/sys/mod.rs b/compio-compat/src/sys/mod.rs new file mode 100644 index 000000000..b5f96df32 --- /dev/null +++ b/compio-compat/src/sys/mod.rs @@ -0,0 +1,27 @@ +use std::{io, ops::Deref, time::Duration}; + +use compio_runtime::Runtime; +use mod_use::mod_use; + +cfg_if::cfg_if! { + if #[cfg(windows)] { + mod_use![windows]; + } else if #[cfg(unix)] { + mod_use![unix]; + } else { + compile_error!("Unsupported platform"); + } +} + +/// Adapter trait for different runtimes. +#[allow(async_fn_in_trait)] +pub trait Adapter: Sized + Deref { + /// Creates a new adapter with the given runtime. + fn new(runtime: Runtime) -> io::Result; + + /// Waits for the runtime to be ready, with an optional timeout. + async fn wait(&self, timeout: Option) -> io::Result<()>; + + /// Clears the runtime's state after waiting. + fn clear(&self) -> io::Result<()>; +} diff --git a/compio-compat/src/sys/unix/futures.rs b/compio-compat/src/sys/unix/futures.rs new file mode 100644 index 000000000..11d88d077 --- /dev/null +++ b/compio-compat/src/sys/unix/futures.rs @@ -0,0 +1,41 @@ +use std::{io, ops::Deref, time::Duration}; + +use async_io::Async; +use compio_runtime::Runtime; +use futures_util::FutureExt; + +use crate::{Adapter, sys::unix::UnixAdapter}; + +/// Adapter for general runtime. It is driven by `async-io`. +pub struct FuturesAdapter(Async); + +impl Adapter for FuturesAdapter { + fn new(runtime: Runtime) -> io::Result { + Ok(Self(Async::new_nonblocking(UnixAdapter::new(runtime)?)?)) + } + + async fn wait(&self, timeout: Option) -> io::Result<()> { + let fut = self.0.readable(); + if let Some(timeout) = timeout { + let timer = async_io::Timer::after(timeout); + futures_util::select! { + res = fut.fuse() => res, + _ = timer.fuse() => Err(io::ErrorKind::TimedOut.into()), + } + } else { + fut.await + } + } + + fn clear(&self) -> io::Result<()> { + self.0.get_ref().clear() + } +} + +impl Deref for FuturesAdapter { + type Target = Runtime; + + fn deref(&self) -> &Self::Target { + self.0.get_ref() + } +} diff --git a/compio-compat/src/sys/unix/mod.rs b/compio-compat/src/sys/unix/mod.rs new file mode 100644 index 000000000..3a88e2027 --- /dev/null +++ b/compio-compat/src/sys/unix/mod.rs @@ -0,0 +1,108 @@ +#[cfg(target_os = "linux")] +use std::os::fd::OwnedFd; +use std::{ + io, + ops::Deref, + os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, +}; + +use compio_runtime::Runtime; +use mod_use::mod_use; + +#[cfg(feature = "tokio")] +mod_use![tokio]; + +#[cfg(feature = "futures")] +mod_use![futures]; + +struct UnixAdapter { + runtime: Runtime, + #[cfg(target_os = "linux")] + efd: Option, +} + +#[cfg(target_os = "linux")] +impl UnixAdapter { + fn new(runtime: Runtime) -> io::Result { + if runtime.driver_type().is_iouring() { + use rustix::{ + event::{EventfdFlags, eventfd}, + io_uring::{IoringRegisterOp, io_uring_register}, + }; + + let efd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?; + let efd_raw = efd.as_raw_fd(); + unsafe { + io_uring_register( + BorrowedFd::borrow_raw(runtime.as_raw_fd()), + IoringRegisterOp::RegisterEventfd, + (&raw const efd_raw).cast(), + 1, + )?; + } + Ok(Self { + runtime, + efd: Some(efd), + }) + } else { + Ok(Self { runtime, efd: None }) + } + } + + fn clear(&self) -> io::Result<()> { + if let Some(efd) = &self.efd { + let mut buf = [0u8; 8]; + match rustix::io::read(efd, &mut buf) { + Ok(_) => {} + Err(e) + if matches!( + e.kind(), + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted + ) => {} + Err(e) => return Err(io::Error::from(e)), + } + } + Ok(()) + } +} + +#[cfg(not(target_os = "linux"))] +impl UnixAdapter { + fn new(runtime: Runtime) -> io::Result { + Ok(Self { runtime }) + } + + fn clear(&self) -> io::Result<()> { + Ok(()) + } +} + +impl AsRawFd for UnixAdapter { + fn as_raw_fd(&self) -> RawFd { + #[cfg(target_os = "linux")] + { + self.efd + .as_ref() + .map(|f| f.as_raw_fd()) + .unwrap_or_else(|| self.runtime.as_raw_fd()) + } + #[cfg(not(target_os = "linux"))] + { + self.runtime.as_raw_fd() + } + } +} + +impl AsFd for UnixAdapter { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} + +impl Deref for UnixAdapter { + type Target = Runtime; + + fn deref(&self) -> &Self::Target { + &self.runtime + } +} diff --git a/compio-compat/src/sys/unix/tokio.rs b/compio-compat/src/sys/unix/tokio.rs new file mode 100644 index 000000000..29210474b --- /dev/null +++ b/compio-compat/src/sys/unix/tokio.rs @@ -0,0 +1,41 @@ +use std::{io, ops::Deref, time::Duration}; + +use compio_runtime::Runtime; +use tokio::io::{Interest, unix::AsyncFd}; + +use crate::{Adapter, sys::unix::UnixAdapter}; + +/// Adapter for `tokio` runtime. +pub struct TokioAdapter(AsyncFd); + +impl Adapter for TokioAdapter { + fn new(runtime: Runtime) -> io::Result { + Ok(Self(AsyncFd::with_interest( + UnixAdapter::new(runtime)?, + Interest::READABLE, + )?)) + } + + async fn wait(&self, timeout: Option) -> io::Result<()> { + let fut = self.0.readable(); + let mut guard = if let Some(timeout) = timeout { + tokio::time::timeout(timeout, fut).await?? + } else { + fut.await? + }; + guard.clear_ready(); + Ok(()) + } + + fn clear(&self) -> io::Result<()> { + self.0.get_ref().clear() + } +} + +impl Deref for TokioAdapter { + type Target = Runtime; + + fn deref(&self) -> &Self::Target { + self.0.get_ref() + } +} diff --git a/compio-compat/src/sys/windows.rs b/compio-compat/src/sys/windows.rs new file mode 100644 index 000000000..48ae2c766 --- /dev/null +++ b/compio-compat/src/sys/windows.rs @@ -0,0 +1,105 @@ +use std::{ + io, + os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, + time::Duration, +}; + +use compio_driver::AsRawFd; +use compio_runtime::Runtime; +use windows_sys::Win32::{ + Foundation::{WAIT_FAILED, WAIT_TIMEOUT}, + System::Threading::{CreateEventW, INFINITE, SetEvent, WaitForMultipleObjects}, +}; + +use crate::sys::Adapter; + +struct WindowsAdapter { + runtime: Runtime, +} + +impl WindowsAdapter { + fn new(runtime: Runtime) -> io::Result { + Ok(Self { runtime }) + } + + async fn wait(&self, timeout: Option) -> io::Result<()> { + let (sender, receiver) = futures_channel::oneshot::channel::>(); + let event = unsafe { CreateEventW(std::ptr::null(), 0, 0, std::ptr::null()) }; + if event.is_null() { + return Err(io::Error::last_os_error()); + } + let event_handle = unsafe { OwnedHandle::from_raw_handle(event as RawHandle) }; + + let timeout = match timeout { + Some(timeout) => timeout.as_millis() as u32, + None => INFINITE, + }; + + struct EventGuard(OwnedHandle); + + impl Drop for EventGuard { + fn drop(&mut self) { + unsafe { SetEvent(self.0.as_raw_handle()) }; + } + } + + let _event_handle = EventGuard(event_handle); + let event = event as usize; + let driver = self.runtime.as_raw_fd() as usize; + windows_threading::submit(move || { + let handles = [event as RawHandle, driver as RawHandle]; + let res = unsafe { WaitForMultipleObjects(2, handles.as_ptr(), 0, timeout) }; + let res = match res { + WAIT_FAILED => Err(io::Error::last_os_error()), + WAIT_TIMEOUT => Err(io::ErrorKind::TimedOut.into()), + _ => Ok(()), + }; + sender.send(res).ok(); + }); + receiver + .await + .map_err(|_| io::ErrorKind::Interrupted.into()) + .flatten() + } +} + +macro_rules! impl_adapter { + ($(#[$($attr:meta)*])? $name:ident) => { + $(#[$($attr)*])? + pub struct $name(WindowsAdapter); + + impl Adapter for $name { + fn new(runtime: Runtime) -> io::Result { + WindowsAdapter::new(runtime).map(Self) + } + + async fn wait(&self, timeout: Option) -> io::Result<()> { + self.0.wait(timeout).await + } + + fn clear(&self) -> io::Result<()> { + Ok(()) + } + } + + impl std::ops::Deref for $name { + type Target = Runtime; + + fn deref(&self) -> &Self::Target { + &self.0.runtime + } + } + }; +} + +#[cfg(feature = "tokio")] +impl_adapter! { + /// Adapter for `tokio` runtime. + TokioAdapter +} + +#[cfg(feature = "futures")] +impl_adapter! { + /// Adapter for general runtime. + FuturesAdapter +} diff --git a/compio-compat/tests/fs.rs b/compio-compat/tests/fs.rs new file mode 100644 index 000000000..bc144d807 --- /dev/null +++ b/compio-compat/tests/fs.rs @@ -0,0 +1,38 @@ +use std::io::Read; + +use compio_compat::{Adapter, RuntimeCompat}; +use compio_fs::File; +use compio_io::AsyncReadAtExt; +use compio_runtime::Runtime; + +async fn test_impl() { + let runtime = Runtime::new().unwrap(); + let runtime = RuntimeCompat::::new(runtime).unwrap(); + let buffer = runtime + .execute(async { + let file = File::open("Cargo.toml").await.unwrap(); + let (_, buffer) = file.read_to_string_at(String::new(), 0).await.unwrap(); + buffer + }) + .await; + + let mut file = std::fs::File::open("Cargo.toml").unwrap(); + let mut expected = String::new(); + file.read_to_string(&mut expected).unwrap(); + + assert_eq!(buffer, expected); +} + +#[cfg(feature = "tokio")] +#[tokio::test] +async fn tokio() { + test_impl::().await; +} + +#[cfg(feature = "futures")] +#[test] +fn futures() { + futures_executor::block_on(async { + test_impl::().await; + }) +} diff --git a/compio-compat/tests/net.rs b/compio-compat/tests/net.rs new file mode 100644 index 000000000..35c262cfd --- /dev/null +++ b/compio-compat/tests/net.rs @@ -0,0 +1,75 @@ +use compio_compat::{RuntimeCompat, TokioAdapter}; +use compio_runtime::Runtime; + +#[tokio::test] +async fn compio_client() { + use compio_net::TcpStream; + use tokio::net::TcpListener; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = async { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let (mut stream, _) = listener.accept().await.unwrap(); + let mut buffer = [0u8; 12]; + stream.read_exact(&mut buffer).await.unwrap(); + stream.write_all(&buffer).await.unwrap(); + stream.shutdown().await.unwrap(); + }; + + let runtime = RuntimeCompat::::new(Runtime::new().unwrap()).unwrap(); + let client = runtime.execute(async { + use compio_io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; + + let mut stream = TcpStream::connect(addr).await.unwrap(); + stream.write_all(b"hello world!").await.unwrap(); + stream.shutdown().await.unwrap(); + let buffer = [0u8; 12]; + let (_, buffer) = stream.read_exact(buffer).await.unwrap(); + assert_eq!(&buffer, b"hello world!"); + stream.close().await.unwrap(); + }); + + tokio::join!(server, client); +} + +#[tokio::test] +async fn compio_server() { + use compio_net::TcpListener; + use tokio::net::TcpStream; + + let runtime = RuntimeCompat::::new(Runtime::new().unwrap()).unwrap(); + let listener = runtime + .execute(TcpListener::bind("127.0.0.1:0")) + .await + .unwrap(); + let addr = listener.local_addr().unwrap(); + let server = runtime.execute(async { + use compio_io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + + let (mut stream, _) = listener.accept().await.unwrap(); + let buffer = [0u8; 12]; + let (_, buffer) = stream.read_exact(buffer).await.unwrap(); + stream.write_all(buffer).await.unwrap(); + stream.shutdown().await.unwrap(); + // It's a good practice to read after shutdown to ensure that FIN is sent and + // received properly, especially when using compio. + stream.read([0u8]).await.unwrap(); + stream.close().await.unwrap(); + }); + + let client = async { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let mut stream = TcpStream::connect(addr).await.unwrap(); + stream.write_all(b"hello world!").await.unwrap(); + stream.shutdown().await.unwrap(); + let mut buffer = [0u8; 12]; + stream.read_exact(&mut buffer).await.unwrap(); + assert_eq!(&buffer, b"hello world!"); + stream.shutdown().await.unwrap(); + }; + + tokio::join!(server, client); +} diff --git a/compio-driver/Cargo.toml b/compio-driver/Cargo.toml index de5dea052..9889763ee 100644 --- a/compio-driver/Cargo.toml +++ b/compio-driver/Cargo.toml @@ -34,7 +34,7 @@ thin-cell = { workspace = true, features = ["weak"] } synchrony = { workspace = true, features = ["waker_slot"] } bitflags = { version = "2.11.0" } paste = { workspace = true } -mod_use = "0.2.3" +mod_use = { workspace = true } rustix = { workspace = true, features = ["net"] } # Windows specific dependencies diff --git a/compio/Cargo.toml b/compio/Cargo.toml index 41cc0c938..9e1c845b4 100644 --- a/compio/Cargo.toml +++ b/compio/Cargo.toml @@ -35,6 +35,7 @@ compio-tls = { workspace = true, optional = true } compio-process = { workspace = true, optional = true } compio-quic = { workspace = true, optional = true } compio-ws = { workspace = true, optional = true } +compio-compat = { workspace = true, optional = true } # Shared dev dependencies for all platforms [dev-dependencies] @@ -44,6 +45,7 @@ compio-macros = { workspace = true } criterion = { workspace = true, features = ["async_tokio"] } futures-channel = { workspace = true } +futures-executor = { workspace = true } futures-util = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } @@ -176,6 +178,12 @@ native-tls-vendored = [ "compio-ws?/native-tls-vendored", ] +# Runtime compatibility layers +compat = ["dep:compio-compat"] +compat-tokio = ["compat", "compio-compat/tokio"] +compat-futures = ["compat", "compio-compat/futures"] +compat-all = ["compat-tokio", "compat-futures"] + # Logging enable_log = ["compio-log/enable_log"] diff --git a/compio/benches/compat/in_futures.rs b/compio/benches/compat/in_futures.rs new file mode 100644 index 000000000..e94d59430 --- /dev/null +++ b/compio/benches/compat/in_futures.rs @@ -0,0 +1,26 @@ +use compio::compat::{FuturesAdapter, RuntimeCompat}; +use criterion::async_executor::AsyncExecutor; + +pub struct CompioInFutures { + runtime: RuntimeCompat, +} + +impl Default for CompioInFutures { + fn default() -> Self { + let runtime = + RuntimeCompat::::new(compio::runtime::Runtime::new().unwrap()).unwrap(); + Self { runtime } + } +} + +impl AsyncExecutor for CompioInFutures { + fn block_on(&self, future: impl Future) -> T { + (&self).block_on(future) + } +} + +impl AsyncExecutor for &CompioInFutures { + fn block_on(&self, future: impl Future) -> T { + futures_executor::block_on(self.runtime.execute(future)) + } +} diff --git a/compio/benches/compat/in_tokio.rs b/compio/benches/compat/in_tokio.rs new file mode 100644 index 000000000..dd07ec4a4 --- /dev/null +++ b/compio/benches/compat/in_tokio.rs @@ -0,0 +1,32 @@ +use compio::compat::{RuntimeCompat, TokioAdapter}; +use criterion::async_executor::AsyncExecutor; + +pub struct CompioInTokio { + truntime: tokio::runtime::Runtime, + cruntime: RuntimeCompat, +} + +impl Default for CompioInTokio { + fn default() -> Self { + let truntime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _guard = truntime.enter(); + let cruntime = + RuntimeCompat::::new(compio::runtime::Runtime::new().unwrap()).unwrap(); + Self { truntime, cruntime } + } +} + +impl AsyncExecutor for CompioInTokio { + fn block_on(&self, future: impl Future) -> T { + (&self).block_on(future) + } +} + +impl AsyncExecutor for &CompioInTokio { + fn block_on(&self, future: impl Future) -> T { + self.truntime.block_on(self.cruntime.execute(future)) + } +} diff --git a/compio/benches/compat/mod.rs b/compio/benches/compat/mod.rs new file mode 100644 index 000000000..83810b95d --- /dev/null +++ b/compio/benches/compat/mod.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "compat-tokio")] +mod in_tokio; +#[cfg(feature = "compat-tokio")] +pub use in_tokio::*; + +#[cfg(feature = "compat-futures")] +mod in_futures; +#[cfg(feature = "compat-futures")] +pub use in_futures::*; diff --git a/compio/benches/fs.rs b/compio/benches/fs.rs index edc6ab5da..921e0d342 100644 --- a/compio/benches/fs.rs +++ b/compio/benches/fs.rs @@ -1,7 +1,7 @@ use std::{ io::{Read, Seek, SeekFrom, Write}, path::Path, - time::Instant, + time::{Duration, Instant}, }; use compio_buf::{IntoInner, IoBuf}; @@ -16,6 +16,11 @@ mod monoio_wrap; #[cfg(all(target_os = "linux", target_env = "gnu"))] use monoio_wrap::MonoioRuntime; +#[cfg(feature = "compat")] +mod compat; +#[cfg(feature = "compat")] +use compat::*; + criterion_group!(fs, read, write); criterion_main!(fs); @@ -61,20 +66,37 @@ fn read_tokio(b: &mut Bencher, (path, offsets): &(&Path, &[u64])) { }) } +async fn read_compio_impl(iter: u64, path: &Path, offsets: &[u64]) -> Duration { + let file = compio::fs::File::open(path).await.unwrap(); + + let mut buffer = vec![0u8; BUFFER_SIZE]; + let start = Instant::now(); + for _i in 0..iter { + for &offset in offsets { + (_, buffer) = file.read_at(buffer, offset).await.unwrap(); + } + } + start.elapsed() +} + fn read_compio(b: &mut Bencher, (path, offsets): &(&Path, &[u64])) { let runtime = compio::runtime::Runtime::new().unwrap(); - b.to_async(&runtime).iter_custom(|iter| async move { - let file = compio::fs::File::open(path).await.unwrap(); + b.to_async(&runtime) + .iter_custom(|iter| read_compio_impl(iter, path, offsets)) +} - let mut buffer = vec![0u8; BUFFER_SIZE]; - let start = Instant::now(); - for _i in 0..iter { - for &offset in *offsets { - (_, buffer) = file.read_at(buffer, offset).await.unwrap(); - } - } - start.elapsed() - }) +#[cfg(feature = "compat-tokio")] +fn read_compio_in_tokio(b: &mut Bencher, (path, offsets): &(&Path, &[u64])) { + let runtime = CompioInTokio::default(); + b.to_async(&runtime) + .iter_custom(|iter| read_compio_impl(iter, path, offsets)) +} + +#[cfg(feature = "compat-futures")] +fn read_compio_in_futures(b: &mut Bencher, (path, offsets): &(&Path, &[u64])) { + let runtime = CompioInFutures::default(); + b.to_async(&runtime) + .iter_custom(|iter| read_compio_impl(iter, path, offsets)) } #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -135,23 +157,40 @@ fn read_all_tokio(b: &mut Bencher, (path, len): &(&Path, u64)) { }) } +async fn read_all_compio_impl(iter: u64, path: &Path, len: u64) -> Duration { + let file = compio::fs::File::open(path).await.unwrap(); + let mut buffer = vec![0u8; BUFFER_SIZE]; + + let start = Instant::now(); + for _i in 0..iter { + let mut read_len = 0; + while read_len < len { + let read; + (read, buffer) = file.read_at(buffer, read_len).await.unwrap(); + read_len += read as u64; + } + } + start.elapsed() +} + fn read_all_compio(b: &mut Bencher, (path, len): &(&Path, u64)) { let runtime = compio::runtime::Runtime::new().unwrap(); - b.to_async(&runtime).iter_custom(|iter| async move { - let file = compio::fs::File::open(path).await.unwrap(); - let mut buffer = vec![0u8; BUFFER_SIZE]; + b.to_async(&runtime) + .iter_custom(|iter| read_all_compio_impl(iter, path, *len)) +} - let start = Instant::now(); - for _i in 0..iter { - let mut read_len = 0; - while read_len < *len { - let read; - (read, buffer) = file.read_at(buffer, read_len).await.unwrap(); - read_len += read as u64; - } - } - start.elapsed() - }) +#[cfg(feature = "compat-tokio")] +fn read_all_compio_in_tokio(b: &mut Bencher, (path, len): &(&Path, u64)) { + let runtime = CompioInTokio::default(); + b.to_async(&runtime) + .iter_custom(|iter| read_all_compio_impl(iter, path, *len)) +} + +#[cfg(feature = "compat-futures")] +fn read_all_compio_in_futures(b: &mut Bencher, (path, len): &(&Path, u64)) { + let runtime = CompioInFutures::default(); + b.to_async(&runtime) + .iter_custom(|iter| read_all_compio_impl(iter, path, *len)) } #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -197,6 +236,18 @@ fn read(c: &mut Criterion) { group.bench_with_input::<_, _, (&Path, &[u64])>("std", &(&path, &offsets), read_std); group.bench_with_input::<_, _, (&Path, &[u64])>("tokio", &(&path, &offsets), read_tokio); group.bench_with_input::<_, _, (&Path, &[u64])>("compio", &(&path, &offsets), read_compio); + #[cfg(feature = "compat-tokio")] + group.bench_with_input::<_, _, (&Path, &[u64])>( + "compio-in-tokio", + &(&path, &offsets), + read_compio_in_tokio, + ); + #[cfg(feature = "compat-futures")] + group.bench_with_input::<_, _, (&Path, &[u64])>( + "compio-in-futures", + &(&path, &offsets), + read_compio_in_futures, + ); #[cfg(all(target_os = "linux", target_env = "gnu"))] group.bench_with_input::<_, _, (&Path, &[u64])>("monoio", &(&path, &offsets), read_monoio); @@ -210,6 +261,18 @@ fn read(c: &mut Criterion) { group.bench_with_input::<_, _, (&Path, u64)>("std", &(&path, TOTAL_SIZE), read_all_std); group.bench_with_input::<_, _, (&Path, u64)>("tokio", &(&path, TOTAL_SIZE), read_all_tokio); group.bench_with_input::<_, _, (&Path, u64)>("compio", &(&path, TOTAL_SIZE), read_all_compio); + #[cfg(feature = "compat-tokio")] + group.bench_with_input::<_, _, (&Path, u64)>( + "compio-in-tokio", + &(&path, TOTAL_SIZE), + read_all_compio_in_tokio, + ); + #[cfg(feature = "compat-futures")] + group.bench_with_input::<_, _, (&Path, u64)>( + "compio-in-futures", + &(&path, TOTAL_SIZE), + read_all_compio_in_futures, + ); #[cfg(all(target_os = "linux", target_env = "gnu"))] group.bench_with_input::<_, _, (&Path, u64)>("monoio", &(&path, TOTAL_SIZE), read_all_monoio); @@ -257,27 +320,48 @@ fn write_tokio(b: &mut Bencher, (path, offsets, content): &(&Path, &[u64], &[u8] }) } +async fn write_compio_impl( + iter: u64, + path: &Path, + offsets: &[u64], + mut content: Vec, +) -> Duration { + let mut file = compio::fs::OpenOptions::new() + .write(true) + .open(path) + .await + .unwrap(); + + let start = Instant::now(); + for _i in 0..iter { + for &offset in offsets { + (_, content) = file.write_at(content, offset).await.unwrap(); + } + } + start.elapsed() +} + fn write_compio(b: &mut Bencher, (path, offsets, content): &(&Path, &[u64], &[u8])) { let runtime = compio::runtime::Runtime::new().unwrap(); let content = content.to_vec(); - b.to_async(&runtime).iter_custom(|iter| { - let mut content = content.clone(); - async move { - let mut file = compio::fs::OpenOptions::new() - .write(true) - .open(path) - .await - .unwrap(); + b.to_async(&runtime) + .iter_custom(|iter| write_compio_impl(iter, path, offsets, content.clone())) +} - let start = Instant::now(); - for _i in 0..iter { - for &offset in *offsets { - (_, content) = file.write_at(content, offset).await.unwrap(); - } - } - start.elapsed() - } - }) +#[cfg(feature = "compat-tokio")] +fn write_compio_in_tokio(b: &mut Bencher, (path, offsets, content): &(&Path, &[u64], &[u8])) { + let runtime = CompioInTokio::default(); + let content = content.to_vec(); + b.to_async(&runtime) + .iter_custom(|iter| write_compio_impl(iter, path, offsets, content.clone())) +} + +#[cfg(feature = "compat-futures")] +fn write_compio_in_futures(b: &mut Bencher, (path, offsets, content): &(&Path, &[u64], &[u8])) { + let runtime = CompioInFutures::default(); + let content = content.to_vec(); + b.to_async(&runtime) + .iter_custom(|iter| write_compio_impl(iter, path, offsets, content.clone())) } #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -346,36 +430,51 @@ fn write_all_tokio(b: &mut Bencher, (path, content): &(&Path, &[u8])) { }) } +async fn write_all_compio_impl(iter: u64, path: &Path, mut content: Vec) -> Duration { + let mut file = compio::fs::File::create(path).await.unwrap(); + + let start = Instant::now(); + for _i in 0..iter { + let mut write_len = 0; + let total_len = content.len(); + while write_len < total_len as u64 { + let (write, slice) = file + .write_at( + content.slice( + write_len as usize..(write_len as usize + BUFFER_SIZE).min(total_len), + ), + write_len, + ) + .await + .unwrap(); + write_len += write as u64; + content = slice.into_inner(); + } + } + start.elapsed() +} + fn write_all_compio(b: &mut Bencher, (path, content): &(&Path, &[u8])) { let runtime = compio::runtime::Runtime::new().unwrap(); let content = content.to_vec(); - b.to_async(&runtime).iter_custom(|iter| { - let mut content = content.clone(); - async move { - let mut file = compio::fs::File::create(path).await.unwrap(); + b.to_async(&runtime) + .iter_custom(|iter| write_all_compio_impl(iter, path, content.clone())) +} - let start = Instant::now(); - for _i in 0..iter { - let mut write_len = 0; - let total_len = content.len(); - while write_len < total_len as u64 { - let (write, slice) = file - .write_at( - content.slice( - write_len as usize - ..(write_len as usize + BUFFER_SIZE).min(total_len), - ), - write_len, - ) - .await - .unwrap(); - write_len += write as u64; - content = slice.into_inner(); - } - } - start.elapsed() - } - }) +#[cfg(feature = "compat-tokio")] +fn write_all_compio_in_tokio(b: &mut Bencher, (path, content): &(&Path, &[u8])) { + let runtime = CompioInTokio::default(); + let content = content.to_vec(); + b.to_async(&runtime) + .iter_custom(|iter| write_all_compio_impl(iter, path, content.clone())) +} + +#[cfg(feature = "compat-futures")] +fn write_all_compio_in_futures(b: &mut Bencher, (path, content): &(&Path, &[u8])) { + let runtime = CompioInFutures::default(); + let content = content.to_vec(); + b.to_async(&runtime) + .iter_custom(|iter| write_all_compio_impl(iter, path, content.clone())) } #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -458,6 +557,18 @@ fn write(c: &mut Criterion) { &(&path, &offsets, &single_content), write_compio, ); + #[cfg(feature = "compat-tokio")] + group.bench_with_input::<_, _, (&Path, &[u64], &[u8])>( + "compio-in-tokio", + &(&path, &offsets, &single_content), + write_compio_in_tokio, + ); + #[cfg(feature = "compat-futures")] + group.bench_with_input::<_, _, (&Path, &[u64], &[u8])>( + "compio-in-futures", + &(&path, &offsets, &single_content), + write_compio_in_futures, + ); #[cfg(all(target_os = "linux", target_env = "gnu"))] group.bench_with_input::<_, _, (&Path, &[u64], &[u8])>( "monoio", @@ -473,6 +584,18 @@ fn write(c: &mut Criterion) { group.bench_with_input::<_, _, (&Path, &[u8])>("std", &(&path, &content), write_all_std); group.bench_with_input::<_, _, (&Path, &[u8])>("tokio", &(&path, &content), write_all_tokio); group.bench_with_input::<_, _, (&Path, &[u8])>("compio", &(&path, &content), write_all_compio); + #[cfg(feature = "compat-tokio")] + group.bench_with_input::<_, _, (&Path, &[u8])>( + "compio-in-tokio", + &(&path, &content), + write_all_compio_in_tokio, + ); + #[cfg(feature = "compat-futures")] + group.bench_with_input::<_, _, (&Path, &[u8])>( + "compio-in-futures", + &(&path, &content), + write_all_compio_in_futures, + ); #[cfg(all(target_os = "linux", target_env = "gnu"))] group.bench_with_input::<_, _, (&Path, &[u8])>("monoio", &(&path, &content), write_all_monoio); diff --git a/compio/benches/net.rs b/compio/benches/net.rs index 2764782f6..99ea5e929 100644 --- a/compio/benches/net.rs +++ b/compio/benches/net.rs @@ -1,4 +1,7 @@ -use std::{net::Ipv4Addr, time::Instant}; +use std::{ + net::Ipv4Addr, + time::{Duration, Instant}, +}; use criterion::{Bencher, Criterion, Throughput, criterion_group, criterion_main}; use rand::{Rng, rng}; @@ -8,6 +11,11 @@ mod monoio_wrap; #[cfg(all(target_os = "linux", target_env = "gnu"))] use monoio_wrap::MonoioRuntime; +#[cfg(feature = "compat")] +mod compat; +#[cfg(feature = "compat")] +use compat::*; + criterion_group!(net, echo); criterion_main!(net); @@ -134,29 +142,43 @@ fn echo_tokio_tcp(b: &mut Bencher, content: &[u8]) { }) } -fn echo_compio_tcp(b: &mut Bencher, content: Vec) { +async fn echo_compio_tcp_impl(iter: u64, mut content: Vec) -> Duration { use compio::net::{TcpListener, TcpStream}; + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let mut client_buffer = vec![0u8; BUFFER_SIZE]; + let mut server_buffer = vec![0u8; BUFFER_SIZE]; + + let start = Instant::now(); + for _i in 0..iter { + let (tx, (rx, _)) = + futures_util::try_join!(TcpStream::connect(addr), listener.accept()).unwrap(); + (content, client_buffer, server_buffer) = + echo_compio_impl(tx, rx, content, client_buffer, server_buffer).await; + } + start.elapsed() +} + +fn echo_compio_tcp(b: &mut Bencher, content: Vec) { let runtime = compio::runtime::Runtime::new().unwrap(); - b.to_async(&runtime).iter_custom(|iter| { - let mut content = content.clone(); - async move { - let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap(); - let addr = listener.local_addr().unwrap(); + b.to_async(&runtime) + .iter_custom(|iter| echo_compio_tcp_impl(iter, content.clone())) +} - let mut client_buffer = vec![0u8; BUFFER_SIZE]; - let mut server_buffer = vec![0u8; BUFFER_SIZE]; +#[cfg(feature = "compat-tokio")] +fn echo_compio_in_tokio_tcp(b: &mut Bencher, content: Vec) { + let runtime = CompioInTokio::default(); + b.to_async(&runtime) + .iter_custom(|iter| echo_compio_tcp_impl(iter, content.clone())) +} - let start = Instant::now(); - for _i in 0..iter { - let (tx, (rx, _)) = - futures_util::try_join!(TcpStream::connect(addr), listener.accept()).unwrap(); - (content, client_buffer, server_buffer) = - echo_compio_impl(tx, rx, content, client_buffer, server_buffer).await; - } - start.elapsed() - } - }) +#[cfg(feature = "compat-futures")] +fn echo_compio_in_futures_tcp(b: &mut Bencher, content: Vec) { + let runtime = CompioInFutures::default(); + b.to_async(&runtime) + .iter_custom(|iter| echo_compio_tcp_impl(iter, content.clone())) } #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -268,34 +290,47 @@ fn echo_tokio_unix(b: &mut Bencher, content: &[u8]) { }) } -fn echo_compio_unix(b: &mut Bencher, content: Vec) { +async fn echo_compio_unix_impl(iter: u64, mut content: Vec) -> Duration { use compio::net::{UnixListener, UnixStream}; + let dir = tempfile::Builder::new() + .prefix("compio-uds") + .tempdir() + .unwrap(); + let sock_path = dir.path().join("connect.sock"); + let listener = UnixListener::bind(&sock_path).await.unwrap(); + + let mut client_buffer = vec![0u8; BUFFER_SIZE]; + let mut server_buffer = vec![0u8; BUFFER_SIZE]; + + let start = Instant::now(); + for _i in 0..iter { + let (tx, (rx, _)) = + futures_util::try_join!(UnixStream::connect(&sock_path), listener.accept()).unwrap(); + (content, client_buffer, server_buffer) = + echo_compio_impl(tx, rx, content, client_buffer, server_buffer).await; + } + start.elapsed() +} + +fn echo_compio_unix(b: &mut Bencher, content: Vec) { let runtime = compio::runtime::Runtime::new().unwrap(); - b.to_async(&runtime).iter_custom(|iter| { - let mut content = content.clone(); - async move { - let dir = tempfile::Builder::new() - .prefix("compio-uds") - .tempdir() - .unwrap(); - let sock_path = dir.path().join("connect.sock"); - let listener = UnixListener::bind(&sock_path).await.unwrap(); + b.to_async(&runtime) + .iter_custom(|iter| echo_compio_unix_impl(iter, content.clone())) +} - let mut client_buffer = vec![0u8; BUFFER_SIZE]; - let mut server_buffer = vec![0u8; BUFFER_SIZE]; +#[cfg(feature = "compat-tokio")] +fn echo_compio_in_tokio_unix(b: &mut Bencher, content: Vec) { + let runtime = CompioInTokio::default(); + b.to_async(&runtime) + .iter_custom(|iter| echo_compio_unix_impl(iter, content.clone())) +} - let start = Instant::now(); - for _i in 0..iter { - let (tx, (rx, _)) = - futures_util::try_join!(UnixStream::connect(&sock_path), listener.accept()) - .unwrap(); - (content, client_buffer, server_buffer) = - echo_compio_impl(tx, rx, content, client_buffer, server_buffer).await; - } - start.elapsed() - } - }) +#[cfg(feature = "compat-futures")] +fn echo_compio_in_futures_unix(b: &mut Bencher, content: Vec) { + let runtime = CompioInFutures::default(); + b.to_async(&runtime) + .iter_custom(|iter| echo_compio_unix_impl(iter, content.clone())) } #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -344,6 +379,14 @@ fn echo(c: &mut Criterion) { group.bench_function("tokio-tcp", |b| echo_tokio_tcp(b, &content)); group.bench_function("compio-tcp", |b| echo_compio_tcp(b, content.clone())); + #[cfg(feature = "compat-tokio")] + group.bench_function("compio-in-tokio-tcp", |b| { + echo_compio_in_tokio_tcp(b, content.clone()) + }); + #[cfg(feature = "compat-futures")] + group.bench_function("compio-in-futures-tcp", |b| { + echo_compio_in_futures_tcp(b, content.clone()) + }); #[cfg(all(target_os = "linux", target_env = "gnu"))] group.bench_function("monoio-tcp", |b| echo_monoio_tcp(b, content.clone())); @@ -357,6 +400,14 @@ fn echo(c: &mut Criterion) { #[cfg(unix)] group.bench_function("tokio-unix", |b| echo_tokio_unix(b, &content)); group.bench_function("compio-unix", |b| echo_compio_unix(b, content.clone())); + #[cfg(feature = "compat-tokio")] + group.bench_function("compio-in-tokio-unix", |b| { + echo_compio_in_tokio_unix(b, content.clone()) + }); + #[cfg(feature = "compat-futures")] + group.bench_function("compio-in-futures-unix", |b| { + echo_compio_in_futures_unix(b, content.clone()) + }); #[cfg(all(target_os = "linux", target_env = "gnu"))] group.bench_function("monoio-unix", |b| echo_monoio_unix(b, content.clone())); diff --git a/compio/src/lib.rs b/compio/src/lib.rs index 6c705b180..8da386a9d 100644 --- a/compio/src/lib.rs +++ b/compio/src/lib.rs @@ -40,6 +40,8 @@ pub use buf::bumpalo; pub use buf::bytes; #[cfg(feature = "smallvec")] pub use buf::smallvec; +#[cfg(feature = "compat")] +pub use compio_compat as compat; #[cfg(feature = "dispatcher")] #[doc(inline)] pub use compio_dispatcher as dispatcher;