Skip to content

Commit ba0e57b

Browse files
authored
feat: add AllocEvent and FreeEvent to trace format (#420)
Adds wire-format definitions for sampled allocation and matching free events that the upcoming memory profiler will emit. Per design §3: - AllocEvent { timestamp_ns, tid, size, addr, callchain } - FreeEvent { timestamp_ns, tid, addr, size, alloc_timestamp_ns } `size` and `alloc_timestamp_ns` are denormalized onto FreeEvent so leak analysis stays useful when the matching AllocEvent has been evicted by trace rotation. This is the schema only — no producer (allocator hook) and no UI flamegraph yet. Both will land in later commits per the rollout in design §15. Demo trace not regenerated: there is no producer of these events yet.
1 parent d422140 commit ba0e57b

11 files changed

Lines changed: 333 additions & 2 deletions

File tree

dial9-tokio-telemetry/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ worker-s3 = [
8585
"dep:aws-config",
8686
"dep:time",
8787
]
88+
## Removes #[non_exhaustive] from event structs so downstream crates can
89+
## construct them directly. Unstable: no semver guarantees on new fields.
90+
unstable-events = []
8891
## All features except platform-specific ones (taskdump). Used in CI for non-Linux targets.
8992
__nonlinux_all_features = [
9093
"analysis",
@@ -97,6 +100,7 @@ __nonlinux_all_features = [
97100
dial9-tokio-telemetry = { path = ".", features = [
98101
"analysis",
99102
"tracing-layer",
103+
"unstable-events",
100104
"worker-s3",
101105
] }
102106
assert2 = { workspace = true }

dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option<FatEvent
187187
TelemetryEvent::TaskSpawn { .. }
188188
| TelemetryEvent::TaskTerminate { .. }
189189
| TelemetryEvent::TaskDump { .. }
190+
| TelemetryEvent::Alloc { .. }
191+
| TelemetryEvent::Free { .. }
190192
| TelemetryEvent::ThreadNameDef { .. }
191193
| TelemetryEvent::SegmentMetadata { .. }
192194
| TelemetryEvent::ClockSync { .. } => None,

dial9-tokio-telemetry/src/telemetry/analysis.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis {
334334
| TelemetryEvent::TaskTerminate { .. }
335335
| TelemetryEvent::CpuSample { .. }
336336
| TelemetryEvent::TaskDump { .. }
337+
| TelemetryEvent::Alloc { .. }
338+
| TelemetryEvent::Free { .. }
337339
| TelemetryEvent::ThreadNameDef { .. }
338340
| TelemetryEvent::WakeEvent { .. }
339341
| TelemetryEvent::SegmentMetadata { .. }

dial9-tokio-telemetry/src/telemetry/events.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,35 @@ pub enum TelemetryEvent {
236236
/// Named field values in schema order.
237237
fields: Vec<(String, FieldValue)>,
238238
},
239+
/// A sampled memory allocation event.
240+
Alloc {
241+
/// Wall-clock timestamp in nanoseconds (monotonic).
242+
#[serde(rename = "timestamp_ns")]
243+
timestamp_nanos: u64,
244+
/// OS thread ID of the allocating thread.
245+
tid: u32,
246+
/// Allocation size in bytes.
247+
size: u64,
248+
/// Returned pointer address.
249+
addr: u64,
250+
/// Raw instruction pointer addresses (leaf first).
251+
callchain: Vec<u64>,
252+
},
253+
/// A deallocation paired with a previously-sampled allocation.
254+
Free {
255+
/// Wall-clock timestamp in nanoseconds (monotonic) of the free.
256+
#[serde(rename = "timestamp_ns")]
257+
timestamp_nanos: u64,
258+
/// OS thread ID of the freeing thread.
259+
tid: u32,
260+
/// Pointer that was freed.
261+
addr: u64,
262+
/// Size of the allocation being freed (denormalized).
263+
size: u64,
264+
/// Monotonic-ns timestamp of the original allocation.
265+
#[serde(rename = "alloc_timestamp_ns")]
266+
alloc_timestamp_nanos: u64,
267+
},
239268
}
240269

241270
impl TelemetryEvent {
@@ -263,6 +292,12 @@ impl TelemetryEvent {
263292
| TelemetryEvent::TaskDump {
264293
timestamp_nanos, ..
265294
}
295+
| TelemetryEvent::Alloc {
296+
timestamp_nanos, ..
297+
}
298+
| TelemetryEvent::Free {
299+
timestamp_nanos, ..
300+
}
266301
| TelemetryEvent::WakeEvent {
267302
timestamp_nanos, ..
268303
}
@@ -297,6 +332,8 @@ impl TelemetryEvent {
297332
| TelemetryEvent::TaskSpawn { .. }
298333
| TelemetryEvent::TaskTerminate { .. }
299334
| TelemetryEvent::TaskDump { .. }
335+
| TelemetryEvent::Alloc { .. }
336+
| TelemetryEvent::Free { .. }
300337
| TelemetryEvent::ThreadNameDef { .. }
301338
| TelemetryEvent::WakeEvent { .. }
302339
| TelemetryEvent::SegmentMetadata { .. }

dial9-tokio-telemetry/src/telemetry/format.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,59 @@ pub(crate) struct TaskDumpEvent {
219219
pub callchain: InternedStackFrames,
220220
}
221221

222+
/// Wire-format event for a sampled memory allocation.
223+
///
224+
/// Emitted from the consolidator (flush thread) for allocations that tripped
225+
/// the geometric sampling counter. The sampling rate that produced this event
226+
/// lives in the segment metadata, not on each event.
227+
#[derive(Debug, TraceEvent)]
228+
#[cfg_attr(not(feature = "unstable-events"), non_exhaustive)]
229+
pub struct AllocEvent {
230+
/// Wall-clock timestamp in nanoseconds (monotonic).
231+
#[traceevent(timestamp)]
232+
pub timestamp_ns: u64,
233+
/// OS thread ID of the allocating thread. Same source as `WorkerParkEvent.tid`
234+
/// and `CpuSampleEvent.tid`. Use this to join against worker park/unpark
235+
/// history to recover worker_id when the allocation happened on a tokio
236+
/// worker thread.
237+
pub tid: u32,
238+
/// Allocation size in bytes. The actual size requested by the allocating
239+
/// code; the underlying allocator may have rounded up, but that's not
240+
/// recorded here.
241+
pub size: u64,
242+
/// Returned pointer. Only meaningful when liveset tracking is on; otherwise 0.
243+
/// Always present so the schema is stable across track_liveset on/off.
244+
pub addr: u64,
245+
/// Stack at the allocation site. Frame 0 is the most-recent caller.
246+
pub callchain: InternedStackFrames,
247+
}
248+
249+
/// Wire-format event for a deallocation paired with a previously-sampled
250+
/// `AllocEvent`. Only emitted when liveset tracking is on.
251+
///
252+
/// `size` and `alloc_timestamp_ns` are denormalized from the matching
253+
/// `AllocEvent` so the free stays analytically useful when the corresponding
254+
/// `AllocEvent` has been evicted by trace rotation. See design §3
255+
/// "Why denormalize size and alloc_timestamp_ns?" for the rationale.
256+
#[derive(Debug, TraceEvent)]
257+
#[cfg_attr(not(feature = "unstable-events"), non_exhaustive)]
258+
pub struct FreeEvent {
259+
/// Wall-clock timestamp in nanoseconds (monotonic) of the free.
260+
#[traceevent(timestamp)]
261+
pub timestamp_ns: u64,
262+
/// OS thread ID of the freeing thread.
263+
pub tid: u32,
264+
/// Pointer that was freed. Matches a previously-seen `AllocEvent.addr`.
265+
pub addr: u64,
266+
/// Size of the allocation being freed. Denormalized from the matching
267+
/// `AllocEvent` for rotation robustness.
268+
pub size: u64,
269+
/// Monotonic-ns timestamp of the original `AllocEvent`. Allows leak
270+
/// analysis to bucket frees by generation without needing the
271+
/// `AllocEvent` in the same (unrotated) trace.
272+
pub alloc_timestamp_ns: u64,
273+
}
274+
222275
/// Wire-format event for a wake notification.
223276
#[derive(Debug, TraceEvent)]
224277
pub struct WakeEventEvent {
@@ -291,6 +344,8 @@ pub(crate) enum TelemetryEventRef<'a> {
291344
TaskTerminate(TaskTerminateEventRef<'a>),
292345
CpuSample(CpuSampleEventRef<'a>),
293346
TaskDump(TaskDumpEventRef<'a>),
347+
Alloc(AllocEventRef<'a>),
348+
Free(FreeEventRef<'a>),
294349
WakeEvent(WakeEventEventRef<'a>),
295350
SegmentMetadata(SegmentMetadataEventRef<'a>),
296351
ClockSync(ClockSyncEventRef<'a>),
@@ -311,6 +366,8 @@ impl<'a> TelemetryEventRef<'a> {
311366
Self::TaskTerminate(e) => Some(e.timestamp_ns),
312367
Self::CpuSample(e) => Some(e.timestamp_ns),
313368
Self::TaskDump(e) => Some(e.timestamp_ns),
369+
Self::Alloc(e) => Some(e.timestamp_ns),
370+
Self::Free(e) => Some(e.timestamp_ns),
314371
Self::WakeEvent(e) => Some(e.timestamp_ns),
315372
Self::SegmentMetadata(e) => Some(e.timestamp_ns),
316373
Self::ClockSync(e) => Some(e.timestamp_ns),
@@ -365,6 +422,12 @@ pub(crate) fn decode_ref<'a>(
365422
"TaskDumpEvent" => {
366423
TelemetryEventRef::TaskDump(TaskDumpEvent::decode(timestamp_ns, fields, field_defs)?)
367424
}
425+
"AllocEvent" => {
426+
TelemetryEventRef::Alloc(AllocEvent::decode(timestamp_ns, fields, field_defs)?)
427+
}
428+
"FreeEvent" => {
429+
TelemetryEventRef::Free(FreeEvent::decode(timestamp_ns, fields, field_defs)?)
430+
}
368431
"WakeEventEvent" => {
369432
TelemetryEventRef::WakeEvent(WakeEventEvent::decode(timestamp_ns, fields, field_defs)?)
370433
}
@@ -453,6 +516,23 @@ pub(crate) fn to_owned_event(
453516
.expect("stack pool entry must exist for TaskDump callchain")
454517
.to_vec(),
455518
},
519+
TelemetryEventRef::Alloc(e) => TelemetryEvent::Alloc {
520+
timestamp_nanos: e.timestamp_ns,
521+
tid: e.tid,
522+
size: e.size,
523+
addr: e.addr,
524+
callchain: stack_pool
525+
.get(e.callchain)
526+
.expect("stack pool entry must exist for AllocEvent callchain")
527+
.to_vec(),
528+
},
529+
TelemetryEventRef::Free(e) => TelemetryEvent::Free {
530+
timestamp_nanos: e.timestamp_ns,
531+
tid: e.tid,
532+
addr: e.addr,
533+
size: e.size,
534+
alloc_timestamp_nanos: e.alloc_timestamp_ns,
535+
},
456536
TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent {
457537
timestamp_nanos: e.timestamp_ns,
458538
waker_task_id: e.waker_task_id,
@@ -473,3 +553,74 @@ pub(crate) fn to_owned_event(
473553
},
474554
}
475555
}
556+
557+
#[cfg(test)]
558+
mod tests {
559+
use super::*;
560+
use dial9_trace_format::encoder::Encoder;
561+
562+
#[test]
563+
fn alloc_event_round_trip() {
564+
let mut enc = Encoder::new_to(Vec::new()).unwrap();
565+
let callchain = enc.intern_stack_frames(&[0x1000, 0x2000, 0x3000]).unwrap();
566+
enc.write_infallible(&AllocEvent {
567+
timestamp_ns: 123_456_789,
568+
tid: 42,
569+
size: 4096,
570+
addr: 0xDEAD_BEEF_CAFE,
571+
callchain,
572+
});
573+
let buf = enc.into_inner();
574+
575+
let events = decode_events(&buf).unwrap();
576+
assert_eq!(events.len(), 1);
577+
match &events[0] {
578+
TelemetryEvent::Alloc {
579+
timestamp_nanos,
580+
tid,
581+
size,
582+
addr,
583+
callchain,
584+
} => {
585+
assert_eq!(*timestamp_nanos, 123_456_789);
586+
assert_eq!(*tid, 42);
587+
assert_eq!(*size, 4096);
588+
assert_eq!(*addr, 0xDEAD_BEEF_CAFE);
589+
assert_eq!(callchain, &[0x1000, 0x2000, 0x3000]);
590+
}
591+
other => panic!("expected Alloc event, got {other:?}"),
592+
}
593+
}
594+
595+
#[test]
596+
fn free_event_round_trip() {
597+
let mut enc = Encoder::new_to(Vec::new()).unwrap();
598+
enc.write_infallible(&FreeEvent {
599+
timestamp_ns: 999_000_000,
600+
tid: 7,
601+
addr: 0xCAFE_BABE,
602+
size: 2048,
603+
alloc_timestamp_ns: 100_000_000,
604+
});
605+
let buf = enc.into_inner();
606+
607+
let events = decode_events(&buf).unwrap();
608+
assert_eq!(events.len(), 1);
609+
match &events[0] {
610+
TelemetryEvent::Free {
611+
timestamp_nanos,
612+
tid,
613+
addr,
614+
size,
615+
alloc_timestamp_nanos,
616+
} => {
617+
assert_eq!(*timestamp_nanos, 999_000_000);
618+
assert_eq!(*tid, 7);
619+
assert_eq!(*addr, 0xCAFE_BABE);
620+
assert_eq!(*size, 2048);
621+
assert_eq!(*alloc_timestamp_nanos, 100_000_000);
622+
}
623+
other => panic!("expected Free event, got {other:?}"),
624+
}
625+
}
626+
}

dial9-tokio-telemetry/src/telemetry/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ pub use crate::traced::TracedFuture;
2121
pub use buffer::{Encodable, ThreadLocalEncoder};
2222
pub use events::{CpuSampleSource, TelemetryEvent, clock_monotonic_ns};
2323
pub use format::{
24-
PollEndEvent, PollStartEvent, TaskSpawnEvent, WakeEventEvent, WorkerId, WorkerParkEvent,
25-
WorkerUnparkEvent,
24+
AllocEvent, FreeEvent, PollEndEvent, PollStartEvent, TaskSpawnEvent, WakeEventEvent, WorkerId,
25+
WorkerParkEvent, WorkerUnparkEvent,
2626
};
2727
pub use recorder::{
2828
HasTracePath, NoTracePath, PipelineCustom, PipelineS3, PipelineUnset, RuntimeTelemetryHandle,

dial9-tokio-telemetry/tests/js_parser.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,86 @@ main().catch((e) => {{ console.error(e); process.exit(1); }});
183183
String::from_utf8_lossy(&output.stderr)
184184
);
185185
}
186+
187+
#[test]
188+
fn test_js_parser_alloc_free_events() {
189+
use dial9_tokio_telemetry::telemetry::{AllocEvent, FreeEvent};
190+
use dial9_trace_format::encoder::Encoder;
191+
192+
let temp_dir = TempDir::new().unwrap();
193+
let trace_path = temp_dir.path().join("alloc_trace.bin");
194+
195+
{
196+
let mut enc = Encoder::new();
197+
let stack = enc.intern_stack_frames(&[0xAAAA, 0xBBBB, 0xCCCC]).unwrap();
198+
enc.write(&AllocEvent {
199+
timestamp_ns: 5_000_000,
200+
tid: 42,
201+
size: 4096,
202+
addr: 0xDEAD_BEEF,
203+
callchain: stack,
204+
})
205+
.unwrap();
206+
enc.write(&FreeEvent {
207+
timestamp_ns: 10_000_000,
208+
tid: 7,
209+
addr: 0xDEAD_BEEF,
210+
size: 4096,
211+
alloc_timestamp_ns: 5_000_000,
212+
})
213+
.unwrap();
214+
std::fs::write(&trace_path, enc.finish()).unwrap();
215+
}
216+
217+
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
218+
let script = format!(
219+
r#"
220+
const {{ parseTrace }} = require("{viewer}/trace_parser.js");
221+
const fs = require("fs");
222+
async function main() {{
223+
const result = await parseTrace(fs.readFileSync("{trace}"));
224+
if (result.allocEvents.length !== 1) {{
225+
console.error("expected 1 allocEvent, got " + result.allocEvents.length);
226+
process.exit(1);
227+
}}
228+
const a = result.allocEvents[0];
229+
if (a.tid !== 42) {{ console.error("bad tid: " + a.tid); process.exit(1); }}
230+
if (a.size !== 4096) {{ console.error("bad size: " + a.size); process.exit(1); }}
231+
if (a.callchain.length !== 3) {{ console.error("bad callchain len: " + a.callchain.length); process.exit(1); }}
232+
233+
if (result.freeEvents.length !== 1) {{
234+
console.error("expected 1 freeEvent, got " + result.freeEvents.length);
235+
process.exit(1);
236+
}}
237+
const f = result.freeEvents[0];
238+
if (f.tid !== 7) {{ console.error("bad free tid: " + f.tid); process.exit(1); }}
239+
if (f.addr !== "3735928559") {{ console.error("bad free addr: " + f.addr); process.exit(1); }}
240+
if (f.size !== 4096) {{ console.error("bad free size: " + f.size); process.exit(1); }}
241+
if (f.allocTimestampNs !== 5000000) {{ console.error("bad allocTimestampNs: " + f.allocTimestampNs); process.exit(1); }}
242+
243+
console.log("OK: allocEvents=" + result.allocEvents.length + " freeEvents=" + result.freeEvents.length);
244+
}}
245+
main().catch((e) => {{ console.error(e); process.exit(1); }});
246+
"#,
247+
viewer = std::path::Path::new(&manifest_dir)
248+
.parent()
249+
.unwrap()
250+
.join("dial9-viewer")
251+
.join("ui")
252+
.display(),
253+
trace = trace_path.display(),
254+
);
255+
256+
let output = Command::new("node")
257+
.args(["-e", &script])
258+
.output()
259+
.expect("Failed to run node");
260+
261+
eprintln!("{}", String::from_utf8_lossy(&output.stdout));
262+
assert!(
263+
output.status.success(),
264+
"JS parser alloc/free test failed:\nstdout: {}\nstderr: {}",
265+
String::from_utf8_lossy(&output.stdout),
266+
String::from_utf8_lossy(&output.stderr)
267+
);
268+
}

dial9-tokio-telemetry/tests/rotation_time_alignment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ fn event_type_name(event: &TelemetryEvent) -> &'static str {
247247
TelemetryEvent::TaskTerminate { .. } => "TaskTerminate",
248248
TelemetryEvent::CpuSample { .. } => "CpuSample",
249249
TelemetryEvent::TaskDump { .. } => "TaskDump",
250+
TelemetryEvent::Alloc { .. } => "Alloc",
251+
TelemetryEvent::Free { .. } => "Free",
250252
TelemetryEvent::ThreadNameDef { .. } => "ThreadNameDef",
251253
TelemetryEvent::WakeEvent { .. } => "WakeEvent",
252254
TelemetryEvent::SegmentMetadata { .. } => "SegmentMetadata",

0 commit comments

Comments
 (0)