diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index c20e63da..41985439 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -89,7 +89,7 @@ fn flush_once( let cpu_events_time = std::time::Instant::now(); #[cfg(feature = "cpu-profiling")] { - if shared.enabled.load(Ordering::Relaxed) { + if shared.is_enabled() { event_writer.flush_cpu(shared); } } @@ -168,44 +168,56 @@ fn register_hooks( builder .on_thread_park(move || { - let event = make_worker_park(&c1, &s1); - s1.record_event(event); + s1.if_enabled(|buf| { + let event = make_worker_park(&c1, &s1); + buf.record_event(event); + }); }) .on_thread_unpark(move || { - let event = make_worker_unpark(&c2, &s2); - s2.record_event(event); + s2.if_enabled(|buf| { + let event = make_worker_unpark(&c2, &s2); + buf.record_event(event); + }); }) .on_before_task_poll(move |meta| { - let task_id = TaskId::from(meta.id()); - let location = meta.spawned_at(); - let event = make_poll_start(&c3, &s3, location, task_id); - s3.record_event(event); + s3.if_enabled(|buf| { + let task_id = TaskId::from(meta.id()); + let location = meta.spawned_at(); + let event = make_poll_start(&c3, &s3, location, task_id); + buf.record_event(event); + }); }) .on_after_task_poll(move |_meta| { - let event = make_poll_end(&c4, &s4); - s4.record_event(event); + s4.if_enabled(|buf| { + let event = make_poll_end(&c4, &s4); + buf.record_event(event); + }); }); if task_tracking_enabled { let s5 = shared.clone(); builder.on_task_spawn(move |meta| { - let task_id = TaskId::from(meta.id()); - let location = meta.spawned_at(); - let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get()); - - s5.record_event(RawEvent::TaskSpawn { - timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), - task_id, - location, - instrumented, + s5.if_enabled(|buf| { + let task_id = TaskId::from(meta.id()); + let location = meta.spawned_at(); + let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get()); + + buf.record_event(RawEvent::TaskSpawn { + timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), + task_id, + location, + instrumented, + }); }); }); let s6 = shared.clone(); builder.on_task_terminate(move |meta| { - let task_id = TaskId::from(meta.id()); - s6.record_event(RawEvent::TaskTerminate { - timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), - task_id, + s6.if_enabled(|buf| { + let task_id = TaskId::from(meta.id()); + buf.record_event(RawEvent::TaskTerminate { + timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(), + task_id, + }); }); }); } @@ -447,15 +459,20 @@ impl TelemetryHandle { } /// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event. + /// + /// No-op on a disabled handle or when recording is paused. pub(crate) fn record_encodable_event(&self, event: &dyn crate::telemetry::buffer::Encodable) { if let Some(inner) = &self.inner { - inner.shared.record_encodable_event(event); + inner + .shared + .if_enabled(|buf| buf.record_encodable_event(event)); } } /// Run a closure with direct access to the thread-local encoder. /// - /// No-op on a disabled handle. + /// The closure is only invoked if telemetry is enabled. + /// No-op on a disabled handle or when recording is paused. // TODO(GH-XXX): consider making this public as an alternative to record_event // for zero-copy dynamic schema encoding pub(crate) fn with_encoder( @@ -463,7 +480,7 @@ impl TelemetryHandle { f: impl FnOnce(&mut crate::telemetry::buffer::ThreadLocalEncoder<'_>), ) { if let Some(inner) = &self.inner { - inner.shared.with_encoder(f); + inner.shared.if_enabled(|buf| buf.with_encoder(f)); } } @@ -1369,7 +1386,7 @@ fn run_flush_loop( // When disabled, skip all recording work (queue sampling, metadata // merging, drain coordination, flush). The loop still wakes every // 5ms to check for control commands and the exit signal. - if !exit && !shared.enabled.load(Ordering::Relaxed) { + if !exit && !shared.is_enabled() { continue; } diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs index 64c10ef6..9e344160 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs @@ -83,6 +83,27 @@ impl SharedState { } } + /// Check whether recording is currently enabled. + /// + /// Prefer [`if_enabled`](Self::if_enabled) for event-recording paths — it + /// provides an [`EventBuffer`] that makes it structurally impossible to + /// record without checking first. Use `is_enabled()` only for + /// control-flow decisions that don't directly record events (e.g. + /// deciding whether to wrap a waker in `Traced::poll`). + pub(crate) fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + /// Run `f` only when recording is enabled, passing an [`EventBuffer`] + /// that provides `record_event` / `record_encodable_event`. Returns + /// `None` when disabled (no work is done). + pub(crate) fn if_enabled(&self, f: impl FnOnce(&EventBuffer<'_>) -> R) -> Option { + if !self.enabled.load(Ordering::Relaxed) { + return None; + } + Some(f(&EventBuffer(self))) + } + pub(crate) fn record_queue_sample(&self, global_queue_depth: usize) { self.record_event(RawEvent::QueueSample { timestamp_nanos: self.timestamp_nanos(), @@ -90,15 +111,15 @@ impl SharedState { }); } - pub(crate) fn record_event(&self, event: RawEvent) { + fn record_event(&self, event: RawEvent) { self.record_encodable_event(&event); } /// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event. - pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) { - if !self.enabled.load(Ordering::Relaxed) { - return; - } + /// + /// Callers must ensure recording is enabled (via [`if_enabled`](Self::if_enabled) + /// or [`is_enabled`](Self::is_enabled)) before calling this method. + fn record_encodable_event(&self, event: &dyn buffer::Encodable) { if let Some(handle) = buffer::record_encodable_event(event, &self.collector, &self.drain_epoch) { @@ -106,16 +127,6 @@ impl SharedState { } } - /// Run a closure with direct access to the thread-local encoder. - pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) { - if !self.enabled.load(Ordering::Relaxed) { - return; - } - if let Some(handle) = buffer::with_encoder(f, &self.collector, &self.drain_epoch) { - self.tl_buffers.lock().unwrap().push(handle); - } - } - /// Bump the drain epoch and flush all idle/silent thread-local buffers. /// /// Buffers whose `FlushEpoch` matches the current epoch are skipped @@ -194,6 +205,31 @@ impl SharedState { } } +/// Handle provided by [`SharedState::if_enabled`] that proves recording is +/// active. All event-recording calls should go through this type so that +/// callers cannot accidentally emit events without an enabled check. +pub(crate) struct EventBuffer<'a>(&'a SharedState); + +impl EventBuffer<'_> { + pub(crate) fn record_event(&self, event: RawEvent) { + self.record_encodable_event(&event); + } + + pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) { + if let Some(handle) = + buffer::record_encodable_event(event, &self.0.collector, &self.0.drain_epoch) + { + self.0.tl_buffers.lock().unwrap().push(handle); + } + } + + pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) { + if let Some(handle) = buffer::with_encoder(f, &self.0.collector, &self.0.drain_epoch) { + self.0.tl_buffers.lock().unwrap().push(handle); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/dial9-tokio-telemetry/src/traced.rs b/dial9-tokio-telemetry/src/traced.rs index 8e67bfbd..00dc1689 100644 --- a/dial9-tokio-telemetry/src/traced.rs +++ b/dial9-tokio-telemetry/src/traced.rs @@ -7,7 +7,6 @@ use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::sync::atomic::Ordering; use std::task::{Context, Poll, Waker}; /// Handle used by `Traced` to emit events into the telemetry system. @@ -70,20 +69,22 @@ impl ArcWake for TracedWakerData { } fn record_wake_event(data: &TracedWakerData) { - // The worker issuing the wake — not the worker that will execute the woken task - // (which is unknowable at wake time). Stored in the event as `target_worker`. - let waking_worker_id = crate::telemetry::recorder::current_worker_id(); - // TODO: cleanly handle more than 255 global workers in the wake event wire format. - // Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255). - let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 { - waking_worker_id.as_u64() as u8 - } else { - 255 - }; - let event = data - .shared - .create_wake_event(data.woken_task_id, waking_worker_u8); - data.shared.record_event(event); + data.shared.if_enabled(|buf| { + // The worker issuing the wake — not the worker that will execute the woken task + // (which is unknowable at wake time). Stored in the event as `target_worker`. + let waking_worker_id = crate::telemetry::recorder::current_worker_id(); + // TODO: cleanly handle more than 255 global workers in the wake event wire format. + // Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255). + let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 { + waking_worker_id.as_u64() as u8 + } else { + 255 + }; + let event = data + .shared + .create_wake_event(data.woken_task_id, waking_worker_u8); + buf.record_event(event); + }); } fn make_traced_waker(data: Arc) -> Waker { @@ -95,7 +96,7 @@ impl Future for Traced { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if !this.handle.shared.enabled.load(Ordering::Relaxed) { + if !this.handle.shared.is_enabled() { return this.inner.poll(cx); } diff --git a/dial9-tokio-telemetry/src/tracing_layer.rs b/dial9-tokio-telemetry/src/tracing_layer.rs index 9c981c35..996ca10d 100644 --- a/dial9-tokio-telemetry/src/tracing_layer.rs +++ b/dial9-tokio-telemetry/src/tracing_layer.rs @@ -259,6 +259,9 @@ where S: tracing::Subscriber + for<'a> LookupSpan<'a>, { fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + if !TelemetryHandle::current().is_enabled() { + return; + } let mut field_values = Vec::new(); attrs.record(&mut FieldVisitor { values: &mut field_values, @@ -276,6 +279,9 @@ where } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { + if !TelemetryHandle::current().is_enabled() { + return; + } if let Some(span) = ctx.span(id) { let mut extensions = span.extensions_mut(); if let Some(data) = extensions.get_mut::() { @@ -291,29 +297,28 @@ where if !handle.is_enabled() { return; } - - let worker_id = current_worker_id(); - let span_id = id.into_u64(); - let ts = clock_monotonic_ns(); - - let Some(span_ref) = ctx.span(id) else { return }; - let ext = span_ref.extensions(); - let Some(data) = ext.get::() else { - return; - }; - - let schemas = self.get_schemas(data.meta); - - // We only use explicit parents (span!(parent: &x, ..)), not contextual - // parents (ctx.current_span()), because contextual parenting is - // unreliable across tasks on the same worker thread. See: - // https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/ - // The viewer infers nesting from timestamp containment instead. - let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64()); - let span_name = data.meta.name(); - - // Encode directly into the thread-local buffer (no clone needed) handle.with_encoder(|enc| { + let worker_id = current_worker_id(); + let span_id = id.into_u64(); + let ts = clock_monotonic_ns(); + + let Some(span_ref) = ctx.span(id) else { return }; + let ext = span_ref.extensions(); + let Some(data) = ext.get::() else { + return; + }; + + let schemas = self.get_schemas(data.meta); + + // We only use explicit parents (span!(parent: &x, ..)), not contextual + // parents (ctx.current_span()), because contextual parenting is + // unreliable across tasks on the same worker thread. See: + // https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/ + // The viewer infers nesting from timestamp containment instead. + let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64()); + let span_name = data.meta.name(); + + // Encode directly into the thread-local buffer (no clone needed) let mut values = Vec::with_capacity(5 + schemas.field_names.len()); values.push(FieldValue::Varint(ts)); values.push(FieldValue::Varint(worker_id.as_u64())); @@ -338,21 +343,20 @@ where if !handle.is_enabled() { return; } + handle.with_encoder(|enc| { + let worker_id = current_worker_id(); + let span_id = id.into_u64(); + let ts = clock_monotonic_ns(); - let worker_id = current_worker_id(); - let span_id = id.into_u64(); - let ts = clock_monotonic_ns(); - - let Some(span_ref) = ctx.span(id) else { return }; - let ext = span_ref.extensions(); - let Some(data) = ext.get::() else { - return; - }; + let Some(span_ref) = ctx.span(id) else { return }; + let ext = span_ref.extensions(); + let Some(data) = ext.get::() else { + return; + }; - let schemas = self.get_schemas(data.meta); - let span_name = data.meta.name(); + let schemas = self.get_schemas(data.meta); + let span_name = data.meta.name(); - handle.with_encoder(|enc| { let mut values = Vec::with_capacity(4 + schemas.field_names.len()); values.push(FieldValue::Varint(ts)); values.push(FieldValue::Varint(worker_id.as_u64()));