Skip to content

Commit 1b5b334

Browse files
committed
add flume performance test compare
1 parent 24149de commit 1b5b334

File tree

6 files changed

+283
-1
lines changed

6 files changed

+283
-1
lines changed

Cargo.lock

Lines changed: 118 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ crossbeam-queue = "0.3"
2020
[dev-dependencies]
2121
tokio-test = "0.4"
2222
tracing-subscriber = "0.3"
23+
flume = "0.11.1"
2324
cargo-husky = { version = "1", features = ["precommit-hook", "run-cargo-test", "run-cargo-clippy", "run-cargo-fmt"] }

examples/performance-test.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod tests;
22

3+
use tests::flume_bounded_io_test::run_flume_bounded_io_test;
4+
use tests::flume_bounded_test::run_flume_bounded_test;
35
use tests::tokio_mpmc_io_test::run_tokio_mpmc_io_test;
46
use tests::tokio_mpmc_test::run_tokio_mpmc_test;
57
use tests::tokio_mpsc_io_test::run_tokio_mpsc_io_test;
@@ -20,6 +22,7 @@ async fn main() {
2022
)
2123
.await;
2224
run_tokio_mpsc_test(queue_size as u32, num_producers as u32).await;
25+
run_flume_bounded_test(queue_size as u32, num_producers as u32).await;
2326

2427
println!("============================================");
2528
println!("Starting IO tests");
@@ -30,4 +33,5 @@ async fn main() {
3033
)
3134
.await;
3235
run_tokio_mpsc_io_test(queue_size as u32, num_producers as u32).await;
36+
run_flume_bounded_io_test(queue_size as u32, num_producers as u32).await;
3337
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use std::fs::OpenOptions;
2+
use std::io::Write;
3+
use std::time::Instant;
4+
5+
pub async fn run_flume_bounded_io_test(queue_size: u32, num_producers: u32) -> std::time::Duration {
6+
let num_consumers = 1;
7+
println!(
8+
"Starting flume::bounded io performance test with queue size: {}, producers: {}, consumers: {}",
9+
queue_size, num_producers, num_consumers
10+
);
11+
12+
let (tx, rx) = flume::bounded(queue_size as usize);
13+
let start_time = Instant::now();
14+
15+
// Spawn producer tasks
16+
let mut producer_handles = vec![];
17+
for i in 0..num_producers {
18+
let tx_clone = tx.clone();
19+
let handle = tokio::spawn(async move {
20+
let messages_per_producer = queue_size / num_producers;
21+
let start_message = i * messages_per_producer;
22+
let end_message = start_message + messages_per_producer;
23+
for msg in start_message..end_message {
24+
if tx_clone.send(msg).is_err() {
25+
eprintln!("mpsc Producer {} failed to send message {}", i, msg);
26+
break;
27+
}
28+
}
29+
});
30+
producer_handles.push(handle);
31+
}
32+
33+
// Spawn consumer tasks
34+
let mut consumer_handles = vec![];
35+
for i in 0..num_consumers {
36+
let rx_clone = rx.clone();
37+
let handle = tokio::spawn(async move {
38+
let messages_per_consumer = queue_size / num_consumers;
39+
let mut received_count = 0;
40+
while received_count < messages_per_consumer {
41+
match rx_clone.recv() {
42+
Ok(msg) => {
43+
received_count += 1;
44+
let mut file = OpenOptions::new()
45+
.append(true)
46+
.create(true)
47+
.open("output.txt")
48+
.expect("Failed to open file");
49+
writeln!(file, "Received message: {}", msg)
50+
.expect("Failed to write to file");
51+
}
52+
Err(e) => {
53+
eprintln!("Consumer {} failed to receive message: {:?}", i, e);
54+
break;
55+
}
56+
}
57+
}
58+
});
59+
consumer_handles.push(handle);
60+
}
61+
62+
// Wait for all producers to finish
63+
for handle in producer_handles {
64+
let _ = handle.await;
65+
}
66+
67+
// Wait for all consumers to finish
68+
for handle in consumer_handles {
69+
let _ = handle.await;
70+
}
71+
72+
let duration = start_time.elapsed();
73+
74+
let _ = tokio::fs::remove_file("output.txt").await;
75+
println!(
76+
"flume::bounded performance test finished in: {:?}",
77+
duration
78+
);
79+
80+
// Drop the original sender to signal consumers that no more messages will be sent
81+
drop(tx);
82+
83+
duration
84+
}

examples/tests/flume_bounded_test.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::time::Instant;
2+
3+
pub async fn run_flume_bounded_test(queue_size: u32, num_producers: u32) -> std::time::Duration {
4+
let num_consumers = 1;
5+
println!(
6+
"Starting flume::bounded performance test with queue size: {}, producers: {}, consumers: {}",
7+
queue_size, num_producers, num_consumers
8+
);
9+
10+
let (tx, rx) = flume::bounded(queue_size as usize);
11+
let start_time = Instant::now();
12+
13+
// Spawn producer tasks
14+
let mut producer_handles = vec![];
15+
for i in 0..num_producers {
16+
let tx_clone = tx.clone();
17+
let handle = tokio::spawn(async move {
18+
let messages_per_producer = queue_size / num_producers;
19+
let start_message = i * messages_per_producer;
20+
let end_message = start_message + messages_per_producer;
21+
for msg in start_message..end_message {
22+
if tx_clone.send(msg).is_err() {
23+
eprintln!("mpsc Producer {} failed to send message {}", i, msg);
24+
break;
25+
}
26+
}
27+
});
28+
producer_handles.push(handle);
29+
}
30+
31+
// Spawn consumer tasks
32+
let mut consumer_handles = vec![];
33+
for i in 0..num_consumers {
34+
let rx_clone = rx.clone();
35+
let handle = tokio::spawn(async move {
36+
let messages_per_consumer = queue_size / num_consumers;
37+
let mut received_count = 0;
38+
while received_count < messages_per_consumer {
39+
match rx_clone.recv() {
40+
Ok(_) => {
41+
received_count += 1;
42+
}
43+
Err(e) => {
44+
eprintln!("Consumer {} failed to receive message: {:?}", i, e);
45+
break;
46+
}
47+
}
48+
}
49+
});
50+
consumer_handles.push(handle);
51+
}
52+
53+
// Wait for all producers to finish
54+
for handle in producer_handles {
55+
let _ = handle.await;
56+
}
57+
58+
// Wait for all consumers to finish
59+
for handle in consumer_handles {
60+
let _ = handle.await;
61+
}
62+
63+
let duration = start_time.elapsed();
64+
65+
println!(
66+
"flume::bounded performance test finished in: {:?}",
67+
duration
68+
);
69+
70+
// Drop the original sender to signal consumers that no more messages will be sent
71+
drop(tx);
72+
73+
duration
74+
}

examples/tests/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod flume_bounded_io_test;
2+
pub mod flume_bounded_test;
13
pub mod tokio_mpmc_io_test;
24
pub mod tokio_mpmc_test;
35
pub mod tokio_mpsc_io_test;

0 commit comments

Comments
 (0)