Skip to content

Commit 14fb827

Browse files
committed
Updated benchmark
added but disabled heartbeat ping update readme
1 parent 282a3d0 commit 14fb827

File tree

3 files changed

+57
-21
lines changed

3 files changed

+57
-21
lines changed

README.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,12 @@ This is derived from the sse in https://github.com/actix/examples which is under
55
## Usage
66

77
```
8-
cargo build --release
8+
cargo run --release
99
```
1010

11-
The executable will be in ``target/release``
12-
13-
14-
To benchmark, run
11+
To benchmark, run multiple instances of
1512

1613
```
17-
node benchmark.js
14+
python3 benchmark.py
1815
```
1916

benchmark.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,48 @@
1+
import asyncio
12
from sseclient import SSEClient
3+
4+
import logging
5+
from itertools import count
26
from time import sleep
37
from threading import Lock, Thread
48

5-
class Counter(Thread):
9+
logging.basicConfig(filename="sse.log", level=logging.DEBUG)
10+
11+
class Counter:
612
def __init__(self):
7-
super().__init__()
13+
# super().__init__()
814
self.daemon = True
915
self.counter = 0
10-
self.connections = []
16+
self._number_of_read = 0
17+
self._counter = count()
1118
self.lock = Lock()
1219

13-
def increment(self, connection):
20+
def increment(self):
1421
with self.lock:
1522
self.counter += 1
16-
# self.connections.append(connection)
17-
print(self.counter, end='\r')
23+
print(self.counter, end='\r', flush=True)
24+
1825

1926
counter = Counter()
20-
counter.start()
27+
# counter.start()
2128

22-
def task(counter):
2329

30+
def task(counter):
2431
try:
2532
messages = SSEClient("http://localhost:8080/events/channel1")
26-
print("yee")
27-
except OSError:
33+
counter.increment()
34+
except OSError as e:
35+
logging.error(str(e))
2836
return
2937
for msg in messages:
30-
if str(msg) == "connected":
31-
counter.increment(messages)
3238
if "secret" in str(msg):
33-
counter.increment(messages)
39+
counter.increment()
3440

3541

3642
threads = []
37-
for _ in range(10):
43+
for i in range(5000):
44+
# if i > 4000 and i % 1000 == 0:
45+
# sleep(1)
3846
t = Thread(target=task, args=(counter,), daemon=True)
3947
t.start()
4048
threads.append(t)

src/main.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::collections::HashMap;
1212
use std::pin::Pin;
1313
use std::sync::Mutex;
1414
use std::task::{Context, Poll};
15+
use std::time::Duration;
1516

1617
use actix_cors::Cors;
1718
use actix_web::web::{Bytes, Data, Path};
@@ -20,6 +21,7 @@ use clap::{crate_version, clap_app};
2021
use futures::Stream;
2122
use serde::{Deserialize, Serialize};
2223
use tokio::sync::broadcast::{channel, Receiver, Sender};
24+
use tokio::time::{interval_at, Instant};
2325

2426

2527
#[actix_rt::main]
@@ -52,6 +54,7 @@ async fn main() -> std::io::Result<()> {
5254
.bind(format!("{}:{}",
5355
matches.value_of("HOST").unwrap_or("127.0.0.1"),
5456
matches.value_of("PORT").unwrap_or("8080")))?
57+
.maxconn(500000)
5558
.run()
5659
.await
5760
}
@@ -96,6 +99,24 @@ struct SpotifyData {
9699
msg: String,
97100
}
98101

102+
struct Broadcaster {
103+
senders: Vec<Sender<Bytes>>,
104+
num_clients: u32,
105+
}
106+
107+
impl Broadcaster {
108+
fn new() -> Broadcaster {
109+
Broadcaster {
110+
senders: Vec::new(),
111+
num_clients: 1,
112+
}
113+
}
114+
115+
// fn send(&msg) {
116+
//
117+
// }
118+
}
119+
99120
struct BroadcasterMap {
100121
broadcasters: HashMap<String, Sender<Bytes>>,
101122
channel_num: usize,
@@ -105,20 +126,30 @@ impl BroadcasterMap {
105126
fn new() -> Self {
106127
BroadcasterMap {
107128
broadcasters: HashMap::<String, Sender<Bytes>>::new(),
108-
channel_num: 50000
129+
channel_num: 500000
109130
}
110131
}
111132

112133
fn create() -> Data<Mutex<Self>> {
113134
Data::new(Mutex::new(BroadcasterMap::new()))
114135
}
115136

137+
fn spawn_ping(tx: Sender<Bytes>) {
138+
actix_rt::spawn(async move {
139+
let mut task = interval_at(Instant::now(), Duration::from_secs(10));
140+
while let _ = task.tick().await {
141+
tx.send(Bytes::from("data: ping\n\n"));
142+
}
143+
});
144+
}
145+
116146
fn new_client(&mut self, room: &str) -> Client {
117147
let s = (&room).to_string();
118148
match self.broadcasters.get_mut(&s) {
119149
Some(broadcaster) => Client(broadcaster.subscribe()),
120150
None => {
121151
let (tx, rx) = channel(self.channel_num);
152+
// BroadcasterMap::spawn_ping(tx.clone());
122153
self.broadcasters.insert(s.to_string(), tx);
123154
Client(rx)
124155
}

0 commit comments

Comments
 (0)