diff --git a/dial9-tokio-telemetry/src/telemetry/analysis.rs b/dial9-tokio-telemetry/src/telemetry/analysis.rs index 9acdc661..ac4cdfc0 100644 --- a/dial9-tokio-telemetry/src/telemetry/analysis.rs +++ b/dial9-tokio-telemetry/src/telemetry/analysis.rs @@ -1271,6 +1271,7 @@ mod tests { thread_name: None, source: CpuSampleSource::CpuProfile, callchain: vec![], + cpu: None, }, TelemetryEvent::CpuSample { timestamp_nanos: 1_800_000, @@ -1279,6 +1280,7 @@ mod tests { thread_name: None, source: CpuSampleSource::SchedEvent, callchain: vec![], + cpu: None, }, TelemetryEvent::PollEnd { timestamp_nanos: 2_000_000, @@ -1332,6 +1334,7 @@ mod tests { thread_name: None, source: CpuSampleSource::CpuProfile, callchain: vec![], + cpu: None, }, ]; let sampled = detect_sampled_polls(&events); diff --git a/dial9-tokio-telemetry/src/telemetry/buffer.rs b/dial9-tokio-telemetry/src/telemetry/buffer.rs index 35e28362..539bf0f3 100644 --- a/dial9-tokio-telemetry/src/telemetry/buffer.rs +++ b/dial9-tokio-telemetry/src/telemetry/buffer.rs @@ -267,6 +267,7 @@ impl Encodable for RawEvent { source: data.source, thread_name, callchain, + cpu: data.cpu.map(u64::from), }); } } @@ -607,4 +608,55 @@ mod tests { let guard = buf.lock().unwrap(); assert_eq!(guard.event_count, 1); } + + /// Encode a single `RawEvent::CpuSample` through a real thread-local buffer + /// and decode it back via the public `decode_events` path, asserting that + /// the `cpu` field round-trips. + fn cpu_sample_round_trip(cpu: Option) -> crate::telemetry::events::TelemetryEvent { + use crate::telemetry::events::{CpuSampleData, CpuSampleSource, RawEvent}; + use crate::telemetry::format::{WorkerId, decode_events}; + + let data = CpuSampleData { + timestamp_nanos: 12_345, + worker_id: WorkerId::from(0usize), + tid: 4242, + thread_name: None, + source: CpuSampleSource::CpuProfile, + callchain: vec![0xdead_beef, 0xcafe_babe], + cpu, + }; + let encoded = ThreadLocalBuffer::encode_single(&RawEvent::CpuSample(Box::new(data))); + let events = decode_events(&encoded).expect("decode"); + assert_eq!(events.len(), 1); + events.into_iter().next().unwrap() + } + + #[test] + fn cpu_sample_event_round_trips_with_cpu() { + use crate::telemetry::events::TelemetryEvent; + match cpu_sample_round_trip(Some(7)) { + TelemetryEvent::CpuSample { + tid, + cpu, + callchain, + .. + } => { + assert_eq!(tid, 4242); + assert_eq!(cpu, Some(7)); + assert_eq!(callchain, vec![0xdead_beef, 0xcafe_babe]); + } + other => panic!("expected CpuSample, got {other:?}"), + } + } + + #[test] + fn cpu_sample_event_round_trips_without_cpu() { + use crate::telemetry::events::TelemetryEvent; + match cpu_sample_round_trip(None) { + TelemetryEvent::CpuSample { cpu, .. } => { + assert_eq!(cpu, None); + } + other => panic!("expected CpuSample, got {other:?}"), + } + } } diff --git a/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs b/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs index 5068759b..1e56f245 100644 --- a/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs +++ b/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs @@ -86,6 +86,7 @@ pub(crate) struct RawCpuSample { pub timestamp_nanos: u64, pub callchain: Vec, pub source: CpuSampleSource, + pub cpu: Option, } /// Manages the process-wide perf sampler. Yields raw samples without worker IDs. @@ -134,6 +135,7 @@ impl CpuProfiler { timestamp_nanos: sample.time, callchain: sample.callchain.clone(), source: CpuSampleSource::CpuProfile, + cpu: sample.cpu, }, thread_name, ); @@ -172,6 +174,7 @@ impl SchedProfiler { timestamp_nanos: sample.time, callchain: sample.callchain.clone(), source: CpuSampleSource::SchedEvent, + cpu: sample.cpu, }); }); } diff --git a/dial9-tokio-telemetry/src/telemetry/events.rs b/dial9-tokio-telemetry/src/telemetry/events.rs index 5f433bed..8aa1adc9 100644 --- a/dial9-tokio-telemetry/src/telemetry/events.rs +++ b/dial9-tokio-telemetry/src/telemetry/events.rs @@ -161,6 +161,10 @@ pub enum TelemetryEvent { source: CpuSampleSource, /// Raw instruction pointer addresses (leaf first). Symbolized offline. callchain: Vec, + /// CPU the sample was taken on, if the backend could determine it. + /// Perf sampling fills this in; ctimer may report `None` if `getcpu` + /// fails. Older traces recorded before this field existed decode as `None`. + cpu: Option, }, /// Maps an OS thread ID to its name (from `/proc/self/task//comm`). /// Emitted before the first CpuSample referencing this tid in each file. @@ -349,6 +353,8 @@ pub(crate) struct CpuSampleData { pub thread_name: Option, pub source: CpuSampleSource, pub callchain: Vec, + /// CPU the sample was taken on, if the backend could determine it. + pub cpu: Option, } /// Get the OS thread ID (tid) of the calling thread via `gettid()`. @@ -450,7 +456,7 @@ pub(crate) fn clock_pair() -> (u64, u64) { pub(crate) struct SchedStat { pub wait_time_ns: u64, /// Raw fd backing this read, exposed for FD-lifecycle tests. Not used in production. - #[cfg(test)] + #[cfg(all(test, target_os = "linux"))] fd: std::os::fd::RawFd, } @@ -509,7 +515,7 @@ impl SchedStat { .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "bad schedstat"))?; Ok(Self { wait_time_ns, - #[cfg(test)] + #[cfg(all(test, target_os = "linux"))] fd, }) } diff --git a/dial9-tokio-telemetry/src/telemetry/format.rs b/dial9-tokio-telemetry/src/telemetry/format.rs index c5342769..2976a647 100644 --- a/dial9-tokio-telemetry/src/telemetry/format.rs +++ b/dial9-tokio-telemetry/src/telemetry/format.rs @@ -196,6 +196,11 @@ pub(crate) struct CpuSampleEvent { pub source: CpuSampleSource, pub thread_name: Option, pub callchain: InternedStackFrames, + /// CPU the sample was taken on, if the backend could determine it. + /// + /// Widened to `u64` on the wire so the field encodes as `OptionalVarint`: + /// 1 byte when absent, typically 2 bytes (tag + small-varint) when present. + pub cpu: Option, } /// Wire-format event for a wake notification. @@ -414,6 +419,8 @@ pub(crate) fn to_owned_event( .get(e.callchain) .expect("stack pool entry must exist for CpuSample callchain") .to_vec(), + // CPU id is varint-encoded as u64 on the wire; real CPU ids fit in u32. + cpu: e.cpu.map(|v| v as u32), }, TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent { timestamp_nanos: e.timestamp_ns, diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs b/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs index 640fe751..958401a0 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs @@ -87,6 +87,7 @@ impl EventWriter { source: raw.source, callchain: raw.callchain, thread_name: thread_name.cloned(), + cpu: raw.cpu, }; record_event( RawEvent::CpuSample(Box::new(data)), @@ -112,6 +113,7 @@ impl EventWriter { // TODO: we should be able to also track thread name here. // sampler is running on worker threads so no thread name thread_name: None, + cpu: raw.cpu, }; record_event( RawEvent::CpuSample(Box::new(data)), diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index 41985439..3b66c1fb 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -2372,6 +2372,7 @@ mod tests { source: CpuSampleSource::CpuProfile, thread_name: None, callchain: callchain.clone(), + cpu: None, }; *timestamp += 1; ew.write_raw_event(RawEvent::CpuSample(Box::new(data))) diff --git a/dial9-viewer/ui/demo-trace.bin b/dial9-viewer/ui/demo-trace.bin index 9597866e..f3e34571 100644 Binary files a/dial9-viewer/ui/demo-trace.bin and b/dial9-viewer/ui/demo-trace.bin differ diff --git a/dial9-viewer/ui/trace_parser.js b/dial9-viewer/ui/trace_parser.js index c014e914..c2f7e352 100644 --- a/dial9-viewer/ui/trace_parser.js +++ b/dial9-viewer/ui/trace_parser.js @@ -73,6 +73,7 @@ * tid: number, * source: number, * callchain: string[], + * cpu: number|null, * }} CpuSample */ @@ -356,12 +357,17 @@ const chain = (v.callchain || []).map( (addr) => "0x" + BigInt(addr).toString(16) ); + // `cpu` is encoded as OptionalVarint: null when the backend could + // not determine the CPU. Varints decode as strings for BigInt safety; + // CPU ids always fit in a Number. + const cpu = v.cpu == null ? null : Number(v.cpu); cpuSamples.push({ timestamp: ts, workerId: num(v.worker_id), tid: num(v.tid), source: num(v.source), callchain: chain, + cpu, }); const tn = v.thread_name; if (tn) { diff --git a/perf-self-profile/examples/basic.rs b/perf-self-profile/examples/basic.rs index 1d627ce8..8dfb884f 100644 --- a/perf-self-profile/examples/basic.rs +++ b/perf-self-profile/examples/basic.rs @@ -48,9 +48,13 @@ fn main() { // --- Print a few raw samples --- eprintln!("=== First 3 samples ==="); for (i, sample) in samples.iter().take(3).enumerate() { + let cpu = sample + .cpu + .map(|c| c.to_string()) + .unwrap_or_else(|| "?".into()); eprintln!( - "Sample {i}: ip={:#x}, tid={}, cpu={}, frames:", - sample.ip, sample.tid, sample.cpu + "Sample {i}: ip={:#x}, tid={}, cpu={cpu}, frames:", + sample.ip, sample.tid ); for (j, &addr) in sample.callchain.iter().enumerate() { let sym = resolve_symbol(addr); diff --git a/perf-self-profile/src/sampler.rs b/perf-self-profile/src/sampler.rs index ac39c1f6..f738e6bf 100644 --- a/perf-self-profile/src/sampler.rs +++ b/perf-self-profile/src/sampler.rs @@ -99,8 +99,11 @@ pub struct Sample { pub tid: u32, /// Timestamp in nanoseconds from `CLOCK_MONOTONIC` (set via `use_clockid`). pub time: u64, - /// CPU the sample was taken on. - pub cpu: u32, + /// CPU the sample was taken on, if the backend could determine it. + /// + /// Perf-based sampling always fills this in (via `PERF_SAMPLE_CPU`). + /// The ctimer fallback sets it to `None` when `getcpu` fails. + pub cpu: Option, /// The actual period for this sample. pub period: u64, /// Stack frames from the callchain. diff --git a/perf-self-profile/src/sys/linux/ctimer_sampler.rs b/perf-self-profile/src/sys/linux/ctimer_sampler.rs index b2306711..10deba62 100644 --- a/perf-self-profile/src/sys/linux/ctimer_sampler.rs +++ b/perf-self-profile/src/sys/linux/ctimer_sampler.rs @@ -93,7 +93,7 @@ impl SamplerBackend for CtimerSampler { pid: 0, tid: 0, time: 0, - cpu: 0, + cpu: None, period: 0, callchain: Vec::with_capacity(MAX_FRAMES), raw: None, @@ -166,17 +166,19 @@ extern "C" fn sigprof_handler( let pid = libc::getpid() as u32; let tid = gettid() as u32; - let mut cpu = 0u32; - // SAFETY: `getcpu` writes one `u32` through `&mut cpu`, node/cache pointers are null (allowed). - let ok = libc::syscall( + let mut cpu_raw = 0u32; + // SAFETY: `getcpu` writes one `u32` through `&mut cpu_raw`, node/cache pointers are null (allowed). + let cpu = if libc::syscall( libc::SYS_getcpu, - &mut cpu, + &mut cpu_raw, ptr::null_mut::(), ptr::null_mut::(), - ) == 0; - if !ok { - cpu = 0; // fallback if getcpu fails - } + ) == 0 + { + Some(cpu_raw) + } else { + None + }; let mut ts: libc::timespec = mem::zeroed(); libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts); diff --git a/perf-self-profile/src/sys/linux/fp_profiler/sample_buffer.rs b/perf-self-profile/src/sys/linux/fp_profiler/sample_buffer.rs index f8ff0795..78a9959c 100644 --- a/perf-self-profile/src/sys/linux/fp_profiler/sample_buffer.rs +++ b/perf-self-profile/src/sys/linux/fp_profiler/sample_buffer.rs @@ -27,7 +27,7 @@ struct SampleData { pid: u32, tid: u32, time: u64, - cpu: u32, + cpu: Option, period: u64, num_frames: u32, frames: [u64; MAX_FRAMES], @@ -51,7 +51,7 @@ impl SampleSlot { pid: 0, tid: 0, time: 0, - cpu: 0, + cpu: None, period: 0, num_frames: 0, frames: [0u64; MAX_FRAMES], @@ -101,7 +101,7 @@ pub(crate) struct DrainedSample { pub pid: u32, pub tid: u32, pub time: u64, - pub cpu: u32, + pub cpu: Option, /// Effective sampling period in nanoseconds, accounting for timer overruns. /// `interval_ns * (1 + overrun_count)`, used as sample weight in flamegraphs. pub period: u64, @@ -183,7 +183,14 @@ pub(crate) struct SlotWriter { impl SlotWriter { #[inline] - pub(crate) unsafe fn write(&mut self, pid: u32, tid: u32, time: u64, cpu: u32, period: u64) { + pub(crate) unsafe fn write( + &mut self, + pid: u32, + tid: u32, + time: u64, + cpu: Option, + period: u64, + ) { // SAFETY: slot lives in the 'static BUFFER, and the tail CAS in // claim_slot hands us exclusive access until commit. unsafe { @@ -238,7 +245,7 @@ impl Drop for SlotWriter { (*data).pid = 0; (*data).tid = 0; (*data).time = 0; - (*data).cpu = 0; + (*data).cpu = None; (*data).period = 0; (*data).num_frames = 0; (*self.slot) @@ -352,7 +359,7 @@ mod tests { unsafe { let mut slot = claim_slot().expect("should claim slot"); - slot.write(1000, 42, 999_000_000, 3, 10_000_000); + slot.write(1000, 42, 999_000_000, Some(3), 10_000_000); let frames = [0x1234u64, 0x5678, 0x9abc]; write_frames(&mut slot, &frames, 3); slot.commit(); @@ -366,7 +373,7 @@ mod tests { assert_eq!(got[0].pid, 1000); assert_eq!(got[0].tid, 42); assert_eq!(got[0].time, 999_000_000); - assert_eq!(got[0].cpu, 3); + assert_eq!(got[0].cpu, Some(3)); assert_eq!(got[0].period, 10_000_000); assert_eq!(got[0].num_frames, 3); assert_eq!(got[0].frames[0], 0x1234); @@ -383,21 +390,21 @@ mod tests { unsafe { let mut slot = claim_slot().unwrap(); - slot.write(1, 1, 100, 0, 0); + slot.write(1, 1, 100, None, 0); write_frames(&mut slot, &[0xAA], 1); slot.commit(); } unsafe { let mut slot = claim_slot().unwrap(); - slot.write(2, 2, 200, 0, 0); + slot.write(2, 2, 200, None, 0); write_frames(&mut slot, &[0xBB], 1); drop(slot); } unsafe { let mut slot = claim_slot().unwrap(); - slot.write(3, 3, 300, 0, 0); + slot.write(3, 3, 300, None, 0); write_frames(&mut slot, &[0xCC], 1); slot.commit(); } @@ -449,20 +456,20 @@ mod tests { unsafe { // Slot 0: committed. let mut s0 = claim_slot().unwrap(); - s0.write(10, 10, 10, 0, 1); + s0.write(10, 10, 10, None, 1); write_frames(&mut s0, &[0x10], 1); s0.commit(); // Slot 1: left uncommitted to block ordered drain. let mut s1 = claim_slot().unwrap(); - s1.write(20, 20, 20, 0, 1); + s1.write(20, 20, 20, None, 1); write_frames(&mut s1, &[0x20], 1); blocked_writer = s1; // Slot 2: committed, but should not be drained yet because slot 1 // is not ready. let mut s2 = claim_slot().unwrap(); - s2.write(30, 30, 30, 0, 1); + s2.write(30, 30, 30, None, 1); write_frames(&mut s2, &[0x30], 1); s2.commit(); } @@ -487,7 +494,7 @@ mod tests { for i in 0..BUFFER_CAP { unsafe { let mut s = claim_slot().expect("should claim during fill"); - s.write(i as u32, 0, 0, 0, 1); + s.write(i as u32, 0, 0, None, 1); write_frames(&mut s, &[], 0); s.commit(); } @@ -514,7 +521,7 @@ mod tests { for i in 0..50u32 { unsafe { let mut s = claim_slot().expect("should claim after drain"); - s.write(10000 + i, 0, 0, 0, 1); + s.write(10000 + i, 0, 0, None, 1); write_frames(&mut s, &[], 0); s.commit(); } @@ -552,7 +559,7 @@ mod tests { for i in 0..CLAIMS_PER_PRODUCER { unsafe { if let Some(mut s) = claim_slot() { - s.write(pid as u32, i as u32, 0, 0, 1); + s.write(pid as u32, i as u32, 0, None, 1); write_frames(&mut s, &[], 0); s.commit(); claimed.fetch_add(1, Ordering::Relaxed); @@ -615,12 +622,12 @@ mod tests { unsafe { let mut a = claim_slot().unwrap(); - a.write(111, 1, 1, 0, 1); + a.write(111, 1, 1, None, 1); write_frames(&mut a, &[0xAA], 1); a.commit(); let mut b = claim_slot().unwrap(); - b.write(222, 2, 2, 0, 1); + b.write(222, 2, 2, None, 1); write_frames(&mut b, &[0xBB], 1); b.commit(); } diff --git a/perf-self-profile/src/sys/linux/perf_sampler.rs b/perf-self-profile/src/sys/linux/perf_sampler.rs index 2336ccda..37903a0e 100644 --- a/perf-self-profile/src/sys/linux/perf_sampler.rs +++ b/perf-self-profile/src/sys/linux/perf_sampler.rs @@ -344,7 +344,7 @@ impl super::sampler::SamplerBackend for PerfSamplerImpl { pid: s.pid().unwrap_or(0), tid: s.tid().unwrap_or(0), time: s.time().unwrap_or(0), - cpu: s.cpu().unwrap_or(0), + cpu: s.cpu(), period: s.period().unwrap_or(0), callchain, raw: s.raw().map(|r| r.to_vec()), diff --git a/perf-self-profile/tests/multithread.rs b/perf-self-profile/tests/multithread.rs index 684a7c46..dd01e14f 100644 --- a/perf-self-profile/tests/multithread.rs +++ b/perf-self-profile/tests/multithread.rs @@ -54,4 +54,14 @@ fn profiles_spawned_threads() { tids, samples.len() ); + + // Perf events carry a CPU id via PERF_SAMPLE_CPU. Expect every sample to + // have Some(cpu) — the perf path never reports an unknown cpu. + let without_cpu: usize = samples.iter().filter(|s| s.cpu.is_none()).count(); + assert_eq!( + without_cpu, + 0, + "expected every perf sample to carry a cpu id, got {without_cpu}/{} without", + samples.len(), + ); }