Skip to content

Commit 0eb8bb0

Browse files
committed
cache the timestamp on poll_start
1 parent b8dffa1 commit 0eb8bb0

3 files changed

Lines changed: 50 additions & 16 deletions

File tree

dial9-tokio-telemetry/src/task_dumped.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::telemetry::{Encodable, ThreadLocalEncoder};
3535
use pin_project_lite::pin_project;
3636
use smallvec::SmallVec;
3737
use std::future::Future;
38+
use std::num::NonZeroU64;
3839
use std::pin::Pin;
3940
use std::sync::Arc;
4041
use std::sync::atomic::Ordering;
@@ -54,7 +55,7 @@ pin_project! {
5455
frames: FrameBuf,
5556
// Monotonic nanoseconds when the frames in `frames` were captured.
5657
// Only meaningful when `frames.has_data()`.
57-
pending_capture_ts: u64,
58+
pending_capture_ts: Option<NonZeroU64>,
5859
// Snapshot of SharedState::task_dump_idle_threshold_ns at construction;
5960
// promotes the atomic load off the poll hot path.
6061
idle_threshold_ns: u64,
@@ -69,7 +70,7 @@ impl<F> TaskDumped<F> {
6970
shared,
7071
task_id,
7172
frames: FrameBuf::new(),
72-
pending_capture_ts: 0,
73+
pending_capture_ts: None,
7374
idle_threshold_ns,
7475
}
7576
}
@@ -91,38 +92,42 @@ impl<F: Future> Future for TaskDumped<F> {
9192
// after capture resumes.
9293
if this.frames.has_data() {
9394
this.frames.clear();
94-
*this.pending_capture_ts = 0;
95+
*this.pending_capture_ts = None;
9596
}
9697
return this.inner.poll(cx);
9798
}
9899

99100
// If we have captured frames from a previous idle, decide whether
100101
// that idle was long enough to emit.
101-
let should_emit = if this.frames.has_data() {
102-
let now = crate::telemetry::events::clock_monotonic_ns();
103-
now.saturating_sub(*this.pending_capture_ts) > *this.idle_threshold_ns
104-
} else {
105-
false
102+
let poll_start = crate::telemetry::recorder::poll_start_ts_or_now();
103+
let should_emit = match *this.pending_capture_ts {
104+
Some(ts) if this.frames.has_data() => {
105+
poll_start.saturating_sub(ts.get()) > *this.idle_threshold_ns
106+
}
107+
_ => false,
106108
};
107109

108110
let result = this.inner.as_mut().poll(cx);
109111

110112
if should_emit {
111-
this.frames
112-
.emit(this.shared, *this.task_id, *this.pending_capture_ts);
113+
this.frames.emit(
114+
this.shared,
115+
*this.task_id,
116+
this.pending_capture_ts.unwrap().get(),
117+
);
113118
}
114119

115120
match &result {
116121
Poll::Ready(_) => {
117122
// Terminal. Nothing more to capture; discard stale frames.
118123
this.frames.clear();
119-
*this.pending_capture_ts = 0;
124+
*this.pending_capture_ts = None;
120125
}
121126
Poll::Pending => {
122127
// Capture the yield point we just landed on, for the next
123128
// poll's threshold check.
124129
this.frames.capture(this.inner.as_mut());
125-
*this.pending_capture_ts = crate::telemetry::events::clock_monotonic_ns();
130+
*this.pending_capture_ts = NonZeroU64::new(poll_start);
126131
}
127132
}
128133

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ mod shared_state;
44

55
pub(crate) use runtime_context::RuntimeContext;
66
pub use runtime_context::current_worker_id;
7+
#[cfg(feature = "taskdump")]
8+
pub(crate) use runtime_context::poll_start_ts_or_now;
79
pub(crate) use shared_state::SharedState;
810

911
use event_writer::EventWriter;
@@ -913,13 +915,18 @@ impl<P> TracedRuntimeBuilder<P> {
913915
}
914916

915917
/// Capture async backtraces at yield points for tasks that stay idle
916-
/// longer than the configured threshold. Requires the `taskdump` crate
917-
/// feature to actually record events; the builder accepts the config
918-
/// unconditionally so the API surface is stable.
918+
/// longer than the configured threshold.
919+
///
920+
/// Requires the `taskdump` crate feature to actually record events
919921
pub fn with_task_dumps(
920922
mut self,
921923
config: crate::telemetry::task_dump_config::TaskDumpConfig,
922924
) -> Self {
925+
if cfg!(not(feature = "taskdump")) {
926+
tracing::warn!(
927+
"taskdumps enabled but `taskdump` feature was not. No task dumps will be captured."
928+
)
929+
}
923930
self.task_dump_config = Some(config);
924931
self
925932
}

dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use crate::telemetry::format::{
77
use crate::telemetry::task_metadata::TaskId;
88
use std::cell::Cell;
99
use std::collections::HashMap;
10+
#[cfg(feature = "taskdump")]
11+
use std::num::NonZeroU64;
1012
use std::sync::OnceLock;
1113
use std::sync::RwLock;
1214
use tokio::runtime::RuntimeMetrics;
@@ -35,6 +37,21 @@ thread_local! {
3537
/// Whether we've registered this thread's OS tid for CPU profiling.
3638
#[cfg(feature = "cpu-profiling")]
3739
static TID_REGISTERED: Cell<bool> = const { Cell::new(false) };
40+
/// Monotonic timestamp captured in `on_before_task_poll`, cleared in
41+
/// `on_after_task_poll`. Allows code running inside a poll (e.g.
42+
/// `TaskDumped`) to reuse the timestamp without an extra clock read.
43+
#[cfg(feature = "taskdump")]
44+
static POLL_START_TS: Cell<Option<NonZeroU64>> = const { Cell::new(None) };
45+
}
46+
47+
/// Returns the poll-start timestamp if we're inside a poll, otherwise reads
48+
/// the clock.
49+
#[cfg(feature = "taskdump")]
50+
pub(crate) fn poll_start_ts_or_now() -> u64 {
51+
POLL_START_TS.with(|c| c.get()).map_or_else(
52+
crate::telemetry::events::clock_monotonic_ns,
53+
NonZeroU64::get,
54+
)
3855
}
3956

4057
impl RuntimeContext {
@@ -209,8 +226,11 @@ pub(super) fn make_poll_start(
209226
let worker_local_queue_depth = resolved
210227
.map(|(_, idx)| ctx.local_queue_depth(idx))
211228
.unwrap_or(0);
229+
let timestamp_ns = crate::telemetry::events::clock_monotonic_ns();
230+
#[cfg(feature = "taskdump")]
231+
POLL_START_TS.with(|c| c.set(NonZeroU64::new(timestamp_ns)));
212232
PollStart {
213-
timestamp_ns: crate::telemetry::events::clock_monotonic_ns(),
233+
timestamp_ns,
214234
worker_id: resolved.map(|(id, _)| id).unwrap_or(WorkerId::UNKNOWN),
215235
local_queue: worker_local_queue_depth as u8,
216236
task_id,
@@ -219,6 +239,8 @@ pub(super) fn make_poll_start(
219239
}
220240

221241
pub(super) fn make_poll_end(ctx: &RuntimeContext, shared: &SharedState) -> PollEndEvent {
242+
#[cfg(feature = "taskdump")]
243+
POLL_START_TS.with(|c| c.set(None));
222244
let resolved = ctx.resolve_worker(shared);
223245
PollEndEvent {
224246
timestamp_ns: crate::telemetry::events::clock_monotonic_ns(),

0 commit comments

Comments
 (0)