diff --git a/dial9-tokio-telemetry/benches/threadlocal_encode_iai.rs b/dial9-tokio-telemetry/benches/threadlocal_encode_iai.rs index c8435f69..cb189165 100644 --- a/dial9-tokio-telemetry/benches/threadlocal_encode_iai.rs +++ b/dial9-tokio-telemetry/benches/threadlocal_encode_iai.rs @@ -66,6 +66,7 @@ fn encode_batch(encoder: &mut Encoder>, batch: &[(u64, WorkerId, TaskId) worker_id: batch[0].1, local_queue: 0, cpu_time_ns: 600_000, + tid: 0, }); encoder.write_infallible(&WorkerUnparkEvent { timestamp_ns: batch[0].0, @@ -73,6 +74,7 @@ fn encode_batch(encoder: &mut Encoder>, batch: &[(u64, WorkerId, TaskId) local_queue: 5, cpu_time_ns: 500_000, sched_wait_ns: 1_000, + tid: 0, }); } diff --git a/dial9-tokio-telemetry/benches/writer_encode_iai.rs b/dial9-tokio-telemetry/benches/writer_encode_iai.rs index 373e3592..cb17a2a4 100644 --- a/dial9-tokio-telemetry/benches/writer_encode_iai.rs +++ b/dial9-tokio-telemetry/benches/writer_encode_iai.rs @@ -44,6 +44,7 @@ fn encode(workers: Vec) -> Vec { local_queue: 5, cpu_time_ns: 500_000, sched_wait_ns: 1_000, + tid: 0, }); for i in 0..170u64 { @@ -82,6 +83,7 @@ fn encode(workers: Vec) -> Vec { worker_id: wid, local_queue: 0, cpu_time_ns: 600_000, + tid: 0, }); } } diff --git a/dial9-tokio-telemetry/benches/writer_write_encoded_iai.rs b/dial9-tokio-telemetry/benches/writer_write_encoded_iai.rs index 806163e8..afca5b94 100644 --- a/dial9-tokio-telemetry/benches/writer_write_encoded_iai.rs +++ b/dial9-tokio-telemetry/benches/writer_write_encoded_iai.rs @@ -40,6 +40,7 @@ fn make_encoded_batch(worker: usize) -> Batch { local_queue: 5, cpu_time_ns: 500_000, sched_wait_ns: 1_000, + tid: 0, }); for i in 0..170u64 { @@ -78,6 +79,7 @@ fn make_encoded_batch(worker: usize) -> Batch { worker_id: wid, local_queue: 0, cpu_time_ns: 600_000, + tid: 0, }); } diff --git a/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs b/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs index 7f40cd56..9dd81930 100644 --- a/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs +++ b/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs @@ -28,6 +28,7 @@ enum FatEvent { worker: u64, local_q: usize, cpu_ns: u64, + tid: u32, }, WorkerUnpark { timestamp_ns: u64, @@ -35,6 +36,7 @@ enum FatEvent { local_q: usize, cpu_ns: u64, sched_wait_ns: u64, + tid: u32, }, QueueSample { timestamp_ns: u64, @@ -117,11 +119,13 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option Some(FatEvent::WorkerPark { timestamp_ns: *timestamp_nanos, worker: worker_id.as_u64(), local_q: *worker_local_queue_depth, cpu_ns: *cpu_time_nanos, + tid: *tid, }), TelemetryEvent::WorkerUnpark { timestamp_nanos, @@ -129,12 +133,14 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option Some(FatEvent::WorkerUnpark { timestamp_ns: *timestamp_nanos, worker: worker_id.as_u64(), local_q: *worker_local_queue_depth, cpu_ns: *cpu_time_nanos, sched_wait_ns: *sched_wait_delta_nanos, + tid: *tid, }), TelemetryEvent::QueueSample { timestamp_nanos, diff --git a/dial9-tokio-telemetry/src/background_task/sealed.rs b/dial9-tokio-telemetry/src/background_task/sealed.rs index b6c19c67..199c06e4 100644 --- a/dial9-tokio-telemetry/src/background_task/sealed.rs +++ b/dial9-tokio-telemetry/src/background_task/sealed.rs @@ -253,6 +253,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 0, cpu_time_ns: 0, + tid: 0, }); writer .write_encoded_batch(&crate::telemetry::collector::Batch { @@ -292,6 +293,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 0, cpu_time_ns: 0, + tid: 0, }); writer .write_encoded_batch(&crate::telemetry::collector::Batch { @@ -349,6 +351,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 0, cpu_time_ns: 0, + tid: 0, }); writer .write_encoded_batch(&crate::telemetry::collector::Batch { @@ -382,6 +385,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 0, cpu_time_ns: 0, + tid: 0, }); enc.into_inner() } diff --git a/dial9-tokio-telemetry/src/telemetry/analysis.rs b/dial9-tokio-telemetry/src/telemetry/analysis.rs index 8d058a54..edbffd2c 100644 --- a/dial9-tokio-telemetry/src/telemetry/analysis.rs +++ b/dial9-tokio-telemetry/src/telemetry/analysis.rs @@ -877,6 +877,7 @@ mod tests { worker_id: WorkerId::from(7usize), local_queue: 3, cpu_time_ns: 11, + tid: 0, }), 1, ); @@ -903,6 +904,7 @@ mod tests { worker_id, worker_local_queue_depth: 3, cpu_time_nanos: 11, + .. } if worker_id == WorkerId::from(7usize) )); } @@ -987,6 +989,7 @@ mod tests { worker_id: WorkerId::from(0usize), worker_local_queue_depth: 0, cpu_time_nanos: 0, + tid: 0, }, TelemetryEvent::QueueSample { timestamp_nanos: 5_000_000, @@ -998,6 +1001,7 @@ mod tests { worker_local_queue_depth: 0, cpu_time_nanos: 0, sched_wait_delta_nanos: 0, + tid: 0, }, ]; let idle = detect_idle_workers(&events); @@ -1101,6 +1105,7 @@ mod tests { worker_id: WorkerId::from(0usize), worker_local_queue_depth: 0, cpu_time_nanos: 0, + tid: 0, }, TelemetryEvent::WorkerUnpark { timestamp_nanos: 5_000_000, @@ -1108,6 +1113,7 @@ mod tests { worker_local_queue_depth: 0, cpu_time_nanos: 0, sched_wait_delta_nanos: 200_000, // 200us + tid: 0, }, ]; let delays = detect_sched_delays(&events, 100_000); // 100us threshold @@ -1126,6 +1132,7 @@ mod tests { worker_id: WorkerId::from(0usize), worker_local_queue_depth: 0, cpu_time_nanos: 0, + tid: 0, }, TelemetryEvent::WorkerUnpark { timestamp_nanos: 2_000_000, @@ -1133,6 +1140,7 @@ mod tests { worker_local_queue_depth: 0, cpu_time_nanos: 0, sched_wait_delta_nanos: 50_000, // 50us + tid: 0, }, ]; let delays = detect_sched_delays(&events, 100_000); @@ -1147,12 +1155,14 @@ mod tests { worker_id: WorkerId::from(0usize), worker_local_queue_depth: 0, cpu_time_nanos: 0, + tid: 0, }, TelemetryEvent::WorkerPark { timestamp_nanos: 1_000_000, worker_id: WorkerId::from(1usize), worker_local_queue_depth: 0, cpu_time_nanos: 0, + tid: 0, }, TelemetryEvent::WorkerUnpark { timestamp_nanos: 3_000_000, @@ -1160,6 +1170,7 @@ mod tests { worker_local_queue_depth: 0, cpu_time_nanos: 0, sched_wait_delta_nanos: 500_000, // 500us + tid: 0, }, TelemetryEvent::WorkerUnpark { timestamp_nanos: 4_000_000, @@ -1167,6 +1178,7 @@ mod tests { worker_local_queue_depth: 0, cpu_time_nanos: 0, sched_wait_delta_nanos: 10_000, // 10us - below threshold + tid: 0, }, ]; let delays = detect_sched_delays(&events, 100_000); @@ -1354,6 +1366,7 @@ mod tests { worker_id: WorkerId::from(0usize), worker_local_queue_depth: 0, cpu_time_nanos: 0, + tid: 0, }, TelemetryEvent::WorkerUnpark { timestamp_nanos: 6_000_000, @@ -1361,6 +1374,7 @@ mod tests { worker_local_queue_depth: 0, cpu_time_nanos: 0, sched_wait_delta_nanos: 0, + tid: 0, }, ]; let idle = detect_idle_workers(&events); diff --git a/dial9-tokio-telemetry/src/telemetry/events.rs b/dial9-tokio-telemetry/src/telemetry/events.rs index 63cc7dfb..5e940a03 100644 --- a/dial9-tokio-telemetry/src/telemetry/events.rs +++ b/dial9-tokio-telemetry/src/telemetry/events.rs @@ -99,6 +99,8 @@ pub enum TelemetryEvent { /// Thread CPU time (nanos) from CLOCK_THREAD_CPUTIME_ID. #[serde(rename = "cpu_ns")] cpu_time_nanos: u64, + /// OS thread ID of the parking thread. + tid: u32, }, /// A worker thread unparked (resumed). WorkerUnpark { @@ -117,6 +119,8 @@ pub enum TelemetryEvent { /// Scheduling wait delta (nanos) from schedstat. #[serde(rename = "sched_wait_ns")] sched_wait_delta_nanos: u64, + /// OS thread ID of the unparking thread. + tid: u32, }, /// Periodic sample of the global task queue depth. QueueSample { diff --git a/dial9-tokio-telemetry/src/telemetry/format.rs b/dial9-tokio-telemetry/src/telemetry/format.rs index 32372fb9..f1a3547b 100644 --- a/dial9-tokio-telemetry/src/telemetry/format.rs +++ b/dial9-tokio-telemetry/src/telemetry/format.rs @@ -141,6 +141,9 @@ pub struct WorkerParkEvent { pub local_queue: u8, /// Thread CPU time in nanoseconds. pub cpu_time_ns: u64, + /// OS thread ID of the parking thread. On Linux/Android, the result of gettid(); + /// on other platforms, a synthetic per-process counter — see `events::current_tid`. + pub tid: u32, } /// Wire-format event for a worker unpark. @@ -157,6 +160,9 @@ pub struct WorkerUnparkEvent { pub cpu_time_ns: u64, /// Scheduling wait delta in nanoseconds. pub sched_wait_ns: u64, + /// OS thread ID of the unparking thread. On Linux/Android, the result of gettid(); + /// on other platforms, a synthetic per-process counter — see `events::current_tid`. + pub tid: u32, } #[derive(TraceEvent)] @@ -400,6 +406,7 @@ pub(crate) fn to_owned_event( worker_id: e.worker_id, worker_local_queue_depth: e.local_queue as usize, cpu_time_nanos: e.cpu_time_ns, + tid: e.tid, }, TelemetryEventRef::WorkerUnpark(e) => TelemetryEvent::WorkerUnpark { timestamp_nanos: e.timestamp_ns, @@ -407,6 +414,7 @@ pub(crate) fn to_owned_event( worker_local_queue_depth: e.local_queue as usize, cpu_time_nanos: e.cpu_time_ns, sched_wait_delta_nanos: e.sched_wait_ns, + tid: e.tid, }, TelemetryEventRef::QueueSample(e) => TelemetryEvent::QueueSample { timestamp_nanos: e.timestamp_ns, diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs index 1e515510..6b97a785 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs @@ -262,6 +262,7 @@ pub(super) fn make_worker_park(ctx: &RuntimeContext, shared: &SharedState) -> Wo worker_id: resolved.map(|(id, _)| id).unwrap_or(WorkerId::UNKNOWN), local_queue: worker_local_queue_depth as u8, cpu_time_ns: cpu_time_nanos, + tid: crate::telemetry::events::current_tid(), } } @@ -283,5 +284,6 @@ pub(super) fn make_worker_unpark(ctx: &RuntimeContext, shared: &SharedState) -> local_queue: worker_local_queue_depth as u8, cpu_time_ns: cpu_time_nanos, sched_wait_ns: sched_wait_delta_nanos, + tid: crate::telemetry::events::current_tid(), } } diff --git a/dial9-tokio-telemetry/src/telemetry/writer.rs b/dial9-tokio-telemetry/src/telemetry/writer.rs index 590a21db..59422108 100644 --- a/dial9-tokio-telemetry/src/telemetry/writer.rs +++ b/dial9-tokio-telemetry/src/telemetry/writer.rs @@ -751,6 +751,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 2, cpu_time_ns: 0, + tid: 0, }); Batch { encoded_bytes: enc.into_inner(), @@ -1885,6 +1886,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 0, cpu_time_ns: 0, + tid: 0, }); let buf = enc.into_inner(); @@ -1922,6 +1924,7 @@ mod tests { worker_id: crate::telemetry::format::WorkerId::from(0usize), local_queue: 0, cpu_time_ns: 0, + tid: 0, }); writer .write_encoded_batch(&Batch { diff --git a/dial9-tokio-telemetry/tests/park_unpark_tid.rs b/dial9-tokio-telemetry/tests/park_unpark_tid.rs new file mode 100644 index 00000000..fd2ef9fd --- /dev/null +++ b/dial9-tokio-telemetry/tests/park_unpark_tid.rs @@ -0,0 +1,69 @@ +//! Test that WorkerParkEvent and WorkerUnparkEvent include a non-zero tid field. + +mod common; + +use dial9_tokio_telemetry::telemetry::{TelemetryEvent, TracedRuntime}; + +#[test] +#[cfg(feature = "analysis")] +fn worker_park_unpark_events_carry_nonzero_tid() { + let (writer, events) = common::CapturingWriter::new(); + + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(2).enable_all(); + let (runtime, guard) = TracedRuntime::builder() + .build_and_start_with_writer(builder, writer) + .unwrap(); + + // Generate park/unpark cycles by spawning work that yields + runtime.block_on(async { + let mut handles = Vec::new(); + for _ in 0..20 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + // Sleep briefly to ensure workers park + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + }); + + drop(runtime); + drop(guard); + + let events = events.lock().unwrap(); + + let park_tids: Vec = events + .iter() + .filter_map(|e| match e { + TelemetryEvent::WorkerPark { tid, .. } => Some(*tid), + _ => None, + }) + .collect(); + + let unpark_tids: Vec = events + .iter() + .filter_map(|e| match e { + TelemetryEvent::WorkerUnpark { tid, .. } => Some(*tid), + _ => None, + }) + .collect(); + + assert!( + !park_tids.is_empty(), + "expected at least one WorkerPark event" + ); + assert!( + !unpark_tids.is_empty(), + "expected at least one WorkerUnpark event" + ); + + for tid in &park_tids { + assert_ne!(*tid, 0, "WorkerPark tid must be non-zero"); + } + for tid in &unpark_tids { + assert_ne!(*tid, 0, "WorkerUnpark tid must be non-zero"); + } +} diff --git a/dial9-viewer/ui/demo-trace.bin b/dial9-viewer/ui/demo-trace.bin index 156c1545..95d621b4 100644 Binary files a/dial9-viewer/ui/demo-trace.bin and b/dial9-viewer/ui/demo-trace.bin differ