Skip to content

Commit 75ae5bc

Browse files
rcohjlizen
andauthored
fix: Avoid constructing events when telemetry is disabled (#332)
* fix: Avoid constructing events when telemetry is disabled * Remove EVENT_CONSTRUCTION_COUNT test infrastructure The `enabled()` callback pattern now structurally prevents event construction when disabled, making the counter-based test redundant. Removes the static counter, the two helper functions, the four `#[cfg(test)] fetch_add` calls in make_* helpers, and the `hooks_skip_event_construction_when_disabled` test. * fix: correct doc links and use is_enabled for control-flow guard - Fix 3 broken rustdoc links: Self::enabled → Self::if_enabled, SharedState::enabled → SharedState::if_enabled - Use is_enabled() instead of if_enabled(|_buf|) in flush_once CPU profiling path, since the EventBuffer token is unused and this is a control-flow decision, not an event-recording path --------- Co-authored-by: jlizen <jlizen@amazon.com>
1 parent 5e984b9 commit 75ae5bc

4 files changed

Lines changed: 151 additions & 93 deletions

File tree

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ fn flush_once(
8989
let cpu_events_time = std::time::Instant::now();
9090
#[cfg(feature = "cpu-profiling")]
9191
{
92-
if shared.enabled.load(Ordering::Relaxed) {
92+
if shared.is_enabled() {
9393
event_writer.flush_cpu(shared);
9494
}
9595
}
@@ -168,44 +168,56 @@ fn register_hooks(
168168

169169
builder
170170
.on_thread_park(move || {
171-
let event = make_worker_park(&c1, &s1);
172-
s1.record_event(event);
171+
s1.if_enabled(|buf| {
172+
let event = make_worker_park(&c1, &s1);
173+
buf.record_event(event);
174+
});
173175
})
174176
.on_thread_unpark(move || {
175-
let event = make_worker_unpark(&c2, &s2);
176-
s2.record_event(event);
177+
s2.if_enabled(|buf| {
178+
let event = make_worker_unpark(&c2, &s2);
179+
buf.record_event(event);
180+
});
177181
})
178182
.on_before_task_poll(move |meta| {
179-
let task_id = TaskId::from(meta.id());
180-
let location = meta.spawned_at();
181-
let event = make_poll_start(&c3, &s3, location, task_id);
182-
s3.record_event(event);
183+
s3.if_enabled(|buf| {
184+
let task_id = TaskId::from(meta.id());
185+
let location = meta.spawned_at();
186+
let event = make_poll_start(&c3, &s3, location, task_id);
187+
buf.record_event(event);
188+
});
183189
})
184190
.on_after_task_poll(move |_meta| {
185-
let event = make_poll_end(&c4, &s4);
186-
s4.record_event(event);
191+
s4.if_enabled(|buf| {
192+
let event = make_poll_end(&c4, &s4);
193+
buf.record_event(event);
194+
});
187195
});
188196

189197
if task_tracking_enabled {
190198
let s5 = shared.clone();
191199
builder.on_task_spawn(move |meta| {
192-
let task_id = TaskId::from(meta.id());
193-
let location = meta.spawned_at();
194-
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get());
195-
196-
s5.record_event(RawEvent::TaskSpawn {
197-
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
198-
task_id,
199-
location,
200-
instrumented,
200+
s5.if_enabled(|buf| {
201+
let task_id = TaskId::from(meta.id());
202+
let location = meta.spawned_at();
203+
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get());
204+
205+
buf.record_event(RawEvent::TaskSpawn {
206+
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
207+
task_id,
208+
location,
209+
instrumented,
210+
});
201211
});
202212
});
203213
let s6 = shared.clone();
204214
builder.on_task_terminate(move |meta| {
205-
let task_id = TaskId::from(meta.id());
206-
s6.record_event(RawEvent::TaskTerminate {
207-
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
208-
task_id,
215+
s6.if_enabled(|buf| {
216+
let task_id = TaskId::from(meta.id());
217+
buf.record_event(RawEvent::TaskTerminate {
218+
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
219+
task_id,
220+
});
209221
});
210222
});
211223
}
@@ -447,23 +459,28 @@ impl TelemetryHandle {
447459
}
448460

449461
/// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event.
462+
///
463+
/// No-op on a disabled handle or when recording is paused.
450464
pub(crate) fn record_encodable_event(&self, event: &dyn crate::telemetry::buffer::Encodable) {
451465
if let Some(inner) = &self.inner {
452-
inner.shared.record_encodable_event(event);
466+
inner
467+
.shared
468+
.if_enabled(|buf| buf.record_encodable_event(event));
453469
}
454470
}
455471

456472
/// Run a closure with direct access to the thread-local encoder.
457473
///
458-
/// No-op on a disabled handle.
474+
/// The closure is only invoked if telemetry is enabled.
475+
/// No-op on a disabled handle or when recording is paused.
459476
// TODO(GH-XXX): consider making this public as an alternative to record_event
460477
// for zero-copy dynamic schema encoding
461478
pub(crate) fn with_encoder(
462479
&self,
463480
f: impl FnOnce(&mut crate::telemetry::buffer::ThreadLocalEncoder<'_>),
464481
) {
465482
if let Some(inner) = &self.inner {
466-
inner.shared.with_encoder(f);
483+
inner.shared.if_enabled(|buf| buf.with_encoder(f));
467484
}
468485
}
469486

@@ -1369,7 +1386,7 @@ fn run_flush_loop(
13691386
// When disabled, skip all recording work (queue sampling, metadata
13701387
// merging, drain coordination, flush). The loop still wakes every
13711388
// 5ms to check for control commands and the exit signal.
1372-
if !exit && !shared.enabled.load(Ordering::Relaxed) {
1389+
if !exit && !shared.is_enabled() {
13731390
continue;
13741391
}
13751392

dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -83,39 +83,50 @@ impl SharedState {
8383
}
8484
}
8585

86+
/// Check whether recording is currently enabled.
87+
///
88+
/// Prefer [`if_enabled`](Self::if_enabled) for event-recording paths — it
89+
/// provides an [`EventBuffer`] that makes it structurally impossible to
90+
/// record without checking first. Use `is_enabled()` only for
91+
/// control-flow decisions that don't directly record events (e.g.
92+
/// deciding whether to wrap a waker in `Traced::poll`).
93+
pub(crate) fn is_enabled(&self) -> bool {
94+
self.enabled.load(Ordering::Relaxed)
95+
}
96+
97+
/// Run `f` only when recording is enabled, passing an [`EventBuffer`]
98+
/// that provides `record_event` / `record_encodable_event`. Returns
99+
/// `None` when disabled (no work is done).
100+
pub(crate) fn if_enabled<R>(&self, f: impl FnOnce(&EventBuffer<'_>) -> R) -> Option<R> {
101+
if !self.enabled.load(Ordering::Relaxed) {
102+
return None;
103+
}
104+
Some(f(&EventBuffer(self)))
105+
}
106+
86107
pub(crate) fn record_queue_sample(&self, global_queue_depth: usize) {
87108
self.record_event(RawEvent::QueueSample {
88109
timestamp_nanos: self.timestamp_nanos(),
89110
global_queue_depth,
90111
});
91112
}
92113

93-
pub(crate) fn record_event(&self, event: RawEvent) {
114+
fn record_event(&self, event: RawEvent) {
94115
self.record_encodable_event(&event);
95116
}
96117

97118
/// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event.
98-
pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) {
99-
if !self.enabled.load(Ordering::Relaxed) {
100-
return;
101-
}
119+
///
120+
/// Callers must ensure recording is enabled (via [`if_enabled`](Self::if_enabled)
121+
/// or [`is_enabled`](Self::is_enabled)) before calling this method.
122+
fn record_encodable_event(&self, event: &dyn buffer::Encodable) {
102123
if let Some(handle) =
103124
buffer::record_encodable_event(event, &self.collector, &self.drain_epoch)
104125
{
105126
self.tl_buffers.lock().unwrap().push(handle);
106127
}
107128
}
108129

109-
/// Run a closure with direct access to the thread-local encoder.
110-
pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) {
111-
if !self.enabled.load(Ordering::Relaxed) {
112-
return;
113-
}
114-
if let Some(handle) = buffer::with_encoder(f, &self.collector, &self.drain_epoch) {
115-
self.tl_buffers.lock().unwrap().push(handle);
116-
}
117-
}
118-
119130
/// Bump the drain epoch and flush all idle/silent thread-local buffers.
120131
///
121132
/// Buffers whose `FlushEpoch` matches the current epoch are skipped
@@ -194,6 +205,31 @@ impl SharedState {
194205
}
195206
}
196207

208+
/// Handle provided by [`SharedState::if_enabled`] that proves recording is
209+
/// active. All event-recording calls should go through this type so that
210+
/// callers cannot accidentally emit events without an enabled check.
211+
pub(crate) struct EventBuffer<'a>(&'a SharedState);
212+
213+
impl EventBuffer<'_> {
214+
pub(crate) fn record_event(&self, event: RawEvent) {
215+
self.record_encodable_event(&event);
216+
}
217+
218+
pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) {
219+
if let Some(handle) =
220+
buffer::record_encodable_event(event, &self.0.collector, &self.0.drain_epoch)
221+
{
222+
self.0.tl_buffers.lock().unwrap().push(handle);
223+
}
224+
}
225+
226+
pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) {
227+
if let Some(handle) = buffer::with_encoder(f, &self.0.collector, &self.0.drain_epoch) {
228+
self.0.tl_buffers.lock().unwrap().push(handle);
229+
}
230+
}
231+
}
232+
197233
#[cfg(test)]
198234
mod tests {
199235
use super::*;

dial9-tokio-telemetry/src/traced.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use pin_project_lite::pin_project;
77
use std::future::Future;
88
use std::pin::Pin;
99
use std::sync::Arc;
10-
use std::sync::atomic::Ordering;
1110
use std::task::{Context, Poll, Waker};
1211

1312
/// Handle used by `Traced<F>` to emit events into the telemetry system.
@@ -70,20 +69,22 @@ impl ArcWake for TracedWakerData {
7069
}
7170

7271
fn record_wake_event(data: &TracedWakerData) {
73-
// The worker issuing the wake — not the worker that will execute the woken task
74-
// (which is unknowable at wake time). Stored in the event as `target_worker`.
75-
let waking_worker_id = crate::telemetry::recorder::current_worker_id();
76-
// TODO: cleanly handle more than 255 global workers in the wake event wire format.
77-
// Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255).
78-
let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 {
79-
waking_worker_id.as_u64() as u8
80-
} else {
81-
255
82-
};
83-
let event = data
84-
.shared
85-
.create_wake_event(data.woken_task_id, waking_worker_u8);
86-
data.shared.record_event(event);
72+
data.shared.if_enabled(|buf| {
73+
// The worker issuing the wake — not the worker that will execute the woken task
74+
// (which is unknowable at wake time). Stored in the event as `target_worker`.
75+
let waking_worker_id = crate::telemetry::recorder::current_worker_id();
76+
// TODO: cleanly handle more than 255 global workers in the wake event wire format.
77+
// Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255).
78+
let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 {
79+
waking_worker_id.as_u64() as u8
80+
} else {
81+
255
82+
};
83+
let event = data
84+
.shared
85+
.create_wake_event(data.woken_task_id, waking_worker_u8);
86+
buf.record_event(event);
87+
});
8788
}
8889

8990
fn make_traced_waker(data: Arc<TracedWakerData>) -> Waker {
@@ -95,7 +96,7 @@ impl<F: Future> Future for Traced<F> {
9596

9697
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
9798
let this = self.project();
98-
if !this.handle.shared.enabled.load(Ordering::Relaxed) {
99+
if !this.handle.shared.is_enabled() {
99100
return this.inner.poll(cx);
100101
}
101102

dial9-tokio-telemetry/src/tracing_layer.rs

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ where
259259
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
260260
{
261261
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
262+
if !TelemetryHandle::current().is_enabled() {
263+
return;
264+
}
262265
let mut field_values = Vec::new();
263266
attrs.record(&mut FieldVisitor {
264267
values: &mut field_values,
@@ -276,6 +279,9 @@ where
276279
}
277280

278281
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
282+
if !TelemetryHandle::current().is_enabled() {
283+
return;
284+
}
279285
if let Some(span) = ctx.span(id) {
280286
let mut extensions = span.extensions_mut();
281287
if let Some(data) = extensions.get_mut::<SpanData>() {
@@ -291,29 +297,28 @@ where
291297
if !handle.is_enabled() {
292298
return;
293299
}
294-
295-
let worker_id = current_worker_id();
296-
let span_id = id.into_u64();
297-
let ts = clock_monotonic_ns();
298-
299-
let Some(span_ref) = ctx.span(id) else { return };
300-
let ext = span_ref.extensions();
301-
let Some(data) = ext.get::<SpanData>() else {
302-
return;
303-
};
304-
305-
let schemas = self.get_schemas(data.meta);
306-
307-
// We only use explicit parents (span!(parent: &x, ..)), not contextual
308-
// parents (ctx.current_span()), because contextual parenting is
309-
// unreliable across tasks on the same worker thread. See:
310-
// https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/
311-
// The viewer infers nesting from timestamp containment instead.
312-
let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64());
313-
let span_name = data.meta.name();
314-
315-
// Encode directly into the thread-local buffer (no clone needed)
316300
handle.with_encoder(|enc| {
301+
let worker_id = current_worker_id();
302+
let span_id = id.into_u64();
303+
let ts = clock_monotonic_ns();
304+
305+
let Some(span_ref) = ctx.span(id) else { return };
306+
let ext = span_ref.extensions();
307+
let Some(data) = ext.get::<SpanData>() else {
308+
return;
309+
};
310+
311+
let schemas = self.get_schemas(data.meta);
312+
313+
// We only use explicit parents (span!(parent: &x, ..)), not contextual
314+
// parents (ctx.current_span()), because contextual parenting is
315+
// unreliable across tasks on the same worker thread. See:
316+
// https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/
317+
// The viewer infers nesting from timestamp containment instead.
318+
let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64());
319+
let span_name = data.meta.name();
320+
321+
// Encode directly into the thread-local buffer (no clone needed)
317322
let mut values = Vec::with_capacity(5 + schemas.field_names.len());
318323
values.push(FieldValue::Varint(ts));
319324
values.push(FieldValue::Varint(worker_id.as_u64()));
@@ -338,21 +343,20 @@ where
338343
if !handle.is_enabled() {
339344
return;
340345
}
346+
handle.with_encoder(|enc| {
347+
let worker_id = current_worker_id();
348+
let span_id = id.into_u64();
349+
let ts = clock_monotonic_ns();
341350

342-
let worker_id = current_worker_id();
343-
let span_id = id.into_u64();
344-
let ts = clock_monotonic_ns();
345-
346-
let Some(span_ref) = ctx.span(id) else { return };
347-
let ext = span_ref.extensions();
348-
let Some(data) = ext.get::<SpanData>() else {
349-
return;
350-
};
351+
let Some(span_ref) = ctx.span(id) else { return };
352+
let ext = span_ref.extensions();
353+
let Some(data) = ext.get::<SpanData>() else {
354+
return;
355+
};
351356

352-
let schemas = self.get_schemas(data.meta);
353-
let span_name = data.meta.name();
357+
let schemas = self.get_schemas(data.meta);
358+
let span_name = data.meta.name();
354359

355-
handle.with_encoder(|enc| {
356360
let mut values = Vec::with_capacity(4 + schemas.field_names.len());
357361
values.push(FieldValue::Varint(ts));
358362
values.push(FieldValue::Varint(worker_id.as_u64()));

0 commit comments

Comments
 (0)