diff --git a/dial9-tokio-telemetry/Cargo.toml b/dial9-tokio-telemetry/Cargo.toml index 95560e9e..0b3f03b2 100644 --- a/dial9-tokio-telemetry/Cargo.toml +++ b/dial9-tokio-telemetry/Cargo.toml @@ -85,6 +85,9 @@ worker-s3 = [ "dep:aws-config", "dep:time", ] +## Removes #[non_exhaustive] from event structs so downstream crates can +## construct them directly. Unstable: no semver guarantees on new fields. +unstable-events = [] ## All features except platform-specific ones (taskdump). Used in CI for non-Linux targets. __nonlinux_all_features = [ "analysis", @@ -97,6 +100,7 @@ __nonlinux_all_features = [ dial9-tokio-telemetry = { path = ".", features = [ "analysis", "tracing-layer", + "unstable-events", "worker-s3", ] } assert2 = { workspace = true } diff --git a/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs b/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs index 9dd81930..76b30a3f 100644 --- a/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs +++ b/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs @@ -187,6 +187,8 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option None, diff --git a/dial9-tokio-telemetry/src/telemetry/analysis.rs b/dial9-tokio-telemetry/src/telemetry/analysis.rs index edbffd2c..67c59641 100644 --- a/dial9-tokio-telemetry/src/telemetry/analysis.rs +++ b/dial9-tokio-telemetry/src/telemetry/analysis.rs @@ -334,6 +334,8 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis { | TelemetryEvent::TaskTerminate { .. } | TelemetryEvent::CpuSample { .. } | TelemetryEvent::TaskDump { .. } + | TelemetryEvent::Alloc { .. } + | TelemetryEvent::Free { .. } | TelemetryEvent::ThreadNameDef { .. } | TelemetryEvent::WakeEvent { .. } | TelemetryEvent::SegmentMetadata { .. } diff --git a/dial9-tokio-telemetry/src/telemetry/events.rs b/dial9-tokio-telemetry/src/telemetry/events.rs index d67b95bd..4db1890a 100644 --- a/dial9-tokio-telemetry/src/telemetry/events.rs +++ b/dial9-tokio-telemetry/src/telemetry/events.rs @@ -236,6 +236,35 @@ pub enum TelemetryEvent { /// Named field values in schema order. fields: Vec<(String, FieldValue)>, }, + /// A sampled memory allocation event. + Alloc { + /// Wall-clock timestamp in nanoseconds (monotonic). + #[serde(rename = "timestamp_ns")] + timestamp_nanos: u64, + /// OS thread ID of the allocating thread. + tid: u32, + /// Allocation size in bytes. + size: u64, + /// Returned pointer address. + addr: u64, + /// Raw instruction pointer addresses (leaf first). + callchain: Vec, + }, + /// A deallocation paired with a previously-sampled allocation. + Free { + /// Wall-clock timestamp in nanoseconds (monotonic) of the free. + #[serde(rename = "timestamp_ns")] + timestamp_nanos: u64, + /// OS thread ID of the freeing thread. + tid: u32, + /// Pointer that was freed. + addr: u64, + /// Size of the allocation being freed (denormalized). + size: u64, + /// Monotonic-ns timestamp of the original allocation. + #[serde(rename = "alloc_timestamp_ns")] + alloc_timestamp_nanos: u64, + }, } impl TelemetryEvent { @@ -263,6 +292,12 @@ impl TelemetryEvent { | TelemetryEvent::TaskDump { timestamp_nanos, .. } + | TelemetryEvent::Alloc { + timestamp_nanos, .. + } + | TelemetryEvent::Free { + timestamp_nanos, .. + } | TelemetryEvent::WakeEvent { timestamp_nanos, .. } @@ -297,6 +332,8 @@ impl TelemetryEvent { | TelemetryEvent::TaskSpawn { .. } | TelemetryEvent::TaskTerminate { .. } | TelemetryEvent::TaskDump { .. } + | TelemetryEvent::Alloc { .. } + | TelemetryEvent::Free { .. } | TelemetryEvent::ThreadNameDef { .. } | TelemetryEvent::WakeEvent { .. } | TelemetryEvent::SegmentMetadata { .. } diff --git a/dial9-tokio-telemetry/src/telemetry/format.rs b/dial9-tokio-telemetry/src/telemetry/format.rs index f1a3547b..abb1e4ae 100644 --- a/dial9-tokio-telemetry/src/telemetry/format.rs +++ b/dial9-tokio-telemetry/src/telemetry/format.rs @@ -219,6 +219,59 @@ pub(crate) struct TaskDumpEvent { pub callchain: InternedStackFrames, } +/// Wire-format event for a sampled memory allocation. +/// +/// Emitted from the consolidator (flush thread) for allocations that tripped +/// the geometric sampling counter. The sampling rate that produced this event +/// lives in the segment metadata, not on each event. +#[derive(Debug, TraceEvent)] +#[cfg_attr(not(feature = "unstable-events"), non_exhaustive)] +pub struct AllocEvent { + /// Wall-clock timestamp in nanoseconds (monotonic). + #[traceevent(timestamp)] + pub timestamp_ns: u64, + /// OS thread ID of the allocating thread. Same source as `WorkerParkEvent.tid` + /// and `CpuSampleEvent.tid`. Use this to join against worker park/unpark + /// history to recover worker_id when the allocation happened on a tokio + /// worker thread. + pub tid: u32, + /// Allocation size in bytes. The actual size requested by the allocating + /// code; the underlying allocator may have rounded up, but that's not + /// recorded here. + pub size: u64, + /// Returned pointer. Only meaningful when liveset tracking is on; otherwise 0. + /// Always present so the schema is stable across track_liveset on/off. + pub addr: u64, + /// Stack at the allocation site. Frame 0 is the most-recent caller. + pub callchain: InternedStackFrames, +} + +/// Wire-format event for a deallocation paired with a previously-sampled +/// `AllocEvent`. Only emitted when liveset tracking is on. +/// +/// `size` and `alloc_timestamp_ns` are denormalized from the matching +/// `AllocEvent` so the free stays analytically useful when the corresponding +/// `AllocEvent` has been evicted by trace rotation. See design §3 +/// "Why denormalize size and alloc_timestamp_ns?" for the rationale. +#[derive(Debug, TraceEvent)] +#[cfg_attr(not(feature = "unstable-events"), non_exhaustive)] +pub struct FreeEvent { + /// Wall-clock timestamp in nanoseconds (monotonic) of the free. + #[traceevent(timestamp)] + pub timestamp_ns: u64, + /// OS thread ID of the freeing thread. + pub tid: u32, + /// Pointer that was freed. Matches a previously-seen `AllocEvent.addr`. + pub addr: u64, + /// Size of the allocation being freed. Denormalized from the matching + /// `AllocEvent` for rotation robustness. + pub size: u64, + /// Monotonic-ns timestamp of the original `AllocEvent`. Allows leak + /// analysis to bucket frees by generation without needing the + /// `AllocEvent` in the same (unrotated) trace. + pub alloc_timestamp_ns: u64, +} + /// Wire-format event for a wake notification. #[derive(Debug, TraceEvent)] pub struct WakeEventEvent { @@ -291,6 +344,8 @@ pub(crate) enum TelemetryEventRef<'a> { TaskTerminate(TaskTerminateEventRef<'a>), CpuSample(CpuSampleEventRef<'a>), TaskDump(TaskDumpEventRef<'a>), + Alloc(AllocEventRef<'a>), + Free(FreeEventRef<'a>), WakeEvent(WakeEventEventRef<'a>), SegmentMetadata(SegmentMetadataEventRef<'a>), ClockSync(ClockSyncEventRef<'a>), @@ -311,6 +366,8 @@ impl<'a> TelemetryEventRef<'a> { Self::TaskTerminate(e) => Some(e.timestamp_ns), Self::CpuSample(e) => Some(e.timestamp_ns), Self::TaskDump(e) => Some(e.timestamp_ns), + Self::Alloc(e) => Some(e.timestamp_ns), + Self::Free(e) => Some(e.timestamp_ns), Self::WakeEvent(e) => Some(e.timestamp_ns), Self::SegmentMetadata(e) => Some(e.timestamp_ns), Self::ClockSync(e) => Some(e.timestamp_ns), @@ -365,6 +422,12 @@ pub(crate) fn decode_ref<'a>( "TaskDumpEvent" => { TelemetryEventRef::TaskDump(TaskDumpEvent::decode(timestamp_ns, fields, field_defs)?) } + "AllocEvent" => { + TelemetryEventRef::Alloc(AllocEvent::decode(timestamp_ns, fields, field_defs)?) + } + "FreeEvent" => { + TelemetryEventRef::Free(FreeEvent::decode(timestamp_ns, fields, field_defs)?) + } "WakeEventEvent" => { TelemetryEventRef::WakeEvent(WakeEventEvent::decode(timestamp_ns, fields, field_defs)?) } @@ -453,6 +516,23 @@ pub(crate) fn to_owned_event( .expect("stack pool entry must exist for TaskDump callchain") .to_vec(), }, + TelemetryEventRef::Alloc(e) => TelemetryEvent::Alloc { + timestamp_nanos: e.timestamp_ns, + tid: e.tid, + size: e.size, + addr: e.addr, + callchain: stack_pool + .get(e.callchain) + .expect("stack pool entry must exist for AllocEvent callchain") + .to_vec(), + }, + TelemetryEventRef::Free(e) => TelemetryEvent::Free { + timestamp_nanos: e.timestamp_ns, + tid: e.tid, + addr: e.addr, + size: e.size, + alloc_timestamp_nanos: e.alloc_timestamp_ns, + }, TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent { timestamp_nanos: e.timestamp_ns, waker_task_id: e.waker_task_id, @@ -473,3 +553,74 @@ pub(crate) fn to_owned_event( }, } } + +#[cfg(test)] +mod tests { + use super::*; + use dial9_trace_format::encoder::Encoder; + + #[test] + fn alloc_event_round_trip() { + let mut enc = Encoder::new_to(Vec::new()).unwrap(); + let callchain = enc.intern_stack_frames(&[0x1000, 0x2000, 0x3000]).unwrap(); + enc.write_infallible(&AllocEvent { + timestamp_ns: 123_456_789, + tid: 42, + size: 4096, + addr: 0xDEAD_BEEF_CAFE, + callchain, + }); + let buf = enc.into_inner(); + + let events = decode_events(&buf).unwrap(); + assert_eq!(events.len(), 1); + match &events[0] { + TelemetryEvent::Alloc { + timestamp_nanos, + tid, + size, + addr, + callchain, + } => { + assert_eq!(*timestamp_nanos, 123_456_789); + assert_eq!(*tid, 42); + assert_eq!(*size, 4096); + assert_eq!(*addr, 0xDEAD_BEEF_CAFE); + assert_eq!(callchain, &[0x1000, 0x2000, 0x3000]); + } + other => panic!("expected Alloc event, got {other:?}"), + } + } + + #[test] + fn free_event_round_trip() { + let mut enc = Encoder::new_to(Vec::new()).unwrap(); + enc.write_infallible(&FreeEvent { + timestamp_ns: 999_000_000, + tid: 7, + addr: 0xCAFE_BABE, + size: 2048, + alloc_timestamp_ns: 100_000_000, + }); + let buf = enc.into_inner(); + + let events = decode_events(&buf).unwrap(); + assert_eq!(events.len(), 1); + match &events[0] { + TelemetryEvent::Free { + timestamp_nanos, + tid, + addr, + size, + alloc_timestamp_nanos, + } => { + assert_eq!(*timestamp_nanos, 999_000_000); + assert_eq!(*tid, 7); + assert_eq!(*addr, 0xCAFE_BABE); + assert_eq!(*size, 2048); + assert_eq!(*alloc_timestamp_nanos, 100_000_000); + } + other => panic!("expected Free event, got {other:?}"), + } + } +} diff --git a/dial9-tokio-telemetry/src/telemetry/mod.rs b/dial9-tokio-telemetry/src/telemetry/mod.rs index 41a6d026..5f0c257e 100644 --- a/dial9-tokio-telemetry/src/telemetry/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/mod.rs @@ -21,8 +21,8 @@ pub use crate::traced::TracedFuture; pub use buffer::{Encodable, ThreadLocalEncoder}; pub use events::{CpuSampleSource, TelemetryEvent, clock_monotonic_ns}; pub use format::{ - PollEndEvent, PollStartEvent, TaskSpawnEvent, WakeEventEvent, WorkerId, WorkerParkEvent, - WorkerUnparkEvent, + AllocEvent, FreeEvent, PollEndEvent, PollStartEvent, TaskSpawnEvent, WakeEventEvent, WorkerId, + WorkerParkEvent, WorkerUnparkEvent, }; pub use recorder::{ HasTracePath, NoTracePath, PipelineCustom, PipelineS3, PipelineUnset, RuntimeTelemetryHandle, diff --git a/dial9-tokio-telemetry/tests/js_parser.rs b/dial9-tokio-telemetry/tests/js_parser.rs index c4def7bf..6121dab0 100644 --- a/dial9-tokio-telemetry/tests/js_parser.rs +++ b/dial9-tokio-telemetry/tests/js_parser.rs @@ -183,3 +183,86 @@ main().catch((e) => {{ console.error(e); process.exit(1); }}); String::from_utf8_lossy(&output.stderr) ); } + +#[test] +fn test_js_parser_alloc_free_events() { + use dial9_tokio_telemetry::telemetry::{AllocEvent, FreeEvent}; + use dial9_trace_format::encoder::Encoder; + + let temp_dir = TempDir::new().unwrap(); + let trace_path = temp_dir.path().join("alloc_trace.bin"); + + { + let mut enc = Encoder::new(); + let stack = enc.intern_stack_frames(&[0xAAAA, 0xBBBB, 0xCCCC]).unwrap(); + enc.write(&AllocEvent { + timestamp_ns: 5_000_000, + tid: 42, + size: 4096, + addr: 0xDEAD_BEEF, + callchain: stack, + }) + .unwrap(); + enc.write(&FreeEvent { + timestamp_ns: 10_000_000, + tid: 7, + addr: 0xDEAD_BEEF, + size: 4096, + alloc_timestamp_ns: 5_000_000, + }) + .unwrap(); + std::fs::write(&trace_path, enc.finish()).unwrap(); + } + + let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let script = format!( + r#" +const {{ parseTrace }} = require("{viewer}/trace_parser.js"); +const fs = require("fs"); +async function main() {{ + const result = await parseTrace(fs.readFileSync("{trace}")); + if (result.allocEvents.length !== 1) {{ + console.error("expected 1 allocEvent, got " + result.allocEvents.length); + process.exit(1); + }} + const a = result.allocEvents[0]; + if (a.tid !== 42) {{ console.error("bad tid: " + a.tid); process.exit(1); }} + if (a.size !== 4096) {{ console.error("bad size: " + a.size); process.exit(1); }} + if (a.callchain.length !== 3) {{ console.error("bad callchain len: " + a.callchain.length); process.exit(1); }} + + if (result.freeEvents.length !== 1) {{ + console.error("expected 1 freeEvent, got " + result.freeEvents.length); + process.exit(1); + }} + const f = result.freeEvents[0]; + if (f.tid !== 7) {{ console.error("bad free tid: " + f.tid); process.exit(1); }} + if (f.addr !== "3735928559") {{ console.error("bad free addr: " + f.addr); process.exit(1); }} + if (f.size !== 4096) {{ console.error("bad free size: " + f.size); process.exit(1); }} + if (f.allocTimestampNs !== 5000000) {{ console.error("bad allocTimestampNs: " + f.allocTimestampNs); process.exit(1); }} + + console.log("OK: allocEvents=" + result.allocEvents.length + " freeEvents=" + result.freeEvents.length); +}} +main().catch((e) => {{ console.error(e); process.exit(1); }}); +"#, + viewer = std::path::Path::new(&manifest_dir) + .parent() + .unwrap() + .join("dial9-viewer") + .join("ui") + .display(), + trace = trace_path.display(), + ); + + let output = Command::new("node") + .args(["-e", &script]) + .output() + .expect("Failed to run node"); + + eprintln!("{}", String::from_utf8_lossy(&output.stdout)); + assert!( + output.status.success(), + "JS parser alloc/free test failed:\nstdout: {}\nstderr: {}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); +} diff --git a/dial9-tokio-telemetry/tests/rotation_time_alignment.rs b/dial9-tokio-telemetry/tests/rotation_time_alignment.rs index 916e78cf..5d87344f 100644 --- a/dial9-tokio-telemetry/tests/rotation_time_alignment.rs +++ b/dial9-tokio-telemetry/tests/rotation_time_alignment.rs @@ -247,6 +247,8 @@ fn event_type_name(event: &TelemetryEvent) -> &'static str { TelemetryEvent::TaskTerminate { .. } => "TaskTerminate", TelemetryEvent::CpuSample { .. } => "CpuSample", TelemetryEvent::TaskDump { .. } => "TaskDump", + TelemetryEvent::Alloc { .. } => "Alloc", + TelemetryEvent::Free { .. } => "Free", TelemetryEvent::ThreadNameDef { .. } => "ThreadNameDef", TelemetryEvent::WakeEvent { .. } => "WakeEvent", TelemetryEvent::SegmentMetadata { .. } => "SegmentMetadata", diff --git a/dial9-viewer/skills/dial9-trace-loading/SKILL.md b/dial9-viewer/skills/dial9-trace-loading/SKILL.md index e55b59f4..fa2a79f3 100644 --- a/dial9-viewer/skills/dial9-trace-loading/SKILL.md +++ b/dial9-viewer/skills/dial9-trace-loading/SKILL.md @@ -36,6 +36,8 @@ description: Parse and load dial9 Tokio runtime trace files. Covers the ParsedTr hasTaskTracking: boolean, // trace includes task spawn/terminate events taskInstrumented: Map, // task ID → whether task has tracing instrumentation taskDumps: Map, // task ID → async stack captures (sorted by timestamp); see dial9-tokio-telemetry `taskdump` feature + allocEvents: AllocEvent[], // Sampled memory allocations (requires dial9-tokio-telemetry memory-profiling feature) + freeEvents: FreeEvent[], // Deallocations paired with sampled allocs (requires `track_liveset`) } ``` @@ -78,6 +80,26 @@ const EVENT_TYPES = { | `source` | 0 = CPU profiling sample, 1 = scheduling (off-CPU) sample | | `callchain` | Array of address strings like `"0x55cc6d053893"` | +## AllocEvent fields + +| Field | Description | +|-------|-------------| +| `timestamp` | Monotonic nanoseconds at the allocation | +| `tid` | OS thread ID of the allocating thread | +| `size` | Allocation size in bytes | +| `addr` | Returned pointer address as a decimal string (BigInt-safe) | +| `callchain` | Array of address strings like `"0x55cc6d053893"` | + +## FreeEvent fields + +| Field | Description | +|-------|-------------| +| `timestamp` | Monotonic nanoseconds at the free | +| `tid` | OS thread ID of the freeing thread | +| `addr` | Pointer that was freed, as a decimal string (BigInt-safe) | +| `size` | Size of the allocation being freed (denormalized from the matching `AllocEvent`) | +| `allocTimestampNs` | Monotonic-ns timestamp of the original allocation (denormalized) | + ## Parse options ```javascript diff --git a/dial9-viewer/ui/test_all_skills_snippets.js b/dial9-viewer/ui/test_all_skills_snippets.js index 33a01871..d1d9ebac 100644 --- a/dial9-viewer/ui/test_all_skills_snippets.js +++ b/dial9-viewer/ui/test_all_skills_snippets.js @@ -286,6 +286,7 @@ async function main() { if (actual === '_empty_') return; if (doc.endsWith('|null') && (actual === doc.replace('|null', '') || actual === '_null_')) return; if (doc === 'number[]' && actual === '[]') return; + if (doc === 'unknown[]' && actual === '[]') return; deepErrors.push(`${p}: type mismatch (documented: ${doc}, actual: ${actual})`); return; } diff --git a/dial9-viewer/ui/trace_parser.js b/dial9-viewer/ui/trace_parser.js index 008cfde1..7d0760cd 100644 --- a/dial9-viewer/ui/trace_parser.js +++ b/dial9-viewer/ui/trace_parser.js @@ -196,6 +196,8 @@ const taskInstrumented = new Map(); // taskId -> bool (true if spawned via TelemetryHandle::spawn) const callframeSymbols = new Map(); const cpuSamples = []; + const allocEvents = []; + const freeEvents = []; const threadNames = new Map(); const runtimeWorkers = new Map(); // runtime name → [workerId, ...] const taskDumps = new Map(); // taskId → [{timestamp, callchain}] sorted by timestamp @@ -391,6 +393,29 @@ taskDumps.get(taskId).push({ timestamp: ts, callchain: chain }); break; } + case "AllocEvent": { + const chain = (v.callchain || []).map( + (addr) => "0x" + BigInt(addr).toString(16) + ); + allocEvents.push({ + timestamp: ts, + tid: num(v.tid), + size: num(v.size), + addr: BigInt(v.addr || 0).toString(), + callchain: chain, + }); + break; + } + case "FreeEvent": { + freeEvents.push({ + timestamp: ts, + tid: num(v.tid), + addr: BigInt(v.addr || 0).toString(), + size: num(v.size), + allocTimestampNs: num(v.alloc_timestamp_ns), + }); + break; + } case "ClockSyncEvent": { const real = num(v.realtime_ns); if (real > 0) { @@ -517,6 +542,8 @@ taskSpawnTimes, taskInstrumented, cpuSamples, + allocEvents, + freeEvents, callframeSymbols, threadNames, taskTerminateTimes,