Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dial9-tokio-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod background_task;
pub(crate) mod metrics;
pub(crate) mod primitives;
pub(crate) mod rate_limit;
pub(crate) mod sampling;
#[cfg(feature = "taskdump")]
pub(crate) mod task_dumped;
/// Core telemetry types, recording, and trace I/O.
Expand Down
90 changes: 90 additions & 0 deletions dial9-tokio-telemetry/src/sampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Shared geometric/Poisson sampling primitives.
//!
//! Used by the task-dump idle sampler (sampling on nanoseconds) and by the
//! memory profiler (sampling on bytes). The unit is opaque to the math —
//! callers pass the mean and treat the returned i64 as a counter in their
//! native unit.

/// Minimal splitmix64 PRNG. Fast, no dependencies, good enough for sampling.
pub(crate) struct SplitMix64(u64);

impl SplitMix64 {
pub(crate) fn new(seed: u64) -> Self {
Self(seed)
}

pub(crate) fn next_u64(&mut self) -> u64 {
self.0 = self.0.wrapping_add(0x9e3779b97f4a7c15);
let mut z = self.0;
z = (z ^ (z >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
z = (z ^ (z >> 27)).wrapping_mul(0x94d049bb133111eb);
z ^ (z >> 31)
}

/// Draw from an exponential distribution with the given mean.
/// Returns at least 1 to avoid immediate re-trigger when the
/// counter goes to zero.
///
/// The unit is whatever the caller treats `mean` as (nanoseconds,
/// bytes, etc.).
pub(crate) fn draw_exponential(&mut self, mean: u64) -> i64 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub(crate) fn draw_exponential(&mut self, mean: u64) -> i64 {
pub(crate) fn draw_exponential(&mut self, mean: u64) -> u64 {

this is guaranteed to return >1 so I would change it to a narrower and more correct type

// Generate a uniform float in (0, 1] — avoid exact 0 to prevent ln(0).
let u = (self.next_u64() >> 11) as f64 / ((1u64 << 53) as f64);
let u = if u == 0.0 { f64::MIN_POSITIVE } else { u };
let sample = -u.ln() * (mean as f64);
(sample as i64).max(1)
}
}

#[cfg(test)]
mod tests {
use super::SplitMix64;

#[test]
fn splitmix_deterministic_with_fixed_seed() {
let mut rng = SplitMix64::new(42);
let a = rng.next_u64();
let b = rng.next_u64();

let mut rng2 = SplitMix64::new(42);
assert_eq!(a, rng2.next_u64());
assert_eq!(b, rng2.next_u64());
}

#[test]
fn draw_exponential_returns_at_least_1() {
let mut rng = SplitMix64::new(0);
for _ in 0..1000 {
assert!(rng.draw_exponential(1) >= 1);
}
}

#[test]
fn draw_exponential_mean_approximates_target() {
let mut rng = SplitMix64::new(123);
let mean: u64 = 1024;
let n = 100_000;
let sum: f64 = (0..n).map(|_| rng.draw_exponential(mean) as f64).sum();
let observed_mean = sum / n as f64;
// Within ±5% of the configured mean.
assert!(
(observed_mean - mean as f64).abs() < mean as f64 * 0.05,
"observed mean {observed_mean} too far from expected {mean}"
);
}

#[test]
fn draw_exponential_handles_large_mean() {
let mut rng = SplitMix64::new(999);
let mean: u64 = 1_000_000_000;
let mut saw_large = false;
for _ in 0..1000 {
let v = rng.draw_exponential(mean);
assert!(v >= 1);
if v > 1_000_000 {
saw_large = true;
}
}
assert!(saw_large, "expected some values much larger than 1");
}
}
Comment on lines +40 to +90
Copy link
Copy Markdown
Contributor

@Fluzko Fluzko May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are duplicated, they still live at task_dumped.rs. They're not exactly the same, but pretty much they are, probably worth consolidating them in one place

47 changes: 8 additions & 39 deletions dial9-tokio-telemetry/src/task_dumped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! yield points hit during a capture, with offsets recording each callchain's
//! start. The buffers are reused across polls.

use crate::sampling::SplitMix64;
use crate::telemetry::format::TaskDumpEvent;
use crate::telemetry::recorder::SharedState;
use crate::telemetry::task_metadata::TaskId;
Expand All @@ -47,36 +48,6 @@ use std::task::{Context, Poll};
/// Initial heap reservation for the instruction-pointer buffer on first capture.
const FRAME_BUF_INITIAL_CAPACITY: usize = 256;

// ─── Minimal PRNG (splitmix64) ──────────────────────────────────────────────

/// Minimal splitmix64 PRNG. Fast, no dependencies, good enough for sampling.
struct SplitMix64(u64);

impl SplitMix64 {
fn new(seed: u64) -> Self {
Self(seed)
}

fn next_u64(&mut self) -> u64 {
self.0 = self.0.wrapping_add(0x9e3779b97f4a7c15);
let mut z = self.0;
z = (z ^ (z >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
z = (z ^ (z >> 27)).wrapping_mul(0x94d049bb133111eb);
z ^ (z >> 31)
}

/// Draw from exponential distribution with given mean (in nanoseconds).
/// Returns at least 1 to avoid immediate re-trigger.
fn draw_exponential_ns(&mut self, mean_ns: u64) -> i64 {
// Generate a uniform float in (0, 1] — avoid exact 0 to prevent ln(0).
let u = (self.next_u64() >> 11) as f64 / ((1u64 << 53) as f64);
let u = if u == 0.0 { f64::MIN_POSITIVE } else { u };
let sample = -u.ln() * (mean_ns as f64);
// Clamp to at least 1ns so we never immediately re-trigger.
(sample as i64).max(1)
}
}

// ─── TaskDumped future wrapper ──────────────────────────────────────────────

pin_project! {
Expand Down Expand Up @@ -121,7 +92,7 @@ impl<F> TaskDumped<F> {
}
};
let mut rng = SplitMix64::new(seed);
let next_sample_ns = rng.draw_exponential_ns(sample_mean_ns);
let next_sample_ns = rng.draw_exponential(sample_mean_ns);
Self {
inner,
shared,
Expand Down Expand Up @@ -170,7 +141,7 @@ impl<F: Future> Future for TaskDumped<F> {
.expect("checked in match above")
.get();
this.frames.emit(this.shared, *this.task_id, ts);
*this.next_sample_ns = this.rng.draw_exponential_ns(*this.sample_mean_ns);
*this.next_sample_ns = this.rng.draw_exponential(*this.sample_mean_ns);
}
match &result {
Poll::Ready(_) => {
Expand Down Expand Up @@ -319,7 +290,7 @@ impl Encodable for TaskDumpData<'_> {

#[cfg(test)]
mod tests {
use super::SplitMix64;
use crate::sampling::SplitMix64;

#[test]
fn splitmix64_deterministic() {
Expand All @@ -333,13 +304,11 @@ mod tests {
}

#[test]
fn draw_exponential_ns_mean_is_reasonable() {
fn draw_exponential_mean_is_reasonable() {
let mut rng = SplitMix64::new(123);
let mean_ns: u64 = 10_000_000; // 10ms
let n = 10_000;
let sum: f64 = (0..n)
.map(|_| rng.draw_exponential_ns(mean_ns) as f64)
.sum();
let sum: f64 = (0..n).map(|_| rng.draw_exponential(mean_ns) as f64).sum();
let observed_mean = sum / n as f64;
// Within 10% of the configured mean.
assert!(
Expand All @@ -349,10 +318,10 @@ mod tests {
}

#[test]
fn draw_exponential_ns_always_positive() {
fn draw_exponential_always_positive() {
let mut rng = SplitMix64::new(0);
for _ in 0..10_000 {
assert!(rng.draw_exponential_ns(1_000_000) >= 1);
assert!(rng.draw_exponential(1_000_000) >= 1);
}
}
}
Loading