Skip to content

Commit 7dd7a1b

Browse files
authored
Merge branch 'main' into devel-socketref-clone
2 parents 3cf6791 + fc599c3 commit 7dd7a1b

File tree

28 files changed

+940
-226
lines changed

28 files changed

+940
-226
lines changed

.github/workflows/bench.yml

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: Bench
2+
3+
on:
4+
workflow_dispatch:
5+
inputs:
6+
heaptrack:
7+
description: 'Run heaptrack memory benchmark'
8+
required: true
9+
default: false
10+
type: boolean
11+
12+
jobs:
13+
heaptrack:
14+
if: ${{ github.event.inputs.heaptrack == 'true' }}
15+
runs-on: ubuntu-latest
16+
steps:
17+
- uses: actions/checkout@v4
18+
- uses: dtolnay/rust-toolchain@master
19+
with:
20+
toolchain: stable
21+
- uses: actions/cache@v4
22+
with:
23+
path: |
24+
~/.cargo/bin/
25+
~/.cargo/registry/index/
26+
~/.cargo/registry/cache/
27+
~/.cargo/git/db/
28+
target/
29+
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-heaptrack
30+
- name: Install heaptrack
31+
run: sudo apt-get install -y heaptrack
32+
- name: Build server && client
33+
run: cargo build -r -p heaptrack && cargo build -r -p heaptrack --bin heaptrack-client
34+
- name: Run memory benchmark
35+
run: heaptrack target/release/heaptrack > server.txt & ./target/release/heaptrack-client > client.txt
36+
- name: Server output
37+
if: always()
38+
run: cat server.txt
39+
- name: Client output
40+
if: always()
41+
run: cat client.txt
42+
- name: Publish memory benchmark
43+
uses: actions/upload-artifact@v4
44+
with:
45+
name: heaptrack-${{ github.head_ref }}.${{ github.sha }}
46+
path: heaptrack.heaptrack.*

.github/workflows/github-ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,4 @@ jobs:
251251
run: cat server.txt
252252
- name: Client output
253253
if: always()
254-
run: cat client.txt
254+
run: cat client.txt

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
target
22
Cargo.lock
33
.env
4-
.vscode
4+
.vscode
5+
*.gz

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
# 0.11.0
2+
## socketioxide
3+
* fix: a panic was raised sometimes under heavy traffic with socketio v5 when the connect timeout handler is destroyed but that the chan sender is still alive.
4+
* **(Breaking)**: Emit errors now contains the provided data if there is an issue with the internal channel (for example if it is full) or if the socket closed.
5+
* **(Breaking)**: Operators are now splitted between `Operators` and `BroadcastOperators` in order to split logic and fn signatures between broadcast and non-broadcast operators.
6+
7+
## engineioxide
8+
* fix #277: with engine.io v3, the message byte prefix `0x4` was not added to the binary payload with `ws` transport.
9+
* bump dependency `base64` to 0.22.0.
10+
111
# 0.10.2
212
## socketioxide
313
* New [`rooms`](https://docs.rs/socketioxide/latest/socketioxide/struct.SocketIo.html#method.rooms) fn to get all the rooms of a namespace.

Cargo.toml

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
[workspace.package]
2-
version = "0.10.2"
2+
version = "0.11.0"
33
edition = "2021"
44
rust-version = "1.67.0"
55
authors = ["Théodore Prévot <"]
66
repository = "https://github.com/totodore/socketioxide"
77
homepage = "https://github.com/totodore/socketioxide"
88
keywords = ["socketio", "tower", "axum", "hyper", "websocket"]
99
categories = [
10-
"asynchronous",
11-
"network-programming",
12-
"web-programming::websocket",
10+
"asynchronous",
11+
"network-programming",
12+
"web-programming::websocket",
1313
]
1414
license = "MIT"
1515

@@ -39,5 +39,5 @@ pin-project-lite = "0.2.13"
3939
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
4040
criterion = { version = "0.5.1", features = ["html_reports"] }
4141
axum = "0.7.2"
42-
salvo = { version = "0.65.0", features = ["tower-compat"] }
42+
salvo = { version = "0.66.0", features = ["tower-compat"] }
4343
rust_socketio = { version = "0.4.2", features = ["async"] }

e2e/heaptrack/.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*.gz
2+
client/node_modules
3+
memory_usage.svg

e2e/heaptrack/Cargo.toml

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "heaptrack"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
socketioxide = { path = "../../socketioxide" }
10+
hyper = { workspace = true, features = ["server", "http1", "http2"] }
11+
hyper-util = { workspace = true, features = ["tokio"] }
12+
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
13+
rust_socketio = { version = "0.4.2", features = ["async"] }
14+
serde_json = "1.0.68"
15+
rand = "0.8.4"
16+
17+
[[bin]]
18+
name = "heaptrack-client"
19+
path = "src/client.rs"

e2e/heaptrack/README.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Memory usage benchmark
2+
## Based on the official implementation : https://socket.io/docs/v4/memory-usage/
3+
4+
The goal of this program is to benchmark the memory usage of the socket.io server under different conditions.
5+
The server can be configured to generated its own reports with the `custom-report` feature flag.
6+
7+
The best way is still to run the program with (heaptrack)[https://github.com/KDE/heaptrack] and analyze the results with the `heaptrack_gui` tool.

e2e/heaptrack/src/client.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use std::{pin::Pin, time::Duration};
2+
3+
use rust_socketio::{
4+
asynchronous::{Client, ClientBuilder},
5+
Payload,
6+
};
7+
8+
const PING_INTERVAL: Duration = Duration::from_millis(1000);
9+
const POLLING_PERCENTAGE: f32 = 0.05;
10+
const MAX_CLIENT: usize = 200;
11+
12+
fn cb(_: Payload, socket: Client) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
13+
Box::pin(async move {
14+
tokio::spawn(async move {
15+
let mut inter = tokio::time::interval(PING_INTERVAL);
16+
loop {
17+
inter.tick().await;
18+
let _ = socket.emit("ping", serde_json::Value::Null).await;
19+
let _ = socket
20+
.emit("ping", (0..u8::MAX).into_iter().collect::<Vec<u8>>())
21+
.await;
22+
}
23+
});
24+
})
25+
}
26+
#[tokio::main]
27+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
28+
tokio::spawn(async move {
29+
for _ in 0..MAX_CLIENT {
30+
let random: f32 = rand::random();
31+
let transport_type = if POLLING_PERCENTAGE > random {
32+
rust_socketio::TransportType::Polling
33+
} else {
34+
rust_socketio::TransportType::WebsocketUpgrade
35+
};
36+
// get a socket that is connected to the admin namespace
37+
ClientBuilder::new("http://localhost:3000/")
38+
.transport_type(transport_type)
39+
.namespace("/")
40+
.on("open", cb)
41+
.on("error", |err, _| {
42+
Box::pin(async move { eprintln!("Error: {:#?}", err) })
43+
})
44+
.connect()
45+
.await
46+
.expect("Connection failed");
47+
}
48+
});
49+
tokio::time::sleep(Duration::from_secs(60)).await;
50+
51+
Ok(())
52+
}

e2e/heaptrack/src/main.rs

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use hyper::server::conn::http1;
2+
use hyper_util::rt::TokioIo;
3+
use socketioxide::{extract::SocketRef, SocketIo};
4+
use std::{net::SocketAddr, time::Duration};
5+
use tokio::net::TcpListener;
6+
7+
fn on_connect(socket: SocketRef) {
8+
socket.on("ping", |s: SocketRef| {
9+
s.emit("pong", ()).ok();
10+
});
11+
}
12+
13+
#[tokio::main]
14+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
15+
let (svc, io) = SocketIo::new_svc();
16+
17+
io.ns("/", on_connect);
18+
19+
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
20+
let listener = TcpListener::bind(addr).await?;
21+
22+
tokio::spawn(async move {
23+
tokio::time::sleep(Duration::from_secs(60)).await;
24+
std::process::exit(0);
25+
});
26+
27+
loop {
28+
let (stream, _) = listener.accept().await?;
29+
30+
let io = TokioIo::new(stream);
31+
let svc = svc.clone();
32+
33+
tokio::task::spawn(async move {
34+
http1::Builder::new()
35+
.serve_connection(io, svc)
36+
.with_upgrades()
37+
.await
38+
.ok()
39+
});
40+
}
41+
}

e2e/socketioxide/socketioxide.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<Value>) {
3939
|s: SocketRef, Data::<Value>(data), Bin(bin)| async move {
4040
let ack = s
4141
.bin(bin)
42-
.emit_with_ack::<Value>("emit-with-ack", data)
42+
.emit_with_ack::<_, Value>("emit-with-ack", data)
4343
.unwrap()
4444
.await
4545
.unwrap();

engineioxide/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ http-body-util.workspace = true
3333
pin-project-lite.workspace = true
3434
hyper-util = { workspace = true, features = ["tokio"] }
3535

36-
base64 = "0.21.0"
36+
base64 = "0.22.0"
3737
bytes = "1.4.0"
3838
rand = "0.8.5"
3939

engineioxide/src/socket.rs

+52
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,47 @@ impl From<&Error> for Option<DisconnectReason> {
115115
}
116116
}
117117

118+
/// A permit to emit a message to the client.
119+
/// A permit holds a place in the internal channel to send one packet to the client.
120+
pub struct Permit<'a> {
121+
inner: mpsc::Permit<'a, Packet>,
122+
}
123+
impl Permit<'_> {
124+
/// Consume the permit and emit a message to the client.
125+
#[inline]
126+
pub fn emit(self, msg: String) {
127+
self.inner.send(Packet::Message(msg));
128+
}
129+
/// Consume the permit and emit a binary message to the client.
130+
#[inline]
131+
pub fn emit_binary(self, data: Vec<u8>) {
132+
self.inner.send(Packet::Binary(data));
133+
}
134+
}
135+
136+
/// An [`Iterator`] over the permits returned by the [`reserve`](Socket::reserve) function
137+
#[derive(Debug)]
138+
pub struct PermitIterator<'a> {
139+
inner: mpsc::PermitIterator<'a, Packet>,
140+
}
141+
142+
impl<'a> Iterator for PermitIterator<'a> {
143+
type Item = Permit<'a>;
144+
145+
#[inline]
146+
fn next(&mut self) -> Option<Self::Item> {
147+
let inner = self.inner.next()?;
148+
Some(Permit { inner })
149+
}
150+
}
151+
impl ExactSizeIterator for PermitIterator<'_> {
152+
#[inline]
153+
fn len(&self) -> usize {
154+
self.inner.len()
155+
}
156+
}
157+
impl std::iter::FusedIterator for PermitIterator<'_> {}
158+
118159
/// A [`Socket`] represents a client connection to the server.
119160
/// It is agnostic to the [`TransportType`].
120161
///
@@ -348,6 +389,17 @@ where
348389
TransportType::from(self.transport.load(Ordering::Relaxed))
349390
}
350391

392+
/// Reserve `n` permits to emit multiple messages and ensure that there is enough
393+
/// space in the internal chan.
394+
///
395+
/// If the internal chan is full, the function will return a [`TrySendError::Full`] error.
396+
/// If the socket is closed, the function will return a [`TrySendError::Closed`] error.
397+
#[inline]
398+
pub fn reserve(&self, n: usize) -> Result<PermitIterator<'_>, TrySendError<()>> {
399+
let inner = self.internal_tx.try_reserve_many(n)?;
400+
Ok(PermitIterator { inner })
401+
}
402+
351403
/// Emits a message to the client.
352404
///
353405
/// If the transport is in websocket mode, the message is directly sent as a text frame.

engineioxide/src/transport/ws.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,11 @@ where
182182
}
183183
p => return Err(Error::BadPacket(p)),
184184
},
185-
Message::Binary(data) => {
185+
Message::Binary(mut data) => {
186+
if socket.protocol == ProtocolVersion::V3 && !data.is_empty() {
187+
// The first byte is the message type, which we don't need.
188+
let _ = data.remove(0);
189+
}
186190
engine.handler.on_binary(data, socket.clone());
187191
Ok(())
188192
}
@@ -212,7 +216,11 @@ where
212216
macro_rules! map_fn {
213217
($item:ident) => {
214218
let res = match $item {
215-
Packet::Binary(bin) | Packet::BinaryV3(bin) => {
219+
Packet::Binary(mut bin) | Packet::BinaryV3(mut bin) => {
220+
if socket.protocol == ProtocolVersion::V3 {
221+
// v3 protocol requires packet type as the first byte
222+
bin.insert(0, 0x04);
223+
}
216224
tx.feed(Message::Binary(bin)).await
217225
}
218226
Packet::Close => {

examples/loco-rooms-chat/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ rust-version = "1.70" # required by loco
66

77
[dependencies]
88

9-
loco-rs = { version = "0.2.3", default-features = false, features = [
9+
loco-rs = { version = "0.3.1", default-features = false, features = [
1010
"cli",
1111
"channels",
1212
] }

examples/private-messaging/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ tracing-subscriber.workspace = true
1818
tracing.workspace = true
1919
serde.workspace = true
2020
serde_json.workspace = true
21+
anyhow = "1.0"

0 commit comments

Comments
 (0)