Skip to content

Commit a3b5d95

Browse files
authored
chore(e2e): switch the client from JS to rust
1 parent 3c47e4e commit a3b5d95

File tree

7 files changed

+64
-273
lines changed

7 files changed

+64
-273
lines changed

.github/workflows/github-ci.yml

+3-8
Original file line numberDiff line numberDiff line change
@@ -268,18 +268,13 @@ jobs:
268268
~/.cargo/registry/cache/
269269
~/.cargo/git/db/
270270
target/
271-
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
271+
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-heaptrack
272272
- name: Install heaptrack
273273
run: sudo apt-get install -y heaptrack
274-
- uses: actions/setup-node@v4
275-
with:
276-
node-version: 16
277-
- name: Install client deps
278-
run: cd e2e/heaptrack/client && npm install && cd ../../..
279-
- name: Build server
274+
- name: Build server && client
280275
run: cargo build -r -p heaptrack
281276
- name: Run memory benchmark
282-
run: heaptrack target/release/heaptrack & npm --prefix e2e/heaptrack/client start
277+
run: heaptrack target/release/heaptrack & sleep 1 && ./target/release/heaptrack-client && kill -SIGINT $!
283278
- name: Publish memory benchmark
284279
uses: actions/upload-artifact@v2
285280
with:

e2e/heaptrack/Cargo.toml

+6-5
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ socketioxide = { path = "../../socketioxide" }
1010
hyper = { workspace = true, features = ["server", "http1", "http2"] }
1111
hyper-util = { workspace = true, features = ["tokio"] }
1212
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
13-
tracing-subscriber = { workspace = true, optional = true }
14-
tracing = { workspace = true, optional = true }
15-
charts-rs = { version = "0.3.3", optional = true }
13+
rust_socketio = { version = "0.4.2", features = ["async"] }
14+
serde_json = "1.0.68"
15+
rand = "0.8.4"
1616

17-
[features]
18-
custom-report = ["socketioxide/tracing", "tracing", "tracing-subscriber"]
17+
[[bin]]
18+
name = "heaptrack-client"
19+
path = "src/client.rs"

e2e/heaptrack/client/main.js

-41
This file was deleted.

e2e/heaptrack/client/package-lock.json

-114
This file was deleted.

e2e/heaptrack/client/package.json

-16
This file was deleted.

e2e/heaptrack/src/client.rs

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use std::{pin::Pin, time::Duration};
2+
3+
use rust_socketio::{
4+
asynchronous::{Client, ClientBuilder},
5+
Payload,
6+
};
7+
8+
const PING_INTERVAL: Duration = Duration::from_millis(1000);
9+
const POLLING_PERCENTAGE: f32 = 0.05;
10+
const MAX_CLIENT: usize = 200;
11+
12+
fn cb(_: Payload, socket: Client) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
13+
Box::pin(async move {
14+
tokio::spawn(async move {
15+
let mut inter = tokio::time::interval(PING_INTERVAL);
16+
loop {
17+
inter.tick().await;
18+
let _ = socket.emit("ping", serde_json::Value::Null).await;
19+
let _ = socket
20+
.emit("ping", (0..u8::MAX).into_iter().collect::<Vec<u8>>())
21+
.await;
22+
}
23+
});
24+
})
25+
}
26+
#[tokio::main]
27+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
28+
tokio::spawn(async move {
29+
for _ in 0..MAX_CLIENT {
30+
let random: f32 = rand::random();
31+
let transport_type = if POLLING_PERCENTAGE > random {
32+
rust_socketio::TransportType::Polling
33+
} else {
34+
rust_socketio::TransportType::WebsocketUpgrade
35+
};
36+
// get a socket that is connected to the admin namespace
37+
ClientBuilder::new("http://localhost:3000/")
38+
.transport_type(transport_type)
39+
.namespace("/")
40+
.on("open", cb)
41+
.on("error", |err, _| {
42+
Box::pin(async move { eprintln!("Error: {:#?}", err) })
43+
})
44+
.connect()
45+
.await
46+
.expect("Connection failed");
47+
}
48+
});
49+
tokio::time::sleep(Duration::from_secs(60)).await;
50+
51+
Ok(())
52+
}

e2e/heaptrack/src/main.rs

+3-89
Original file line numberDiff line numberDiff line change
@@ -1,110 +1,24 @@
1-
#[cfg(feature = "custom-report")]
2-
use charts_rs::LineChart;
31
use hyper::server::conn::http1;
42
use hyper_util::rt::TokioIo;
5-
use socketioxide::{
6-
extract::{AckSender, SocketRef},
7-
SocketIo,
8-
};
3+
use socketioxide::{extract::SocketRef, SocketIo};
94
use std::net::SocketAddr;
10-
#[cfg(feature = "custom-report")]
11-
use std::sync::{atomic::AtomicUsize, Mutex};
12-
#[cfg(feature = "custom-report")]
13-
use std::time::Instant;
145
use tokio::net::TcpListener;
15-
#[cfg(feature = "custom-report")]
16-
use tracing::Level;
17-
#[cfg(feature = "custom-report")]
18-
use tracing_subscriber::FmtSubscriber;
19-
20-
use std::alloc::{GlobalAlloc, Layout, System};
21-
22-
struct MyAllocator;
23-
24-
#[cfg(feature = "custom-report")]
25-
static CURRENT: AtomicUsize = AtomicUsize::new(0);
26-
#[cfg(feature = "custom-report")]
27-
static MEMORY: Mutex<Vec<(Instant, usize, usize)>> = Mutex::new(Vec::new());
28-
#[cfg(feature = "custom-report")]
29-
static SOCKETS_CNT: AtomicUsize = AtomicUsize::new(0);
30-
31-
unsafe impl GlobalAlloc for MyAllocator {
32-
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
33-
#[cfg(feature = "custom-report")]
34-
CURRENT.fetch_add(layout.size(), std::sync::atomic::Ordering::Relaxed);
35-
System.alloc(layout)
36-
}
37-
38-
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
39-
#[cfg(feature = "custom-report")]
40-
CURRENT.fetch_sub(layout.size(), std::sync::atomic::Ordering::Relaxed);
41-
System.dealloc(ptr, layout)
42-
}
43-
}
44-
45-
#[global_allocator]
46-
static GLOBAL: MyAllocator = MyAllocator;
476

487
fn on_connect(socket: SocketRef) {
49-
#[cfg(feature = "custom-report")]
50-
SOCKETS_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
51-
socket.on("ping", |ack: AckSender| {
52-
ack.send("pong").ok();
53-
});
54-
socket.on_disconnect(|| {
55-
#[cfg(feature = "custom-report")]
56-
SOCKETS_CNT.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
8+
socket.on("ping", |s: SocketRef| {
9+
s.emit("pong", ()).ok();
5710
});
5811
}
5912

6013
#[tokio::main]
6114
async fn main() -> Result<(), Box<dyn std::error::Error>> {
6215
let (svc, io) = SocketIo::new_svc();
63-
#[cfg(feature = "custom-report")]
64-
{
65-
let subscriber = FmtSubscriber::builder()
66-
.with_line_number(true)
67-
.with_max_level(Level::TRACE)
68-
.finish();
69-
tracing::subscriber::set_global_default(subscriber)?;
70-
}
7116

7217
io.ns("/", on_connect);
7318

7419
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
7520
let listener = TcpListener::bind(addr).await?;
7621

77-
#[cfg(feature = "custom-report")]
78-
tokio::task::spawn(async move {
79-
loop {
80-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
81-
MEMORY.lock().unwrap().push((
82-
Instant::now(),
83-
SOCKETS_CNT.load(std::sync::atomic::Ordering::Relaxed),
84-
CURRENT.load(std::sync::atomic::Ordering::Relaxed),
85-
));
86-
}
87-
});
88-
89-
#[cfg(feature = "custom-report")]
90-
tokio::task::spawn(async move {
91-
tokio::signal::ctrl_c().await.unwrap();
92-
println!("saving charts...");
93-
let points = MEMORY.lock().unwrap();
94-
let mut time: Vec<String> = Vec::with_capacity(points.len());
95-
let mut sockets: Vec<String> = Vec::with_capacity(points.len());
96-
let mut memory: Vec<f32> = Vec::with_capacity(points.len());
97-
for (t, s, m) in points.iter() {
98-
time.push(t.elapsed().as_secs_f64().to_string());
99-
sockets.push(s.to_string());
100-
memory.push(*m as f32);
101-
}
102-
let line_chart = LineChart::new(vec![("Memory", memory).into()], sockets);
103-
let svg = line_chart.svg().unwrap();
104-
std::fs::write("memory_usage.svg", svg).unwrap();
105-
std::process::exit(0);
106-
});
107-
10822
loop {
10923
let (stream, _) = listener.accept().await?;
11024

0 commit comments

Comments
 (0)