Skip to content

Commit a92a32b

Browse files
committed
Cleanups
1 parent 4629cc0 commit a92a32b

5 files changed

Lines changed: 66 additions & 28 deletions

File tree

dial9-tokio-telemetry/src/task_dumped.rs

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! `TaskDumped<F>` wraps a future and captures async backtraces at yield
2-
//! points using geometric (Poisson) sampling keyed on idle duration.
2+
//! points using Poisson sampling keyed on idle duration.
33
//!
44
//! This wrapper is intentionally separate from [`crate::traced::Traced`]: the
55
//! wake-event capture in `Traced` runs on every instrumented spawn regardless
@@ -83,7 +83,7 @@ impl SplitMix64 {
8383

8484
pin_project! {
8585
/// Future wrapper that captures async backtraces at yield points using
86-
/// geometric (Poisson) sampling keyed on idle duration.
86+
/// Poisson sampling keyed on idle duration.
8787
pub(crate) struct TaskDumped<F> {
8888
#[pin]
8989
inner: F,
@@ -93,7 +93,7 @@ pin_project! {
9393
// Monotonic nanoseconds when the frames in `frames` were captured.
9494
// Only meaningful when `frames.has_data()`.
9595
pending_capture_ts: Option<NonZeroU64>,
96-
// Geometric sampling state: remaining nanoseconds of idle time before
96+
// Sampling state: remaining nanoseconds of idle time before
9797
// the next sample triggers. Signed so subtracting a large idle from a
9898
// small remaining value goes negative rather than wrapping.
9999
next_sample_ns: i64,
@@ -107,15 +107,14 @@ pin_project! {
107107
impl<F> TaskDumped<F> {
108108
pub(crate) fn new(inner: F, shared: Arc<SharedState>, task_id: TaskId) -> Self {
109109
let sample_mean_ns = shared.task_dump_idle_threshold_ns.load(Ordering::Relaxed);
110-
let base_seed = shared.task_dump_rng_seed.load(Ordering::Relaxed);
111-
// When a fixed seed is configured (non-zero), use it directly for
112-
// deterministic tests. Otherwise use task_id + timestamp for
113-
// production uniqueness across tasks.
114-
let seed = if base_seed != 0 {
115-
base_seed
116-
} else {
117-
(task_id.to_u64()).wrapping_mul(0x517cc1b727220a95)
118-
^ crate::telemetry::events::clock_monotonic_ns()
110+
// When a fixed seed is configured, use it directly for deterministic
111+
// tests. Otherwise use task_id + timestamp for production uniqueness.
112+
let seed = match shared.task_dump_rng_seed {
113+
Some(s) => s,
114+
None => {
115+
(task_id.to_u64()).wrapping_mul(0x517cc1b727220a95)
116+
^ crate::telemetry::events::clock_monotonic_ns()
117+
}
119118
};
120119
let mut rng = SplitMix64::new(seed);
121120
let next_sample_ns = rng.draw_exponential_ns(sample_mean_ns);
@@ -148,7 +147,7 @@ impl<F: Future> Future for TaskDumped<F> {
148147
return this.inner.poll(cx);
149148
}
150149

151-
// Geometric sampling: subtract the idle duration from the counter.
150+
// Poisson sampling: subtract the idle duration from the counter.
152151
// If it goes to zero or below, we should emit.
153152
let poll_start = crate::telemetry::recorder::poll_start_ts_or_now();
154153
let should_emit = match *this.pending_capture_ts {
@@ -289,3 +288,43 @@ impl Encodable for TaskDumpData<'_> {
289288
});
290289
}
291290
}
291+
292+
#[cfg(test)]
293+
mod tests {
294+
use super::SplitMix64;
295+
296+
#[test]
297+
fn splitmix64_deterministic() {
298+
let mut rng = SplitMix64::new(42);
299+
let a = rng.next_u64();
300+
let b = rng.next_u64();
301+
302+
let mut rng2 = SplitMix64::new(42);
303+
assert_eq!(a, rng2.next_u64());
304+
assert_eq!(b, rng2.next_u64());
305+
}
306+
307+
#[test]
308+
fn draw_exponential_ns_mean_is_reasonable() {
309+
let mut rng = SplitMix64::new(123);
310+
let mean_ns: u64 = 10_000_000; // 10ms
311+
let n = 10_000;
312+
let sum: f64 = (0..n)
313+
.map(|_| rng.draw_exponential_ns(mean_ns) as f64)
314+
.sum();
315+
let observed_mean = sum / n as f64;
316+
// Within 10% of the configured mean.
317+
assert!(
318+
(observed_mean - mean_ns as f64).abs() < mean_ns as f64 * 0.1,
319+
"observed mean {observed_mean} too far from expected {mean_ns}"
320+
);
321+
}
322+
323+
#[test]
324+
fn draw_exponential_ns_always_positive() {
325+
let mut rng = SplitMix64::new(0);
326+
for _ in 0..10_000 {
327+
assert!(rng.draw_exponential_ns(1_000_000) >= 1);
328+
}
329+
}
330+
}

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,15 +1473,13 @@ impl TelemetryCore {
14731473
worker_metrics_sink: Option<metrique_writer::BoxEntrySink>,
14741474
) -> std::io::Result<TelemetryGuard> {
14751475
let start_mono_ns = crate::telemetry::events::clock_monotonic_ns();
1476-
let shared = Arc::new(SharedState::new(start_mono_ns));
1476+
let rng_seed = task_dump_config.as_ref().and_then(|cfg| cfg.rng_seed());
1477+
let shared = Arc::new(SharedState::new(start_mono_ns, rng_seed));
14771478
if let Some(cfg) = task_dump_config.as_ref() {
14781479
shared.task_dumps_enabled.store(true, Ordering::Relaxed);
14791480
shared
14801481
.task_dump_idle_threshold_ns
14811482
.store(cfg.idle_threshold().as_nanos() as u64, Ordering::Relaxed);
1482-
if let Some(seed) = cfg.rng_seed() {
1483-
shared.task_dump_rng_seed.store(seed, Ordering::Relaxed);
1484-
}
14851483
}
14861484
#[allow(unused_mut)]
14871485
let mut event_writer = EventWriter::new(Box::new(writer));
@@ -2130,7 +2128,7 @@ mod tests {
21302128

21312129
#[test]
21322130
fn test_shared_state_no_spawn_location_fields() {
2133-
let _shared = SharedState::new(crate::telemetry::events::clock_monotonic_ns());
2131+
let _shared = SharedState::new(crate::telemetry::events::clock_monotonic_ns(), None);
21342132
}
21352133

21362134
#[test]

dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ pub(crate) struct SharedState {
3232
/// each `TaskDumped` instance at construction time so the hot poll path
3333
/// does not need an atomic load.
3434
pub(crate) task_dump_idle_threshold_ns: AtomicU64,
35-
/// Fixed RNG seed for deterministic task dump sampling (0 = use timestamp).
36-
pub(crate) task_dump_rng_seed: AtomicU64,
35+
/// Fixed RNG seed for deterministic task dump sampling. Set once at
36+
/// construction before the `Arc` is shared; read-only thereafter.
37+
pub(crate) task_dump_rng_seed: Option<u64>,
3738
pub(crate) collector: Arc<CentralCollector>,
3839
/// Absolute `CLOCK_MONOTONIC` nanosecond timestamp captured at trace start.
3940
pub(crate) start_time_ns: u64,
@@ -60,12 +61,12 @@ pub(crate) struct SharedState {
6061
}
6162

6263
impl SharedState {
63-
pub(super) fn new(start_time_ns: u64) -> Self {
64+
pub(super) fn new(start_time_ns: u64, task_dump_rng_seed: Option<u64>) -> Self {
6465
Self {
6566
enabled: AtomicBool::new(false),
6667
task_dumps_enabled: AtomicBool::new(false),
6768
task_dump_idle_threshold_ns: AtomicU64::new(0),
68-
task_dump_rng_seed: AtomicU64::new(0),
69+
task_dump_rng_seed,
6970
collector: Arc::new(CentralCollector::new()),
7071
start_time_ns,
7172
next_worker_id: AtomicU64::new(0),
@@ -252,7 +253,7 @@ mod tests {
252253

253254
/// Helper: create a SharedState with recording enabled.
254255
fn enabled_shared_state() -> SharedState {
255-
let ss = SharedState::new(0);
256+
let ss = SharedState::new(0, None);
256257
ss.enabled.store(true, Ordering::Relaxed);
257258
ss
258259
}

dial9-tokio-telemetry/src/telemetry/task_dump_config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Configuration for task dump capture.
22
//!
33
//! Task dumps capture async backtraces at yield points for tasks that have
4-
//! been idle, using geometric (Poisson) sampling keyed on idle duration.
4+
//! been idle, using Poisson sampling keyed on idle duration.
55
//! Use [`TaskDumpConfig`] with
66
//! [`TracedRuntimeBuilder::with_task_dumps`](crate::telemetry::TracedRuntimeBuilder::with_task_dumps)
77
//! or [`TelemetryCoreBuilder::task_dump_config`](crate::telemetry::TelemetryCoreBuilder::task_dump_config).
@@ -12,13 +12,13 @@
1212
1313
use std::time::Duration;
1414

15-
/// Default mean idle duration for geometric sampling.
15+
/// Default mean idle duration for Poisson sampling.
1616
const DEFAULT_IDLE_THRESHOLD: Duration = Duration::from_millis(10);
1717

1818
/// Configuration for task dump capture.
1919
#[derive(Debug, Clone, bon::Builder)]
2020
pub struct TaskDumpConfig {
21-
/// Mean idle duration for geometric (Poisson) sampling. On average, one
21+
/// Mean idle duration for Poisson sampling. On average, one
2222
/// task dump is emitted per this amount of cumulative idle time. Shorter
2323
/// idles have a lower (but non-zero) probability of triggering a dump;
2424
/// longer idles are very likely to trigger. Defaults to 10ms.
@@ -39,7 +39,7 @@ impl Default for TaskDumpConfig {
3939
}
4040

4141
impl TaskDumpConfig {
42-
/// Mean idle duration for geometric sampling.
42+
/// Mean idle duration for Poisson sampling.
4343
pub fn idle_threshold(&self) -> Duration {
4444
self.idle_threshold
4545
}

dial9-tokio-telemetry/src/unwind.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ pub(crate) fn collect_frames(
6868
leaf_addr,
6969
};
7070
unsafe {
71-
_Unwind_Backtrace(trace_fn, core::ptr::addr_of_mut!(data).cast());
71+
_Unwind_Backtrace(trace_fn, (&raw mut data).cast());
7272
}
7373
}

0 commit comments

Comments
 (0)