Skip to content

Commit fb24479

Browse files
authored
fix(engineio/socket): atomically adjacent packet requirement for binary payloads (#287)
* fix(engineio/socket): Send bufferred adjacent packets * fix(socketio/socket): use new `send_many` to ensure adjacent bin payload * feat(socketio/socket): use `reserve` rather than `reserve_many` and `Iterator` for `Permtis` * test(socketio/socket): add test for `concurrent_emit`
1 parent 4ffd473 commit fb24479

File tree

11 files changed

+249
-159
lines changed

11 files changed

+249
-159
lines changed

engineioxide/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
3636
base64 = "0.22.0"
3737
bytes = "1.4.0"
3838
rand = "0.8.5"
39+
smallvec = { version = "1.13.1", features = ["union"] }
3940

4041
# Tracing
4142
tracing = { workspace = true, optional = true }

engineioxide/src/socket.rs

+33-34
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use std::{
6262
};
6363

6464
use http::request::Parts;
65+
use smallvec::{smallvec, SmallVec};
6566
use tokio::{
6667
sync::{
6768
mpsc::{self},
@@ -118,44 +119,35 @@ impl From<&Error> for Option<DisconnectReason> {
118119
/// A permit to emit a message to the client.
119120
/// A permit holds a place in the internal channel to send one packet to the client.
120121
pub struct Permit<'a> {
121-
inner: mpsc::Permit<'a, Packet>,
122+
inner: mpsc::Permit<'a, PacketBuf>,
122123
}
123124
impl Permit<'_> {
124125
/// Consume the permit and emit a message to the client.
125126
#[inline]
126127
pub fn emit(self, msg: String) {
127-
self.inner.send(Packet::Message(msg));
128+
self.inner.send(smallvec![Packet::Message(msg)]);
128129
}
129130
/// Consume the permit and emit a binary message to the client.
130131
#[inline]
131132
pub fn emit_binary(self, data: Vec<u8>) {
132-
self.inner.send(Packet::Binary(data));
133+
self.inner.send(smallvec![Packet::Binary(data)]);
133134
}
134-
}
135-
136-
/// An [`Iterator`] over the permits returned by the [`reserve`](Socket::reserve) function
137-
#[derive(Debug)]
138-
pub struct PermitIterator<'a> {
139-
inner: mpsc::PermitIterator<'a, Packet>,
140-
}
141135

142-
impl<'a> Iterator for PermitIterator<'a> {
143-
type Item = Permit<'a>;
144-
145-
#[inline]
146-
fn next(&mut self) -> Option<Self::Item> {
147-
let inner = self.inner.next()?;
148-
Some(Permit { inner })
149-
}
150-
}
151-
impl ExactSizeIterator for PermitIterator<'_> {
152-
#[inline]
153-
fn len(&self) -> usize {
154-
self.inner.len()
136+
/// Consume the permit and emit a message with multiple binary data to the client.
137+
///
138+
/// It can be used to ensure atomicity when sending a string packet with adjacent binary packets.
139+
pub fn emit_many(self, msg: String, data: Vec<Vec<u8>>) {
140+
let mut packets = SmallVec::with_capacity(data.len() + 1);
141+
packets.push(Packet::Message(msg));
142+
for d in data {
143+
packets.push(Packet::Binary(d));
144+
}
145+
self.inner.send(packets);
155146
}
156147
}
157-
impl std::iter::FusedIterator for PermitIterator<'_> {}
158148

149+
/// Buffered packets to send to the client
150+
pub(crate) type PacketBuf = SmallVec<[Packet; 10]>;
159151
/// A [`Socket`] represents a client connection to the server.
160152
/// It is agnostic to the [`TransportType`].
161153
///
@@ -179,7 +171,7 @@ where
179171
/// without any mutex
180172
transport: AtomicU8,
181173

182-
/// Channel to receive [`Packet`] from the connection
174+
/// Channel to send [`PacketBuf`] to the connection
183175
///
184176
/// It is used and managed by the [`EngineIo`](crate::engine) struct depending on the transport type
185177
///
@@ -192,10 +184,12 @@ where
192184
/// * From the fn [`on_ws_req_init`](crate::engine::EngineIo) if the transport is websocket
193185
/// * Automatically via the [`close_session fn`](crate::engine::EngineIo::close_session) as a fallback.
194186
/// Because with polling transport, if the client is not currently polling then the encoder will never be able to close the channel
195-
pub(crate) internal_rx: Mutex<PeekableReceiver<Packet>>,
187+
///
188+
/// The channel is made of a [`SmallVec`] of [`Packet`]s so that adjacent packets can be sent atomically.
189+
pub(crate) internal_rx: Mutex<PeekableReceiver<PacketBuf>>,
196190

197-
/// Channel to send [Packet] to the internal connection
198-
internal_tx: mpsc::Sender<Packet>,
191+
/// Channel to send [PacketBuf] to the internal connection
192+
internal_tx: mpsc::Sender<PacketBuf>,
199193

200194
/// Internal channel to receive Pong [`Packets`](Packet) (v4 protocol) or Ping (v3 protocol) in the heartbeat job
201195
/// which is running in a separate task
@@ -266,7 +260,12 @@ where
266260
pub(crate) fn send(&self, packet: Packet) -> Result<(), TrySendError<Packet>> {
267261
#[cfg(feature = "tracing")]
268262
tracing::debug!("[sid={}] sending packet: {:?}", self.id, packet);
269-
self.internal_tx.try_send(packet)?;
263+
self.internal_tx
264+
.try_send(smallvec![packet])
265+
.map_err(|p| match p {
266+
TrySendError::Full(mut p) => TrySendError::Full(p.pop().unwrap()),
267+
TrySendError::Closed(mut p) => TrySendError::Closed(p.pop().unwrap()),
268+
})?;
270269
Ok(())
271270
}
272271

@@ -334,7 +333,7 @@ where
334333
heartbeat_rx.try_recv().ok();
335334

336335
self.internal_tx
337-
.try_send(Packet::Ping)
336+
.try_send(smallvec![Packet::Ping])
338337
.map_err(|_| Error::HeartbeatTimeout)?;
339338
tokio::time::timeout(timeout, heartbeat_rx.recv())
340339
.await
@@ -363,7 +362,7 @@ where
363362
#[cfg(feature = "tracing")]
364363
tracing::debug!("[sid={}] ping received, sending pong", self.id);
365364
self.internal_tx
366-
.try_send(Packet::Pong)
365+
.try_send(smallvec![Packet::Pong])
367366
.map_err(|_| Error::HeartbeatTimeout)?;
368367
}
369368
}
@@ -395,9 +394,9 @@ where
395394
/// If the internal chan is full, the function will return a [`TrySendError::Full`] error.
396395
/// If the socket is closed, the function will return a [`TrySendError::Closed`] error.
397396
#[inline]
398-
pub fn reserve(&self, n: usize) -> Result<PermitIterator<'_>, TrySendError<()>> {
399-
let inner = self.internal_tx.try_reserve_many(n)?;
400-
Ok(PermitIterator { inner })
397+
pub fn reserve(&self) -> Result<Permit<'_>, TrySendError<()>> {
398+
let permit = self.internal_tx.try_reserve()?;
399+
Ok(Permit { inner: permit })
401400
}
402401

403402
/// Emits a message to the client.

0 commit comments

Comments
 (0)