Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compio-quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ rustls = { workspace = true }
rustls-platform-verifier = { version = "0.5.0", optional = true }
rustls-native-certs = { workspace = true, optional = true }
webpki-roots = { version = "0.26.3", optional = true }
h3 = { version = "0.0.7", optional = true }
h3-datagram = { version = "0.0.1", optional = true }
h3 = { version = "0.0.8", optional = true }
h3-datagram = { version = "0.0.2", optional = true }

# Utils
flume = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion compio-quic/examples/http3-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() {

drop(send_req);

handle.await.unwrap().unwrap();
handle.await.unwrap();
}

endpoint.shutdown().await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion compio-quic/examples/http3-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ async fn main() {
.await
.unwrap();

while let Ok(Some((req, mut stream))) = conn.accept().await {
while let Ok(Some(resolver)) = conn.accept().await {
let (req, mut stream) = resolver.resolve_request().await.unwrap();
println!("Received request: {req:?}");
stream
.send_response(
Expand Down
129 changes: 70 additions & 59 deletions compio-quic/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,73 +983,90 @@ pub enum OpenStreamError {

#[cfg(feature = "h3")]
pub(crate) mod h3_impl {
use compio_buf::bytes::{Buf, BytesMut};
use compio_buf::bytes::Buf;
use futures_util::ready;
use h3::{
error::Code,
quic::{self, Error, WriteBuf},
quic::{self, ConnectionErrorIncoming, StreamErrorIncoming, WriteBuf},
};
use h3_datagram::{
datagram::Datagram,
quic_traits::{RecvDatagramExt, SendDatagramExt},
datagram::EncodedDatagram,
quic_traits::{
DatagramConnectionExt, RecvDatagram, SendDatagram, SendDatagramErrorIncoming,
},
};

use super::*;
use crate::{ReadError, WriteError, send_stream::h3_impl::SendStream};

impl Error for ConnectionError {
fn is_timeout(&self) -> bool {
matches!(self, ConnectionError::TimedOut)
}
use crate::send_stream::h3_impl::SendStream;

impl From<ConnectionError> for ConnectionErrorIncoming {
fn from(e: ConnectionError) -> Self {
use ConnectionError::*;
match e {
ApplicationClosed(e) => Self::ApplicationClose {
error_code: e.error_code.into_inner(),
},
TimedOut => Self::Timeout,

fn err_code(&self) -> Option<u64> {
match &self {
ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose {
error_code,
..
}) => Some(error_code.into_inner()),
_ => None,
e => Self::Undefined(Arc::new(e)),
}
}
}

impl Error for SendDatagramError {
fn is_timeout(&self) -> bool {
false
impl From<ConnectionError> for StreamErrorIncoming {
fn from(e: ConnectionError) -> Self {
Self::ConnectionErrorIncoming {
connection_error: e.into(),
}
}
}

fn err_code(&self) -> Option<u64> {
match self {
SendDatagramError::ConnectionLost(ConnectionError::ApplicationClosed(
quinn_proto::ApplicationClose { error_code, .. },
)) => Some(error_code.into_inner()),
_ => None,
impl From<SendDatagramError> for SendDatagramErrorIncoming {
fn from(e: SendDatagramError) -> Self {
use SendDatagramError::*;
match e {
UnsupportedByPeer | Disabled => Self::NotAvailable,
TooLarge => Self::TooLarge,
ConnectionLost(e) => Self::ConnectionError(e.into()),
}
}
}

impl<B> SendDatagramExt<B> for Connection
impl<B> SendDatagram<B> for Connection
where
B: Buf,
{
type Error = SendDatagramError;

fn send_datagram(&mut self, data: Datagram<B>) -> Result<(), Self::Error> {
let mut buf = BytesMut::new();
data.encode(&mut buf);
Connection::send_datagram(self, buf.freeze())
fn send_datagram<T: Into<EncodedDatagram<B>>>(
&mut self,
data: T,
) -> Result<(), SendDatagramErrorIncoming> {
let mut buf: EncodedDatagram<B> = data.into();
let buf = buf.copy_to_bytes(buf.remaining());
Ok(Connection::send_datagram(self, buf)?)
}
}

impl RecvDatagramExt for Connection {
type Buf = Bytes;
type Error = ConnectionError;
impl RecvDatagram for Connection {
type Buffer = Bytes;

fn poll_accept_datagram(
fn poll_incoming_datagram(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
Poll::Ready(Ok(Some(ready!(self.poll_recv_datagram(cx))?)))
cx: &mut core::task::Context<'_>,
) -> Poll<Result<Self::Buffer, ConnectionErrorIncoming>> {
Poll::Ready(Ok(ready!(self.poll_recv_datagram(cx))?))
}
}

impl<B: Buf> DatagramConnectionExt<B> for Connection {
type RecvDatagramHandler = Self;
type SendDatagramHandler = Self;

fn send_datagram_handler(&self) -> Self::SendDatagramHandler {
self.clone()
}

fn recv_datagram_handler(&self) -> Self::RecvDatagramHandler {
self.clone()
}
}

Expand Down Expand Up @@ -1085,12 +1102,11 @@ pub(crate) mod h3_impl {
B: Buf,
{
type Buf = Bytes;
type Error = ReadError;

fn poll_data(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
self.recv.poll_data(cx)
}

Expand All @@ -1107,17 +1123,15 @@ pub(crate) mod h3_impl {
where
B: Buf,
{
type Error = WriteError;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
self.send.poll_ready(cx)
}

fn send_data<T: Into<WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
fn send_data<T: Into<WriteBuf<B>>>(&mut self, data: T) -> Result<(), StreamErrorIncoming> {
self.send.send_data(data)
}

fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
self.send.poll_finish(cx)
}

Expand All @@ -1138,7 +1152,7 @@ pub(crate) mod h3_impl {
&mut self,
cx: &mut Context<'_>,
buf: &mut D,
) -> Poll<Result<usize, Self::Error>> {
) -> Poll<Result<usize, StreamErrorIncoming>> {
self.send.poll_send(cx, buf)
}
}
Expand All @@ -1152,21 +1166,20 @@ pub(crate) mod h3_impl {
B: Buf,
{
type BidiStream = BidiStream<B>;
type OpenError = ConnectionError;
type SendStream = SendStream<B>;

fn poll_open_bidi(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
let (stream, is_0rtt) = ready!(self.0.poll_open_stream(Some(cx), Dir::Bi))?;
Poll::Ready(Ok(BidiStream::new(self.0.0.clone(), stream, is_0rtt)))
}

fn poll_open_send(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
let (stream, is_0rtt) = ready!(self.0.poll_open_stream(Some(cx), Dir::Uni))?;
Poll::Ready(Ok(SendStream::new(self.0.0.clone(), stream, is_0rtt)))
}
Expand All @@ -1182,21 +1195,20 @@ pub(crate) mod h3_impl {
B: Buf,
{
type BidiStream = BidiStream<B>;
type OpenError = ConnectionError;
type SendStream = SendStream<B>;

fn poll_open_bidi(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
let (stream, is_0rtt) = ready!(self.poll_open_stream(Some(cx), Dir::Bi))?;
Poll::Ready(Ok(BidiStream::new(self.0.clone(), stream, is_0rtt)))
}

fn poll_open_send(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
let (stream, is_0rtt) = ready!(self.poll_open_stream(Some(cx), Dir::Uni))?;
Poll::Ready(Ok(SendStream::new(self.0.clone(), stream, is_0rtt)))
}
Expand All @@ -1210,24 +1222,23 @@ pub(crate) mod h3_impl {
where
B: Buf,
{
type AcceptError = ConnectionError;
type OpenStreams = OpenStreams;
type RecvStream = RecvStream;

fn poll_accept_recv(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
) -> Poll<Result<Self::RecvStream, ConnectionErrorIncoming>> {
let (stream, is_0rtt) = ready!(self.poll_accept_stream(cx, Dir::Uni))?;
Poll::Ready(Ok(Some(RecvStream::new(self.0.clone(), stream, is_0rtt))))
Poll::Ready(Ok(RecvStream::new(self.0.clone(), stream, is_0rtt)))
}

fn poll_accept_bidi(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
) -> Poll<Result<Self::BidiStream, ConnectionErrorIncoming>> {
let (stream, is_0rtt) = ready!(self.poll_accept_stream(cx, Dir::Bi))?;
Poll::Ready(Ok(Some(BidiStream::new(self.0.clone(), stream, is_0rtt))))
Poll::Ready(Ok(BidiStream::new(self.0.clone(), stream, is_0rtt)))
}

fn opener(&self) -> Self::OpenStreams {
Expand Down
30 changes: 15 additions & 15 deletions compio-quic/src/recv_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,38 +529,38 @@ impl futures_util::AsyncRead for RecvStream {

#[cfg(feature = "h3")]
pub(crate) mod h3_impl {
use h3::quic::{self, Error};
use h3::quic::{self, StreamErrorIncoming};

use super::*;

impl Error for ReadError {
fn is_timeout(&self) -> bool {
matches!(self, Self::ConnectionLost(ConnectionError::TimedOut))
}

fn err_code(&self) -> Option<u64> {
match self {
Self::ConnectionLost(ConnectionError::ApplicationClosed(
quinn_proto::ApplicationClose { error_code, .. },
))
| Self::Reset(error_code) => Some(error_code.into_inner()),
_ => None,
impl From<ReadError> for StreamErrorIncoming {
fn from(e: ReadError) -> Self {
use ReadError::*;
match e {
Reset(code) => Self::StreamTerminated {
error_code: code.into_inner(),
},
ConnectionLost(e) => Self::ConnectionErrorIncoming {
connection_error: e.into(),
},
IllegalOrderedRead => unreachable!("illegal ordered read"),
e => Self::Unknown(Box::new(e)),
}
}
}

impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = ReadError;

fn poll_data(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
self.execute_poll_read(cx, true, |chunks| match chunks.next(usize::MAX) {
Ok(Some(chunk)) => ReadStatus::Readable(chunk.bytes),
res => (None, res.err()).into(),
})
.map_err(Into::into)
}

fn stop_sending(&mut self, error_code: u64) {
Expand Down
Loading
Loading