Skip to content

Commit 4c820c2

Browse files
authored
feat: instrumented joinsets and other custom spawns (#392)
* `spawn_with` public api + `TracedFuture` * spawn_with related tests * correct test * rename and update docs
1 parent 5e905e8 commit 4c820c2

8 files changed

Lines changed: 605 additions & 79 deletions

File tree

dial9-tokio-telemetry/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ pub use current_config::{
4343
Dial9Config, Dial9ConfigBuilder, Dial9ConfigBuilderError, ValidationError,
4444
};
4545
pub use dial9_macro::main;
46-
pub use telemetry::{TelemetryRuntimeError, TracedRuntime, spawn};
46+
pub use telemetry::{TelemetryRuntimeError, TracedFuture, TracedRuntime, spawn};

dial9-tokio-telemetry/src/task_dumped.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
//! `TaskDumped<F>` wraps a future and captures async backtraces at yield
22
//! points using Poisson sampling keyed on idle duration.
33
//!
4-
//! This wrapper is intentionally separate from [`crate::traced::Traced`]: the
5-
//! wake-event capture in `Traced` runs on every instrumented spawn regardless
6-
//! of the `taskdump` feature, while task-dump capture is gated behind the
7-
//! `taskdump` feature and its own runtime toggle. Typical stacking is
8-
//! `Traced<TaskDumped<F>>`.
4+
//! This wrapper is intentionally separate from the wake-event wrapper: wake
5+
//! capture runs on every instrumented spawn regardless of the `taskdump`
6+
//! feature, while task-dump capture is gated behind the `taskdump` feature and
7+
//! its own runtime toggle. Typical stacking is `WakeTraced<TaskDumped<F>>`.
98
//!
109
//! # Sampling model
1110
//!

dial9-tokio-telemetry/src/telemetry/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod task_dump_config;
1717
pub(crate) mod task_metadata;
1818
pub(crate) mod writer;
1919

20+
pub use crate::traced::TracedFuture;
2021
pub use buffer::{Encodable, ThreadLocalEncoder};
2122
pub use events::{CpuSampleSource, TelemetryEvent, clock_monotonic_ns};
2223
pub use format::{

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 145 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ crate::primitives::thread_local! {
3333
/// cleared in `on_thread_stop`. Enables [`TelemetryHandle::current`].
3434
static CURRENT_HANDLE: RefCell<Option<TelemetryHandle>> = const { RefCell::new(None) };
3535

36-
/// Set by `TelemetryHandle::spawn()` before calling `tokio::spawn()`,
37-
/// so the `on_task_spawn` hook can distinguish instrumented from raw spawns.
38-
static INSTRUMENTED_SPAWN: Cell<bool> = const { Cell::new(false) };
36+
/// Nest count for [`InstrumentedSpawnGuard`]. `on_task_spawn` treats
37+
/// any value `> 0` as an instrumented spawn.
38+
static INSTRUMENTED_SPAWN: Cell<u32> = const { Cell::new(0) };
3939
}
4040

4141
// ---------------------------------------------------------------------------
@@ -204,7 +204,7 @@ fn register_hooks(
204204
s5.if_enabled(|buf| {
205205
let task_id = TaskId::from(meta.id());
206206
let location = meta.spawned_at();
207-
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get());
207+
let instrumented = INSTRUMENTED_SPAWN.with(|f| f.get()) > 0;
208208
let timestamp_ns = crate::telemetry::events::clock_monotonic_ns();
209209
buf.record_encodable_event(&runtime_context::TaskSpawn {
210210
timestamp_ns,
@@ -506,42 +506,75 @@ impl TelemetryHandle {
506506
{
507507
match self.traced_handle() {
508508
Some(traced_handle) => {
509-
let _guard = InstrumentedSpawnGuard::set();
510-
tokio::spawn(async move {
511-
let task_id = tokio::task::try_id().map(TaskId::from).unwrap_or_default();
512-
let inner = wrap_task_dumped(future, traced_handle.shared.clone(), task_id);
513-
crate::traced::Traced::new(inner, traced_handle, task_id).await
514-
})
509+
let _guard = InstrumentedSpawnGuard::enter();
510+
tokio::spawn(crate::telemetry::TracedFuture::new(
511+
future,
512+
Some(traced_handle),
513+
))
515514
}
516515
None => tokio::spawn(future),
517516
}
518517
}
519-
}
520-
521-
/// If the `taskdump` feature is on, wrap `future` in `TaskDumped<F>`; otherwise
522-
/// pass through unchanged. Factored so `TelemetryHandle::spawn` stays readable.
523-
#[cfg(feature = "taskdump")]
524-
fn wrap_task_dumped<F>(
525-
future: F,
526-
shared: Arc<crate::telemetry::recorder::SharedState>,
527-
task_id: TaskId,
528-
) -> crate::task_dumped::TaskDumped<F>
529-
where
530-
F: std::future::Future,
531-
{
532-
crate::task_dumped::TaskDumped::new(future, shared, task_id)
533-
}
534518

535-
#[cfg(not(feature = "taskdump"))]
536-
fn wrap_task_dumped<F>(
537-
future: F,
538-
_shared: Arc<crate::telemetry::recorder::SharedState>,
539-
_task_id: TaskId,
540-
) -> F
541-
where
542-
F: std::future::Future,
543-
{
544-
future
519+
/// Spawn an instrumented future through a user-supplied spawn function.
520+
///
521+
/// `spawn_fn` must synchronously perform a real Tokio spawn (or an
522+
/// equivalent operation) before returning; do not defer the future or run
523+
/// it with `block_on`. To record the resulting task as instrumented, spawn
524+
/// on a dial9-traced runtime with task tracking enabled. The closure's
525+
/// return value is forwarded back to the caller, so you can keep the
526+
/// [`tokio::task::JoinHandle`], [`tokio::task::AbortHandle`], or whatever
527+
/// the spawn function returns.
528+
///
529+
/// # Examples
530+
///
531+
/// Spawn into a [`tokio::task::JoinSet`]:
532+
///
533+
/// ```rust,no_run
534+
/// # use dial9_tokio_telemetry::telemetry::TelemetryHandle;
535+
/// # use tokio::task::JoinSet;
536+
/// # async fn work() {}
537+
/// # async fn demo() {
538+
/// let handle = TelemetryHandle::current();
539+
/// let mut set: JoinSet<()> = JoinSet::new();
540+
/// handle.spawn_with(work(), |f| set.spawn(f));
541+
/// # }
542+
/// ```
543+
///
544+
/// Spawn into a [`tokio::task::JoinSet`] on a specific runtime:
545+
///
546+
/// ```rust,no_run
547+
/// # use dial9_tokio_telemetry::telemetry::TelemetryHandle;
548+
/// # use tokio::runtime::Runtime;
549+
/// # use tokio::task::JoinSet;
550+
/// # async fn work() {}
551+
/// # fn demo(runtime: &Runtime) {
552+
/// let handle = TelemetryHandle::current();
553+
/// let mut set: JoinSet<()> = JoinSet::new();
554+
/// handle.spawn_with(work(), |f| set.spawn_on(f, runtime.handle()));
555+
/// # }
556+
/// ```
557+
///
558+
/// [`TracedFuture<F>`]: crate::telemetry::TracedFuture
559+
pub fn spawn_with<F, S>(
560+
&self,
561+
future: F,
562+
spawn_fn: impl FnOnce(crate::telemetry::TracedFuture<F>) -> S,
563+
) -> S
564+
where
565+
F: std::future::Future + Send + 'static,
566+
F::Output: Send + 'static,
567+
{
568+
let traced_handle = self.traced_handle();
569+
let future = crate::telemetry::TracedFuture::new(future, traced_handle.clone());
570+
match traced_handle {
571+
Some(_) => {
572+
let _guard = InstrumentedSpawnGuard::enter();
573+
spawn_fn(future)
574+
}
575+
None => spawn_fn(future),
576+
}
577+
}
545578
}
546579

547580
/// Spawn a traced task on the current tokio runtime.
@@ -565,24 +598,24 @@ where
565598
TelemetryHandle::current().spawn(future)
566599
}
567600

568-
/// RAII guard that sets `INSTRUMENTED_SPAWN` to `true` on creation and
569-
/// resets it to `false` on drop, even if `tokio::spawn` panics.
601+
/// RAII guard that increments `INSTRUMENTED_SPAWN` on creation and
602+
/// decrements it on drop, even if the protected closure panics.
570603
struct InstrumentedSpawnGuard;
571604

572605
impl InstrumentedSpawnGuard {
573-
fn set() -> Self {
574-
INSTRUMENTED_SPAWN.with(|c| c.set(true));
606+
fn enter() -> Self {
607+
INSTRUMENTED_SPAWN.with(|c| c.set(c.get().saturating_add(1)));
575608
Self
576609
}
577610
}
578611

579612
impl Drop for InstrumentedSpawnGuard {
580613
fn drop(&mut self) {
581-
INSTRUMENTED_SPAWN.with(|c| c.set(false));
614+
INSTRUMENTED_SPAWN.with(|c| c.set(c.get().saturating_sub(1)));
582615
}
583616
}
584617

585-
/// Handle for spawning wake-tracked futures on a specific runtime.
618+
/// Handle for spawning instrumented futures on a specific runtime.
586619
///
587620
/// Returned by [`TraceRuntimeCoreBuilder::build`]. Unlike [`TelemetryHandle::spawn`]
588621
/// which uses `tokio::spawn()` (requiring an ambient runtime context), this type
@@ -606,18 +639,61 @@ impl RuntimeTelemetryHandle {
606639
F: std::future::Future + Send + 'static,
607640
F::Output: Send + 'static,
608641
{
609-
match &self.traced {
610-
Some(traced) => {
611-
let traced = traced.clone();
612-
let _guard = InstrumentedSpawnGuard::set();
613-
self.runtime.spawn(async move {
614-
let task_id = tokio::task::try_id().map(TaskId::from).unwrap_or_default();
615-
crate::traced::Traced::new(future, traced, task_id).await
616-
})
642+
match self.traced.clone() {
643+
Some(traced_handle) => {
644+
let _guard = InstrumentedSpawnGuard::enter();
645+
self.runtime.spawn(crate::telemetry::TracedFuture::new(
646+
future,
647+
Some(traced_handle),
648+
))
617649
}
618650
None => self.runtime.spawn(future),
619651
}
620652
}
653+
654+
/// Spawn an instrumented future through a user-supplied spawn function.
655+
///
656+
/// Mirrors [`TelemetryHandle::spawn_with`] for callers that already hold a
657+
/// [`RuntimeTelemetryHandle`]. `spawn_fn` must synchronously perform a real
658+
/// Tokio spawn (or an equivalent operation) before returning; do not defer
659+
/// the future or run it with `block_on`. To record the resulting task as
660+
/// instrumented, target a dial9-traced runtime with task tracking enabled,
661+
/// typically via [`tokio::task::JoinSet::spawn_on`] with the appropriate
662+
/// [`tokio::runtime::Handle`].
663+
///
664+
/// # Examples
665+
///
666+
/// Spawn into a [`tokio::task::JoinSet`] on a specific runtime:
667+
///
668+
/// ```rust,no_run
669+
/// # use dial9_tokio_telemetry::telemetry::RuntimeTelemetryHandle;
670+
/// # use tokio::runtime::Runtime;
671+
/// # use tokio::task::JoinSet;
672+
/// # async fn work() {}
673+
/// # fn demo(runtime: &Runtime, handle: RuntimeTelemetryHandle, set: &mut JoinSet<()>) {
674+
/// handle.spawn_with(work(), |f| set.spawn_on(f, runtime.handle()));
675+
/// # }
676+
/// ```
677+
///
678+
/// [`TracedFuture<F>`]: crate::telemetry::TracedFuture
679+
pub fn spawn_with<F, S>(
680+
&self,
681+
future: F,
682+
spawn_fn: impl FnOnce(crate::telemetry::TracedFuture<F>) -> S,
683+
) -> S
684+
where
685+
F: std::future::Future + Send + 'static,
686+
F::Output: Send + 'static,
687+
{
688+
let future = crate::telemetry::TracedFuture::new(future, self.traced.clone());
689+
match self.traced {
690+
Some(_) => {
691+
let _guard = InstrumentedSpawnGuard::enter();
692+
spawn_fn(future)
693+
}
694+
None => spawn_fn(future),
695+
}
696+
}
621697
}
622698

623699
/// Holds the background worker thread and its stop signal.
@@ -1378,7 +1454,7 @@ impl<'a> TraceRuntimeCoreBuilder<'a> {
13781454
/// Install telemetry hooks, build the runtime, and reserve worker IDs.
13791455
///
13801456
/// Returns the runtime and a [`RuntimeTelemetryHandle`] for spawning
1381-
/// wake-tracked futures via [`RuntimeTelemetryHandle::spawn`].
1457+
/// instrumented futures via [`RuntimeTelemetryHandle::spawn`].
13821458
pub fn build(
13831459
self,
13841460
mut builder: tokio::runtime::Builder,
@@ -2081,6 +2157,23 @@ mod tests {
20812157
Ok(())
20822158
}
20832159
}
2160+
2161+
/// Nested `InstrumentedSpawnGuard`s must compose: inner drop must not
2162+
/// clear the outer scope. Counter, not flag.
2163+
#[test]
2164+
fn instrumented_spawn_guard_nests() {
2165+
assert_eq!(INSTRUMENTED_SPAWN.with(|c| c.get()), 0);
2166+
let outer = InstrumentedSpawnGuard::enter();
2167+
assert_eq!(INSTRUMENTED_SPAWN.with(|c| c.get()), 1);
2168+
{
2169+
let _inner = InstrumentedSpawnGuard::enter();
2170+
assert_eq!(INSTRUMENTED_SPAWN.with(|c| c.get()), 2);
2171+
}
2172+
assert_eq!(INSTRUMENTED_SPAWN.with(|c| c.get()), 1);
2173+
drop(outer);
2174+
assert_eq!(INSTRUMENTED_SPAWN.with(|c| c.get()), 0);
2175+
}
2176+
20842177
#[test]
20852178
fn current_thread_runtime_resolves_worker_ids() {
20862179
let data = Arc::new(std::sync::Mutex::new(Vec::<u8>::new()));
@@ -2491,7 +2584,7 @@ mod tests {
24912584
.build_and_attach_to_telemetry(builder_b, &guard)
24922585
.unwrap();
24932586

2494-
// Use handle.spawn on runtime B to get Traced waker wrapping → wake events.
2587+
// Use handle.spawn on runtime B to get wake-tracked wrapping → wake events.
24952588
let handle = guard.handle();
24962589
runtime_b.block_on(async {
24972590
let mut handles = Vec::new();
@@ -2909,7 +3002,7 @@ mod tests {
29093002
drop(runtime);
29103003
drop(guard);
29113004

2912-
// Verify wake events were recorded (handle.spawn wraps with Traced)
3005+
// Verify wake events were recorded (handle.spawn wraps with wake tracking)
29133006
let raw = data.lock().unwrap();
29143007
let events = crate::telemetry::format::decode_events(&raw).unwrap();
29153008
let wake_count = events

dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub(crate) struct SharedState {
3434
pub(crate) task_dump_idle_threshold_ns: AtomicU64,
3535
/// Fixed RNG seed for deterministic task dump sampling. Set once at
3636
/// construction before the `Arc` is shared; read-only thereafter.
37+
#[cfg_attr(not(feature = "taskdump"), allow(dead_code))]
3738
pub(crate) task_dump_rng_seed: Option<u64>,
3839
pub(crate) collector: Arc<CentralCollector>,
3940
/// Absolute `CLOCK_MONOTONIC` nanosecond timestamp captured at trace start.
@@ -85,7 +86,7 @@ impl SharedState {
8586
}
8687

8788
/// Create a wake event. Pragmatic exception: calls `tokio::task::try_id()`
88-
/// because `Traced` is inherently tokio-specific.
89+
/// because the wake-tracking future is inherently tokio-specific.
8990
pub(crate) fn create_wake_event(
9091
&self,
9192
woken_task_id: TaskId,
@@ -106,7 +107,7 @@ impl SharedState {
106107
/// provides an [`EventBuffer`] that makes it structurally impossible to
107108
/// record without checking first. Use `is_enabled()` only for
108109
/// control-flow decisions that don't directly record events (e.g.
109-
/// deciding whether to wrap a waker in `Traced::poll`).
110+
/// deciding whether to wrap a waker in wake-tracking polls).
110111
pub(crate) fn is_enabled(&self) -> bool {
111112
self.enabled.load(Ordering::Relaxed)
112113
}

0 commit comments

Comments
 (0)