Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ criterion = "0.7.0"
crossbeam-queue = "0.3.8"
flume = { version = "0.11.0", default-features = false }
futures-channel = "0.3.29"
futures-rustls = { version = "0.26.0", default-features = false }
futures-util = "0.3.29"
libc = "0.2.164"
nix = "0.30.1"
Expand Down
5 changes: 2 additions & 3 deletions compio-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "compio-io"
version = "0.8.1"
version = "0.8.2"
description = "IO traits for completion based async IO"
categories = ["asynchronous"]
keywords = ["async", "io"]
Expand All @@ -15,7 +15,6 @@ compio-buf = { workspace = true, features = ["arrayvec", "bytes"] }
futures-util = { workspace = true, features = ["sink"] }
paste = { workspace = true }
thiserror = { workspace = true, optional = true }
pin-project-lite = { workspace = true, optional = true }
serde = { version = "1.0.219", optional = true }
serde_json = { version = "1.0.140", optional = true }
Comment thread
Berrysoft marked this conversation as resolved.

Expand All @@ -29,7 +28,7 @@ futures-executor = "0.3.30"

[features]
default = []
compat = ["dep:pin-project-lite", "futures-util/io"]
compat = ["futures-util/io"]

# Codecs
# Serde json codec
Expand Down
99 changes: 45 additions & 54 deletions compio-io/src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Compat wrappers for interop with other crates.

use std::{
fmt::Debug,
io::{self, BufRead, Read, Write},
mem::MaybeUninit,
pin::Pin,
task::{Context, Poll},
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
use pin_project_lite::pin_project;

use crate::{PinBoxFuture, buffer::Buffer, util::DEFAULT_BUF_SIZE};

Expand Down Expand Up @@ -176,15 +176,14 @@ impl<S: crate::AsyncWrite> SyncStream<S> {
}
}

pin_project! {
/// A stream wrapper for [`futures_util::io`] traits.
pub struct AsyncStream<S> {
#[pin]
inner: SyncStream<S>,
read_future: Option<PinBoxFuture<io::Result<usize>>>,
write_future: Option<PinBoxFuture<io::Result<usize>>>,
shutdown_future: Option<PinBoxFuture<io::Result<()>>>,
}
/// A stream wrapper for [`futures_util::io`] traits.
pub struct AsyncStream<S> {
// The futures keep the reference to the inner stream, so we need to pin
// the inner stream to make sure the reference is valid.
inner: Pin<Box<SyncStream<S>>>,
Comment thread
Berrysoft marked this conversation as resolved.
read_future: Option<PinBoxFuture<io::Result<usize>>>,
write_future: Option<PinBoxFuture<io::Result<usize>>>,
shutdown_future: Option<PinBoxFuture<io::Result<()>>>,
}

impl<S> AsyncStream<S> {
Expand All @@ -200,7 +199,7 @@ impl<S> AsyncStream<S> {

fn new_impl(inner: SyncStream<S>) -> Self {
Self {
inner,
inner: Box::pin(inner),
read_future: None,
write_future: None,
shutdown_future: None,
Expand Down Expand Up @@ -253,20 +252,18 @@ macro_rules! poll_future_would_block {

impl<S: crate::AsyncRead + 'static> futures_util::AsyncRead for AsyncStream<S> {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.project();
// Safety:
// - The futures won't live longer than the stream.
// - `self` is pinned.
// - The inner stream won't be moved.
// - The inner stream is pinned.
let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };

poll_future_would_block!(
this.read_future,
self.read_future,
cx,
inner.fill_read_buf(),
io::Read::read(inner, buf)
Expand All @@ -279,16 +276,14 @@ impl<S: crate::AsyncRead + 'static> AsyncStream<S> {
///
/// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
pub fn poll_read_uninit(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [MaybeUninit<u8>],
) -> Poll<io::Result<usize>> {
let this = self.project();

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
poll_future_would_block!(
this.read_future,
self.read_future,
cx,
inner.fill_read_buf(),
inner.read_buf_uninit(buf)
Expand All @@ -297,79 +292,75 @@ impl<S: crate::AsyncRead + 'static> AsyncStream<S> {
}

impl<S: crate::AsyncRead + 'static> futures_util::AsyncBufRead for AsyncStream<S> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.project();

fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
poll_future_would_block!(
this.read_future,
self.read_future,
cx,
inner.fill_read_buf(),
// Safety: anyway the slice won't be used after free.
io::BufRead::fill_buf(inner).map(|slice| unsafe { &*(slice as *const _) })
)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.project();

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
inner.consume(amt)
fn consume(mut self: Pin<&mut Self>, amt: usize) {
unsafe { self.inner.as_mut().get_unchecked_mut().consume(amt) }
}
}

impl<S: crate::AsyncWrite + 'static> futures_util::AsyncWrite for AsyncStream<S> {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.project();

if this.shutdown_future.is_some() {
debug_assert!(this.write_future.is_none());
if self.shutdown_future.is_some() {
debug_assert!(self.write_future.is_none());
return Poll::Pending;
}

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
poll_future_would_block!(
this.write_future,
self.write_future,
cx,
inner.flush_write_buf(),
io::Write::write(inner, buf)
)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();

if this.shutdown_future.is_some() {
debug_assert!(this.write_future.is_none());
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.shutdown_future.is_some() {
debug_assert!(self.write_future.is_none());
return Poll::Pending;
}

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
let res = poll_future!(this.write_future, cx, inner.flush_write_buf());
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
let res = poll_future!(self.write_future, cx, inner.flush_write_buf());
Poll::Ready(res.map(|_| ()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// Avoid shutdown on flush because the inner buffer might be passed to the
// driver.
if this.write_future.is_some() {
debug_assert!(this.shutdown_future.is_none());
if self.write_future.is_some() {
debug_assert!(self.shutdown_future.is_none());
return Poll::Pending;
}

let inner: &'static mut SyncStream<S> =
unsafe { &mut *(this.inner.get_unchecked_mut() as *mut _) };
let res = poll_future!(this.shutdown_future, cx, inner.get_mut().shutdown());
unsafe { &mut *(self.inner.as_mut().get_unchecked_mut() as *mut _) };
let res = poll_future!(self.shutdown_future, cx, inner.get_mut().shutdown());
Poll::Ready(res)
}
}

impl<S: Debug> Debug for AsyncStream<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncStream")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
20 changes: 15 additions & 5 deletions compio-tls/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "compio-tls"
version = "0.7.0"
version = "0.7.1"
description = "TLS adaptor with compio"
categories = ["asynchronous", "network-programming"]
keywords = ["async", "net", "tls"]
Expand All @@ -25,6 +25,12 @@ rustls = { workspace = true, default-features = false, optional = true, features
"tls12",
] }

futures-rustls = { workspace = true, default-features = false, optional = true, features = [
"logging",
"tls12",
] }
futures-util = { workspace = true, optional = true }

[dev-dependencies]
compio-net = { workspace = true }
compio-runtime = { workspace = true }
Expand All @@ -33,14 +39,18 @@ compio-macros = { workspace = true }
rustls = { workspace = true, default-features = false, features = ["ring"] }
rustls-native-certs = { workspace = true }

futures-rustls = { workspace = true, default-features = false, features = [
"ring",
] }

[features]
default = ["native-tls"]
all = ["native-tls", "rustls"]
rustls = ["dep:rustls"]
rustls = ["dep:rustls", "dep:futures-rustls", "dep:futures-util"]

ring = ["rustls", "rustls/ring"]
aws-lc-rs = ["rustls", "rustls/aws-lc-rs"]
aws-lc-rs-fips = ["aws-lc-rs", "rustls/fips"]
ring = ["rustls", "rustls/ring", "futures-rustls/ring"]
aws-lc-rs = ["rustls", "rustls/aws-lc-rs", "futures-rustls/aws-lc-rs"]
aws-lc-rs-fips = ["aws-lc-rs", "rustls/fips", "futures-rustls/fips"]

read_buf = ["compio-buf/read_buf", "compio-io/read_buf", "rustls?/read_buf"]
nightly = ["read_buf"]
Loading
Loading