-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathoverhead_bench.rs
More file actions
291 lines (257 loc) · 10.2 KB
/
overhead_bench.rs
File metadata and controls
291 lines (257 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
//! Realistic telemetry overhead benchmark.
//!
//! The server runs on a traced runtime. The load generator runs on a separate
//! plain runtime so it doesn't pollute the trace or compete for workers.
//!
//! Usage:
//! cargo bench --bench overhead_bench -- <mode> [duration_secs] [--json]
//! cargo bench --bench overhead_bench -- --bmf [duration_secs]
//!
//! Modes:
//! baseline – plain tokio runtime, no hooks
//! telemetry – hooks installed, writing to a temp file
//! noop – hooks installed, NullWriter (measures pure hook overhead)
//!
//! Duration defaults to 30 seconds. A 3-second warmup precedes measurement.
//! --bmf runs all three modes and outputs Bencher Metric Format JSON.
mod bmf;
#[cfg(target_os = "linux")]
use dial9_tokio_telemetry::telemetry::cpu_profile::CpuProfilingConfig;
use dial9_tokio_telemetry::telemetry::{
NullWriter, RotatingWriter, TelemetryGuard, TelemetryHandle, TracedRuntime,
};
use hdrhistogram::Histogram;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
const NUM_CLIENTS: usize = 50;
const DEFAULT_DURATION_SECS: u64 = 30;
const WARMUP_SECS: u64 = 3;
// ── Echo server (runs on the traced runtime) ─────────────────────────────────
async fn echo_server(listener: TcpListener, handle: Option<TelemetryHandle>) {
loop {
let (mut sock, _) = match listener.accept().await {
Ok(c) => c,
Err(_) => break,
};
let conn = async move {
let mut buf = [0u8; 256];
loop {
let n = match sock.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => n,
};
if sock.write_all(&buf[..n]).await.is_err() {
break;
}
}
};
if let Some(h) = &handle {
h.spawn(conn);
} else {
tokio::spawn(conn);
}
}
}
// ── Load generator (runs on a separate plain runtime) ────────────────────────
async fn run_client(
port: u16,
running: Arc<AtomicBool>,
warmup: Arc<AtomicBool>,
) -> Histogram<u64> {
let mut stream = tokio::net::TcpStream::connect(("127.0.0.1", port))
.await
.unwrap();
let msg = b"hello echo benchmark!";
let mut buf = [0u8; 256];
// Warmup phase: send requests but don't record
while warmup.load(Ordering::Relaxed) {
stream.write_all(msg).await.unwrap();
let _ = stream.read(&mut buf).await.unwrap();
}
// Measured phase: record into a thread-local histogram
// Track latencies from 1µs to 60s with 3 significant figures
let mut hist = Histogram::<u64>::new_with_bounds(1_000, 60_000_000_000, 3).unwrap();
while running.load(Ordering::Relaxed) {
let start = Instant::now();
stream.write_all(msg).await.unwrap();
let _ = stream.read(&mut buf).await.unwrap();
let nanos = start.elapsed().as_nanos() as u64;
// Clamp to histogram bounds
let nanos = nanos.max(1_000);
hist.record(nanos).unwrap();
}
hist
}
// ── Benchmark runner ────────────────────────────────────────────────────────
struct BenchResult {
hist: Histogram<u64>, // latency values in nanoseconds
wall: Duration,
}
fn run_bench(mode: &str, duration_secs: u64) -> BenchResult {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.worker_threads(4).enable_all();
let (server_rt, guard): (tokio::runtime::Runtime, Option<TelemetryGuard>) = match mode {
"telemetry" => {
let writer = RotatingWriter::single_file("/tmp/overhead_bench_trace.bin").unwrap();
let mut tb = TracedRuntime::builder().with_task_tracking(true);
#[cfg(target_os = "linux")]
{
tb = tb.with_cpu_profiling(CpuProfilingConfig::default());
}
let (rt, g) = tb.build_and_start(builder, writer).unwrap();
(rt, Some(g))
}
"noop" => {
let (rt, g) = TracedRuntime::builder()
.build_and_start(builder, NullWriter)
.unwrap();
(rt, Some(g))
}
"baseline" => (builder.build().unwrap(), None),
other => {
eprintln!("unknown mode: {other} (expected: baseline, telemetry, noop)");
std::process::exit(1);
}
};
let port = server_rt.block_on(async {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let handle = guard.as_ref().map(|g| g.handle());
tokio::spawn(echo_server(listener, handle));
port
});
let client_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
let warmup_flag = Arc::new(AtomicBool::new(true));
let running_flag = Arc::new(AtomicBool::new(true));
let (hist, wall) = client_rt.block_on(async {
let mut handles = Vec::with_capacity(NUM_CLIENTS);
for _ in 0..NUM_CLIENTS {
handles.push(tokio::spawn(run_client(
port,
running_flag.clone(),
warmup_flag.clone(),
)));
}
tokio::time::sleep(Duration::from_secs(WARMUP_SECS)).await;
eprintln!("[{mode}] warmup done, measuring {duration_secs}s...");
warmup_flag.store(false, Ordering::Relaxed);
let t = Instant::now();
tokio::time::sleep(Duration::from_secs(duration_secs)).await;
running_flag.store(false, Ordering::Relaxed);
let wall = t.elapsed();
let mut merged = Histogram::<u64>::new_with_bounds(1_000, 60_000_000_000, 3).unwrap();
for h in handles {
merged.add(h.await.unwrap()).unwrap();
}
(merged, wall)
});
drop(client_rt);
drop(server_rt);
BenchResult { hist, wall }
}
// ── Stats ────────────────────────────────────────────────────────────────────
fn print_stats(hist: &Histogram<u64>, wall: Duration, json: bool) {
let n = hist.len();
let rps = n as f64 / wall.as_secs_f64();
if json {
println!("{{");
println!(" \"requests\": {},", n);
println!(" \"wall_time_secs\": {:.3},", wall.as_secs_f64());
println!(" \"throughput_rps\": {:.0},", rps);
println!(" \"mean_lat_ns\": {},", hist.mean() as u64);
println!(" \"min_lat_ns\": {},", hist.min());
println!(" \"p50_lat_ns\": {},", hist.value_at_percentile(50.0));
println!(" \"p90_lat_ns\": {},", hist.value_at_percentile(90.0));
println!(" \"p99_lat_ns\": {},", hist.value_at_percentile(99.0));
println!(" \"p99_9_lat_ns\": {},", hist.value_at_percentile(99.9));
println!(" \"max_lat_ns\": {}", hist.max());
println!("}}");
} else {
println!(" requests : {n}");
println!(" wall time: {wall:.2?}");
println!(" throughput: {rps:.0} req/s");
println!(
" mean lat : {:.1?}",
Duration::from_nanos(hist.mean() as u64)
);
println!(" min lat : {:.1?}", Duration::from_nanos(hist.min()));
println!(
" p50 lat : {:.1?}",
Duration::from_nanos(hist.value_at_percentile(50.0))
);
println!(
" p90 lat : {:.1?}",
Duration::from_nanos(hist.value_at_percentile(90.0))
);
println!(
" p99 lat : {:.1?}",
Duration::from_nanos(hist.value_at_percentile(99.0))
);
println!(
" p99.9 lat: {:.1?}",
Duration::from_nanos(hist.value_at_percentile(99.9))
);
println!(" max lat : {:.1?}", Duration::from_nanos(hist.max()));
}
}
// ── Main ─────────────────────────────────────────────────────────────────────
fn main() {
let args: Vec<String> = std::env::args().collect();
let json = args.iter().any(|a| a == "--json");
let is_bmf = args.iter().any(|a| a == "--bmf");
let positional: Vec<&str> = args
.iter()
.skip(1)
.filter(|a| !a.starts_with("--"))
.map(|s| s.as_str())
.collect();
let duration_secs: u64 = if is_bmf {
positional.first()
} else {
positional.get(1)
}
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_DURATION_SECS);
if is_bmf {
let mut report = bmf::Report::new();
for mode in ["baseline", "telemetry", "noop"] {
let r = run_bench(mode, duration_secs);
let rps = r.hist.len() as f64 / r.wall.as_secs_f64();
let p = format!("overhead::{mode}");
report.insert(format!("{p}::throughput_rps"), bmf::Metric::throughput(rps));
report.insert(
format!("{p}::mean_lat_ns"),
bmf::Metric::latency(r.hist.mean()),
);
report.insert(
format!("{p}::p99_lat_ns"),
bmf::Metric::latency(r.hist.value_at_percentile(99.0) as f64),
);
report.insert(
format!("{p}::p99_9_lat_ns"),
bmf::Metric::latency(r.hist.value_at_percentile(99.9) as f64),
);
}
println!("{}", serde_json::to_string_pretty(&report).unwrap());
return;
}
let mode = positional.first().copied().unwrap_or("baseline");
if !json {
println!("mode: {mode}");
println!(
"config: {NUM_CLIENTS} clients, {WARMUP_SECS}s warmup, {duration_secs}s measurement"
);
}
let r = run_bench(mode, duration_secs);
if !json {
println!("\n── results ({mode}) ──");
}
print_stats(&r.hist, r.wall, json);
}