Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ mod tests {
thread_name: None,
source: CpuSampleSource::CpuProfile,
callchain: vec![],
cpu: None,
},
TelemetryEvent::CpuSample {
timestamp_nanos: 1_800_000,
Expand All @@ -1279,6 +1280,7 @@ mod tests {
thread_name: None,
source: CpuSampleSource::SchedEvent,
callchain: vec![],
cpu: None,
},
TelemetryEvent::PollEnd {
timestamp_nanos: 2_000_000,
Expand Down Expand Up @@ -1332,6 +1334,7 @@ mod tests {
thread_name: None,
source: CpuSampleSource::CpuProfile,
callchain: vec![],
cpu: None,
},
];
let sampled = detect_sampled_polls(&events);
Expand Down
52 changes: 52 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl Encodable for RawEvent {
source: data.source,
thread_name,
callchain,
cpu: data.cpu.map(u64::from),
});
}
}
Expand Down Expand Up @@ -607,4 +608,55 @@ mod tests {
let guard = buf.lock().unwrap();
assert_eq!(guard.event_count, 1);
}

/// Encode a single `RawEvent::CpuSample` through a real thread-local buffer
/// and decode it back via the public `decode_events` path, asserting that
/// the `cpu` field round-trips.
fn cpu_sample_round_trip(cpu: Option<u32>) -> crate::telemetry::events::TelemetryEvent {
use crate::telemetry::events::{CpuSampleData, CpuSampleSource, RawEvent};
use crate::telemetry::format::{WorkerId, decode_events};

let data = CpuSampleData {
timestamp_nanos: 12_345,
worker_id: WorkerId::from(0usize),
tid: 4242,
thread_name: None,
source: CpuSampleSource::CpuProfile,
callchain: vec![0xdead_beef, 0xcafe_babe],
cpu,
};
let encoded = ThreadLocalBuffer::encode_single(&RawEvent::CpuSample(Box::new(data)));
let events = decode_events(&encoded).expect("decode");
assert_eq!(events.len(), 1);
events.into_iter().next().unwrap()
}

#[test]
fn cpu_sample_event_round_trips_with_cpu() {
use crate::telemetry::events::TelemetryEvent;
match cpu_sample_round_trip(Some(7)) {
TelemetryEvent::CpuSample {
tid,
cpu,
callchain,
..
} => {
assert_eq!(tid, 4242);
assert_eq!(cpu, Some(7));
assert_eq!(callchain, vec![0xdead_beef, 0xcafe_babe]);
}
other => panic!("expected CpuSample, got {other:?}"),
}
}

#[test]
fn cpu_sample_event_round_trips_without_cpu() {
use crate::telemetry::events::TelemetryEvent;
match cpu_sample_round_trip(None) {
TelemetryEvent::CpuSample { cpu, .. } => {
assert_eq!(cpu, None);
}
other => panic!("expected CpuSample, got {other:?}"),
}
}
}
3 changes: 3 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/cpu_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub(crate) struct RawCpuSample {
pub timestamp_nanos: u64,
pub callchain: Vec<u64>,
pub source: CpuSampleSource,
pub cpu: Option<u32>,
}

/// Manages the process-wide perf sampler. Yields raw samples without worker IDs.
Expand Down Expand Up @@ -134,6 +135,7 @@ impl CpuProfiler {
timestamp_nanos: sample.time,
callchain: sample.callchain.clone(),
source: CpuSampleSource::CpuProfile,
cpu: sample.cpu,
},
thread_name,
);
Expand Down Expand Up @@ -172,6 +174,7 @@ impl SchedProfiler {
timestamp_nanos: sample.time,
callchain: sample.callchain.clone(),
source: CpuSampleSource::SchedEvent,
cpu: sample.cpu,
});
});
}
Expand Down
10 changes: 8 additions & 2 deletions dial9-tokio-telemetry/src/telemetry/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ pub enum TelemetryEvent {
source: CpuSampleSource,
/// Raw instruction pointer addresses (leaf first). Symbolized offline.
callchain: Vec<u64>,
/// CPU the sample was taken on, if the backend could determine it.
/// Perf sampling fills this in; ctimer may report `None` if `getcpu`
/// fails. Older traces recorded before this field existed decode as `None`.
cpu: Option<u32>,
},
/// Maps an OS thread ID to its name (from `/proc/self/task/<tid>/comm`).
/// Emitted before the first CpuSample referencing this tid in each file.
Expand Down Expand Up @@ -349,6 +353,8 @@ pub(crate) struct CpuSampleData {
pub thread_name: Option<ThreadName>,
pub source: CpuSampleSource,
pub callchain: Vec<u64>,
/// CPU the sample was taken on, if the backend could determine it.
pub cpu: Option<u32>,
}

/// Get the OS thread ID (tid) of the calling thread via `gettid()`.
Expand Down Expand Up @@ -450,7 +456,7 @@ pub(crate) fn clock_pair() -> (u64, u64) {
pub(crate) struct SchedStat {
pub wait_time_ns: u64,
/// Raw fd backing this read, exposed for FD-lifecycle tests. Not used in production.
#[cfg(test)]
#[cfg(all(test, target_os = "linux"))]
fd: std::os::fd::RawFd,
}

Expand Down Expand Up @@ -509,7 +515,7 @@ impl SchedStat {
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "bad schedstat"))?;
Ok(Self {
wait_time_ns,
#[cfg(test)]
#[cfg(all(test, target_os = "linux"))]
fd,
})
}
Expand Down
7 changes: 7 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ pub(crate) struct CpuSampleEvent {
pub source: CpuSampleSource,
pub thread_name: Option<InternedString>,
pub callchain: InternedStackFrames,
/// CPU the sample was taken on, if the backend could determine it.
///
/// Widened to `u64` on the wire so the field encodes as `OptionalVarint`:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good approach

/// 1 byte when absent, typically 2 bytes (tag + small-varint) when present.
pub cpu: Option<u64>,
}

/// Wire-format event for a wake notification.
Expand Down Expand Up @@ -414,6 +419,8 @@ pub(crate) fn to_owned_event(
.get(e.callchain)
.expect("stack pool entry must exist for CpuSample callchain")
.to_vec(),
// CPU id is varint-encoded as u64 on the wire; real CPU ids fit in u32.
cpu: e.cpu.map(|v| v as u32),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better to be clearer about bad wire with u32::try_from()::ok

},
TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent {
timestamp_nanos: e.timestamp_ns,
Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl EventWriter {
source: raw.source,
callchain: raw.callchain,
thread_name: thread_name.cloned(),
cpu: raw.cpu,
};
record_event(
RawEvent::CpuSample(Box::new(data)),
Expand All @@ -112,6 +113,7 @@ impl EventWriter {
// TODO: we should be able to also track thread name here.
// sampler is running on worker threads so no thread name
thread_name: None,
cpu: raw.cpu,
};
record_event(
RawEvent::CpuSample(Box::new(data)),
Expand Down
1 change: 1 addition & 0 deletions dial9-tokio-telemetry/src/telemetry/recorder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1936,6 +1936,7 @@ mod tests {
source: CpuSampleSource::CpuProfile,
thread_name: None,
callchain: callchain.clone(),
cpu: None,
};
*timestamp += 1;
ew.write_raw_event(RawEvent::CpuSample(Box::new(data)))
Expand Down
Binary file modified dial9-viewer/ui/demo-trace.bin
Binary file not shown.
6 changes: 6 additions & 0 deletions dial9-viewer/ui/trace_parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* tid: number,
* source: number,
* callchain: string[],
* cpu: number|null,
* }} CpuSample
*/

Expand Down Expand Up @@ -356,12 +357,17 @@
const chain = (v.callchain || []).map(
(addr) => "0x" + BigInt(addr).toString(16)
);
// `cpu` is encoded as OptionalVarint: null when the backend could
// not determine the CPU. Varints decode as strings for BigInt safety;
// CPU ids always fit in a Number.
const cpu = v.cpu == null ? null : Number(v.cpu);
cpuSamples.push({
timestamp: ts,
workerId: num(v.worker_id),
tid: num(v.tid),
source: num(v.source),
callchain: chain,
cpu,
});
const tn = v.thread_name;
if (tn) {
Expand Down
8 changes: 6 additions & 2 deletions perf-self-profile/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ fn main() {
// --- Print a few raw samples ---
eprintln!("=== First 3 samples ===");
for (i, sample) in samples.iter().take(3).enumerate() {
let cpu = sample
.cpu
.map(|c| c.to_string())
.unwrap_or_else(|| "?".into());
eprintln!(
"Sample {i}: ip={:#x}, tid={}, cpu={}, frames:",
sample.ip, sample.tid, sample.cpu
"Sample {i}: ip={:#x}, tid={}, cpu={cpu}, frames:",
sample.ip, sample.tid
);
for (j, &addr) in sample.callchain.iter().enumerate() {
let sym = resolve_symbol(addr);
Expand Down
7 changes: 5 additions & 2 deletions perf-self-profile/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ pub struct Sample {
pub tid: u32,
/// Timestamp in nanoseconds from `CLOCK_MONOTONIC` (set via `use_clockid`).
pub time: u64,
/// CPU the sample was taken on.
pub cpu: u32,
/// CPU the sample was taken on, if the backend could determine it.
///
/// Perf-based sampling always fills this in (via `PERF_SAMPLE_CPU`).
/// The ctimer fallback sets it to `None` when `getcpu` fails.
pub cpu: Option<u32>,
/// The actual period for this sample.
pub period: u64,
/// Stack frames from the callchain.
Expand Down
20 changes: 11 additions & 9 deletions perf-self-profile/src/sys/linux/ctimer_sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl SamplerBackend for CtimerSampler {
pid: 0,
tid: 0,
time: 0,
cpu: 0,
cpu: None,
period: 0,
callchain: Vec::with_capacity(MAX_FRAMES),
raw: None,
Expand Down Expand Up @@ -166,17 +166,19 @@ extern "C" fn sigprof_handler(
let pid = libc::getpid() as u32;
let tid = gettid() as u32;

let mut cpu = 0u32;
// SAFETY: `getcpu` writes one `u32` through `&mut cpu`, node/cache pointers are null (allowed).
let ok = libc::syscall(
let mut cpu_raw = 0u32;
// SAFETY: `getcpu` writes one `u32` through `&mut cpu_raw`, node/cache pointers are null (allowed).
let cpu = if libc::syscall(
libc::SYS_getcpu,
&mut cpu,
&mut cpu_raw,
ptr::null_mut::<libc::c_void>(),
ptr::null_mut::<libc::c_void>(),
) == 0;
if !ok {
cpu = 0; // fallback if getcpu fails
}
) == 0
{
Some(cpu_raw)
} else {
None
};

let mut ts: libc::timespec = mem::zeroed();
libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
Expand Down
Loading
Loading