Skip to content

Commit 02ebfdc

Browse files
committed
Added Transmog compatability.
This commit adds support for specifying the serialization format at the time of accepting a connection, which means that ALPN protocol negotiation can be used to control which serialization format is used on an incoming connection. The next feature is the ability to switch serialization formats on a per-stream basis, after the r#type negotation.
1 parent 04336dd commit 02ebfdc

File tree

13 files changed

+330
-134
lines changed

13 files changed

+330
-134
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ trust-dns = ["trust-dns-resolver"]
1616

1717
[dependencies]
1818
async-trait = "0.1"
19-
bincode = "1"
19+
transmog = "0.1.0-dev.2"
2020
bytes = "1"
2121
ct-logs = "0.9"
2222
flume = "0.10"
@@ -54,6 +54,7 @@ fabruic = { path = "", features = ["rcgen", "test"] }
5454
quinn-proto = { version = "0.8", default-features = false }
5555
tokio = { version = "1", features = ["macros"] }
5656
trust-dns-proto = "0.21.0-alpha.4"
57+
transmog-bincode = { version = "0.1.0-dev.2" }
5758

5859
[profile.release]
5960
codegen-units = 1

examples/basic.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::{Error, Result};
22
use fabruic::{Endpoint, KeyPair};
33
use futures_util::{future, StreamExt, TryFutureExt};
4+
use transmog_bincode::Bincode;
45

56
const SERVER_NAME: &str = "test";
67
/// Some random port.
@@ -38,7 +39,7 @@ async fn main() -> Result<()> {
3839
index,
3940
connecting.remote_address()
4041
);
41-
let connection = connecting.accept::<()>().await?;
42+
let connection = connecting.accept::<(), _>(Bincode::default()).await?;
4243
println!(
4344
"[client:{}] Successfully connected to {}",
4445
index,
@@ -107,7 +108,7 @@ async fn main() -> Result<()> {
107108
// every new incoming connections is handled in it's own task
108109
connections.push(
109110
tokio::spawn(async move {
110-
let mut connection = connecting.accept::<()>().await?;
111+
let mut connection = connecting.accept::<(), _>(Bincode::default()).await?;
111112
println!("[server] New Connection: {}", connection.remote_address());
112113

113114
// start listening to new incoming streams

src/error.rs

+17-8
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
// TODO: error type is becoming too big, split it up
99

1010
use std::{
11-
fmt::{self, Debug, Formatter},
11+
fmt::{self, Debug, Display, Formatter},
1212
io,
1313
};
1414

15-
pub use bincode::ErrorKind;
1615
use quinn::ConnectionClose;
1716
pub use quinn::{ConnectError, ConnectionError, ReadError, WriteError};
1817
use thiserror::Error;
@@ -207,7 +206,9 @@ impl From<ConnectionError> for Connecting {
207206
reason,
208207
}) if reason.as_ref() == b"peer doesn't support any known protocol"
209208
&& error_code.to_string() == "the cryptographic handshake failed: error 120" =>
210-
Self::ProtocolMismatch,
209+
{
210+
Self::ProtocolMismatch
211+
}
211212
other => Self::Connection(other),
212213
}
213214
}
@@ -241,23 +242,25 @@ pub enum Incoming {
241242

242243
/// Error receiving a message from a [`Receiver`](crate::Receiver).
243244
#[derive(Debug, Error)]
245+
#[allow(variant_size_differences)]
244246
pub enum Receiver {
245247
/// Failed to read from a [`Receiver`](crate::Receiver).
246248
#[error("Error reading from `Receiver`: {0}")]
247249
Read(#[from] ReadError),
248250
/// Failed to [`Deserialize`](serde::Deserialize) a message from a
249251
/// [`Receiver`](crate::Receiver).
250252
#[error("Error deserializing a message from `Receiver`: {0}")]
251-
Deserialize(#[from] ErrorKind),
253+
Deserialize(Box<dyn SerializationError>),
252254
}
253255

254256
/// Error sending a message to a [`Sender`](crate::Sender).
255257
#[derive(Debug, Error)]
258+
#[allow(variant_size_differences)]
256259
pub enum Sender {
257260
/// Failed to [`Serialize`](serde::Serialize) a message for a
258261
/// [`Sender`](crate::Sender).
259262
#[error("Error serializing a message to `Sender`: {0}")]
260-
Serialize(ErrorKind),
263+
Serialize(Box<dyn SerializationError>),
261264
/// Failed to write to a [`Sender`](crate::Sender).
262265
#[error("Error writing to `Sender`: {0}")]
263266
Write(#[from] WriteError),
@@ -266,8 +269,14 @@ pub enum Sender {
266269
Closed(#[from] AlreadyClosed),
267270
}
268271

269-
impl From<Box<ErrorKind>> for Sender {
270-
fn from(error: Box<ErrorKind>) -> Self {
271-
Self::Serialize(*error)
272+
impl Sender {
273+
/// Returns a new instance after boxing `err`.
274+
pub(crate) fn from_serialization<E: SerializationError>(err: E) -> Self {
275+
Self::Serialize(Box::new(err))
272276
}
273277
}
278+
279+
/// An error raised from serialization.
280+
pub trait SerializationError: Display + Debug + Send + Sync + 'static {}
281+
282+
impl<T> SerializationError for T where T: Display + Debug + Send + Sync + 'static {}

src/quic/connection/connecting.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
use std::net::SocketAddr;
55

66
use quinn::{crypto::rustls::HandshakeData, NewConnection};
7-
use serde::{de::DeserializeOwned, Serialize};
7+
use transmog::OwnedDeserializer;
88

9-
use crate::{error, Connection};
9+
use crate::{
10+
error::{self, SerializationError},
11+
Connection,
12+
};
1013

1114
/// Represent's an intermediate state to build a [`Connection`].
1215
#[must_use = "`Connecting` does nothing unless accepted with `Connecting::accept`"]
@@ -47,17 +50,20 @@ impl Connecting {
4750
///
4851
/// # Errors
4952
/// [`error::Connecting`] if the [`Connection`] failed to be established.
50-
pub async fn accept<T: DeserializeOwned + Serialize + Send + 'static>(
51-
self,
52-
) -> Result<Connection<T>, error::Connecting> {
53+
pub async fn accept<T, F>(self, format: F) -> Result<Connection<T, F>, error::Connecting>
54+
where
55+
T: Send + 'static,
56+
F: OwnedDeserializer<T> + Clone,
57+
F::Error: SerializationError,
58+
{
5359
self.0
5460
.await
5561
.map(
5662
|NewConnection {
5763
connection,
5864
bi_streams,
5965
..
60-
}| Connection::new(connection, bi_streams),
66+
}| Connection::new(connection, bi_streams, format),
6167
)
6268
.map_err(error::Connecting::from)
6369
}

src/quic/connection/incoming.rs

+70-15
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,30 @@ use std::fmt::{self, Debug, Formatter};
44

55
use futures_util::StreamExt;
66
use quinn::{RecvStream, SendStream};
7-
use serde::{de::DeserializeOwned, Serialize};
7+
use transmog::{Format, OwnedDeserializer};
88

99
use super::ReceiverStream;
10-
use crate::{error, Receiver, Sender};
10+
use crate::{
11+
error::{self, SerializationError},
12+
Receiver, Sender,
13+
};
1114

1215
/// An intermediate state to define which type to accept in this stream. See
1316
/// [`accept_stream`](Self::accept).
1417
#[must_use = "`Incoming` does nothing unless accepted with `Incoming::accept`"]
15-
pub struct Incoming<T: DeserializeOwned> {
18+
pub struct Incoming<T, F: OwnedDeserializer<T>> {
1619
/// [`SendStream`] to build [`Sender`].
1720
sender: SendStream,
1821
/// [`RecvStream`] to build [`Receiver`].
19-
receiver: ReceiverStream<T>,
22+
receiver: ReceiverStream<T, F>,
2023
/// Requested type.
2124
r#type: Option<Result<T, error::Incoming>>,
2225
}
2326

24-
impl<T: DeserializeOwned> Debug for Incoming<T> {
27+
impl<T, F> Debug for Incoming<T, F>
28+
where
29+
F: OwnedDeserializer<T>,
30+
{
2531
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
2632
f.debug_struct("Incoming")
2733
.field("sender", &self.sender)
@@ -31,12 +37,16 @@ impl<T: DeserializeOwned> Debug for Incoming<T> {
3137
}
3238
}
3339

34-
impl<T: DeserializeOwned> Incoming<T> {
40+
impl<T, F> Incoming<T, F>
41+
where
42+
F: OwnedDeserializer<T> + Clone,
43+
F::Error: SerializationError,
44+
{
3545
/// Builds a new [`Incoming`] from raw [`quinn`] types.
36-
pub(super) fn new(sender: SendStream, receiver: RecvStream) -> Self {
46+
pub(super) fn new(sender: SendStream, receiver: RecvStream, format: F) -> Self {
3747
Self {
3848
sender,
39-
receiver: ReceiverStream::new(receiver),
49+
receiver: ReceiverStream::new(receiver, format),
4050
r#type: None,
4151
}
4252
}
@@ -80,12 +90,57 @@ impl<T: DeserializeOwned> Incoming<T> {
8090
/// - [`error::Incoming::Receiver`] if receiving the type information to the
8191
/// peer failed, see [`error::Receiver`] for more details
8292
/// - [`error::Incoming::Closed`] if the stream was closed
83-
pub async fn accept<
84-
S: DeserializeOwned + Serialize + Send + 'static,
85-
R: DeserializeOwned + Serialize + Send + 'static,
86-
>(
93+
pub async fn accept<S: Send + 'static, R: Send + 'static>(
94+
self,
95+
) -> Result<(Sender<S, F>, Receiver<R>), error::Incoming>
96+
where
97+
F: OwnedDeserializer<R> + Format<'static, S> + 'static,
98+
<F as Format<'static, S>>::Error: SerializationError,
99+
<F as Format<'static, R>>::Error: SerializationError,
100+
{
101+
let format = self.receiver.format.clone();
102+
self.accept_with_format(format).await
103+
}
104+
105+
/// Accept the incoming stream with the given types.
106+
///
107+
/// Use `S` and `R` to define which type this stream is sending and
108+
/// receiving.
109+
///
110+
/// # Errors
111+
/// - [`error::Incoming::Receiver`] if receiving the type information to the
112+
/// peer failed, see [`error::Receiver`] for more details
113+
/// - [`error::Incoming::Closed`] if the stream was closed
114+
pub async fn accept_raw<S: Send + 'static, R: Send + 'static>(
115+
self,
116+
) -> Result<(Sender<S, F>, Receiver<R>), error::Incoming>
117+
where
118+
F: OwnedDeserializer<R> + Format<'static, S> + 'static,
119+
<F as Format<'static, S>>::Error: SerializationError,
120+
<F as Format<'static, R>>::Error: SerializationError,
121+
{
122+
let format = self.receiver.format.clone();
123+
self.accept_with_format(format).await
124+
}
125+
126+
/// Accept the incoming stream with the given types.
127+
///
128+
/// Use `S` and `R` to define which type this stream is sending and
129+
/// receiving.
130+
///
131+
/// # Errors
132+
/// - [`error::Incoming::Receiver`] if receiving the type information to the
133+
/// peer failed, see [`error::Receiver`] for more details
134+
/// - [`error::Incoming::Closed`] if the stream was closed
135+
pub async fn accept_with_format<S: Send + 'static, R: Send + 'static, NewFormat>(
87136
mut self,
88-
) -> Result<(Sender<S>, Receiver<R>), error::Incoming> {
137+
format: NewFormat,
138+
) -> Result<(Sender<S, NewFormat>, Receiver<R>), error::Incoming>
139+
where
140+
NewFormat: OwnedDeserializer<R> + Format<'static, S> + Clone + 'static,
141+
<NewFormat as Format<'static, S>>::Error: SerializationError,
142+
<NewFormat as Format<'static, R>>::Error: SerializationError,
143+
{
89144
match self.r#type {
90145
Some(Ok(_)) => (),
91146
Some(Err(error)) => return Err(error),
@@ -100,8 +155,8 @@ impl<T: DeserializeOwned> Incoming<T> {
100155
}
101156
}
102157

103-
let sender = Sender::new(self.sender);
104-
let receiver = Receiver::new(self.receiver.transmute());
158+
let sender = Sender::new(self.sender, format.clone());
159+
let receiver = Receiver::new(self.receiver.transmute(format));
105160

106161
Ok((sender, receiver))
107162
}

0 commit comments

Comments
 (0)