Skip to content

Commit ed1293d

Browse files
committed
wip: integrate binary data into payloads
Previously, the non-binary part of a message and the binary payloads in a message were represented separately: the non-binary portion was represented by a serde_json::Value, and could be converted to an arbitrary data structure. That data structure would not include the binary data or any indication that there is any binary data at all. The binary data would be provided in a Vec<Vec<u8>>. There were a few problems with this: 1. The original code only supported cases where the payload was a flat array with some binary payloads in the root of the array, or a flat object where the root of the object was a binary payload. Objects with more complicated structure and binary data embedded in various places in the structure were not supported. 2. Adding support for the above turned out to not be possible in a useful way, because the ordering of the Vec<Vec<u8>> matters, and it could never be clear where exactly in the possibly-complex structure each binary payload belonged. 3. One of the useful features of the socket.io protocol is that it lets users efficiently transmit binary data in addition to textual/numeric data, and have that handled transparently by the protocol, with either end of the connection believing that they just sent or received a single mixed textual/numeric/binary payload. Separating the non-binary from the binary negates that benefit. This introduces a new type, PayloadValue, that behaves similarly to serde_json::Value. The main difference is that it has a Binary variant, which holds a numeric index and a Vec<u8>. This allows us to include the binary data where the sender of that data intended it to be. There is currently one wrinkle: serde_json does not appear to consistently handle binary data; when serializing a struct with Vec<u8>, I believe it will serialize it as an array of numbers, rather than recognize that it's binary data. For now, I've included a Binary struct that wraps a Vec<u8>, which can be included as the type of a binary member, instead of using a Vec<u8> directly. Hopefully I'll be able to figure out a better way to do this. Unfinished tasks: * Testing: I have no idea if this even works yet. All I've done is get it to compile. * Benchmarking: I've tried to ensure that I don't copy data any more than the existing library does, but it's possible I've introduced some performance regressions, so I'll have to look into that. * Documentation: the documentation still references the old way of doing things and needs to be updated. Closes #276.
1 parent c243f6c commit ed1293d

File tree

8 files changed

+374
-333
lines changed

8 files changed

+374
-333
lines changed

socketioxide/src/ack.rs

+26-33
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,22 @@ use futures::{
1818
Future, Stream,
1919
};
2020
use serde::de::DeserializeOwned;
21-
use serde_json::Value;
2221
use tokio::{sync::oneshot::Receiver, time::Timeout};
2322

24-
use crate::{adapter::Adapter, errors::AckError, extract::SocketRef, packet::Packet, SocketError};
23+
use crate::{
24+
adapter::Adapter, errors::AckError, extract::SocketRef, packet::Packet,
25+
payload_value::PayloadValue, SocketError,
26+
};
2527

2628
/// An acknowledgement sent by the client.
2729
/// It contains the data sent by the client and the binary payloads if there are any.
2830
#[derive(Debug)]
2931
pub struct AckResponse<T> {
3032
/// The data returned by the client
3133
pub data: T,
32-
/// Optional binary payloads.
33-
/// If there is no binary payload, the `Vec` will be empty
34-
pub binary: Vec<Vec<u8>>,
3534
}
3635

37-
pub(crate) type AckResult<T = Value> = Result<AckResponse<T>, AckError<()>>;
36+
pub(crate) type AckResult<T = PayloadValue> = Result<AckResponse<T>, AckError<()>>;
3837

3938
pin_project_lite::pin_project! {
4039
/// A [`Future`] of [`AckResponse`] received from the client with its corresponding [`Sid`].
@@ -127,12 +126,12 @@ pin_project_lite::pin_project! {
127126
pub enum AckInnerStream {
128127
Stream {
129128
#[pin]
130-
rxs: FuturesUnordered<AckResultWithId<Value>>,
129+
rxs: FuturesUnordered<AckResultWithId<PayloadValue>>,
131130
},
132131

133132
Fut {
134133
#[pin]
135-
rx: AckResultWithId<Value>,
134+
rx: AckResultWithId<PayloadValue>,
136135
polled: bool,
137136
},
138137
}
@@ -171,7 +170,7 @@ impl AckInnerStream {
171170

172171
/// Creates a new [`AckInnerStream`] from a [`oneshot::Receiver`](tokio) corresponding to the acknowledgement
173172
/// of a single socket.
174-
pub fn send(rx: Receiver<AckResult<Value>>, duration: Duration, id: Sid) -> Self {
173+
pub fn send(rx: Receiver<AckResult<PayloadValue>>, duration: Duration, id: Sid) -> Self {
175174
AckInnerStream::Fut {
176175
polled: false,
177176
rx: AckResultWithId {
@@ -183,7 +182,7 @@ impl AckInnerStream {
183182
}
184183

185184
impl Stream for AckInnerStream {
186-
type Item = (Sid, AckResult<Value>);
185+
type Item = (Sid, AckResult<PayloadValue>);
187186

188187
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
189188
use InnerProj::*;
@@ -221,7 +220,7 @@ impl FusedStream for AckInnerStream {
221220
}
222221

223222
impl Future for AckInnerStream {
224-
type Output = AckResult<Value>;
223+
type Output = AckResult<PayloadValue>;
225224

226225
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227226
match self.as_mut().poll_next(cx) {
@@ -295,13 +294,11 @@ impl<T> From<AckInnerStream> for AckStream<T> {
295294
}
296295
}
297296

298-
fn map_ack_response<T: DeserializeOwned>(ack: AckResult<Value>) -> AckResult<T> {
297+
fn map_ack_response<T: DeserializeOwned>(ack: AckResult<PayloadValue>) -> AckResult<T> {
299298
ack.and_then(|v| {
300-
serde_json::from_value(v.data)
301-
.map(|data| AckResponse {
302-
data,
303-
binary: v.binary,
304-
})
299+
v.data
300+
.into_data::<T>()
301+
.map(|data| AckResponse { data })
305302
.map_err(|e| e.into())
306303
})
307304
}
@@ -328,12 +325,12 @@ mod test {
328325
async fn broadcast_ack() {
329326
let socket = create_socket();
330327
let socket2 = create_socket();
331-
let mut packet = Packet::event("/", "test", "test".into());
328+
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
332329
packet.inner.set_ack_id(1);
333330
let socks = vec![socket.clone().into(), socket2.clone().into()];
334331
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();
335332

336-
let res_packet = Packet::ack("test", "test".into(), 1);
333+
let res_packet = Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1);
337334
socket.recv(res_packet.inner.clone()).unwrap();
338335
socket2.recv(res_packet.inner).unwrap();
339336

@@ -351,8 +348,7 @@ mod test {
351348
let stream: AckStream<String> =
352349
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
353350
tx.send(Ok(AckResponse {
354-
data: Value::String("test".into()),
355-
binary: vec![],
351+
data: PayloadValue::String("test".into()),
356352
}))
357353
.unwrap();
358354

@@ -372,8 +368,7 @@ mod test {
372368
let stream: AckStream<String> =
373369
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
374370
tx.send(Ok(AckResponse {
375-
data: Value::String("test".into()),
376-
binary: vec![],
371+
data: PayloadValue::String("test".into()),
377372
}))
378373
.unwrap();
379374

@@ -384,12 +379,12 @@ mod test {
384379
async fn broadcast_ack_with_deserialize_error() {
385380
let socket = create_socket();
386381
let socket2 = create_socket();
387-
let mut packet = Packet::event("/", "test", "test".into());
382+
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
388383
packet.inner.set_ack_id(1);
389384
let socks = vec![socket.clone().into(), socket2.clone().into()];
390385
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();
391386

392-
let res_packet = Packet::ack("test", 132.into(), 1);
387+
let res_packet = Packet::ack("test", PayloadValue::from_data(132).unwrap(), 1);
393388
socket.recv(res_packet.inner.clone()).unwrap();
394389
socket2.recv(res_packet.inner).unwrap();
395390

@@ -413,8 +408,7 @@ mod test {
413408
let stream: AckStream<String> =
414409
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
415410
tx.send(Ok(AckResponse {
416-
data: Value::Bool(true),
417-
binary: vec![],
411+
data: PayloadValue::Bool(true),
418412
}))
419413
.unwrap();
420414
assert_eq!(stream.size_hint().0, 1);
@@ -436,8 +430,7 @@ mod test {
436430
let stream: AckStream<String> =
437431
AckInnerStream::send(rx, Duration::from_secs(1), sid).into();
438432
tx.send(Ok(AckResponse {
439-
data: Value::Bool(true),
440-
binary: vec![],
433+
data: PayloadValue::Bool(true),
441434
}))
442435
.unwrap();
443436

@@ -448,12 +441,12 @@ mod test {
448441
async fn broadcast_ack_with_closed_socket() {
449442
let socket = create_socket();
450443
let socket2 = create_socket();
451-
let mut packet = Packet::event("/", "test", "test".into());
444+
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
452445
packet.inner.set_ack_id(1);
453446
let socks = vec![socket.clone().into(), socket2.clone().into()];
454447
let stream: AckStream<String> = AckInnerStream::broadcast(packet, socks, None).into();
455448

456-
let res_packet = Packet::ack("test", "test".into(), 1);
449+
let res_packet = Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1);
457450
socket.clone().recv(res_packet.inner.clone()).unwrap();
458451

459452
futures::pin_mut!(stream);
@@ -503,14 +496,14 @@ mod test {
503496
async fn broadcast_ack_with_timeout() {
504497
let socket = create_socket();
505498
let socket2 = create_socket();
506-
let mut packet = Packet::event("/", "test", "test".into());
499+
let mut packet = Packet::event("/", "test", PayloadValue::from_data("test").unwrap());
507500
packet.inner.set_ack_id(1);
508501
let socks = vec![socket.clone().into(), socket2.clone().into()];
509502
let stream: AckStream<String> =
510503
AckInnerStream::broadcast(packet, socks, Some(Duration::from_millis(10))).into();
511504

512505
socket
513-
.recv(Packet::ack("test", "test".into(), 1).inner)
506+
.recv(Packet::ack("test", PayloadValue::from_data("test").unwrap(), 1).inner)
514507
.unwrap();
515508

516509
futures::pin_mut!(stream);

socketioxide/src/errors.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ pub enum Error {
1111
#[error("invalid packet type")]
1212
InvalidPacketType,
1313

14+
#[error("invalid binary payload count")]
15+
InvalidPayloadCount,
16+
1417
#[error("invalid event name")]
1518
InvalidEventName,
1619

@@ -165,9 +168,10 @@ impl From<&Error> for Option<EIoDisconnectReason> {
165168
use EIoDisconnectReason::*;
166169
match value {
167170
Error::SocketGone(_) => Some(TransportClose),
168-
Error::Serialize(_) | Error::InvalidPacketType | Error::InvalidEventName => {
169-
Some(PacketParsingError)
170-
}
171+
Error::Serialize(_)
172+
| Error::InvalidPacketType
173+
| Error::InvalidEventName
174+
| Error::InvalidPayloadCount => Some(PacketParsingError),
171175
Error::Adapter(_) | Error::InvalidNamespace => None,
172176
}
173177
}

0 commit comments

Comments
 (0)