Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions dial9-tokio-telemetry/src/telemetry/recorder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn flush_once(
let cpu_events_time = std::time::Instant::now();
#[cfg(feature = "cpu-profiling")]
{
if shared.enabled.load(Ordering::Relaxed) {
if shared.is_enabled() {
event_writer.flush_cpu(shared);
}
}
Expand Down Expand Up @@ -168,44 +168,56 @@ fn register_hooks(

builder
.on_thread_park(move || {
let event = make_worker_park(&c1, &s1);
s1.record_event(event);
s1.if_enabled(|buf| {
let event = make_worker_park(&c1, &s1);
buf.record_event(event);
});
})
.on_thread_unpark(move || {
let event = make_worker_unpark(&c2, &s2);
s2.record_event(event);
s2.if_enabled(|buf| {
let event = make_worker_unpark(&c2, &s2);
buf.record_event(event);
});
})
.on_before_task_poll(move |meta| {
let task_id = TaskId::from(meta.id());
let location = meta.spawned_at();
let event = make_poll_start(&c3, &s3, location, task_id);
s3.record_event(event);
s3.if_enabled(|buf| {
let task_id = TaskId::from(meta.id());
let location = meta.spawned_at();
let event = make_poll_start(&c3, &s3, location, task_id);
buf.record_event(event);
});
})
.on_after_task_poll(move |_meta| {
let event = make_poll_end(&c4, &s4);
s4.record_event(event);
s4.if_enabled(|buf| {
let event = make_poll_end(&c4, &s4);
buf.record_event(event);
});
});

if task_tracking_enabled {
let s5 = shared.clone();
builder.on_task_spawn(move |meta| {
let task_id = TaskId::from(meta.id());
let location = meta.spawned_at();
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get());

s5.record_event(RawEvent::TaskSpawn {
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
task_id,
location,
instrumented,
s5.if_enabled(|buf| {
let task_id = TaskId::from(meta.id());
let location = meta.spawned_at();
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get());

buf.record_event(RawEvent::TaskSpawn {
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
task_id,
location,
instrumented,
});
});
});
let s6 = shared.clone();
builder.on_task_terminate(move |meta| {
let task_id = TaskId::from(meta.id());
s6.record_event(RawEvent::TaskTerminate {
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
task_id,
s6.if_enabled(|buf| {
let task_id = TaskId::from(meta.id());
buf.record_event(RawEvent::TaskTerminate {
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
task_id,
});
});
});
}
Expand Down Expand Up @@ -447,23 +459,28 @@ impl TelemetryHandle {
}

/// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event.
///
/// No-op on a disabled handle or when recording is paused.
pub(crate) fn record_encodable_event(&self, event: &dyn crate::telemetry::buffer::Encodable) {
if let Some(inner) = &self.inner {
inner.shared.record_encodable_event(event);
inner
.shared
.if_enabled(|buf| buf.record_encodable_event(event));
}
}

/// Run a closure with direct access to the thread-local encoder.
///
/// No-op on a disabled handle.
/// The closure is only invoked if telemetry is enabled.
/// No-op on a disabled handle or when recording is paused.
// TODO(GH-XXX): consider making this public as an alternative to record_event
// for zero-copy dynamic schema encoding
pub(crate) fn with_encoder(
&self,
f: impl FnOnce(&mut crate::telemetry::buffer::ThreadLocalEncoder<'_>),
) {
if let Some(inner) = &self.inner {
inner.shared.with_encoder(f);
inner.shared.if_enabled(|buf| buf.with_encoder(f));
}
}

Expand Down Expand Up @@ -1369,7 +1386,7 @@ fn run_flush_loop(
// When disabled, skip all recording work (queue sampling, metadata
// merging, drain coordination, flush). The loop still wakes every
// 5ms to check for control commands and the exit signal.
if !exit && !shared.enabled.load(Ordering::Relaxed) {
if !exit && !shared.is_enabled() {
continue;
}

Expand Down
66 changes: 51 additions & 15 deletions dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,39 +83,50 @@ impl SharedState {
}
}

/// Check whether recording is currently enabled.
///
/// Prefer [`if_enabled`](Self::if_enabled) for event-recording paths — it
/// provides an [`EventBuffer`] that makes it structurally impossible to
/// record without checking first. Use `is_enabled()` only for
/// control-flow decisions that don't directly record events (e.g.
/// deciding whether to wrap a waker in `Traced::poll`).
pub(crate) fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}

/// Run `f` only when recording is enabled, passing an [`EventBuffer`]
/// that provides `record_event` / `record_encodable_event`. Returns
/// `None` when disabled (no work is done).
pub(crate) fn if_enabled<R>(&self, f: impl FnOnce(&EventBuffer<'_>) -> R) -> Option<R> {
if !self.enabled.load(Ordering::Relaxed) {
return None;
}
Some(f(&EventBuffer(self)))
}

pub(crate) fn record_queue_sample(&self, global_queue_depth: usize) {
self.record_event(RawEvent::QueueSample {
timestamp_nanos: self.timestamp_nanos(),
global_queue_depth,
});
}

pub(crate) fn record_event(&self, event: RawEvent) {
fn record_event(&self, event: RawEvent) {
self.record_encodable_event(&event);
}

/// Record a user-defined [`Encodable`](crate::telemetry::buffer::Encodable) event.
pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) {
if !self.enabled.load(Ordering::Relaxed) {
return;
}
///
/// Callers must ensure recording is enabled (via [`if_enabled`](Self::if_enabled)
/// or [`is_enabled`](Self::is_enabled)) before calling this method.
fn record_encodable_event(&self, event: &dyn buffer::Encodable) {
if let Some(handle) =
buffer::record_encodable_event(event, &self.collector, &self.drain_epoch)
{
self.tl_buffers.lock().unwrap().push(handle);
}
}

/// Run a closure with direct access to the thread-local encoder.
pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) {
if !self.enabled.load(Ordering::Relaxed) {
return;
}
if let Some(handle) = buffer::with_encoder(f, &self.collector, &self.drain_epoch) {
self.tl_buffers.lock().unwrap().push(handle);
}
}

/// Bump the drain epoch and flush all idle/silent thread-local buffers.
///
/// Buffers whose `FlushEpoch` matches the current epoch are skipped
Expand Down Expand Up @@ -194,6 +205,31 @@ impl SharedState {
}
}

/// Handle provided by [`SharedState::if_enabled`] that proves recording is
/// active. All event-recording calls should go through this type so that
/// callers cannot accidentally emit events without an enabled check.
pub(crate) struct EventBuffer<'a>(&'a SharedState);

impl EventBuffer<'_> {
pub(crate) fn record_event(&self, event: RawEvent) {
self.record_encodable_event(&event);
}

pub(crate) fn record_encodable_event(&self, event: &dyn buffer::Encodable) {
if let Some(handle) =
buffer::record_encodable_event(event, &self.0.collector, &self.0.drain_epoch)
{
self.0.tl_buffers.lock().unwrap().push(handle);
}
}

pub(crate) fn with_encoder(&self, f: impl FnOnce(&mut buffer::ThreadLocalEncoder<'_>)) {
if let Some(handle) = buffer::with_encoder(f, &self.0.collector, &self.0.drain_epoch) {
self.0.tl_buffers.lock().unwrap().push(handle);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
33 changes: 17 additions & 16 deletions dial9-tokio-telemetry/src/traced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll, Waker};

/// Handle used by `Traced<F>` to emit events into the telemetry system.
Expand Down Expand Up @@ -70,20 +69,22 @@ impl ArcWake for TracedWakerData {
}

fn record_wake_event(data: &TracedWakerData) {
// The worker issuing the wake — not the worker that will execute the woken task
// (which is unknowable at wake time). Stored in the event as `target_worker`.
let waking_worker_id = crate::telemetry::recorder::current_worker_id();
// TODO: cleanly handle more than 255 global workers in the wake event wire format.
// Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255).
let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 {
waking_worker_id.as_u64() as u8
} else {
255
};
let event = data
.shared
.create_wake_event(data.woken_task_id, waking_worker_u8);
data.shared.record_event(event);
data.shared.if_enabled(|buf| {
// The worker issuing the wake — not the worker that will execute the woken task
// (which is unknowable at wake time). Stored in the event as `target_worker`.
let waking_worker_id = crate::telemetry::recorder::current_worker_id();
// TODO: cleanly handle more than 255 global workers in the wake event wire format.
// Wake event wire format uses u8; clamp large worker IDs to UNKNOWN (255).
let waking_worker_u8 = if waking_worker_id.as_u64() <= 254 {
waking_worker_id.as_u64() as u8
} else {
255
};
let event = data
.shared
.create_wake_event(data.woken_task_id, waking_worker_u8);
buf.record_event(event);
});
}

fn make_traced_waker(data: Arc<TracedWakerData>) -> Waker {
Expand All @@ -95,7 +96,7 @@ impl<F: Future> Future for Traced<F> {

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if !this.handle.shared.enabled.load(Ordering::Relaxed) {
if !this.handle.shared.is_enabled() {
return this.inner.poll(cx);
}

Expand Down
72 changes: 38 additions & 34 deletions dial9-tokio-telemetry/src/tracing_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
if !TelemetryHandle::current().is_enabled() {
return;
}
let mut field_values = Vec::new();
attrs.record(&mut FieldVisitor {
values: &mut field_values,
Expand All @@ -276,6 +279,9 @@ where
}

fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
if !TelemetryHandle::current().is_enabled() {
return;
}
if let Some(span) = ctx.span(id) {
let mut extensions = span.extensions_mut();
if let Some(data) = extensions.get_mut::<SpanData>() {
Expand All @@ -291,29 +297,28 @@ where
if !handle.is_enabled() {
return;
}

let worker_id = current_worker_id();
let span_id = id.into_u64();
let ts = clock_monotonic_ns();

let Some(span_ref) = ctx.span(id) else { return };
let ext = span_ref.extensions();
let Some(data) = ext.get::<SpanData>() else {
return;
};

let schemas = self.get_schemas(data.meta);

// We only use explicit parents (span!(parent: &x, ..)), not contextual
// parents (ctx.current_span()), because contextual parenting is
// unreliable across tasks on the same worker thread. See:
// https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/
// The viewer infers nesting from timestamp containment instead.
let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64());
let span_name = data.meta.name();

// Encode directly into the thread-local buffer (no clone needed)
handle.with_encoder(|enc| {
let worker_id = current_worker_id();
let span_id = id.into_u64();
let ts = clock_monotonic_ns();

let Some(span_ref) = ctx.span(id) else { return };
let ext = span_ref.extensions();
let Some(data) = ext.get::<SpanData>() else {
return;
};

let schemas = self.get_schemas(data.meta);

// We only use explicit parents (span!(parent: &x, ..)), not contextual
// parents (ctx.current_span()), because contextual parenting is
// unreliable across tasks on the same worker thread. See:
// https://chesedo.me/blog/rust-tracing-incorrect-parent-spans-async-futures-joinset/
// The viewer infers nesting from timestamp containment instead.
let parent_span_id = data.parent_id.as_ref().map(|id| id.into_u64());
let span_name = data.meta.name();

// Encode directly into the thread-local buffer (no clone needed)
let mut values = Vec::with_capacity(5 + schemas.field_names.len());
values.push(FieldValue::Varint(ts));
values.push(FieldValue::Varint(worker_id.as_u64()));
Expand All @@ -338,21 +343,20 @@ where
if !handle.is_enabled() {
return;
}
handle.with_encoder(|enc| {
let worker_id = current_worker_id();
let span_id = id.into_u64();
let ts = clock_monotonic_ns();

let worker_id = current_worker_id();
let span_id = id.into_u64();
let ts = clock_monotonic_ns();

let Some(span_ref) = ctx.span(id) else { return };
let ext = span_ref.extensions();
let Some(data) = ext.get::<SpanData>() else {
return;
};
let Some(span_ref) = ctx.span(id) else { return };
let ext = span_ref.extensions();
let Some(data) = ext.get::<SpanData>() else {
return;
};

let schemas = self.get_schemas(data.meta);
let span_name = data.meta.name();
let schemas = self.get_schemas(data.meta);
let span_name = data.meta.name();

handle.with_encoder(|enc| {
let mut values = Vec::with_capacity(4 + schemas.field_names.len());
values.push(FieldValue::Varint(ts));
values.push(FieldValue::Varint(worker_id.as_u64()));
Expand Down
Loading