Skip to content

Commit 91e0da8

Browse files
authored
Initial implementation of tracking task wakes (#4)
* Add integration test & fix worker id issue * Initial implementation of tracking task wakes * Fix merge issues: add WakeEvent handling and restore traced.rs dependencies * enable waker instrumentation for example service * Rewrite waker with ArcWaker * Add better tracing & add raw wake event * improvements to trace viewer for wakes * refactor code that produces wake events * Improvements to trace viewer * cleanup clippy
1 parent 6aaf41c commit 91e0da8

18 files changed

Lines changed: 1299 additions & 111 deletions

File tree

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dial9-tokio-telemetry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ publish = true
88
tokio = { version = "1", features = ["rt", "macros", "time", "taskdump", "rt-multi-thread", "sync", "net", "io-util"] }
99
arc-swap = "1"
1010
libc.workspace = true
11+
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
1112
pin-project-lite = "0.2"
1213
serde = { version = "1", features = ["derive"] }
1314
serde_json = "1"

dial9-tokio-telemetry/benches/overhead_bench.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
//! Duration defaults to 30 seconds. A 3-second warmup precedes measurement.
1515
1616
use dial9_tokio_telemetry::telemetry::{
17-
NullWriter, SimpleBinaryWriter, TelemetryGuard, TracedRuntime,
17+
NullWriter, SimpleBinaryWriter, TelemetryGuard, TelemetryHandle, TracedRuntime,
1818
};
1919
use hdrhistogram::Histogram;
2020
use std::sync::Arc;
@@ -29,13 +29,13 @@ const WARMUP_SECS: u64 = 3;
2929

3030
// ── Echo server (runs on the traced runtime) ─────────────────────────────────
3131

32-
async fn echo_server(listener: TcpListener) {
32+
async fn echo_server(listener: TcpListener, handle: Option<TelemetryHandle>) {
3333
loop {
3434
let (mut sock, _) = match listener.accept().await {
3535
Ok(c) => c,
3636
Err(_) => break,
3737
};
38-
tokio::spawn(async move {
38+
let conn = async move {
3939
let mut buf = [0u8; 256];
4040
loop {
4141
let n = match sock.read(&mut buf).await {
@@ -46,7 +46,12 @@ async fn echo_server(listener: TcpListener) {
4646
break;
4747
}
4848
}
49-
});
49+
};
50+
if let Some(h) = &handle {
51+
h.spawn(conn);
52+
} else {
53+
tokio::spawn(conn);
54+
}
5055
}
5156
}
5257

@@ -190,7 +195,8 @@ fn main() {
190195
let port = server_rt.block_on(async {
191196
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
192197
let port = listener.local_addr().unwrap().port();
193-
tokio::spawn(echo_server(listener));
198+
let handle = _guard.as_ref().map(|g| g.handle());
199+
tokio::spawn(echo_server(listener, handle));
194200
port
195201
});
196202

dial9-tokio-telemetry/examples/analyze_trace.rs

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use dial9_tokio_telemetry::telemetry::{
2-
TraceReader, analyze_trace, detect_idle_workers, print_analysis,
2+
TelemetryEvent, TraceReader, analyze_trace, compute_wake_to_poll_delays, detect_idle_workers,
3+
print_analysis,
34
};
5+
use std::collections::HashMap;
46
use std::env;
57

68
fn main() {
@@ -29,6 +31,127 @@ fn main() {
2931

3032
println!("\n=== Idle Worker Detection ===");
3133
let idle_periods = detect_idle_workers(&events);
34+
35+
let delays = compute_wake_to_poll_delays(&events);
36+
if !delays.is_empty() {
37+
let p50 = delays[delays.len() * 50 / 100];
38+
let p99 = delays[delays.len() * 99 / 100];
39+
let p999 = delays[delays.len() * 999 / 1000];
40+
let max = *delays.last().unwrap();
41+
println!("\n=== Wake→Poll Delays ({} samples) ===", delays.len());
42+
println!(
43+
" p50: {:.1}µs, p99: {:.1}µs, p99.9: {:.1}µs, max: {:.1}µs",
44+
p50 as f64 / 1000.0,
45+
p99 as f64 / 1000.0,
46+
p999 as f64 / 1000.0,
47+
max as f64 / 1000.0,
48+
);
49+
}
50+
51+
// Build task_id → spawn_loc_id from PollStart events (more complete than TaskSpawn alone)
52+
let mut task_locs: HashMap<u32, u16> = HashMap::new();
53+
for e in &events {
54+
if let TelemetryEvent::PollStart {
55+
task_id,
56+
spawn_loc_id,
57+
..
58+
} = e
59+
{
60+
task_locs
61+
.entry(task_id.to_u32())
62+
.or_insert(spawn_loc_id.as_u16());
63+
}
64+
}
65+
// Also include TaskSpawn mappings from the reader
66+
for (task_id, spawn_loc_id) in &reader.task_spawn_locs {
67+
task_locs
68+
.entry(task_id.to_u32())
69+
.or_insert(spawn_loc_id.as_u16());
70+
}
71+
72+
// Count wakes by waker spawn location
73+
let mut wakes_by_loc: HashMap<Option<&str>, usize> = HashMap::new();
74+
let mut resolved = 0usize;
75+
let mut unresolved = 0usize;
76+
for e in &events {
77+
if let TelemetryEvent::WakeEvent { waker_task_id, .. } = e {
78+
let id = waker_task_id.to_u32();
79+
if id == 0 {
80+
*wakes_by_loc.entry(Some("<non-task context>")).or_default() += 1;
81+
resolved += 1;
82+
} else if let Some(loc_id) = task_locs.get(&id) {
83+
let loc = reader
84+
.spawn_locations
85+
.get(&dial9_tokio_telemetry::telemetry::SpawnLocationId::from_u16(*loc_id));
86+
*wakes_by_loc.entry(loc.map(|s| s.as_str())).or_default() += 1;
87+
resolved += 1;
88+
} else {
89+
unresolved += 1;
90+
}
91+
}
92+
}
93+
if resolved + unresolved > 0 {
94+
println!(
95+
"\n=== Waker Identity ({} resolved, {} unresolved of {} total tasks in trace) ===",
96+
resolved,
97+
unresolved,
98+
task_locs.len()
99+
);
100+
101+
// Debug: show some unresolved waker task IDs vs known task IDs
102+
let mut unresolved_ids: HashMap<u32, usize> = HashMap::new();
103+
for e in &events {
104+
if let TelemetryEvent::WakeEvent { waker_task_id, .. } = e {
105+
let id = waker_task_id.to_u32();
106+
if id != 0 && !task_locs.contains_key(&id) {
107+
*unresolved_ids.entry(id).or_default() += 1;
108+
}
109+
}
110+
}
111+
let mut top_unresolved: Vec<_> = unresolved_ids.into_iter().collect();
112+
top_unresolved.sort_by(|a, b| b.1.cmp(&a.1));
113+
println!(" Top unresolved waker IDs:");
114+
for (id, count) in top_unresolved.iter().take(5) {
115+
println!(" 0x{:08x} ({}) — {} wakes", id, id, count);
116+
}
117+
println!(" Known task IDs (sample):");
118+
for (id, loc_id) in task_locs.iter().take(5) {
119+
let loc = reader
120+
.spawn_locations
121+
.get(&dial9_tokio_telemetry::telemetry::SpawnLocationId::from_u16(*loc_id));
122+
println!(
123+
" 0x{:08x} ({}) — {}",
124+
id,
125+
id,
126+
loc.map(|s| s.as_str()).unwrap_or("?")
127+
);
128+
}
129+
println!(
130+
" task_spawn_locs from reader: {} entries",
131+
reader.task_spawn_locs.len()
132+
);
133+
println!(" task_locs from PollStart: {} entries", task_locs.len());
134+
// Check: are the unresolved IDs in task_spawn_locs?
135+
for (id, count) in top_unresolved.iter().take(5) {
136+
let in_spawn = reader
137+
.task_spawn_locs
138+
.contains_key(&dial9_tokio_telemetry::telemetry::TaskId::from_u32(*id));
139+
println!(
140+
" 0x{:08x}: in task_spawn_locs={}, in task_locs={}, wakes={}",
141+
id,
142+
in_spawn,
143+
task_locs.contains_key(id),
144+
count
145+
);
146+
}
147+
let mut by_count: Vec<_> = wakes_by_loc.into_iter().collect();
148+
by_count.sort_by(|a, b| b.1.cmp(&a.1));
149+
for (loc, count) in &by_count {
150+
let name = loc.unwrap_or("<unknown>");
151+
println!(" {:>8} wakes from {}", count, name);
152+
}
153+
}
154+
32155
if idle_periods.is_empty() {
33156
println!("No significant idle periods detected with work in queue");
34157
} else {

dial9-tokio-telemetry/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod telemetry;
2+
pub mod traced;
23

34
use arc_swap::ArcSwap;
45
use pin_project_lite::pin_project;

dial9-tokio-telemetry/src/telemetry/analysis.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis {
192192
stats.max_sched_wait_ns = stats.max_sched_wait_ns.max(*sched_wait_delta_nanos);
193193
}
194194
TelemetryEvent::SpawnLocationDef { .. } | TelemetryEvent::TaskSpawn { .. } => {}
195+
TelemetryEvent::WakeEvent { .. } => {}
195196
}
196197
}
197198

@@ -209,6 +210,51 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis {
209210
}
210211
}
211212

213+
/// Compute wake→poll scheduling delays from wake events and poll starts.
214+
/// Returns a sorted vec of delay durations in nanoseconds.
215+
pub fn compute_wake_to_poll_delays(events: &[TelemetryEvent]) -> Vec<u64> {
216+
// Index: task_id → sorted vec of wake timestamps
217+
let mut wakes_by_task: HashMap<TaskId, Vec<u64>> = HashMap::new();
218+
for e in events {
219+
if let TelemetryEvent::WakeEvent {
220+
timestamp_nanos,
221+
woken_task_id,
222+
..
223+
} = e
224+
{
225+
wakes_by_task
226+
.entry(*woken_task_id)
227+
.or_default()
228+
.push(*timestamp_nanos);
229+
}
230+
}
231+
for v in wakes_by_task.values_mut() {
232+
v.sort_unstable();
233+
}
234+
235+
let mut delays = Vec::new();
236+
for e in events {
237+
if let TelemetryEvent::PollStart {
238+
timestamp_nanos,
239+
task_id,
240+
..
241+
} = e
242+
&& let Some(wakes) = wakes_by_task.get(task_id)
243+
{
244+
// Binary search for last wake <= poll start
245+
let idx = wakes.partition_point(|&t| t <= *timestamp_nanos);
246+
if idx > 0 {
247+
let delay = timestamp_nanos - wakes[idx - 1];
248+
if delay > 0 && delay < 1_000_000_000 {
249+
delays.push(delay);
250+
}
251+
}
252+
}
253+
}
254+
delays.sort_unstable();
255+
delays
256+
}
257+
212258
/// An active period between WorkerUnpark and WorkerPark, with scheduling ratio.
213259
#[derive(Debug)]
214260
pub struct ActivePeriod {

dial9-tokio-telemetry/src/telemetry/events.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ pub enum TelemetryEvent {
6868
task_id: TaskId,
6969
spawn_loc_id: SpawnLocationId,
7070
},
71+
WakeEvent {
72+
#[serde(rename = "timestamp_ns")]
73+
timestamp_nanos: u64,
74+
waker_task_id: TaskId,
75+
woken_task_id: TaskId,
76+
target_worker: u8,
77+
},
7178
}
7279

7380
impl TelemetryEvent {
@@ -90,6 +97,9 @@ impl TelemetryEvent {
9097
}
9198
| TelemetryEvent::QueueSample {
9299
timestamp_nanos, ..
100+
}
101+
| TelemetryEvent::WakeEvent {
102+
timestamp_nanos, ..
93103
} => Some(*timestamp_nanos),
94104
TelemetryEvent::SpawnLocationDef { .. } | TelemetryEvent::TaskSpawn { .. } => None,
95105
}
@@ -104,7 +114,8 @@ impl TelemetryEvent {
104114
| TelemetryEvent::WorkerUnpark { worker_id, .. } => Some(*worker_id),
105115
TelemetryEvent::QueueSample { .. }
106116
| TelemetryEvent::SpawnLocationDef { .. }
107-
| TelemetryEvent::TaskSpawn { .. } => None,
117+
| TelemetryEvent::TaskSpawn { .. }
118+
| TelemetryEvent::WakeEvent { .. } => None,
108119
}
109120
}
110121

@@ -152,6 +163,12 @@ pub enum RawEvent {
152163
task_id: crate::telemetry::task_metadata::TaskId,
153164
location: &'static std::panic::Location<'static>,
154165
},
166+
WakeEvent {
167+
timestamp_nanos: u64,
168+
waker_task_id: crate::telemetry::task_metadata::TaskId,
169+
woken_task_id: crate::telemetry::task_metadata::TaskId,
170+
target_worker: u8,
171+
},
155172
}
156173

157174
/// Read the calling thread's CPU time via `CLOCK_THREAD_CPUTIME_ID`.

0 commit comments

Comments
 (0)