Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dial9-tokio-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ worker-s3 = [
"dep:aws-config",
"dep:time",
]
## Removes #[non_exhaustive] from event structs so downstream crates can
## construct them directly. Unstable: no semver guarantees on new fields.
unstable-events = []
## All features except platform-specific ones (taskdump). Used in CI for non-Linux targets.
__nonlinux_all_features = [
"analysis",
Expand All @@ -97,6 +100,7 @@ __nonlinux_all_features = [
dial9-tokio-telemetry = { path = ".", features = [
"analysis",
"tracing-layer",
"unstable-events",
"worker-s3",
] }
assert2 = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option<FatEvent
TelemetryEvent::TaskSpawn { .. }
| TelemetryEvent::TaskTerminate { .. }
| TelemetryEvent::TaskDump { .. }
| TelemetryEvent::Alloc { .. }
| TelemetryEvent::Free { .. }
| TelemetryEvent::ThreadNameDef { .. }
| TelemetryEvent::SegmentMetadata { .. }
| TelemetryEvent::ClockSync { .. } => None,
Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis {
| TelemetryEvent::TaskTerminate { .. }
| TelemetryEvent::CpuSample { .. }
| TelemetryEvent::TaskDump { .. }
| TelemetryEvent::Alloc { .. }
| TelemetryEvent::Free { .. }
| TelemetryEvent::ThreadNameDef { .. }
| TelemetryEvent::WakeEvent { .. }
| TelemetryEvent::SegmentMetadata { .. }
Expand Down
37 changes: 37 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,35 @@ pub enum TelemetryEvent {
/// Named field values in schema order.
fields: Vec<(String, FieldValue)>,
},
/// A sampled memory allocation event.
Alloc {
/// Wall-clock timestamp in nanoseconds (monotonic).
#[serde(rename = "timestamp_ns")]
timestamp_nanos: u64,
/// OS thread ID of the allocating thread.
tid: u32,
/// Allocation size in bytes.
size: u64,
/// Returned pointer address.
addr: u64,
/// Raw instruction pointer addresses (leaf first).
callchain: Vec<u64>,
},
/// A deallocation paired with a previously-sampled allocation.
Free {
/// Wall-clock timestamp in nanoseconds (monotonic) of the free.
#[serde(rename = "timestamp_ns")]
timestamp_nanos: u64,
/// OS thread ID of the freeing thread.
tid: u32,
/// Pointer that was freed.
addr: u64,
/// Size of the allocation being freed (denormalized).
size: u64,
/// Monotonic-ns timestamp of the original allocation.
#[serde(rename = "alloc_timestamp_ns")]
alloc_timestamp_nanos: u64,
},
}

impl TelemetryEvent {
Expand Down Expand Up @@ -263,6 +292,12 @@ impl TelemetryEvent {
| TelemetryEvent::TaskDump {
timestamp_nanos, ..
}
| TelemetryEvent::Alloc {
timestamp_nanos, ..
}
| TelemetryEvent::Free {
timestamp_nanos, ..
}
| TelemetryEvent::WakeEvent {
timestamp_nanos, ..
}
Expand Down Expand Up @@ -297,6 +332,8 @@ impl TelemetryEvent {
| TelemetryEvent::TaskSpawn { .. }
| TelemetryEvent::TaskTerminate { .. }
| TelemetryEvent::TaskDump { .. }
| TelemetryEvent::Alloc { .. }
| TelemetryEvent::Free { .. }
| TelemetryEvent::ThreadNameDef { .. }
| TelemetryEvent::WakeEvent { .. }
| TelemetryEvent::SegmentMetadata { .. }
Expand Down
151 changes: 151 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,59 @@ pub(crate) struct TaskDumpEvent {
pub callchain: InternedStackFrames,
}

/// Wire-format event for a sampled memory allocation.
///
/// Emitted from the consolidator (flush thread) for allocations that tripped
/// the geometric sampling counter. The sampling rate that produced this event
/// lives in the segment metadata, not on each event.
#[derive(Debug, TraceEvent)]
#[cfg_attr(not(feature = "unstable-events"), non_exhaustive)]
pub struct AllocEvent {
/// Wall-clock timestamp in nanoseconds (monotonic).
#[traceevent(timestamp)]
pub timestamp_ns: u64,
/// OS thread ID of the allocating thread. Same source as `WorkerParkEvent.tid`
/// and `CpuSampleEvent.tid`. Use this to join against worker park/unpark
/// history to recover worker_id when the allocation happened on a tokio
/// worker thread.
pub tid: u32,
/// Allocation size in bytes. The actual size requested by the allocating
/// code; the underlying allocator may have rounded up, but that's not
/// recorded here.
pub size: u64,
/// Returned pointer. Only meaningful when liveset tracking is on; otherwise 0.
/// Always present so the schema is stable across track_liveset on/off.
pub addr: u64,
/// Stack at the allocation site. Frame 0 is the most-recent caller.
pub callchain: InternedStackFrames,
}

/// Wire-format event for a deallocation paired with a previously-sampled
/// `AllocEvent`. Only emitted when liveset tracking is on.
///
/// `size` and `alloc_timestamp_ns` are denormalized from the matching
/// `AllocEvent` so the free stays analytically useful when the corresponding
/// `AllocEvent` has been evicted by trace rotation. See design §3
/// "Why denormalize size and alloc_timestamp_ns?" for the rationale.
#[derive(Debug, TraceEvent)]
#[cfg_attr(not(feature = "unstable-events"), non_exhaustive)]
pub struct FreeEvent {
/// Wall-clock timestamp in nanoseconds (monotonic) of the free.
#[traceevent(timestamp)]
pub timestamp_ns: u64,
/// OS thread ID of the freeing thread.
pub tid: u32,
/// Pointer that was freed. Matches a previously-seen `AllocEvent.addr`.
pub addr: u64,
/// Size of the allocation being freed. Denormalized from the matching
/// `AllocEvent` for rotation robustness.
pub size: u64,
/// Monotonic-ns timestamp of the original `AllocEvent`. Allows leak
/// analysis to bucket frees by generation without needing the
/// `AllocEvent` in the same (unrotated) trace.
pub alloc_timestamp_ns: u64,
}

/// Wire-format event for a wake notification.
#[derive(Debug, TraceEvent)]
pub struct WakeEventEvent {
Expand Down Expand Up @@ -291,6 +344,8 @@ pub(crate) enum TelemetryEventRef<'a> {
TaskTerminate(TaskTerminateEventRef<'a>),
CpuSample(CpuSampleEventRef<'a>),
TaskDump(TaskDumpEventRef<'a>),
Alloc(AllocEventRef<'a>),
Free(FreeEventRef<'a>),
WakeEvent(WakeEventEventRef<'a>),
SegmentMetadata(SegmentMetadataEventRef<'a>),
ClockSync(ClockSyncEventRef<'a>),
Expand All @@ -311,6 +366,8 @@ impl<'a> TelemetryEventRef<'a> {
Self::TaskTerminate(e) => Some(e.timestamp_ns),
Self::CpuSample(e) => Some(e.timestamp_ns),
Self::TaskDump(e) => Some(e.timestamp_ns),
Self::Alloc(e) => Some(e.timestamp_ns),
Self::Free(e) => Some(e.timestamp_ns),
Self::WakeEvent(e) => Some(e.timestamp_ns),
Self::SegmentMetadata(e) => Some(e.timestamp_ns),
Self::ClockSync(e) => Some(e.timestamp_ns),
Expand Down Expand Up @@ -365,6 +422,12 @@ pub(crate) fn decode_ref<'a>(
"TaskDumpEvent" => {
TelemetryEventRef::TaskDump(TaskDumpEvent::decode(timestamp_ns, fields, field_defs)?)
}
"AllocEvent" => {
TelemetryEventRef::Alloc(AllocEvent::decode(timestamp_ns, fields, field_defs)?)
}
"FreeEvent" => {
TelemetryEventRef::Free(FreeEvent::decode(timestamp_ns, fields, field_defs)?)
}
"WakeEventEvent" => {
TelemetryEventRef::WakeEvent(WakeEventEvent::decode(timestamp_ns, fields, field_defs)?)
}
Expand Down Expand Up @@ -453,6 +516,23 @@ pub(crate) fn to_owned_event(
.expect("stack pool entry must exist for TaskDump callchain")
.to_vec(),
},
TelemetryEventRef::Alloc(e) => TelemetryEvent::Alloc {
timestamp_nanos: e.timestamp_ns,
tid: e.tid,
size: e.size,
addr: e.addr,
callchain: stack_pool
.get(e.callchain)
.expect("stack pool entry must exist for AllocEvent callchain")
.to_vec(),
},
TelemetryEventRef::Free(e) => TelemetryEvent::Free {
timestamp_nanos: e.timestamp_ns,
tid: e.tid,
addr: e.addr,
size: e.size,
alloc_timestamp_nanos: e.alloc_timestamp_ns,
},
TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent {
timestamp_nanos: e.timestamp_ns,
waker_task_id: e.waker_task_id,
Expand All @@ -473,3 +553,74 @@ pub(crate) fn to_owned_event(
},
}
}

#[cfg(test)]
mod tests {
use super::*;
use dial9_trace_format::encoder::Encoder;

#[test]
fn alloc_event_round_trip() {
let mut enc = Encoder::new_to(Vec::new()).unwrap();
let callchain = enc.intern_stack_frames(&[0x1000, 0x2000, 0x3000]).unwrap();
enc.write_infallible(&AllocEvent {
timestamp_ns: 123_456_789,
tid: 42,
size: 4096,
addr: 0xDEAD_BEEF_CAFE,
callchain,
});
let buf = enc.into_inner();

let events = decode_events(&buf).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
TelemetryEvent::Alloc {
timestamp_nanos,
tid,
size,
addr,
callchain,
} => {
assert_eq!(*timestamp_nanos, 123_456_789);
assert_eq!(*tid, 42);
assert_eq!(*size, 4096);
assert_eq!(*addr, 0xDEAD_BEEF_CAFE);
assert_eq!(callchain, &[0x1000, 0x2000, 0x3000]);
}
other => panic!("expected Alloc event, got {other:?}"),
}
}

#[test]
fn free_event_round_trip() {
let mut enc = Encoder::new_to(Vec::new()).unwrap();
enc.write_infallible(&FreeEvent {
timestamp_ns: 999_000_000,
tid: 7,
addr: 0xCAFE_BABE,
size: 2048,
alloc_timestamp_ns: 100_000_000,
});
let buf = enc.into_inner();

let events = decode_events(&buf).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
TelemetryEvent::Free {
timestamp_nanos,
tid,
addr,
size,
alloc_timestamp_nanos,
} => {
assert_eq!(*timestamp_nanos, 999_000_000);
assert_eq!(*tid, 7);
assert_eq!(*addr, 0xCAFE_BABE);
assert_eq!(*size, 2048);
assert_eq!(*alloc_timestamp_nanos, 100_000_000);
}
other => panic!("expected Free event, got {other:?}"),
}
}
}
4 changes: 2 additions & 2 deletions dial9-tokio-telemetry/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub use crate::traced::TracedFuture;
pub use buffer::{Encodable, ThreadLocalEncoder};
pub use events::{CpuSampleSource, TelemetryEvent, clock_monotonic_ns};
pub use format::{
PollEndEvent, PollStartEvent, TaskSpawnEvent, WakeEventEvent, WorkerId, WorkerParkEvent,
WorkerUnparkEvent,
AllocEvent, FreeEvent, PollEndEvent, PollStartEvent, TaskSpawnEvent, WakeEventEvent, WorkerId,
WorkerParkEvent, WorkerUnparkEvent,
};
pub use recorder::{
HasTracePath, NoTracePath, PipelineCustom, PipelineS3, PipelineUnset, RuntimeTelemetryHandle,
Expand Down
83 changes: 83 additions & 0 deletions dial9-tokio-telemetry/tests/js_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,86 @@ main().catch((e) => {{ console.error(e); process.exit(1); }});
String::from_utf8_lossy(&output.stderr)
);
}

#[test]
fn test_js_parser_alloc_free_events() {
use dial9_tokio_telemetry::telemetry::{AllocEvent, FreeEvent};
use dial9_trace_format::encoder::Encoder;

let temp_dir = TempDir::new().unwrap();
let trace_path = temp_dir.path().join("alloc_trace.bin");

{
let mut enc = Encoder::new();
let stack = enc.intern_stack_frames(&[0xAAAA, 0xBBBB, 0xCCCC]).unwrap();
enc.write(&AllocEvent {
timestamp_ns: 5_000_000,
tid: 42,
size: 4096,
addr: 0xDEAD_BEEF,
callchain: stack,
})
.unwrap();
enc.write(&FreeEvent {
timestamp_ns: 10_000_000,
tid: 7,
addr: 0xDEAD_BEEF,
size: 4096,
alloc_timestamp_ns: 5_000_000,
})
.unwrap();
std::fs::write(&trace_path, enc.finish()).unwrap();
}

let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
let script = format!(
r#"
const {{ parseTrace }} = require("{viewer}/trace_parser.js");
const fs = require("fs");
async function main() {{
const result = await parseTrace(fs.readFileSync("{trace}"));
if (result.allocEvents.length !== 1) {{
console.error("expected 1 allocEvent, got " + result.allocEvents.length);
process.exit(1);
}}
const a = result.allocEvents[0];
if (a.tid !== 42) {{ console.error("bad tid: " + a.tid); process.exit(1); }}
if (a.size !== 4096) {{ console.error("bad size: " + a.size); process.exit(1); }}
if (a.callchain.length !== 3) {{ console.error("bad callchain len: " + a.callchain.length); process.exit(1); }}

if (result.freeEvents.length !== 1) {{
console.error("expected 1 freeEvent, got " + result.freeEvents.length);
process.exit(1);
}}
const f = result.freeEvents[0];
if (f.tid !== 7) {{ console.error("bad free tid: " + f.tid); process.exit(1); }}
if (f.addr !== "3735928559") {{ console.error("bad free addr: " + f.addr); process.exit(1); }}
if (f.size !== 4096) {{ console.error("bad free size: " + f.size); process.exit(1); }}
if (f.allocTimestampNs !== 5000000) {{ console.error("bad allocTimestampNs: " + f.allocTimestampNs); process.exit(1); }}

console.log("OK: allocEvents=" + result.allocEvents.length + " freeEvents=" + result.freeEvents.length);
}}
main().catch((e) => {{ console.error(e); process.exit(1); }});
"#,
viewer = std::path::Path::new(&manifest_dir)
.parent()
.unwrap()
.join("dial9-viewer")
.join("ui")
.display(),
trace = trace_path.display(),
);

let output = Command::new("node")
.args(["-e", &script])
.output()
.expect("Failed to run node");

eprintln!("{}", String::from_utf8_lossy(&output.stdout));
assert!(
output.status.success(),
"JS parser alloc/free test failed:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
}
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/tests/rotation_time_alignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ fn event_type_name(event: &TelemetryEvent) -> &'static str {
TelemetryEvent::TaskTerminate { .. } => "TaskTerminate",
TelemetryEvent::CpuSample { .. } => "CpuSample",
TelemetryEvent::TaskDump { .. } => "TaskDump",
TelemetryEvent::Alloc { .. } => "Alloc",
TelemetryEvent::Free { .. } => "Free",
TelemetryEvent::ThreadNameDef { .. } => "ThreadNameDef",
TelemetryEvent::WakeEvent { .. } => "WakeEvent",
TelemetryEvent::SegmentMetadata { .. } => "SegmentMetadata",
Expand Down
Loading
Loading