Skip to content

Commit c1777c9

Browse files
authored
fix(engineio/socket): Send bufferred adjacent packets
1 parent 4ffd473 commit c1777c9

File tree

5 files changed

+155
-89
lines changed

5 files changed

+155
-89
lines changed

engineioxide/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
3636
base64 = "0.22.0"
3737
bytes = "1.4.0"
3838
rand = "0.8.5"
39+
smallvec = { version = "1.13.1", features = ["union"] }
3940

4041
# Tracing
4142
tracing = { workspace = true, optional = true }

engineioxide/src/socket.rs

+33-11
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use std::{
6262
};
6363

6464
use http::request::Parts;
65+
use smallvec::{smallvec, SmallVec};
6566
use tokio::{
6667
sync::{
6768
mpsc::{self},
@@ -118,25 +119,37 @@ impl From<&Error> for Option<DisconnectReason> {
118119
/// A permit to emit a message to the client.
119120
/// A permit holds a place in the internal channel to send one packet to the client.
120121
pub struct Permit<'a> {
121-
inner: mpsc::Permit<'a, Packet>,
122+
inner: mpsc::Permit<'a, PacketBuf>,
122123
}
123124
impl Permit<'_> {
124125
/// Consume the permit and emit a message to the client.
125126
#[inline]
126127
pub fn emit(self, msg: String) {
127-
self.inner.send(Packet::Message(msg));
128+
self.inner.send(smallvec![Packet::Message(msg)]);
128129
}
129130
/// Consume the permit and emit a binary message to the client.
130131
#[inline]
131132
pub fn emit_binary(self, data: Vec<u8>) {
132-
self.inner.send(Packet::Binary(data));
133+
self.inner.send(smallvec![Packet::Binary(data)]);
134+
}
135+
136+
/// Consume the permit and emit a message with multiple binary data to the client.
137+
///
138+
/// It can be used to ensure atomicity when sending a string packet with adjacent binary packets.
139+
pub fn emit_many(self, msg: String, data: Vec<Vec<u8>>) {
140+
let mut packets = SmallVec::with_capacity(data.len() + 1);
141+
packets.push(Packet::Message(msg));
142+
for d in data {
143+
packets.push(Packet::Binary(d));
144+
}
145+
self.inner.send(packets);
133146
}
134147
}
135148

136149
/// An [`Iterator`] over the permits returned by the [`reserve`](Socket::reserve) function
137150
#[derive(Debug)]
138151
pub struct PermitIterator<'a> {
139-
inner: mpsc::PermitIterator<'a, Packet>,
152+
inner: mpsc::PermitIterator<'a, PacketBuf>,
140153
}
141154

142155
impl<'a> Iterator for PermitIterator<'a> {
@@ -156,6 +169,8 @@ impl ExactSizeIterator for PermitIterator<'_> {
156169
}
157170
impl std::iter::FusedIterator for PermitIterator<'_> {}
158171

172+
/// Buffered packets to send to the client
173+
pub(crate) type PacketBuf = SmallVec<[Packet; 10]>;
159174
/// A [`Socket`] represents a client connection to the server.
160175
/// It is agnostic to the [`TransportType`].
161176
///
@@ -179,7 +194,7 @@ where
179194
/// without any mutex
180195
transport: AtomicU8,
181196

182-
/// Channel to receive [`Packet`] from the connection
197+
/// Channel to send [`PacketBuf`] to the connection
183198
///
184199
/// It is used and managed by the [`EngineIo`](crate::engine) struct depending on the transport type
185200
///
@@ -192,10 +207,12 @@ where
192207
/// * From the fn [`on_ws_req_init`](crate::engine::EngineIo) if the transport is websocket
193208
/// * Automatically via the [`close_session fn`](crate::engine::EngineIo::close_session) as a fallback.
194209
/// Because with polling transport, if the client is not currently polling then the encoder will never be able to close the channel
195-
pub(crate) internal_rx: Mutex<PeekableReceiver<Packet>>,
210+
///
211+
/// The channel is made of a [`SmallVec`] of [`Packet`]s so that adjacent packets can be sent atomically.
212+
pub(crate) internal_rx: Mutex<PeekableReceiver<PacketBuf>>,
196213

197-
/// Channel to send [Packet] to the internal connection
198-
internal_tx: mpsc::Sender<Packet>,
214+
/// Channel to send [PacketBuf] to the internal connection
215+
internal_tx: mpsc::Sender<PacketBuf>,
199216

200217
/// Internal channel to receive Pong [`Packets`](Packet) (v4 protocol) or Ping (v3 protocol) in the heartbeat job
201218
/// which is running in a separate task
@@ -266,7 +283,12 @@ where
266283
pub(crate) fn send(&self, packet: Packet) -> Result<(), TrySendError<Packet>> {
267284
#[cfg(feature = "tracing")]
268285
tracing::debug!("[sid={}] sending packet: {:?}", self.id, packet);
269-
self.internal_tx.try_send(packet)?;
286+
self.internal_tx
287+
.try_send(smallvec![packet])
288+
.map_err(|p| match p {
289+
TrySendError::Full(mut p) => TrySendError::Full(p.pop().unwrap()),
290+
TrySendError::Closed(mut p) => TrySendError::Closed(p.pop().unwrap()),
291+
})?;
270292
Ok(())
271293
}
272294

@@ -334,7 +356,7 @@ where
334356
heartbeat_rx.try_recv().ok();
335357

336358
self.internal_tx
337-
.try_send(Packet::Ping)
359+
.try_send(smallvec![Packet::Ping])
338360
.map_err(|_| Error::HeartbeatTimeout)?;
339361
tokio::time::timeout(timeout, heartbeat_rx.recv())
340362
.await
@@ -363,7 +385,7 @@ where
363385
#[cfg(feature = "tracing")]
364386
tracing::debug!("[sid={}] ping received, sending pong", self.id);
365387
self.internal_tx
366-
.try_send(Packet::Pong)
388+
.try_send(smallvec![Packet::Pong])
367389
.map_err(|_| Error::HeartbeatTimeout)?;
368390
}
369391
}

0 commit comments

Comments
 (0)