-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathtask_dumped.rs
More file actions
289 lines (271 loc) · 11.4 KB
/
task_dumped.rs
File metadata and controls
289 lines (271 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
//! `TaskDumped<F>` wraps a future and captures async backtraces at yield
//! points using Poisson sampling keyed on idle duration.
//!
//! This wrapper is intentionally separate from the wake-event wrapper: wake
//! capture runs on every instrumented spawn regardless of the `taskdump`
//! feature, while task-dump capture is gated behind the `taskdump` feature and
//! its own runtime toggle. Typical stacking is `WakeTraced<TaskDumped<F>>`.
//!
//! # Sampling model
//!
//! Instead of a hard time cutoff, each task maintains a byte-counter–style
//! `next_sample_ns` drawn from an exponential distribution with mean equal to
//! the configured `idle_threshold`. On each poll, the preceding idle duration
//! is subtracted from the counter. When the counter reaches zero or below, the
//! captured frames are emitted and a new gap is drawn. This gives unbiased
//! Poisson sampling: longer idles are more likely to trigger a dump, but even
//! short idles have a non-zero (if small) probability.
//!
//! # Capture mechanics
//!
//! If the current poll returns `Pending`, a fresh capture is taken via
//! [`tokio::runtime::dump::trace_with`] so that the next poll's sampling
//! decision has fresh data. The capture runs a second `poll` of the inner
//! future under the real waker inside `trace_with`. This may produce a
//! spurious wake (the inner future re-registers the waker, which fires
//! immediately), but avoids lost wakes that would cause task hangs.
//!
//! # Allocation
//!
//! Captured instruction pointers are stored flat in [`FrameBuf`] across all
//! yield points hit during a capture, with offsets recording each callchain's
//! start. The buffers are reused across polls.
use crate::sampling::SplitMix64;
use crate::telemetry::format::TaskDumpEvent;
use crate::telemetry::recorder::SharedState;
use crate::telemetry::task_metadata::TaskId;
use crate::telemetry::{Encodable, ThreadLocalEncoder};
use pin_project_lite::pin_project;
use smallvec::SmallVec;
use std::future::Future;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
/// Initial heap reservation for the instruction-pointer buffer on first capture.
const FRAME_BUF_INITIAL_CAPACITY: usize = 256;
// ─── TaskDumped future wrapper ──────────────────────────────────────────────
pin_project! {
/// Future wrapper that captures async backtraces at yield points using
/// Poisson sampling keyed on idle duration.
pub(crate) struct TaskDumped<F> {
#[pin]
inner: F,
shared: Arc<SharedState>,
task_id: TaskId,
frames: FrameBuf,
// Monotonic nanoseconds when the frames in `frames` were captured.
// Only meaningful when `frames.has_data()`.
pending_capture_ts: Option<NonZeroU64>,
// Sampling state: remaining nanoseconds of idle time before
// the next sample triggers. Signed so subtracting a large idle from a
// small remaining value goes negative rather than wrapping.
next_sample_ns: i64,
// Mean of the exponential distribution (nanoseconds).
sample_mean_ns: u64,
// Per-task PRNG for drawing exponential gaps.
rng: SplitMix64,
// Set after `capture()` re-polls the inner future with the real waker.
// The re-poll causes a spurious immediate wake; this flag suppresses
// the next capture to break the busy loop (capture → wake → poll →
// capture → …). Cleared on the next poll so subsequent real wakes
// proceed normally.
just_captured: bool,
}
}
impl<F> TaskDumped<F> {
pub(crate) fn new(inner: F, shared: Arc<SharedState>, task_id: TaskId) -> Self {
let sample_mean_ns = shared.task_dump_idle_threshold_ns.load(Ordering::Relaxed);
// When a fixed seed is configured, use it directly for deterministic
// tests. Otherwise use task_id + timestamp for production uniqueness.
let seed = match shared.task_dump_rng_seed {
Some(s) => s,
None => {
(task_id.to_u64()).wrapping_mul(0x517cc1b727220a95)
^ crate::telemetry::events::clock_monotonic_ns()
}
};
let mut rng = SplitMix64::new(seed);
let next_sample_ns = rng.draw_exponential(sample_mean_ns) as i64;
Self {
inner,
shared,
task_id,
frames: FrameBuf::new(),
pending_capture_ts: None,
next_sample_ns,
sample_mean_ns,
rng,
just_captured: false,
}
}
}
impl<F: Future> Future for TaskDumped<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
let mut this = self.project();
// Fast path: forward without any capture work when either task dumps
// are disabled, or telemetry as a whole is paused.
if !this.shared.task_dumps_enabled.load(Ordering::Relaxed) || !this.shared.is_enabled() {
if this.frames.has_data() {
this.frames.clear();
*this.pending_capture_ts = None;
}
return this.inner.poll(cx);
}
// Poisson sampling over idle time: subtract the idle duration from
// the counter. If it goes to zero or below, emit and redraw a fresh
// interval. Short idles have a small but nonzero chance of being
// sampled (~ idle / mean); long idles are sampled with probability
// approaching 1. At most one emission per poll.
let poll_start = crate::telemetry::recorder::poll_start_ts_or_now();
let should_emit = match *this.pending_capture_ts {
Some(ts) if this.frames.has_data() => {
let idle_ns = poll_start.saturating_sub(ts.get()) as i64;
*this.next_sample_ns -= idle_ns;
*this.next_sample_ns <= 0
}
_ => false,
};
let result = this.inner.as_mut().poll(cx);
if should_emit {
let ts = this
.pending_capture_ts
.expect("checked in match above")
.get();
this.frames.emit(this.shared, *this.task_id, ts);
*this.next_sample_ns = this.rng.draw_exponential(*this.sample_mean_ns) as i64;
}
match &result {
Poll::Ready(_) => {
this.frames.clear();
*this.pending_capture_ts = None;
}
Poll::Pending => {
// Skip capture if this poll was triggered by the spurious wake
// from the previous capture's re-poll. This breaks the busy
// loop: capture → wake → poll → capture → …
if *this.just_captured {
*this.just_captured = false;
} else {
this.frames.capture(this.inner.as_mut(), cx);
*this.just_captured = true;
let poll_end = crate::telemetry::recorder::poll_start_ts_or_now();
*this.pending_capture_ts = NonZeroU64::new(poll_end);
}
}
}
result
}
}
/// Metadata for one captured callchain stored in [`FrameBuf`].
struct ChainMeta {
/// Index into `FrameBuf::ips` where this chain's frames start.
ip_start: usize,
/// Address of the root function (upper trim boundary). `None` means trim
/// to the end of the buffer.
root_addr: Option<*const core::ffi::c_void>,
/// Address of the leaf function (lower trim boundary). `None` means no
/// leaf boundary was available; the chain will be skipped at emit time.
leaf_addr: Option<*const core::ffi::c_void>,
}
// SAFETY: raw pointers are only used for address comparison, never dereferenced
// across threads.
unsafe impl Send for ChainMeta {}
/// Reusable storage for one or more callchains captured during a single
/// `trace_with` sub-poll. Frames are appended flat to `ips`; each new chain's
/// metadata is pushed onto `chains`.
struct FrameBuf {
ips: Vec<u64>,
chains: SmallVec<[ChainMeta; 4]>,
}
impl FrameBuf {
fn new() -> Self {
Self {
ips: Vec::new(),
chains: SmallVec::new(),
}
}
fn clear(&mut self) {
self.ips.clear();
self.chains.clear();
}
fn has_data(&self) -> bool {
!self.chains.is_empty()
}
/// Emit one `TaskDumpEvent` per recorded callchain, then clear.
/// Trimming via `_Unwind_FindEnclosingFunction` happens here (emit path)
/// rather than during capture, keeping the hot path lock-free.
fn emit(&mut self, shared: &SharedState, task_id: TaskId, capture_ts: u64) {
shared.if_enabled(|buf| {
for (i, meta) in self.chains.iter().enumerate() {
let ip_end = self
.chains
.get(i + 1)
.map(|next| next.ip_start)
.unwrap_or(self.ips.len());
let raw = &self.ips[meta.ip_start..ip_end];
let chain = match meta.leaf_addr {
Some(leaf) => crate::unwind::trim_frames(raw, meta.root_addr, leaf),
None => &[],
};
if !chain.is_empty() {
buf.record_encodable_event(&TaskDumpData {
timestamp_ns: capture_ts,
task_id,
callchain: chain,
});
}
}
});
self.clear();
}
/// Capture backtraces at yield points by re-polling `inner` under the
/// real waker inside `trace_with`.
fn capture<F: Future>(&mut self, inner: Pin<&mut F>, cx: &mut Context<'_>) {
if self.ips.capacity() == 0 {
self.ips.reserve(FRAME_BUF_INITIAL_CAPACITY);
}
self.clear();
let ips = &mut self.ips;
let chains = &mut self.chains;
// `trace_with`'s outer closure is `FnOnce`; `Option::take` moves the
// pinned reference in without requiring a `Copy` bound or unsafe.
tokio::runtime::dump::trace_with(
|| {
let _ = inner.poll(cx);
},
|meta| {
let ip_start = ips.len();
// Hot path: collect raw IPs only — no _Unwind_FindEnclosingFunction,
// no dl_iterate_phdr, no global locks. Trimming to root/leaf
// boundaries happens later in emit().
crate::unwind::collect_frames_raw(ips);
// Stash the root/leaf addresses so we can trim at emit time.
chains.push(ChainMeta {
ip_start,
root_addr: meta.root_addr,
leaf_addr: Some(meta.trace_leaf_addr),
});
},
);
}
}
/// Borrowed-callchain view of a task-dump event that implements [`Encodable`]
/// by interning its ips into the batch's stack pool.
pub(crate) struct TaskDumpData<'a> {
pub(crate) timestamp_ns: u64,
pub(crate) task_id: TaskId,
pub(crate) callchain: &'a [u64],
}
impl Encodable for TaskDumpData<'_> {
fn encode(&self, enc: &mut ThreadLocalEncoder<'_>) {
let interned_callchain = enc.intern_stack_frames(self.callchain);
enc.encode(&TaskDumpEvent {
timestamp_ns: self.timestamp_ns,
task_id: self.task_id,
callchain: interned_callchain,
});
}
}