Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dial9-tokio-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ _shuttle = [
## is a hard compile error from tokio.
taskdump = ["tokio/taskdump"]
tracing-layer = ["dep:tracing-subscriber"]
## Sampled memory allocation profiling via ring buffers.
memory-profiling = []
worker-s3 = [
"dep:aws-sdk-s3-transfer-manager",
"dep:aws-sdk-s3",
Expand All @@ -92,13 +94,15 @@ unstable-events = []
__nonlinux_all_features = [
"analysis",
"cpu-profiling",
"memory-profiling",
"tracing-layer",
"worker-s3",
]

[dev-dependencies]
dial9-tokio-telemetry = { path = ".", features = [
"analysis",
"memory-profiling",
"tracing-layer",
"unstable-events",
"worker-s3",
Expand Down Expand Up @@ -134,6 +138,7 @@ iai-callgrind = "=0.16.1"
[target.'cfg(target_os = "linux")'.dev-dependencies]
dial9-tokio-telemetry = { path = ".", features = [
"cpu-profiling",
"memory-profiling",
"worker-s3",
"analysis",
"taskdump",
Expand Down
2 changes: 2 additions & 0 deletions dial9-tokio-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
pub mod analysis_unstable;
/// Background worker pipeline for processing sealed trace segments.
pub mod background_task;
#[cfg(feature = "memory-profiling")]
pub(crate) mod memory_profiling;
pub(crate) mod metrics;
pub(crate) mod primitives;
pub(crate) mod rate_limit;
Expand Down
44 changes: 44 additions & 0 deletions dial9-tokio-telemetry/src/memory_profiling/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//! Memory profiling — sampled allocation tracking via ring buffers.
//!
//! See `docs/design/memory-profiling.md` for the full design. The
//! architecture (design §5, §6, §9):
//!
//! 1. The allocator hook (later commit) does the bare minimum on the
//! allocating thread: sampling decision, stack capture, push a
//! fixed-size POD record into one of two process-global lock-free
//! queues.
//! 2. The flush thread (consolidator) drains both queues every flush cycle
//! via the `Source` trait, interns stacks, and emits `AllocEvent`s and
//! `FreeEvent`s into the central collector.
//!
//! ## Why two queues
//!
//! Allocs and frees have very different rates and record sizes:
//! - `RawAlloc` (~1 KiB at 128 frames) is pushed only on sampled
//! allocations, ~2K/sec at default sample rate.
//! - `RawFree` (~32 B) is pushed on every dealloc when liveset tracking is
//! on, potentially 15M/sec.
//!
//! A unified queue would either over-size the alloc queue or under-size
//! the free queue. Splitting the queues lets us size each independently:
//! at default capacities the alloc queue is ~4 MiB and the free queue is
//! ~1 MiB (8× the slot count of the alloc queue, but each slot is ~32× smaller).
//!
//! Gated behind the `memory-profiling` cargo feature.

mod ring;
mod source;

#[expect(
unused_imports,
reason = "wired up by allocator hook in a later commit"
)]
pub(crate) use ring::{
DEFAULT_ALLOC_QUEUE_CAPACITY, DEFAULT_FREE_QUEUE_CAPACITY, DEFAULT_MAX_FRAMES, RawAlloc,
RawFree, RingBuffers,
};
#[expect(
unused_imports,
reason = "wired up by allocator hook in a later commit"
)]
pub(crate) use source::MemoryProfileSource;
82 changes: 82 additions & 0 deletions dial9-tokio-telemetry/src/memory_profiling/ring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Two lock-free MPMC queues — one for sampled allocations, one for frees.

use crossbeam_queue::ArrayQueue;
use std::sync::atomic::AtomicU64;

/// Default maximum frames captured per allocation. 128 × 8 B = 1 KiB stack budget.
pub(crate) const DEFAULT_MAX_FRAMES: usize = 128;

/// Default number of `RawAlloc` slots. ~4 MiB total at 128 frames (design §5).
pub(crate) const DEFAULT_ALLOC_QUEUE_CAPACITY: usize = 4096;

/// Default number of `RawFree` slots. 8× the alloc queue (design §9).
#[expect(dead_code, reason = "wired up by allocator hook in a later commit")]
pub(crate) const DEFAULT_FREE_QUEUE_CAPACITY: usize = DEFAULT_ALLOC_QUEUE_CAPACITY * 8;

/// One sampled allocation captured on the producer thread.
///
/// `MAX_FRAMES` controls the size of the inline stack buffer. The default
/// (128) gives a 1 KiB stack budget for the frames field.
#[derive(Debug, Clone)]
pub(crate) struct RawAlloc<const MAX_FRAMES: usize = DEFAULT_MAX_FRAMES> {
pub(crate) tid: u32,
pub(crate) size: u64,
pub(crate) addr: u64,
pub(crate) ts_ns: u64,
pub(crate) frames: [u64; MAX_FRAMES],
pub(crate) frame_count: u8,
}

impl<const MAX_FRAMES: usize> RawAlloc<MAX_FRAMES> {
const _ASSERT_FRAMES_FIT_U8: () = assert!(
MAX_FRAMES <= u8::MAX as usize,
"MAX_FRAMES must fit in u8 (frame_count field)"
);

pub(crate) fn frames(&self) -> &[u64] {
let _ = Self::_ASSERT_FRAMES_FIT_U8;
&self.frames[..self.frame_count as usize]
}
}

/// One free captured on the producer thread when liveset tracking is on.
#[derive(Debug, Clone, Copy)]
pub(crate) struct RawFree {
pub(crate) tid: u32,
pub(crate) addr: u64,
#[expect(
dead_code,
reason = "consolidator uses size from the liveset entry, not from RawFree; the field is here so the producer doesn't have to do a separate lookup"
)]
pub(crate) size: u64,
pub(crate) ts_ns: u64,
}

/// Process-global pair of lock-free queues for the memory profiler.
///
/// Producers (allocator hook) and the consumer (`MemoryProfileSource`) both
/// hold `Arc<RingBuffers<N>>` and access the queues via `&self` — no inner
/// `Arc`s, so the `&Arc<...>` smell is contained to the outer borrow.
pub(crate) struct RingBuffers<const MAX_FRAMES: usize = DEFAULT_MAX_FRAMES> {
pub(crate) alloc_queue: ArrayQueue<RawAlloc<MAX_FRAMES>>,
pub(crate) free_queue: ArrayQueue<RawFree>,
#[expect(dead_code, reason = "incremented by allocator hook in a later commit")]
pub(crate) dropped_allocs: AtomicU64,
#[expect(dead_code, reason = "incremented by allocator hook in a later commit")]
pub(crate) dropped_frees: AtomicU64,
}

// `RingBuffers::new` is only called from tests in this commit. The allocator
// hook in a later commit will call it from a non-test path; at that point
// this `allow(dead_code)` becomes inert.
#[cfg_attr(not(test), allow(dead_code))]
impl<const MAX_FRAMES: usize> RingBuffers<MAX_FRAMES> {
pub(crate) fn new(alloc_capacity: usize, free_capacity: usize) -> Self {
Self {
alloc_queue: ArrayQueue::new(alloc_capacity),
free_queue: ArrayQueue::new(free_capacity),
dropped_allocs: AtomicU64::new(0),
dropped_frees: AtomicU64::new(0),
}
}
}
Loading
Loading