From d57c456884eeaf598144f219dd38d29c4a3b4135 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Wed, 29 Apr 2026 20:32:53 +0000 Subject: [PATCH 1/3] fix: Avoid constructing events when telemetry is disabled --- .../src/telemetry/recorder/mod.rs | 115 +++++++++++++----- .../src/telemetry/recorder/runtime_context.rs | 26 ++++ .../src/telemetry/recorder/shared_state.rs | 66 +++++++--- dial9-tokio-telemetry/src/traced.rs | 33 ++--- dial9-tokio-telemetry/src/tracing_layer.rs | 72 +++++------ 5 files changed, 219 insertions(+), 93 deletions(-) diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index 0f644dbd..543f341c 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -4,6 +4,8 @@ mod shared_state; pub(crate) use runtime_context::RuntimeContext; pub use runtime_context::current_worker_id; +#[cfg(test)] +pub(crate) use runtime_context::{event_construction_count, reset_event_construction_count}; pub(crate) use shared_state::SharedState; use event_writer::EventWriter; @@ -89,9 +91,9 @@ fn flush_once( let cpu_events_time = std::time::Instant::now(); #[cfg(feature = "cpu-profiling")] { - if shared.enabled.load(Ordering::Relaxed) { + shared.if_enabled(|_buf| { event_writer.flush_cpu(shared); - } + }); } let cpu_flush_duration = cpu_events_time.elapsed(); @@ -168,44 +170,56 @@ fn register_hooks( builder .on_thread_park(move || { - let event = make_worker_park(&c1, &s1); - s1.record_event(event); + s1.if_enabled(|buf| { + let event = make_worker_park(&c1, &s1); + buf.record_event(event); + }); }) .on_thread_unpark(move || { - let event = make_worker_unpark(&c2, &s2); - s2.record_event(event); + s2.if_enabled(|buf| { + let event = make_worker_unpark(&c2, &s2); + buf.record_event(event); + }); }) .on_before_task_poll(move |meta| { - let task_id = TaskId::from(meta.id()); - let location = meta.spawned_at(); - let event = make_poll_start(&c3, &s3, location, task_id); - s3.record_event(event); + s3.if_enabled(|buf| { + let task_id = TaskId::from(meta.id()); + let location = meta.spawned_at(); + let event = make_poll_start(&c3, &s3, location, task_id); + buf.record_event(event); + }); }) .on_after_task_poll(move |_meta| { - let event = make_poll_end(&c4, &s4); - s4.record_event(event); + s4.if_enabled(|buf| { + let event = make_poll_end(&c4, &s4); + buf.record_event(event); + }); }); if task_tracking_enabled { let s5 = shared.clone(); builder.on_task_spawn(move |meta| { - let task_id = TaskId::from(meta.id()); - let location = meta.spawned_at(); - let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get()); - - s5.record_event(RawEvent::TaskSpawn { - timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), - task_id, - location, - instrumented, + s5.if_enabled(|buf| { + let task_id = TaskId::from(meta.id()); + let location = meta.spawned_at(); + let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get()); + + buf.record_event(RawEvent::TaskSpawn { + timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), + task_id, + location, + instrumented, + }); }); }); let s6 = shared.clone(); builder.on_task_terminate(move |meta| { - let task_id = TaskId::from(meta.id()); - s6.record_event(RawEvent::TaskTerminate { - timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), - task_id, + s6.if_enabled(|buf| { + let task_id = TaskId::from(meta.id()); + buf.record_event(RawEvent::TaskTerminate { + timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), + task_id, + }); }); }); } @@ -383,6 +397,11 @@ impl TelemetryHandle { self.shared.enabled.store(false, Ordering::Relaxed); } + /// Returns whether telemetry is enabled + pub fn enabled() -> bool { + TelemetryHandle::try_current().is_some_and(|handle| handle.shared.is_enabled()) + } + /// Get a [`TracedHandle`](crate::traced::TracedHandle) for wrapping futures with wake tracking. pub fn traced_handle(&self) -> crate::traced::TracedHandle { crate::traced::TracedHandle { @@ -392,20 +411,23 @@ impl TelemetryHandle { /// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event. pub(crate) fn record_encodable_event(&self, event: &dyn crate::telemetry::buffer::Encodable) { - self.shared.record_encodable_event(event); + self.shared + .if_enabled(|buf| buf.record_encodable_event(event)); } /// Run a closure with direct access to the thread-local encoder. /// /// Use this for dynamic schema encoding where you need to intern strings /// and write events without an intermediate [`Encodable`] struct. + /// + /// This closure will only be invoked if telemetry is enabled // TODO(GH-XXX): consider making this public as an alternative to record_event // for zero-copy dynamic schema encoding pub(crate) fn with_encoder( &self, f: impl FnOnce(&mut crate::telemetry::buffer::ThreadLocalEncoder<'_>), ) { - self.shared.with_encoder(f); + self.shared.if_enabled(|buf| buf.with_encoder(f)); } /// Spawn a future wrapped with wake-event tracking. @@ -1181,7 +1203,7 @@ fn run_flush_loop( // When disabled, skip all recording work (queue sampling, metadata // merging, drain coordination, flush). The loop still wakes every // 5ms to check for control commands and the exit signal. - if !exit && !shared.enabled.load(Ordering::Relaxed) { + if !exit && !shared.is_enabled() { continue; } @@ -2055,6 +2077,43 @@ mod tests { } } + /// When telemetry is disabled at runtime, hook closures must skip event + /// construction entirely to avoid paying for expensive syscalls + /// (thread_cpu_time_nanos, SchedStat::read_current) on every park/unpark. + #[test] + fn hooks_skip_event_construction_when_disabled() { + reset_event_construction_count(); + + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(2); + // Build with telemetry hooks installed but recording DISABLED. + let (runtime, guard) = TracedRuntime::builder() + .build_with_writer(builder, NullWriter) + .unwrap(); + // Do NOT call guard.enable() — telemetry stays disabled. + + runtime.block_on(async { + let mut handles = Vec::new(); + for _ in 0..50 { + handles.push(tokio::spawn(async { + tokio::task::yield_now().await; + })); + } + for h in handles { + h.await.unwrap(); + } + }); + + drop(runtime); + drop(guard); + + let count = event_construction_count(); + assert_eq!( + count, 0, + "expected zero event constructions when telemetry is disabled, got {count}" + ); + } + #[test] fn telemetry_core_builds_guard_without_runtime() { let guard = TelemetryCore::builder().writer(NullWriter).build().unwrap(); diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs index 6b41a19b..f75913ed 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs @@ -147,12 +147,32 @@ pub fn current_worker_id() -> WorkerId { // ── Event construction helpers ─────────────────────────────────────────────── +#[cfg(test)] +static EVENT_CONSTRUCTION_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +/// Return the number of times a `make_*` event-construction helper has been +/// called. Test-only; used to verify that the early-disable check in hook +/// closures prevents expensive syscalls when telemetry is disabled. +#[cfg(test)] +pub(crate) fn event_construction_count() -> usize { + EVENT_CONSTRUCTION_COUNT.load(std::sync::atomic::Ordering::Relaxed) +} + +/// Reset the event-construction counter to zero. +#[cfg(test)] +pub(crate) fn reset_event_construction_count() { + EVENT_CONSTRUCTION_COUNT.store(0, std::sync::atomic::Ordering::Relaxed); +} + pub(super) fn make_poll_start( ctx: &RuntimeContext, shared: &SharedState, location: &'static std::panic::Location<'static>, task_id: TaskId, ) -> RawEvent { + #[cfg(test)] + EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) @@ -167,6 +187,8 @@ pub(super) fn make_poll_start( } pub(super) fn make_poll_end(ctx: &RuntimeContext, shared: &SharedState) -> RawEvent { + #[cfg(test)] + EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); RawEvent::PollEnd { timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), @@ -175,6 +197,8 @@ pub(super) fn make_poll_end(ctx: &RuntimeContext, shared: &SharedState) -> RawEv } pub(super) fn make_worker_park(ctx: &RuntimeContext, shared: &SharedState) -> RawEvent { + #[cfg(test)] + EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) @@ -192,6 +216,8 @@ pub(super) fn make_worker_park(ctx: &RuntimeContext, shared: &SharedState) -> Ra } pub(super) fn make_worker_unpark(ctx: &RuntimeContext, shared: &SharedState) -> RawEvent { + #[cfg(test)] + EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs index 64c10ef6..031872fc 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs @@ -83,6 +83,27 @@ impl SharedState { } } + /// Check whether recording is currently enabled. + /// + /// Prefer [`enabled`](Self::enabled) for event-recording paths — it + /// provides an [`EventBuffer`] that makes it structurally impossible to + /// record without checking first. Use `is_enabled()` only for + /// control-flow decisions that don't directly record events (e.g. + /// deciding whether to wrap a waker in `Traced::poll`). + pub(crate) fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + /// Run `f` only when recording is enabled, passing an [`EventBuffer`] + /// that provides `record_event` / `record_encodable_event`. Returns + /// `None` when disabled (no work is done). + pub(crate) fn if_enabled(&self, f: impl FnOnce(&EventBuffer<'_>) -> R) -> Option { + if !self.enabled.load(Ordering::Relaxed) { + return None; + } + Some(f(&EventBuffer(self))) + } + pub(crate) fn record_queue_sample(&self, global_queue_depth: usize) { self.record_event(RawEvent::QueueSample { timestamp_nanos: self.timestamp_nanos(), @@ -90,15 +111,15 @@ impl SharedState { }); } - pub(crate) fn record_event(&self, event: RawEvent) { + fn record_event(&self, event: RawEvent) { self.record_encodable_event(&event); } /// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event. - pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) { - if !self.enabled.load(Ordering::Relaxed) { - return; - } + /// + /// Callers must ensure recording is enabled (via [`enabled`](Self::enabled) + /// or [`is_enabled`](Self::is_enabled)) before calling this method. + fn record_encodable_event(&self, event: &dyn buffer::Encodable) { if let Some(handle) = buffer::record_encodable_event(event, &self.collector, &self.drain_epoch) { @@ -106,16 +127,6 @@ impl SharedState { } } - /// Run a closure with direct access to the thread-local encoder. - pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) { - if !self.enabled.load(Ordering::Relaxed) { - return; - } - if let Some(handle) = buffer::with_encoder(f, &self.collector, &self.drain_epoch) { - self.tl_buffers.lock().unwrap().push(handle); - } - } - /// Bump the drain epoch and flush all idle/silent thread-local buffers. /// /// Buffers whose `FlushEpoch` matches the current epoch are skipped @@ -194,6 +205,31 @@ impl SharedState { } } +/// Handle provided by [`SharedState::enabled`] that proves recording is +/// active. All event-recording calls should go through this type so that +/// callers cannot accidentally emit events without an enabled check. +pub(crate) struct EventBuffer<'a>(&'a SharedState); + +impl EventBuffer<'_> { + pub(crate) fn record_event(&self, event: RawEvent) { + self.record_encodable_event(&event); + } + + pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) { + if let Some(handle) = + buffer::record_encodable_event(event, &self.0.collector, &self.0.drain_epoch) + { + self.0.tl_buffers.lock().unwrap().push(handle); + } + } + + pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) { + if let Some(handle) = buffer::with_encoder(f, &self.0.collector, &self.0.drain_epoch) { + self.0.tl_buffers.lock().unwrap().push(handle); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/dial9-tokio-telemetry/src/traced.rs b/dial9-tokio-telemetry/src/traced.rs index c7c84431..7d3316f6 100644 --- a/dial9-tokio-telemetry/src/traced.rs +++ b/dial9-tokio-telemetry/src/traced.rs @@ -7,7 +7,6 @@ use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::sync::atomic::Ordering; use std::task::{Context, Poll, Waker}; /// Handle used by `Traced` to emit events into the telemetry system. @@ -71,20 +70,22 @@ impl ArcWake for TracedWakerData { } fn record_wake_event(data: &TracedWakerData) { - // The worker issuing the wake — not the worker that will execute the woken task - // (which is unknowable at wake time). Stored in the event as `target_worker`. - let waking_worker_id = crate::telemetry::recorder::current_worker_id(); - // TODO: cleanly handle more than 255 global workers in the wake event wire format. - // Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255). - let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 { - waking_worker_id.as_u64() as u8 - } else { - 255 - }; - let event = data - .shared - .create_wake_event(data.woken_task_id, waking_worker_u8); - data.shared.record_event(event); + data.shared.if_enabled(|buf| { + // The worker issuing the wake — not the worker that will execute the woken task + // (which is unknowable at wake time). Stored in the event as `target_worker`. + let waking_worker_id = crate::telemetry::recorder::current_worker_id(); + // TODO: cleanly handle more than 255 global workers in the wake event wire format. + // Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255). + let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 { + waking_worker_id.as_u64() as u8 + } else { + 255 + }; + let event = data + .shared + .create_wake_event(data.woken_task_id, waking_worker_u8); + buf.record_event(event); + }); } fn make_traced_waker(data: Arc) -> Waker { @@ -96,7 +97,7 @@ impl Future for Traced { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if !this.handle.shared.enabled.load(Ordering::Relaxed) { + if !this.handle.shared.is_enabled() { return this.inner.poll(cx); } diff --git a/dial9-tokio-telemetry/src/tracing_layer.rs b/dial9-tokio-telemetry/src/tracing_layer.rs index 38d084f6..1ed0c989 100644 --- a/dial9-tokio-telemetry/src/tracing_layer.rs +++ b/dial9-tokio-telemetry/src/tracing_layer.rs @@ -259,6 +259,9 @@ where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + if !TelemetryHandle::enabled() { + return; + } let mut field_values = Vec::new(); attrs.record(&mut FieldVisitor { values: &mut field_values, @@ -276,6 +279,9 @@ where } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { + if !TelemetryHandle::enabled() { + return; + } if let Some(span) = ctx.span(id) { let mut extensions = span.extensions_mut(); if let Some(data) = extensions.get_mut::() { @@ -290,29 +296,28 @@ where let Some(handle) = TelemetryHandle::try_current() else { return; }; - - let worker_id = current_worker_id(); - let span_id = id.into_u64(); - let ts = clock_monotonic_ns(); - - let Some(span_ref) = ctx.span(id) else { return }; - let ext = span_ref.extensions(); - let Some(data) = ext.get::() else { - return; - }; - - let schemas = self.get_schemas(data.meta); - - // We only use explicit parents (span!(parent: &x, ..)), not contextual - // parents (ctx.current_span()), because contextual parenting is - // unreliable across tasks on the same worker thread. See: - // https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/ - // The viewer infers nesting from timestamp containment instead. - let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64()); - let span_name = data.meta.name(); - - // Encode directly into the thread-local buffer (no clone needed) handle.with_encoder(|enc| { + let worker_id = current_worker_id(); + let span_id = id.into_u64(); + let ts = clock_monotonic_ns(); + + let Some(span_ref) = ctx.span(id) else { return }; + let ext = span_ref.extensions(); + let Some(data) = ext.get::() else { + return; + }; + + let schemas = self.get_schemas(data.meta); + + // We only use explicit parents (span!(parent: &x, ..)), not contextual + // parents (ctx.current_span()), because contextual parenting is + // unreliable across tasks on the same worker thread. See: + // https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/ + // The viewer infers nesting from timestamp containment instead. + let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64()); + let span_name = data.meta.name(); + + // Encode directly into the thread-local buffer (no clone needed) let mut values = Vec::with_capacity(5 + schemas.field_names.len()); values.push(FieldValue::Varint(ts)); values.push(FieldValue::Varint(worker_id.as_u64())); @@ -336,21 +341,20 @@ where let Some(handle) = TelemetryHandle::try_current() else { return; }; + handle.with_encoder(|enc| { + let worker_id = current_worker_id(); + let span_id = id.into_u64(); + let ts = clock_monotonic_ns(); - let worker_id = current_worker_id(); - let span_id = id.into_u64(); - let ts = clock_monotonic_ns(); - - let Some(span_ref) = ctx.span(id) else { return }; - let ext = span_ref.extensions(); - let Some(data) = ext.get::() else { - return; - }; + let Some(span_ref) = ctx.span(id) else { return }; + let ext = span_ref.extensions(); + let Some(data) = ext.get::() else { + return; + }; - let schemas = self.get_schemas(data.meta); - let span_name = data.meta.name(); + let schemas = self.get_schemas(data.meta); + let span_name = data.meta.name(); - handle.with_encoder(|enc| { let mut values = Vec::with_capacity(4 + schemas.field_names.len()); values.push(FieldValue::Varint(ts)); values.push(FieldValue::Varint(worker_id.as_u64())); From c44562c87dcb07220c377d8e63851995cedbb4e8 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Thu, 30 Apr 2026 18:48:40 +0000 Subject: [PATCH 2/3] Remove EVENT_CONSTRUCTION_COUNT test infrastructure The `enabled()` callback pattern now structurally prevents event construction when disabled, making the counter-based test redundant. Removes the static counter, the two helper functions, the four `#[cfg(test)] fetch_add` calls in make_* helpers, and the `hooks_skip_event_construction_when_disabled` test. --- .../src/telemetry/recorder/mod.rs | 39 ------------------- .../src/telemetry/recorder/runtime_context.rs | 26 ------------- 2 files changed, 65 deletions(-) diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index 543f341c..4bc2f1db 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -4,8 +4,6 @@ mod shared_state; pub(crate) use runtime_context::RuntimeContext; pub use runtime_context::current_worker_id; -#[cfg(test)] -pub(crate) use runtime_context::{event_construction_count, reset_event_construction_count}; pub(crate) use shared_state::SharedState; use event_writer::EventWriter; @@ -2077,43 +2075,6 @@ mod tests { } } - /// When telemetry is disabled at runtime, hook closures must skip event - /// construction entirely to avoid paying for expensive syscalls - /// (thread_cpu_time_nanos, SchedStat::read_current) on every park/unpark. - #[test] - fn hooks_skip_event_construction_when_disabled() { - reset_event_construction_count(); - - let mut builder = tokio::runtime::Builder::new_multi_thread(); - builder.worker_threads(2); - // Build with telemetry hooks installed but recording DISABLED. - let (runtime, guard) = TracedRuntime::builder() - .build_with_writer(builder, NullWriter) - .unwrap(); - // Do NOT call guard.enable() — telemetry stays disabled. - - runtime.block_on(async { - let mut handles = Vec::new(); - for _ in 0..50 { - handles.push(tokio::spawn(async { - tokio::task::yield_now().await; - })); - } - for h in handles { - h.await.unwrap(); - } - }); - - drop(runtime); - drop(guard); - - let count = event_construction_count(); - assert_eq!( - count, 0, - "expected zero event constructions when telemetry is disabled, got {count}" - ); - } - #[test] fn telemetry_core_builds_guard_without_runtime() { let guard = TelemetryCore::builder().writer(NullWriter).build().unwrap(); diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs index f75913ed..6b41a19b 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs @@ -147,32 +147,12 @@ pub fn current_worker_id() -> WorkerId { // ── Event construction helpers ─────────────────────────────────────────────── -#[cfg(test)] -static EVENT_CONSTRUCTION_COUNT: std::sync::atomic::AtomicUsize = - std::sync::atomic::AtomicUsize::new(0); - -/// Return the number of times a `make_*` event-construction helper has been -/// called. Test-only; used to verify that the early-disable check in hook -/// closures prevents expensive syscalls when telemetry is disabled. -#[cfg(test)] -pub(crate) fn event_construction_count() -> usize { - EVENT_CONSTRUCTION_COUNT.load(std::sync::atomic::Ordering::Relaxed) -} - -/// Reset the event-construction counter to zero. -#[cfg(test)] -pub(crate) fn reset_event_construction_count() { - EVENT_CONSTRUCTION_COUNT.store(0, std::sync::atomic::Ordering::Relaxed); -} - pub(super) fn make_poll_start( ctx: &RuntimeContext, shared: &SharedState, location: &'static std::panic::Location<'static>, task_id: TaskId, ) -> RawEvent { - #[cfg(test)] - EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) @@ -187,8 +167,6 @@ pub(super) fn make_poll_start( } pub(super) fn make_poll_end(ctx: &RuntimeContext, shared: &SharedState) -> RawEvent { - #[cfg(test)] - EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); RawEvent::PollEnd { timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), @@ -197,8 +175,6 @@ pub(super) fn make_poll_end(ctx: &RuntimeContext, shared: &SharedState) -> RawEv } pub(super) fn make_worker_park(ctx: &RuntimeContext, shared: &SharedState) -> RawEvent { - #[cfg(test)] - EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) @@ -216,8 +192,6 @@ pub(super) fn make_worker_park(ctx: &RuntimeContext, shared: &SharedState) -> Ra } pub(super) fn make_worker_unpark(ctx: &RuntimeContext, shared: &SharedState) -> RawEvent { - #[cfg(test)] - EVENT_CONSTRUCTION_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let resolved = ctx.resolve_worker(shared); let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) From e43c49fada81358c489a0f01f9c69975ba1b9fde Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Thu, 30 Apr 2026 23:06:40 +0000 Subject: [PATCH 3/3] fix: correct doc links and use is_enabled for control-flow guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix 3 broken rustdoc links: Self::enabled → Self::if_enabled, SharedState::enabled → SharedState::if_enabled - Use is_enabled() instead of if_enabled(|_buf|) in flush_once CPU profiling path, since the EventBuffer token is unused and this is a control-flow decision, not an event-recording path --- dial9-tokio-telemetry/src/telemetry/recorder/mod.rs | 4 ++-- .../src/telemetry/recorder/shared_state.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index 4bc2f1db..40880ff1 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -89,9 +89,9 @@ fn flush_once( let cpu_events_time = std::time::Instant::now(); #[cfg(feature = "cpu-profiling")] { - shared.if_enabled(|_buf| { + if shared.is_enabled() { event_writer.flush_cpu(shared); - }); + } } let cpu_flush_duration = cpu_events_time.elapsed(); diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs index 031872fc..9e344160 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs @@ -85,7 +85,7 @@ impl SharedState { /// Check whether recording is currently enabled. /// - /// Prefer [`enabled`](Self::enabled) for event-recording paths — it + /// Prefer [`if_enabled`](Self::if_enabled) for event-recording paths — it /// provides an [`EventBuffer`] that makes it structurally impossible to /// record without checking first. Use `is_enabled()` only for /// control-flow decisions that don't directly record events (e.g. @@ -117,7 +117,7 @@ impl SharedState { /// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event. /// - /// Callers must ensure recording is enabled (via [`enabled`](Self::enabled) + /// Callers must ensure recording is enabled (via [`if_enabled`](Self::if_enabled) /// or [`is_enabled`](Self::is_enabled)) before calling this method. fn record_encodable_event(&self, event: &dyn buffer::Encodable) { if let Some(handle) = @@ -205,7 +205,7 @@ impl SharedState { } } -/// Handle provided by [`SharedState::enabled`] that proves recording is +/// Handle provided by [`SharedState::if_enabled`] that proves recording is /// active. All event-recording calls should go through this type so that /// callers cannot accidentally emit events without an enabled check. pub(crate) struct EventBuffer<'a>(&'a SharedState);