Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dial9-tokio-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod background_task;
pub(crate) mod metrics;
pub(crate) mod primitives;
pub(crate) mod rate_limit;
pub(crate) mod sampling;
#[cfg(feature = "taskdump")]
pub(crate) mod task_dumped;
/// Core telemetry types, recording, and trace I/O.
Expand Down
90 changes: 90 additions & 0 deletions dial9-tokio-telemetry/src/sampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Shared geometric/Poisson sampling primitives.
//!
//! Used by the task-dump idle sampler (sampling on nanoseconds) and by the
//! memory profiler (sampling on bytes). The unit is opaque to the math —
//! callers pass the mean and treat the returned u64 as a counter in their
//! native unit.

/// Minimal splitmix64 PRNG. Fast, no dependencies, good enough for sampling.
pub(crate) struct SplitMix64(u64);

impl SplitMix64 {
pub(crate) fn new(seed: u64) -> Self {
Self(seed)
}

pub(crate) fn next_u64(&mut self) -> u64 {
self.0 = self.0.wrapping_add(0x9e3779b97f4a7c15);
let mut z = self.0;
z = (z ^ (z >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
z = (z ^ (z >> 27)).wrapping_mul(0x94d049bb133111eb);
z ^ (z >> 31)
}

/// Draw from an exponential distribution with the given mean.
/// Returns at least 1 to avoid immediate re-trigger when the
/// counter goes to zero.
///
/// The unit is whatever the caller treats `mean` as (nanoseconds,
/// bytes, etc.).
pub(crate) fn draw_exponential(&mut self, mean: u64) -> u64 {
// Generate a uniform float in (0, 1] — avoid exact 0 to prevent ln(0).
let u = (self.next_u64() >> 11) as f64 / ((1u64 << 53) as f64);
let u = if u == 0.0 { f64::MIN_POSITIVE } else { u };
let sample = -u.ln() * (mean as f64);
(sample as u64).max(1)
}
}

#[cfg(test)]
mod tests {
use super::SplitMix64;

#[test]
fn splitmix_deterministic_with_fixed_seed() {
let mut rng = SplitMix64::new(42);
let a = rng.next_u64();
let b = rng.next_u64();

let mut rng2 = SplitMix64::new(42);
assert_eq!(a, rng2.next_u64());
assert_eq!(b, rng2.next_u64());
}

#[test]
fn draw_exponential_returns_at_least_1() {
let mut rng = SplitMix64::new(0);
for _ in 0..1000 {
assert!(rng.draw_exponential(1) >= 1);
}
}

#[test]
fn draw_exponential_mean_approximates_target() {
let mut rng = SplitMix64::new(123);
let mean: u64 = 1024;
let n = 100_000;
let sum: f64 = (0..n).map(|_| rng.draw_exponential(mean) as f64).sum();
let observed_mean = sum / n as f64;
// Within ±5% of the configured mean.
assert!(
(observed_mean - mean as f64).abs() < mean as f64 * 0.05,
"observed mean {observed_mean} too far from expected {mean}"
);
}

#[test]
fn draw_exponential_handles_large_mean() {
let mut rng = SplitMix64::new(999);
let mean: u64 = 1_000_000_000;
let mut saw_large = false;
for _ in 0..1000 {
let v = rng.draw_exponential(mean);
assert!(v >= 1);
if v > 1_000_000 {
saw_large = true;
}
}
assert!(saw_large, "expected some values much larger than 1");
}
}
Comment on lines +40 to +90
Copy link
Copy Markdown
Contributor

@Fluzko Fluzko May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are duplicated, they still live at task_dumped.rs. They're not exactly the same, but pretty much they are, probably worth consolidating them in one place

75 changes: 3 additions & 72 deletions dial9-tokio-telemetry/src/task_dumped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! yield points hit during a capture, with offsets recording each callchain's
//! start. The buffers are reused across polls.

use crate::sampling::SplitMix64;
use crate::telemetry::format::TaskDumpEvent;
use crate::telemetry::recorder::SharedState;
use crate::telemetry::task_metadata::TaskId;
Expand All @@ -47,36 +48,6 @@ use std::task::{Context, Poll};
/// Initial heap reservation for the instruction-pointer buffer on first capture.
const FRAME_BUF_INITIAL_CAPACITY: usize = 256;

// ─── Minimal PRNG (splitmix64) ──────────────────────────────────────────────

/// Minimal splitmix64 PRNG. Fast, no dependencies, good enough for sampling.
struct SplitMix64(u64);

impl SplitMix64 {
fn new(seed: u64) -> Self {
Self(seed)
}

fn next_u64(&mut self) -> u64 {
self.0 = self.0.wrapping_add(0x9e3779b97f4a7c15);
let mut z = self.0;
z = (z ^ (z >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
z = (z ^ (z >> 27)).wrapping_mul(0x94d049bb133111eb);
z ^ (z >> 31)
}

/// Draw from exponential distribution with given mean (in nanoseconds).
/// Returns at least 1 to avoid immediate re-trigger.
fn draw_exponential_ns(&mut self, mean_ns: u64) -> i64 {
// Generate a uniform float in (0, 1] — avoid exact 0 to prevent ln(0).
let u = (self.next_u64() >> 11) as f64 / ((1u64 << 53) as f64);
let u = if u == 0.0 { f64::MIN_POSITIVE } else { u };
let sample = -u.ln() * (mean_ns as f64);
// Clamp to at least 1ns so we never immediately re-trigger.
(sample as i64).max(1)
}
}

// ─── TaskDumped future wrapper ──────────────────────────────────────────────

pin_project! {
Expand Down Expand Up @@ -121,7 +92,7 @@ impl<F> TaskDumped<F> {
}
};
let mut rng = SplitMix64::new(seed);
let next_sample_ns = rng.draw_exponential_ns(sample_mean_ns);
let next_sample_ns = rng.draw_exponential(sample_mean_ns) as i64;
Self {
inner,
shared,
Expand Down Expand Up @@ -170,7 +141,7 @@ impl<F: Future> Future for TaskDumped<F> {
.expect("checked in match above")
.get();
this.frames.emit(this.shared, *this.task_id, ts);
*this.next_sample_ns = this.rng.draw_exponential_ns(*this.sample_mean_ns);
*this.next_sample_ns = this.rng.draw_exponential(*this.sample_mean_ns) as i64;
}
match &result {
Poll::Ready(_) => {
Expand Down Expand Up @@ -316,43 +287,3 @@ impl Encodable for TaskDumpData<'_> {
});
}
}

#[cfg(test)]
mod tests {
use super::SplitMix64;

#[test]
fn splitmix64_deterministic() {
let mut rng = SplitMix64::new(42);
let a = rng.next_u64();
let b = rng.next_u64();

let mut rng2 = SplitMix64::new(42);
assert_eq!(a, rng2.next_u64());
assert_eq!(b, rng2.next_u64());
}

#[test]
fn draw_exponential_ns_mean_is_reasonable() {
let mut rng = SplitMix64::new(123);
let mean_ns: u64 = 10_000_000; // 10ms
let n = 10_000;
let sum: f64 = (0..n)
.map(|_| rng.draw_exponential_ns(mean_ns) as f64)
.sum();
let observed_mean = sum / n as f64;
// Within 10% of the configured mean.
assert!(
(observed_mean - mean_ns as f64).abs() < mean_ns as f64 * 0.1,
"observed mean {observed_mean} too far from expected {mean_ns}"
);
}

#[test]
fn draw_exponential_ns_always_positive() {
let mut rng = SplitMix64::new(0);
for _ in 0..10_000 {
assert!(rng.draw_exponential_ns(1_000_000) >= 1);
}
}
}
Loading