Skip to content

Commit 7dea98c

Browse files
authored
feat: detect uninstrumented task spawns and surface in viewer (#293)
* feat: detect uninstrumented task spawns and surface in viewer Track whether each task was spawned via TelemetryHandle::spawn (with wake tracking) or raw tokio::spawn. Carries an `instrumented` flag through the wire format, logs a per-location summary on shutdown via tracing::debug, and shows a "no wake data" badge in the viewer's task detail panel. * Add instrumented field to bench * Add summary popup and filter for uninstrumented tasks * Drop collecting spawn locations in-app * Make TaskSpawnEvent.instrumented bool, move Option to TelemetryEvent
1 parent 800e9b7 commit 7dea98c

9 files changed

Lines changed: 291 additions & 4 deletions

File tree

dial9-tokio-telemetry/benches/writer_encode.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ fn make_encoded_batch(worker: usize) -> Batch {
5656
timestamp_ns: base + 2000,
5757
task_id: task,
5858
spawn_loc: loc,
59+
instrumented: true,
5960
});
6061
}
6162
for _ in 0..5 {

dial9-tokio-telemetry/src/telemetry/buffer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,14 @@ impl Encodable for RawEvent {
217217
timestamp_nanos,
218218
task_id,
219219
location,
220+
instrumented,
220221
} => {
221222
let spawn_loc = enc.intern_location(location);
222223
enc.encode(&TaskSpawnEvent {
223224
timestamp_ns: *timestamp_nanos,
224225
task_id: *task_id,
225226
spawn_loc,
227+
instrumented: *instrumented,
226228
});
227229
}
228230
RawEvent::TaskTerminate {

dial9-tokio-telemetry/src/telemetry/events.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ pub enum TelemetryEvent {
133133
task_id: TaskId,
134134
/// Interned spawn location of the task.
135135
spawn_loc: InternedString,
136+
/// Whether this task was spawned via [`TelemetryHandle::spawn`].
137+
/// `None` for traces recorded before this field existed.
138+
instrumented: Option<bool>,
136139
},
137140
/// A task terminated (completed or was cancelled).
138141
TaskTerminate {
@@ -319,6 +322,7 @@ pub(crate) enum RawEvent {
319322
timestamp_nanos: u64,
320323
task_id: crate::telemetry::task_metadata::TaskId,
321324
location: &'static std::panic::Location<'static>,
325+
instrumented: bool,
322326
},
323327
TaskTerminate {
324328
timestamp_nanos: u64,
@@ -555,6 +559,7 @@ mod tests {
555559
timestamp_nanos: 5_000_000,
556560
task_id: TaskId::from_u32(1),
557561
spawn_loc: InternedString::from_raw(1),
562+
instrumented: Some(true),
558563
};
559564
assert_eq!(task_spawn.timestamp_nanos(), Some(5_000_000));
560565
}
@@ -592,6 +597,7 @@ mod tests {
592597
timestamp_nanos: 5_000_000,
593598
task_id: TaskId::from_u32(1),
594599
spawn_loc: InternedString::from_raw(1),
600+
instrumented: Some(true),
595601
};
596602
assert!(task_spawn.is_runtime_event());
597603
}

dial9-tokio-telemetry/src/telemetry/format.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ pub struct TaskSpawnEvent {
176176
pub task_id: TaskId,
177177
/// Interned spawn location.
178178
pub spawn_loc: InternedString,
179+
/// Whether this spawn was instrumented (via `TelemetryHandle::spawn`).
180+
pub instrumented: bool,
179181
}
180182

181183
#[derive(TraceEvent)]
@@ -390,6 +392,7 @@ pub(crate) fn to_owned_event(r: TelemetryEventRef<'_>, pool: &StringPool) -> Tel
390392
timestamp_nanos: e.timestamp_ns,
391393
task_id: e.task_id,
392394
spawn_loc: e.spawn_loc,
395+
instrumented: Some(e.instrumented),
393396
},
394397
TelemetryEventRef::TaskTerminate(e) => TelemetryEvent::TaskTerminate {
395398
timestamp_nanos: e.timestamp_ns,

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::telemetry::writer::{RotatingWriter, TraceWriter};
1818
use metrique::timers::Timer;
1919
use metrique::unit::Microsecond;
2020
use metrique::unit_of_work::metrics;
21+
use std::cell::Cell;
2122
use std::cell::RefCell;
2223
use std::path::PathBuf;
2324
use std::sync::Arc;
@@ -28,6 +29,10 @@ thread_local! {
2829
/// Per-thread [`TelemetryHandle`], populated in `on_thread_start` and
2930
/// cleared in `on_thread_stop`. Enables [`TelemetryHandle::current`].
3031
static CURRENT_HANDLE: RefCell<Option<TelemetryHandle>> = const { RefCell::new(None) };
32+
33+
/// Set by `TelemetryHandle::spawn()` before calling `tokio::spawn()`,
34+
/// so the `on_task_spawn` hook can distinguish instrumented from raw spawns.
35+
static INSTRUMENTED_SPAWN: Cell<bool> = const { Cell::new(false) };
3136
}
3237

3338
// ---------------------------------------------------------------------------
@@ -185,10 +190,13 @@ fn register_hooks(
185190
builder.on_task_spawn(move |meta| {
186191
let task_id = TaskId::from(meta.id());
187192
let location = meta.spawned_at();
193+
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get());
194+
188195
s5.record_event(RawEvent::TaskSpawn {
189196
timestamp_nanos: crate::telemetry::events::clock_monotonic_ns(),
190197
task_id,
191198
location,
199+
instrumented,
192200
});
193201
});
194202
let s6 = shared.clone();
@@ -399,13 +407,31 @@ impl TelemetryHandle {
399407
F::Output: Send + 'static,
400408
{
401409
let traced_handle = self.traced_handle();
410+
let _guard = InstrumentedSpawnGuard::set();
402411
tokio::spawn(async move {
403412
let task_id = tokio::task::try_id().map(TaskId::from).unwrap_or_default();
404413
crate::traced::Traced::new(future, traced_handle, task_id).await
405414
})
406415
}
407416
}
408417

418+
/// RAII guard that sets `INSTRUMENTED_SPAWN` to `true` on creation and
419+
/// resets it to `false` on drop, even if `tokio::spawn` panics.
420+
struct InstrumentedSpawnGuard;
421+
422+
impl InstrumentedSpawnGuard {
423+
fn set() -> Self {
424+
INSTRUMENTED_SPAWN.set(true);
425+
Self
426+
}
427+
}
428+
429+
impl Drop for InstrumentedSpawnGuard {
430+
fn drop(&mut self) {
431+
INSTRUMENTED_SPAWN.set(false);
432+
}
433+
}
434+
409435
/// Handle for spawning wake-tracked futures on a specific runtime.
410436
///
411437
/// Returned by [`TraceRuntimeCoreBuilder::build`]. Unlike [`TelemetryHandle::spawn`]
@@ -428,6 +454,7 @@ impl RuntimeTelemetryHandle {
428454
F::Output: Send + 'static,
429455
{
430456
let traced = self.traced.clone();
457+
let _guard = InstrumentedSpawnGuard::set();
431458
self.runtime.spawn(async move {
432459
let task_id = tokio::task::try_id().map(TaskId::from).unwrap_or_default();
433460
crate::traced::Traced::new(future, traced, task_id).await
@@ -581,6 +608,7 @@ impl Drop for TelemetryGuard {
581608
fn drop(&mut self) {
582609
// 1. Stop the flush thread (flushes + finalizes)
583610
self.stop_flush_thread();
611+
584612
// 2. Hard shutdown: drop the sender without sending — worker sees
585613
// RecvError and exits without draining. No need to join the thread.
586614
// For graceful drain, use graceful_shutdown() instead.
@@ -1484,6 +1512,7 @@ mod tests {
14841512
timestamp_nanos: (i as u64 + 1) * 1000,
14851513
task_id,
14861514
location: loc,
1515+
instrumented: true,
14871516
},
14881517
&collector,
14891518
&drain_epoch,

dial9-tokio-telemetry/tests/end_to_end.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,105 @@ fn custom_event_appears_in_trace() {
196196
.unwrap();
197197
assert_eq!(custom_count, 5, "expected 5 MyCustomEvent events in trace");
198198
}
199+
200+
#[test]
201+
fn spawn_audit_detects_uninstrumented_spawns() {
202+
let dir = tempfile::tempdir().unwrap();
203+
let trace_path = dir.path().join("trace.bin");
204+
205+
const RAW: usize = 5;
206+
const INSTRUMENTED: usize = 3;
207+
208+
let mut builder = tokio::runtime::Builder::new_multi_thread();
209+
builder.worker_threads(2).enable_all();
210+
211+
let writer = RotatingWriter::single_file(&trace_path).unwrap();
212+
let (runtime, guard) = TracedRuntime::builder()
213+
.with_task_tracking(true)
214+
.build_and_start(builder, writer)
215+
.unwrap();
216+
217+
let handle = guard.handle();
218+
219+
runtime.block_on(async {
220+
let mut joins = Vec::new();
221+
222+
// These go through TelemetryHandle::spawn, should NOT be flagged.
223+
for _ in 0..INSTRUMENTED {
224+
joins.push(handle.spawn(async {}));
225+
}
226+
227+
// These are raw tokio::spawn, SHOULD be flagged, all at the same line.
228+
for _ in 0..RAW {
229+
joins.push(tokio::spawn(async {}));
230+
}
231+
232+
for j in joins {
233+
j.await.unwrap();
234+
}
235+
236+
// Wait for flush cycle to drain thread-local buffers.
237+
tokio::time::sleep(Duration::from_millis(200)).await;
238+
});
239+
240+
drop(runtime);
241+
drop(guard);
242+
243+
// Read the trace back from disk and check the instrumented flag.
244+
let sealed_path = dir.path().join("trace.0.bin");
245+
let reader = TraceReader::new(sealed_path.to_str().unwrap()).unwrap();
246+
let events = &reader.all_events;
247+
248+
let instrumented_count = events
249+
.iter()
250+
.filter(|e| {
251+
matches!(
252+
e,
253+
TelemetryEvent::TaskSpawn {
254+
instrumented: Some(true),
255+
..
256+
}
257+
)
258+
})
259+
.count();
260+
let uninstrumented_count = events
261+
.iter()
262+
.filter(|e| {
263+
matches!(
264+
e,
265+
TelemetryEvent::TaskSpawn {
266+
instrumented: Some(false),
267+
..
268+
}
269+
)
270+
})
271+
.count();
272+
273+
assert_eq!(
274+
instrumented_count, INSTRUMENTED,
275+
"expected {INSTRUMENTED} instrumented spawns, got {instrumented_count}"
276+
);
277+
assert_eq!(
278+
uninstrumented_count, RAW,
279+
"expected {RAW} uninstrumented spawns, got {uninstrumented_count}"
280+
);
281+
282+
// Verify spawn locations resolve and point to this test file.
283+
for event in events {
284+
if let TelemetryEvent::TaskSpawn {
285+
spawn_loc,
286+
instrumented: Some(false),
287+
..
288+
} = event
289+
{
290+
let loc = reader
291+
.spawn_locations
292+
.get(spawn_loc)
293+
.expect("uninstrumented spawn_loc should resolve");
294+
assert!(
295+
loc.contains("end_to_end.rs"),
296+
"uninstrumented spawn should point to this test file, got: {loc}"
297+
);
298+
}
299+
}
300+
}

dial9-viewer/ui/trace_analysis.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,22 @@
393393
}
394394
}
395395

396+
if (filterType === "uninstrumented" && opts && opts.taskInstrumented) {
397+
for (const w of workerIds) {
398+
for (const s of workerSpans[w].polls) {
399+
if (s.taskId && opts.taskInstrumented.get(s.taskId) === false) {
400+
points.push({
401+
time: s.start,
402+
worker: w,
403+
type: "uninstrumented",
404+
value: (s.end - s.start) / 1e6,
405+
span: s,
406+
});
407+
}
408+
}
409+
}
410+
}
411+
396412
if (opts && opts.sortByWorst) {
397413
points.sort((a, b) => b.value - a.value);
398414
} else {

dial9-viewer/ui/trace_parser.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
* taskSpawnLocs: Map<number, string|null>,
9494
* taskSpawnTimes: Map<number, number>,
9595
* taskTerminateTimes: Map<number, number>,
96+
* taskInstrumented: Map<number, boolean>,
9697
* cpuSamples: CpuSample[],
9798
* callframeSymbols: Map<string, SymbolFrame|SymbolFrame[]>,
9899
* threadNames: Map<number, string>,
@@ -186,6 +187,7 @@
186187
const taskSpawnLocs = new Map();
187188
const taskSpawnTimes = new Map();
188189
const taskTerminateTimes = new Map();
190+
const taskInstrumented = new Map(); // taskId -> bool (true if spawned via TelemetryHandle::spawn)
189191
const callframeSymbols = new Map();
190192
const cpuSamples = [];
191193
const threadNames = new Map();
@@ -323,9 +325,11 @@
323325
case "TaskSpawnEvent": {
324326
const taskId = num(v.task_id);
325327
const spawnLoc = v.spawn_loc || null;
328+
const instrumented = v.instrumented ?? true;
326329
if (spawnLoc) spawnLocations.set(spawnLoc, spawnLoc);
327330
taskSpawnLocs.set(taskId, spawnLoc);
328331
taskSpawnTimes.set(taskId, ts);
332+
taskInstrumented.set(taskId, !!instrumented);
329333
break;
330334
}
331335
case "TaskTerminateEvent":
@@ -474,6 +478,7 @@
474478
spawnLocations,
475479
taskSpawnLocs,
476480
taskSpawnTimes,
481+
taskInstrumented,
477482
cpuSamples,
478483
callframeSymbols,
479484
threadNames,

0 commit comments

Comments
 (0)