Skip to content

Commit 065b448

Browse files
authored
chore(iroh-bench): Allow configuring the number of worker threads for each endpoint in iroh-bench (#4063)
(this is #3780 reopened, because that had the wrong target branch) ## Description When I look at qlog files of iroh-bench runs, I can see that PATH_ACK processing is severely delayed when using a single-threaded tokio runtime for each endpoint. This disappears when I give iroh-bench two worker threads. This allows the EndpointDriver and the ConnectionDriver to each run independently of each other, thus the endpoint driver won't be delayed when processing PATH_ACKs. When the processing was delayed, this had an effect on the apparent RTT and in turn on congestion control. ## Screenshots Before: Note all the blue lines going in from waaaaay back in history all into one place on the left (those are the PATH_ACKs that get handled way too late): <img width="1720" height="772" alt="image" src="https://github.com/user-attachments/assets/688e19e5-63bb-4d1c-9256-43ce0911a4c0" /> After: The PATH_ACKs get handled in time, they're way more interspersed with stream frames: <img width="1720" height="772" alt="image" src="https://github.com/user-attachments/assets/a33db88c-dde0-458d-8007-12c7f119165d" /> ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review.
1 parent 5edc1bc commit 065b448

2 files changed

Lines changed: 31 additions & 6 deletions

File tree

iroh/bench/src/bin/bulk.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn main() {
3535

3636
pub fn run_iroh(opt: Opt) -> Result<()> {
3737
let server_span = tracing::error_span!("server");
38-
let runtime = rt();
38+
let runtime = rt(opt.workers_per_ep);
3939

4040
#[cfg(feature = "local-relay")]
4141
let (relay_url, relay_server) = if opt.only_relay {
@@ -70,7 +70,7 @@ pub fn run_iroh(opt: Opt) -> Result<()> {
7070
let relay_url = relay_url.clone();
7171
handles.push(std::thread::spawn(move || {
7272
let _guard = tracing::error_span!("client", id).entered();
73-
let runtime = rt();
73+
let runtime = rt(opt.workers_per_ep);
7474
match runtime.block_on(iroh::client(server_addr, relay_url.clone(), opt)) {
7575
Ok(stats) => Ok(stats),
7676
Err(e) => {
@@ -112,7 +112,7 @@ pub fn run_noq(opt: Opt) -> Result<()> {
112112
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
113113

114114
let server_span = tracing::error_span!("server");
115-
let runtime = rt();
115+
let runtime = rt(opt.workers_per_ep);
116116
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
117117
let key = PrivatePkcs8KeyDer::from(cert.signing_key.serialize_der());
118118
let cert = CertificateDer::from(cert.cert);
@@ -134,7 +134,7 @@ pub fn run_noq(opt: Opt) -> Result<()> {
134134
let cert = cert.clone();
135135
handles.push(std::thread::spawn(move || {
136136
let _guard = tracing::error_span!("client", id).entered();
137-
let runtime = rt();
137+
let runtime = rt(opt.workers_per_ep);
138138
match runtime.block_on(noq::client(server_addr, cert, opt)) {
139139
Ok(stats) => Ok(stats),
140140
Err(e) => {

iroh/bench/src/lib.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ pub struct Opt {
7676
pub only_relay: bool,
7777
#[clap(long, default_value_t = false)]
7878
pub use_ipv6: bool,
79+
80+
/// How many tokio worker threads to use for each endpoint in the benchmark.
81+
///
82+
/// Defaults to 1, a single-threaded runtime.
83+
/// Set this to 0 to have each endpoint use a full multi-threaded tokio runtime with as many
84+
/// workers as CPU parallelism detected.
85+
///
86+
/// Setting this to 2 is a very reasonable value.
87+
/// When quinn runs a single connection, it can run one task (the EndpointDriver) for all
88+
/// receive-based work, and one task (the ConnectionDriver) for all send-based tasks.
89+
/// If quinn is only given a single worker, then work distribution may be unfair, resulting
90+
/// in acknowledgements not being processed timely enough, causing congestion control issues,
91+
/// although ideally that problem is fixed in some other way in quinn.
92+
#[clap(long, default_value_t = 1)]
93+
pub workers_per_ep: usize,
7994
}
8095

8196
pub enum EndpointSelector {
@@ -140,8 +155,18 @@ pub fn configure_tracing_subscriber() {
140155
.unwrap();
141156
}
142157

143-
pub fn rt() -> Runtime {
144-
Builder::new_current_thread().enable_all().build().unwrap()
158+
pub fn rt(workers: usize) -> Runtime {
159+
let mut builder = match workers {
160+
// 0 means "use as many threads as detected CPU parallelism" implicitly.
161+
0 => Builder::new_multi_thread(),
162+
1 => Builder::new_current_thread(),
163+
workers => {
164+
let mut b = Builder::new_multi_thread();
165+
b.worker_threads(workers);
166+
b
167+
}
168+
};
169+
builder.enable_all().build().unwrap()
145170
}
146171

147172
fn parse_byte_size(s: &str) -> Result<u64, ParseIntError> {

0 commit comments

Comments
 (0)