Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ mod tests {
thread_name: None,
source: CpuSampleSource::CpuProfile,
callchain: vec![],
cpu: None,
},
TelemetryEvent::CpuSample {
timestamp_nanos: 1_800_000,
Expand All @@ -1279,6 +1280,7 @@ mod tests {
thread_name: None,
source: CpuSampleSource::SchedEvent,
callchain: vec![],
cpu: None,
},
TelemetryEvent::PollEnd {
timestamp_nanos: 2_000_000,
Expand Down Expand Up @@ -1332,6 +1334,7 @@ mod tests {
thread_name: None,
source: CpuSampleSource::CpuProfile,
callchain: vec![],
cpu: None,
},
];
let sampled = detect_sampled_polls(&events);
Expand Down
52 changes: 52 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl Encodable for RawEvent {
source: data.source,
thread_name,
callchain,
cpu: data.cpu.map(u64::from),
});
}
}
Expand Down Expand Up @@ -607,4 +608,55 @@ mod tests {
let guard = buf.lock().unwrap();
assert_eq!(guard.event_count, 1);
}

/// Encode a single `RawEvent::CpuSample` through a real thread-local buffer
/// and decode it back via the public `decode_events` path, asserting that
/// the `cpu` field round-trips.
fn cpu_sample_round_trip(cpu: Option<u32>) -> crate::telemetry::events::TelemetryEvent {
use crate::telemetry::events::{CpuSampleData, CpuSampleSource, RawEvent};
use crate::telemetry::format::{WorkerId, decode_events};

let data = CpuSampleData {
timestamp_nanos: 12_345,
worker_id: WorkerId::from(0usize),
tid: 4242,
thread_name: None,
source: CpuSampleSource::CpuProfile,
callchain: vec![0xdead_beef, 0xcafe_babe],
cpu,
};
let encoded = ThreadLocalBuffer::encode_single(&RawEvent::CpuSample(Box::new(data)));
let events = decode_events(&encoded).expect("decode");
assert_eq!(events.len(), 1);
events.into_iter().next().unwrap()
}

#[test]
fn cpu_sample_event_round_trips_with_cpu() {
use crate::telemetry::events::TelemetryEvent;
match cpu_sample_round_trip(Some(7)) {
TelemetryEvent::CpuSample {
tid,
cpu,
callchain,
..
} => {
assert_eq!(tid, 4242);
assert_eq!(cpu, Some(7));
assert_eq!(callchain, vec![0xdead_beef, 0xcafe_babe]);
}
other => panic!("expected CpuSample, got {other:?}"),
}
}

#[test]
fn cpu_sample_event_round_trips_without_cpu() {
use crate::telemetry::events::TelemetryEvent;
match cpu_sample_round_trip(None) {
TelemetryEvent::CpuSample { cpu, .. } => {
assert_eq!(cpu, None);
}
other => panic!("expected CpuSample, got {other:?}"),
}
}
}
3 changes: 3 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/cpu_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub(crate) struct RawCpuSample {
pub timestamp_nanos: u64,
pub callchain: Vec<u64>,
pub source: CpuSampleSource,
pub cpu: Option<u32>,
}

/// Manages the process-wide perf sampler. Yields raw samples without worker IDs.
Expand Down Expand Up @@ -134,6 +135,7 @@ impl CpuProfiler {
timestamp_nanos: sample.time,
callchain: sample.callchain.clone(),
source: CpuSampleSource::CpuProfile,
cpu: sample.cpu,
},
thread_name,
);
Expand Down Expand Up @@ -172,6 +174,7 @@ impl SchedProfiler {
timestamp_nanos: sample.time,
callchain: sample.callchain.clone(),
source: CpuSampleSource::SchedEvent,
cpu: sample.cpu,
});
});
}
Expand Down
10 changes: 8 additions & 2 deletions dial9-tokio-telemetry/src/telemetry/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ pub enum TelemetryEvent {
source: CpuSampleSource,
/// Raw instruction pointer addresses (leaf first). Symbolized offline.
callchain: Vec<u64>,
/// CPU the sample was taken on, if the backend could determine it.
/// Perf sampling fills this in; ctimer may report `None` if `getcpu`
/// fails. Older traces recorded before this field existed decode as `None`.
cpu: Option<u32>,
},
/// Maps an OS thread ID to its name (from `/proc/self/task/<tid>/comm`).
/// Emitted before the first CpuSample referencing this tid in each file.
Expand Down Expand Up @@ -349,6 +353,8 @@ pub(crate) struct CpuSampleData {
pub thread_name: Option<ThreadName>,
pub source: CpuSampleSource,
pub callchain: Vec<u64>,
/// CPU the sample was taken on, if the backend could determine it.
pub cpu: Option<u32>,
}

/// Get the OS thread ID (tid) of the calling thread via `gettid()`.
Expand Down Expand Up @@ -450,7 +456,7 @@ pub(crate) fn clock_pair() -> (u64, u64) {
pub(crate) struct SchedStat {
pub wait_time_ns: u64,
/// Raw fd backing this read, exposed for FD-lifecycle tests. Not used in production.
#[cfg(test)]
#[cfg(all(test, target_os = "linux"))]
fd: std::os::fd::RawFd,
}

Expand Down Expand Up @@ -509,7 +515,7 @@ impl SchedStat {
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "bad schedstat"))?;
Ok(Self {
wait_time_ns,
#[cfg(test)]
#[cfg(all(test, target_os = "linux"))]
fd,
})
}
Expand Down
7 changes: 7 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ pub(crate) struct CpuSampleEvent {
pub source: CpuSampleSource,
pub thread_name: Option<InternedString>,
pub callchain: InternedStackFrames,
/// CPU the sample was taken on, if the backend could determine it.
///
/// Widened to `u64` on the wire so the field encodes as `OptionalVarint`:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good approach

/// 1 byte when absent, typically 2 bytes (tag + small-varint) when present.
pub cpu: Option<u64>,
}

/// Wire-format event for a wake notification.
Expand Down Expand Up @@ -414,6 +419,8 @@ pub(crate) fn to_owned_event(
.get(e.callchain)
.expect("stack pool entry must exist for CpuSample callchain")
.to_vec(),
// CPU id is varint-encoded as u64 on the wire; real CPU ids fit in u32.
cpu: e.cpu.map(|v| v as u32),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better to be clearer about bad wire with u32::try_from()::ok

},
TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent {
timestamp_nanos: e.timestamp_ns,
Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl EventWriter {
source: raw.source,
callchain: raw.callchain,
thread_name: thread_name.cloned(),
cpu: raw.cpu,
};
record_event(
RawEvent::CpuSample(Box::new(data)),
Expand All @@ -112,6 +113,7 @@ impl EventWriter {
// TODO: we should be able to also track thread name here.
// sampler is running on worker threads so no thread name
thread_name: None,
cpu: raw.cpu,
};
record_event(
RawEvent::CpuSample(Box::new(data)),
Expand Down
1 change: 1 addition & 0 deletions dial9-tokio-telemetry/src/telemetry/recorder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,7 @@ mod tests {
source: CpuSampleSource::CpuProfile,
thread_name: None,
callchain: callchain.clone(),
cpu: None,
};
*timestamp += 1;
ew.write_raw_event(RawEvent::CpuSample(Box::new(data)))
Expand Down
Binary file modified dial9-viewer/ui/demo-trace.bin
Binary file not shown.
6 changes: 6 additions & 0 deletions dial9-viewer/ui/trace_parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* tid: number,
* source: number,
* callchain: string[],
* cpu: number|null,
* }} CpuSample
*/

Expand Down Expand Up @@ -356,12 +357,17 @@
const chain = (v.callchain || []).map(
(addr) => "0x" + BigInt(addr).toString(16)
);
// `cpu` is encoded as OptionalVarint: null when the backend could
// not determine the CPU. Varints decode as strings for BigInt safety;
// CPU ids always fit in a Number.
const cpu = v.cpu == null ? null : Number(v.cpu);
cpuSamples.push({
timestamp: ts,
workerId: num(v.worker_id),
tid: num(v.tid),
source: num(v.source),
callchain: chain,
cpu,
});
const tn = v.thread_name;
if (tn) {
Expand Down
8 changes: 6 additions & 2 deletions perf-self-profile/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ fn main() {
// --- Print a few raw samples ---
eprintln!("=== First 3 samples ===");
for (i, sample) in samples.iter().take(3).enumerate() {
let cpu = sample
.cpu
.map(|c| c.to_string())
.unwrap_or_else(|| "?".into());
eprintln!(
"Sample {i}: ip={:#x}, tid={}, cpu={}, frames:",
sample.ip, sample.tid, sample.cpu
"Sample {i}: ip={:#x}, tid={}, cpu={cpu}, frames:",
sample.ip, sample.tid
);
for (j, &addr) in sample.callchain.iter().enumerate() {
let sym = resolve_symbol(addr);
Expand Down
7 changes: 5 additions & 2 deletions perf-self-profile/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ pub struct Sample {
pub tid: u32,
/// Timestamp in nanoseconds from `CLOCK_MONOTONIC` (set via `use_clockid`).
pub time: u64,
/// CPU the sample was taken on.
pub cpu: u32,
/// CPU the sample was taken on, if the backend could determine it.
///
/// Perf-based sampling always fills this in (via `PERF_SAMPLE_CPU`).
/// The ctimer fallback sets it to `None` when `getcpu` fails.
pub cpu: Option<u32>,
/// The actual period for this sample.
pub period: u64,
/// Stack frames from the callchain.
Expand Down
20 changes: 11 additions & 9 deletions perf-self-profile/src/sys/linux/ctimer_sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl SamplerBackend for CtimerSampler {
pid: 0,
tid: 0,
time: 0,
cpu: 0,
cpu: None,
period: 0,
callchain: Vec::with_capacity(MAX_FRAMES),
raw: None,
Expand Down Expand Up @@ -166,17 +166,19 @@ extern "C" fn sigprof_handler(
let pid = libc::getpid() as u32;
let tid = gettid() as u32;

let mut cpu = 0u32;
// SAFETY: `getcpu` writes one `u32` through `&mut cpu`, node/cache pointers are null (allowed).
let ok = libc::syscall(
let mut cpu_raw = 0u32;
// SAFETY: `getcpu` writes one `u32` through `&mut cpu_raw`, node/cache pointers are null (allowed).
let cpu = if libc::syscall(
libc::SYS_getcpu,
&mut cpu,
&mut cpu_raw,
ptr::null_mut::<libc::c_void>(),
ptr::null_mut::<libc::c_void>(),
) == 0;
if !ok {
cpu = 0; // fallback if getcpu fails
}
) == 0
{
Some(cpu_raw)
} else {
None
};

let mut ts: libc::timespec = mem::zeroed();
libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
Expand Down
Loading
Loading