Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/benches/threadlocal_encode_iai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ fn encode_batch(encoder: &mut Encoder<Vec<u8>>, batch: &[(u64, WorkerId, TaskId)
worker_id: batch[0].1,
local_queue: 0,
cpu_time_ns: 600_000,
tid: 0,
});
encoder.write_infallible(&WorkerUnparkEvent {
timestamp_ns: batch[0].0,
worker_id: batch[0].1,
local_queue: 5,
cpu_time_ns: 500_000,
sched_wait_ns: 1_000,
tid: 0,
});
}

Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/benches/writer_encode_iai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fn encode(workers: Vec<usize>) -> Vec<u8> {
local_queue: 5,
cpu_time_ns: 500_000,
sched_wait_ns: 1_000,
tid: 0,
});

for i in 0..170u64 {
Expand Down Expand Up @@ -82,6 +83,7 @@ fn encode(workers: Vec<usize>) -> Vec<u8> {
worker_id: wid,
local_queue: 0,
cpu_time_ns: 600_000,
tid: 0,
});
}
}
Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/benches/writer_write_encoded_iai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fn make_encoded_batch(worker: usize) -> Batch {
local_queue: 5,
cpu_time_ns: 500_000,
sched_wait_ns: 1_000,
tid: 0,
});

for i in 0..170u64 {
Expand Down Expand Up @@ -78,6 +79,7 @@ fn make_encoded_batch(worker: usize) -> Batch {
worker_id: wid,
local_queue: 0,
cpu_time_ns: 600_000,
tid: 0,
});
}

Expand Down
6 changes: 6 additions & 0 deletions dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ enum FatEvent {
worker: u64,
local_q: usize,
cpu_ns: u64,
tid: u32,
},
WorkerUnpark {
timestamp_ns: u64,
worker: u64,
local_q: usize,
cpu_ns: u64,
sched_wait_ns: u64,
tid: u32,
},
QueueSample {
timestamp_ns: u64,
Expand Down Expand Up @@ -117,24 +119,28 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option<FatEvent
worker_id,
worker_local_queue_depth,
cpu_time_nanos,
tid,
} => Some(FatEvent::WorkerPark {
timestamp_ns: *timestamp_nanos,
worker: worker_id.as_u64(),
local_q: *worker_local_queue_depth,
cpu_ns: *cpu_time_nanos,
tid: *tid,
}),
TelemetryEvent::WorkerUnpark {
timestamp_nanos,
worker_id,
worker_local_queue_depth,
cpu_time_nanos,
sched_wait_delta_nanos,
tid,
} => Some(FatEvent::WorkerUnpark {
timestamp_ns: *timestamp_nanos,
worker: worker_id.as_u64(),
local_q: *worker_local_queue_depth,
cpu_ns: *cpu_time_nanos,
sched_wait_ns: *sched_wait_delta_nanos,
tid: *tid,
}),
TelemetryEvent::QueueSample {
timestamp_nanos,
Expand Down
4 changes: 4 additions & 0 deletions dial9-tokio-telemetry/src/background_task/sealed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 0,
cpu_time_ns: 0,
tid: 0,
});
writer
.write_encoded_batch(&crate::telemetry::collector::Batch {
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 0,
cpu_time_ns: 0,
tid: 0,
});
writer
.write_encoded_batch(&crate::telemetry::collector::Batch {
Expand Down Expand Up @@ -349,6 +351,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 0,
cpu_time_ns: 0,
tid: 0,
});
writer
.write_encoded_batch(&crate::telemetry::collector::Batch {
Expand Down Expand Up @@ -382,6 +385,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 0,
cpu_time_ns: 0,
tid: 0,
});
enc.into_inner()
}
Expand Down
14 changes: 14 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ mod tests {
worker_id: WorkerId::from(7usize),
local_queue: 3,
cpu_time_ns: 11,
tid: 0,
}),
1,
);
Expand All @@ -903,6 +904,7 @@ mod tests {
worker_id,
worker_local_queue_depth: 3,
cpu_time_nanos: 11,
..
} if worker_id == WorkerId::from(7usize)
));
}
Expand Down Expand Up @@ -987,6 +989,7 @@ mod tests {
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
tid: 0,
},
TelemetryEvent::QueueSample {
timestamp_nanos: 5_000_000,
Expand All @@ -998,6 +1001,7 @@ mod tests {
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
sched_wait_delta_nanos: 0,
tid: 0,
},
];
let idle = detect_idle_workers(&events);
Expand Down Expand Up @@ -1101,13 +1105,15 @@ mod tests {
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
tid: 0,
},
TelemetryEvent::WorkerUnpark {
timestamp_nanos: 5_000_000,
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
sched_wait_delta_nanos: 200_000, // 200us
tid: 0,
},
];
let delays = detect_sched_delays(&events, 100_000); // 100us threshold
Expand All @@ -1126,13 +1132,15 @@ mod tests {
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
tid: 0,
},
TelemetryEvent::WorkerUnpark {
timestamp_nanos: 2_000_000,
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
sched_wait_delta_nanos: 50_000, // 50us
tid: 0,
},
];
let delays = detect_sched_delays(&events, 100_000);
Expand All @@ -1147,26 +1155,30 @@ mod tests {
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
tid: 0,
},
TelemetryEvent::WorkerPark {
timestamp_nanos: 1_000_000,
worker_id: WorkerId::from(1usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
tid: 0,
},
TelemetryEvent::WorkerUnpark {
timestamp_nanos: 3_000_000,
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
sched_wait_delta_nanos: 500_000, // 500us
tid: 0,
},
TelemetryEvent::WorkerUnpark {
timestamp_nanos: 4_000_000,
worker_id: WorkerId::from(1usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
sched_wait_delta_nanos: 10_000, // 10us - below threshold
tid: 0,
},
];
let delays = detect_sched_delays(&events, 100_000);
Expand Down Expand Up @@ -1354,13 +1366,15 @@ mod tests {
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
tid: 0,
},
TelemetryEvent::WorkerUnpark {
timestamp_nanos: 6_000_000,
worker_id: WorkerId::from(0usize),
worker_local_queue_depth: 0,
cpu_time_nanos: 0,
sched_wait_delta_nanos: 0,
tid: 0,
},
];
let idle = detect_idle_workers(&events);
Expand Down
4 changes: 4 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub enum TelemetryEvent {
/// Thread CPU time (nanos) from CLOCK_THREAD_CPUTIME_ID.
#[serde(rename = "cpu_ns")]
cpu_time_nanos: u64,
/// OS thread ID of the parking thread.
tid: u32,
},
/// A worker thread unparked (resumed).
WorkerUnpark {
Expand All @@ -117,6 +119,8 @@ pub enum TelemetryEvent {
/// Scheduling wait delta (nanos) from schedstat.
#[serde(rename = "sched_wait_ns")]
sched_wait_delta_nanos: u64,
/// OS thread ID of the unparking thread.
tid: u32,
},
/// Periodic sample of the global task queue depth.
QueueSample {
Expand Down
8 changes: 8 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ pub struct WorkerParkEvent {
pub local_queue: u8,
/// Thread CPU time in nanoseconds.
pub cpu_time_ns: u64,
/// OS thread ID of the parking thread. On Linux/Android, the result of gettid();
/// on other platforms, a synthetic per-process counter — see `events::current_tid`.
pub tid: u32,
}

/// Wire-format event for a worker unpark.
Expand All @@ -157,6 +160,9 @@ pub struct WorkerUnparkEvent {
pub cpu_time_ns: u64,
/// Scheduling wait delta in nanoseconds.
pub sched_wait_ns: u64,
/// OS thread ID of the unparking thread. On Linux/Android, the result of gettid();
/// on other platforms, a synthetic per-process counter — see `events::current_tid`.
pub tid: u32,
}

#[derive(TraceEvent)]
Expand Down Expand Up @@ -400,13 +406,15 @@ pub(crate) fn to_owned_event(
worker_id: e.worker_id,
worker_local_queue_depth: e.local_queue as usize,
cpu_time_nanos: e.cpu_time_ns,
tid: e.tid,
},
TelemetryEventRef::WorkerUnpark(e) => TelemetryEvent::WorkerUnpark {
timestamp_nanos: e.timestamp_ns,
worker_id: e.worker_id,
worker_local_queue_depth: e.local_queue as usize,
cpu_time_nanos: e.cpu_time_ns,
sched_wait_delta_nanos: e.sched_wait_ns,
tid: e.tid,
},
TelemetryEventRef::QueueSample(e) => TelemetryEvent::QueueSample {
timestamp_nanos: e.timestamp_ns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub(super) fn make_worker_park(ctx: &RuntimeContext, shared: &SharedState) -> Wo
worker_id: resolved.map(|(id, _)| id).unwrap_or(WorkerId::UNKNOWN),
local_queue: worker_local_queue_depth as u8,
cpu_time_ns: cpu_time_nanos,
tid: crate::telemetry::events::current_tid(),
}
}

Expand All @@ -283,5 +284,6 @@ pub(super) fn make_worker_unpark(ctx: &RuntimeContext, shared: &SharedState) ->
local_queue: worker_local_queue_depth as u8,
cpu_time_ns: cpu_time_nanos,
sched_wait_ns: sched_wait_delta_nanos,
tid: crate::telemetry::events::current_tid(),
}
}
3 changes: 3 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 2,
cpu_time_ns: 0,
tid: 0,
});
Batch {
encoded_bytes: enc.into_inner(),
Expand Down Expand Up @@ -1885,6 +1886,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 0,
cpu_time_ns: 0,
tid: 0,
});
let buf = enc.into_inner();

Expand Down Expand Up @@ -1922,6 +1924,7 @@ mod tests {
worker_id: crate::telemetry::format::WorkerId::from(0usize),
local_queue: 0,
cpu_time_ns: 0,
tid: 0,
});
writer
.write_encoded_batch(&Batch {
Expand Down
69 changes: 69 additions & 0 deletions dial9-tokio-telemetry/tests/park_unpark_tid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! Test that WorkerParkEvent and WorkerUnparkEvent include a non-zero tid field.

mod common;

use dial9_tokio_telemetry::telemetry::{TelemetryEvent, TracedRuntime};

#[test]
#[cfg(feature = "analysis")]
fn worker_park_unpark_events_carry_nonzero_tid() {
let (writer, events) = common::CapturingWriter::new();

let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.worker_threads(2).enable_all();
let (runtime, guard) = TracedRuntime::builder()
.build_and_start_with_writer(builder, writer)
.unwrap();

// Generate park/unpark cycles by spawning work that yields
runtime.block_on(async {
let mut handles = Vec::new();
for _ in 0..20 {
handles.push(tokio::spawn(async {
tokio::task::yield_now().await;
}));
}
for h in handles {
h.await.unwrap();
}
// Sleep briefly to ensure workers park
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
});

drop(runtime);
drop(guard);

let events = events.lock().unwrap();

let park_tids: Vec<u32> = events
.iter()
.filter_map(|e| match e {
TelemetryEvent::WorkerPark { tid, .. } => Some(*tid),
_ => None,
})
.collect();

let unpark_tids: Vec<u32> = events
.iter()
.filter_map(|e| match e {
TelemetryEvent::WorkerUnpark { tid, .. } => Some(*tid),
_ => None,
})
.collect();

assert!(
!park_tids.is_empty(),
"expected at least one WorkerPark event"
);
assert!(
!unpark_tids.is_empty(),
"expected at least one WorkerUnpark event"
);

for tid in &park_tids {
assert_ne!(*tid, 0, "WorkerPark tid must be non-zero");
}
for tid in &unpark_tids {
assert_ne!(*tid, 0, "WorkerUnpark tid must be non-zero");
}
}
Binary file modified dial9-viewer/ui/demo-trace.bin
Binary file not shown.
Loading