Skip to content

Commit 4062b6a

Browse files
authored
Enable CPU profiling in metrics service and extract client binary (#12)
- Add standalone client binary (src/bin/client.rs) - Enable CPU profiling in main service - Update Cargo.toml dependencies - Export lib.rs for client binary - Add profiling routes
1 parent a77e8f2 commit 4062b6a

11 files changed

Lines changed: 377 additions & 66 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ version = "0.1.0"
1212
[workspace.dependencies]
1313
libc = "0.2"
1414
dial9-perf-self-profile = { version = "0.1.0", path = "perf-self-profile" }
15+
16+
[profile.release]
17+
debug = 1
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use dial9_tokio_telemetry::telemetry::{SimpleBinaryWriter, TracedRuntime};
2+
use std::time::Duration;
3+
4+
async fn blocking_task(id: usize) {
5+
for _ in 0..5 {
6+
// This blocks the worker thread — should show up as a sched event
7+
std::thread::sleep(Duration::from_millis(10));
8+
tokio::task::yield_now().await;
9+
}
10+
println!("Task {} done", id);
11+
}
12+
13+
fn main() {
14+
let mut builder = tokio::runtime::Builder::new_multi_thread();
15+
builder.worker_threads(2).enable_all();
16+
17+
let writer = Box::new(SimpleBinaryWriter::new("blocking_sleep_trace.bin").unwrap());
18+
let (runtime, _guard) = TracedRuntime::builder()
19+
.with_task_tracking(true)
20+
.with_cpu_profiling(Default::default())
21+
.with_inline_callframe_symbols(true)
22+
.with_sched_events(Default::default())
23+
.build_and_start(builder, writer)
24+
.unwrap();
25+
26+
runtime.block_on(async {
27+
let tasks: Vec<_> = (0..4).map(|i| tokio::spawn(blocking_task(i))).collect();
28+
for t in tasks {
29+
let _ = t.await;
30+
}
31+
});
32+
33+
println!("Trace written to blocking_sleep_trace.bin");
34+
}

dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,11 @@ impl EventWriter {
112112
profiler.drain(|event, thread_name| {
113113
if let TelemetryEvent::CpuSample { tid, worker_id, .. } = &event
114114
&& *worker_id == UNKNOWN_WORKER
115+
&& let Some(ref mut cpu) = self.cpu_flush
116+
&& !cpu.thread_name_intern.contains_key(tid)
117+
&& let Some(name) = thread_name
115118
{
116-
if let Some(ref mut cpu) = self.cpu_flush {
117-
if !cpu.thread_name_intern.contains_key(&tid) {
118-
if let Some(name) = thread_name {
119-
cpu.thread_name_intern.insert(*tid, name.to_string());
120-
}
121-
}
122-
}
119+
cpu.thread_name_intern.insert(*tid, name.to_string());
123120
}
124121
self.write_cpu_event(&event);
125122
});

examples/metrics-service/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ version = "0.1.0"
55
publish = false
66

77
[dependencies]
8-
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] }
8+
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync", "process"] }
99
tokio-util = { version = "0.7", features = ["rt"] }
1010
axum = "0.8"
1111
axum-core = "0.5"
@@ -19,6 +19,6 @@ aws-config = "1"
1919
aws-sdk-dynamodb = "1"
2020
serde = { version = "1", features = ["derive"] }
2121
serde_json = "1"
22-
dial9-tokio-telemetry = { path = "../../dial9-tokio-telemetry" }
22+
dial9-tokio-telemetry = { path = "../../dial9-tokio-telemetry", features = ["cpu-profiling"] }
2323
reqwest = { version = "0.12", features = ["json"] }
2424
clap = { version = "4", features = ["derive"] }
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use std::time::Duration;
2+
3+
use clap::Parser;
4+
use reqwest::Client;
5+
use tokio_util::sync::CancellationToken;
6+
7+
#[derive(Parser)]
8+
#[command(about = "Load-test client for the metrics service")]
9+
struct Args {
10+
#[arg(long, help = "Base URL of the metrics server")]
11+
server_url: String,
12+
13+
#[arg(
14+
long,
15+
default_value = "55",
16+
help = "How long to run the load profile (seconds)"
17+
)]
18+
run_duration: u64,
19+
}
20+
21+
#[tokio::main]
22+
async fn main() {
23+
let args = Args::parse();
24+
25+
let shutdown = CancellationToken::new();
26+
27+
// Internal timer: cancel the load loop after run_duration seconds.
28+
let timer_shutdown = shutdown.clone();
29+
tokio::spawn(async move {
30+
tokio::time::sleep(Duration::from_secs(args.run_duration)).await;
31+
timer_shutdown.cancel();
32+
});
33+
34+
println!(
35+
"Client starting load profile against {}...",
36+
args.server_url
37+
);
38+
metrics_service::client::run(&args.server_url, shutdown).await;
39+
40+
// Load profile finished – tell the server to shut down gracefully.
41+
println!("Client load complete, sending /terminate to server...");
42+
let http = Client::new();
43+
match http
44+
.post(format!("{}/terminate", args.server_url))
45+
.send()
46+
.await
47+
{
48+
Ok(_) => println!("Server acknowledged termination."),
49+
Err(e) => eprintln!("Warning: could not reach /terminate: {e}"),
50+
}
51+
}

examples/metrics-service/src/buffer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ pub struct MetricsBuffer {
2929
inner: Mutex<HashMap<String, Aggregate>>,
3030
}
3131

32+
impl Default for MetricsBuffer {
33+
fn default() -> Self {
34+
Self::new()
35+
}
36+
}
37+
3238
impl MetricsBuffer {
3339
pub fn new() -> Self {
3440
Self {

0 commit comments

Comments
 (0)