diff --git a/.github/actions/rust-build/action.yml b/.github/actions/rust-build/action.yml index e82134c1..717a5a95 100644 --- a/.github/actions/rust-build/action.yml +++ b/.github/actions/rust-build/action.yml @@ -32,8 +32,12 @@ runs: if [ "${{ inputs.toolchain }}" = nightly ]; then rm -fv Cargo.lock fi + # tokio's `taskdump` feature only compiles on linux aarch64/x86/x86_64. + # On macOS drop it from --all-features; leave everything else intact. if [ "$RUNNER_OS" = "Linux" ]; then # Raise locked-memory limit for large perf ring buffers sudo prlimit --pid $$ --memlock=unlimited:unlimited + cargo test --all-targets --all-features + else + cargo test --all-targets --features __nonlinux_all_features fi - cargo test --all-targets --all-features diff --git a/Cargo.lock b/Cargo.lock index 58dde43d..aaa67b47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,15 @@ dependencies = [ "regex", ] +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -794,6 +803,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide 0.8.9", + "object", + "rustc-demangle", + "windows-link", +] + [[package]] name = "base16ct" version = "0.1.1" @@ -1466,6 +1490,7 @@ dependencies = [ "aws-config", "aws-sdk-s3", "aws-sdk-s3-transfer-manager", + "backtrace", "bon", "clap", "criterion", @@ -2943,6 +2968,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3c00a0c9600379bd32f8972de90676a7672cba3bf4886986bc05902afc1e093" +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -4248,10 +4282,11 @@ dependencies = [ [[package]] name = "tokio" -version = "1.51.1" +version = "1.52.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" +checksum = "110a78583f19d5cdb2c5ccf321d1290344e71313c6c37d43520d386027d18386" dependencies = [ + "backtrace", "bytes", "libc", "mio", diff --git a/dial9-tokio-telemetry/Cargo.toml b/dial9-tokio-telemetry/Cargo.toml index 57481011..ab53f500 100644 --- a/dial9-tokio-telemetry/Cargo.toml +++ b/dial9-tokio-telemetry/Cargo.toml @@ -19,7 +19,7 @@ all-features = true unexpected_cfgs = { level = "warn", check-cfg = ["cfg(shuttle)"] } [dependencies] -tokio = { version = "1.51.0", features = ["rt", "macros", "time", "rt-multi-thread", "sync", "net", "io-util"] } +tokio = { version = "1.52.0", features = ["rt", "macros", "time", "rt-multi-thread", "sync", "net", "io-util"] } tokio-util = "0.7" arc-swap = "1" crossbeam-queue = "0.3" @@ -29,6 +29,7 @@ pin-project-lite = "0.2" serde = { version = "1", features = ["derive"] } serde_json = "1" smallvec = "1" +backtrace = { version = "0.3", optional = true } dial9-perf-self-profile = { workspace = true, optional = true } dial9-trace-format = { workspace = true, features = ["serde"] } tracing = "0.1.44" @@ -50,8 +51,15 @@ shuttle = { version = "0.9.1", optional = true } analysis = [] cpu-profiling = ["dep:dial9-perf-self-profile"] _shuttle = ["dep:shuttle", "metrique-timesource/custom-timesource", "metrique-timesource/test-util"] +## Capture async backtraces at tokio yield points. Linux-only +## (aarch64/x86/x86_64): tokio's upstream `taskdump` feature refuses to +## compile on other targets. Enabling this feature on an unsupported target +## is a hard compile error from tokio. +taskdump = ["tokio/taskdump", "dep:backtrace"] tracing-layer = ["dep:tracing-subscriber"] worker-s3 = ["dep:aws-sdk-s3-transfer-manager", "dep:aws-sdk-s3", "dep:aws-config", "dep:time"] +## All features except platform-specific ones (taskdump). Used in CI for non-Linux targets. +__nonlinux_all_features = ["analysis", "cpu-profiling", "tracing-layer", "worker-s3"] [dev-dependencies] dial9-tokio-telemetry = { path = ".", features = ["analysis", "tracing-layer", "worker-s3"] } @@ -61,7 +69,7 @@ clap = { version = "4", features = ["derive", "env"] } hdrhistogram = "7" metrique-timesource = { version = "0.1", features = ["custom-timesource", "tokio"] } metrique-writer = { version = "0.1", features = ["test-util"] } -tokio = { version = "1.51.0", features = ["test-util"] } +tokio = { version = "1.52.0", features = ["test-util"] } proptest = "1" tempfile = "3" tracing-subscriber = { version = "0.3", features = ["env-filter"] } @@ -74,7 +82,7 @@ async-trait = "0.1.89" uuid = { version = "1", features = ["v4"] } [target.'cfg(target_os = "linux")'.dev-dependencies] -dial9-tokio-telemetry = { path = ".", features = ["cpu-profiling", "worker-s3", "analysis", "tracing-layer"] } +dial9-tokio-telemetry = { path = ".", features = ["cpu-profiling", "worker-s3", "analysis", "taskdump", "tracing-layer"] } nix = { version = "0.29", features = ["process"] } [[bench]] diff --git a/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs b/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs index 3090f2c4..7f40cd56 100644 --- a/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs +++ b/dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs @@ -180,6 +180,7 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option None, diff --git a/dial9-tokio-telemetry/src/lib.rs b/dial9-tokio-telemetry/src/lib.rs index 796b9b4a..6ad68c68 100644 --- a/dial9-tokio-telemetry/src/lib.rs +++ b/dial9-tokio-telemetry/src/lib.rs @@ -16,6 +16,8 @@ pub mod background_task; pub(crate) mod metrics; pub(crate) mod primitives; pub(crate) mod rate_limit; +#[cfg(feature = "taskdump")] +pub(crate) mod task_dumped; /// Core telemetry types, recording, and trace I/O. pub mod telemetry; pub(crate) mod traced; diff --git a/dial9-tokio-telemetry/src/task_dumped.rs b/dial9-tokio-telemetry/src/task_dumped.rs new file mode 100644 index 00000000..202d7989 --- /dev/null +++ b/dial9-tokio-telemetry/src/task_dumped.rs @@ -0,0 +1,260 @@ +//! `TaskDumped` wraps a future and captures async backtraces at yield +//! points when the task stays idle longer than the configured threshold. +//! +//! This wrapper is intentionally separate from [`crate::traced::Traced`]: the +//! wake-event capture in `Traced` runs on every instrumented spawn regardless +//! of the `taskdump` feature, while task-dump capture is gated behind the +//! `taskdump` feature and its own runtime toggle. Typical stacking is +//! `Traced>`. +//! +//! # Capture model +//! +//! On each poll, the wrapper inspects the elapsed time since the last capture +//! point (i.e., the task's idle duration leading up to this poll). If that +//! idle exceeded the configured threshold, frames captured at the *previous* +//! yield point are emitted as `TaskDump` events. If the current poll returns +//! `Pending`, a fresh capture is taken via [`tokio::runtime::dump::trace_with`] +//! so that the next poll's decision has fresh data. +//! +//! The capture runs a second `poll` of the inner future under a no-op waker +//! inside `trace_with`. Tokio yield points use the *inner* context's waker +//! (noop) rather than the real executor waker, so this does not produce a +//! duplicate `WakeEvent`, and the `PollStart`/`PollEnd` hooks run only on the +//! outer scheduler call, not on the trace_with sub-poll. +//! +//! # Allocation +//! +//! Captured instruction pointers are stored flat in [`FrameBuf`] across all +//! yield points hit during a capture, with offsets recording each callchain's +//! start. The buffers are reused across polls. + +use crate::telemetry::format::TaskDumpEvent; +use crate::telemetry::recorder::SharedState; +use crate::telemetry::task_metadata::TaskId; +use crate::telemetry::{Encodable, ThreadLocalEncoder}; +use pin_project_lite::pin_project; +use smallvec::SmallVec; +use std::future::Future; +use std::num::NonZeroU64; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::task::{Context, Poll, Waker}; + +/// Initial heap reservation for the instruction-pointer buffer on first capture. +const FRAME_BUF_INITIAL_CAPACITY: usize = 256; + +pin_project! { + /// Future wrapper that captures async backtraces at yield points when + /// the enclosing task has been idle longer than the configured threshold. + pub(crate) struct TaskDumped { + #[pin] + inner: F, + shared: Arc, + task_id: TaskId, + frames: FrameBuf, + // Monotonic nanoseconds when the frames in `frames` were captured. + // Only meaningful when `frames.has_data()`. + pending_capture_ts: Option, + // Snapshot of SharedState::task_dump_idle_threshold_ns at construction; + // promotes the atomic load off the poll hot path. + idle_threshold_ns: u64, + } +} + +impl TaskDumped { + pub(crate) fn new(inner: F, shared: Arc, task_id: TaskId) -> Self { + let idle_threshold_ns = shared.task_dump_idle_threshold_ns.load(Ordering::Relaxed); + Self { + inner, + shared, + task_id, + frames: FrameBuf::new(), + pending_capture_ts: None, + idle_threshold_ns, + } + } +} + +impl Future for TaskDumped { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + // Fast path: forward without any capture work when either task dumps + // are disabled, or telemetry as a whole is paused. Checking both here + // skips the re-poll under `trace_with` (not just the emit), so a + // paused guard imposes no overhead beyond two relaxed atomic loads. + if !this.shared.task_dumps_enabled.load(Ordering::Relaxed) || !this.shared.is_enabled() { + // Stale pending frames become meaningless if we stopped capturing — + // drop them so we don't emit a dump attributed to an old idle gap + // after capture resumes. + if this.frames.has_data() { + this.frames.clear(); + *this.pending_capture_ts = None; + } + return this.inner.poll(cx); + } + + // If we have captured frames from a previous idle, decide whether + // that idle was long enough to emit. + let poll_start = crate::telemetry::recorder::poll_start_ts_or_now(); + let should_emit = match *this.pending_capture_ts { + Some(ts) if this.frames.has_data() => { + poll_start.saturating_sub(ts.get()) > *this.idle_threshold_ns + } + _ => false, + }; + + let result = this.inner.as_mut().poll(cx); + + if should_emit { + this.frames.emit( + this.shared, + *this.task_id, + this.pending_capture_ts.unwrap().get(), + ); + } + + match &result { + Poll::Ready(_) => { + // Terminal. Nothing more to capture; discard stale frames. + this.frames.clear(); + *this.pending_capture_ts = None; + } + Poll::Pending => { + // Capture the yield point we just landed on, for the next + // poll's threshold check. + this.frames.capture(this.inner.as_mut()); + *this.pending_capture_ts = NonZeroU64::new(poll_start); + } + } + + result + } +} + +/// Reusable storage for one or more callchains captured during a single +/// `trace_with` sub-poll. Frames are appended flat to `ips`; each new chain's +/// start index is pushed onto `offsets`. +struct FrameBuf { + ips: Vec, + offsets: SmallVec<[usize; 8]>, +} + +impl FrameBuf { + fn new() -> Self { + Self { + ips: Vec::new(), + offsets: SmallVec::new(), + } + } + + fn clear(&mut self) { + self.ips.clear(); + self.offsets.clear(); + } + + fn has_data(&self) -> bool { + !self.offsets.is_empty() + } + + /// Emit one `TaskDumpEvent` per recorded callchain, then clear. + fn emit(&mut self, shared: &SharedState, task_id: TaskId, capture_ts: u64) { + shared.if_enabled(|buf| { + for i in 0..self.offsets.len() { + let start = self.offsets[i]; + let end = self.offsets.get(i + 1).copied().unwrap_or(self.ips.len()); + buf.record_encodable_event(&TaskDumpData { + timestamp_ns: capture_ts, + task_id, + callchain: &self.ips[start..end], + }); + } + }); + self.clear(); + } + + /// Capture backtraces at yield points by re-polling `inner` under a no-op + /// waker inside `trace_with`. + fn capture(&mut self, inner: Pin<&mut F>) { + if self.ips.capacity() == 0 { + self.ips.reserve(FRAME_BUF_INITIAL_CAPACITY); + } + self.clear(); + + // Noop waker so any waker registration performed during this + // diagnostic re-poll is discarded, avoiding duplicate wake events. + let noop = Waker::noop(); + let mut noop_cx = Context::from_waker(noop); + let ips = &mut self.ips; + let offsets = &mut self.offsets; + + // `trace_with`'s outer closure is `FnOnce`; `Option::take` moves the + // pinned reference in without requiring a `Copy` bound or unsafe. + tokio::runtime::dump::trace_with( + || { + let _ = inner.poll(&mut noop_cx); + }, + |meta| { + offsets.push(ips.len()); + capture_frames(ips, meta.root_addr, meta.trace_leaf_addr); + }, + ); + } +} + +/// Walk the stack via [`backtrace::trace`], skipping frames at or below +/// `leaf_addr` (tokio's `trace_leaf` function — any frames at or below it +/// are internal plumbing) and stopping at `root_addr` (tokio's `Root::poll` +/// boundary). `root_addr` is always `None` today because we don't wrap +/// spawned tasks in `Root`, but we keep the check defensively in case that +/// changes — unwrapping the whole stack otherwise walks through scheduler +/// internals that the caller has no use for. +/// +/// Held behind the `backtrace` crate's process-wide mutex. This will be optimized in +/// https://github.com/dial9-rs/dial9-tokio-telemetry/issues/357. This is not a +/// complete blocker for the MvP because no code is utilizing the symbolizing from +/// `backtrace` so the mutex is not actually held for long periods of time. Nevertheless, +/// this will be a fast follow. +fn capture_frames( + ips: &mut Vec, + root_addr: Option<*const core::ffi::c_void>, + leaf_addr: *const core::ffi::c_void, +) { + let mut above_leaf = false; + backtrace::trace(|frame| { + let sym = frame.symbol_address(); + let below_root = root_addr.is_none_or(|root| !std::ptr::eq(sym, root)); + + if above_leaf && below_root { + ips.push(frame.ip() as u64); + } + + if std::ptr::eq(sym, leaf_addr) { + above_leaf = true; + } + + below_root + }); +} + +/// Borrowed-callchain view of a task-dump event that implements [`Encodable`] +/// by interning its ips into the batch's stack pool. +pub(crate) struct TaskDumpData<'a> { + pub(crate) timestamp_ns: u64, + pub(crate) task_id: TaskId, + pub(crate) callchain: &'a [u64], +} + +impl Encodable for TaskDumpData<'_> { + fn encode(&self, enc: &mut ThreadLocalEncoder<'_>) { + let interned_callchain = enc.intern_stack_frames(self.callchain); + enc.encode(&TaskDumpEvent { + timestamp_ns: self.timestamp_ns, + task_id: self.task_id, + callchain: interned_callchain, + }); + } +} diff --git a/dial9-tokio-telemetry/src/telemetry/analysis.rs b/dial9-tokio-telemetry/src/telemetry/analysis.rs index 8fb75ab3..8d058a54 100644 --- a/dial9-tokio-telemetry/src/telemetry/analysis.rs +++ b/dial9-tokio-telemetry/src/telemetry/analysis.rs @@ -333,6 +333,7 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis { TelemetryEvent::TaskSpawn { .. } | TelemetryEvent::TaskTerminate { .. } | TelemetryEvent::CpuSample { .. } + | TelemetryEvent::TaskDump { .. } | TelemetryEvent::ThreadNameDef { .. } | TelemetryEvent::WakeEvent { .. } | TelemetryEvent::SegmentMetadata { .. } diff --git a/dial9-tokio-telemetry/src/telemetry/buffer.rs b/dial9-tokio-telemetry/src/telemetry/buffer.rs index 756ceec6..8a8c5653 100644 --- a/dial9-tokio-telemetry/src/telemetry/buffer.rs +++ b/dial9-tokio-telemetry/src/telemetry/buffer.rs @@ -509,6 +509,39 @@ mod tests { assert_eq!(guard.event_count, 1); } + #[cfg(feature = "taskdump")] + mod task_dump_tests { + use super::ThreadLocalBuffer; + use crate::task_dumped::TaskDumpData; + use crate::telemetry::events::TelemetryEvent; + use crate::telemetry::format::decode_events; + use crate::telemetry::task_metadata::TaskId; + + #[test] + fn task_dump_event_round_trips() { + let dump = TaskDumpData { + timestamp_ns: 42_000, + task_id: TaskId::from_u32(17), + callchain: &[0x1111_2222, 0x3333_4444, 0x5555_6666], + }; + let encoded = ThreadLocalBuffer::encode_single(&dump); + let events = decode_events(&encoded).expect("decode"); + assert_eq!(events.len(), 1); + match events.into_iter().next().unwrap() { + TelemetryEvent::TaskDump { + timestamp_nanos, + task_id, + callchain, + } => { + assert_eq!(timestamp_nanos, 42_000); + assert_eq!(task_id, TaskId::from_u32(17)); + assert_eq!(callchain, vec![0x1111_2222, 0x3333_4444, 0x5555_6666]); + } + other => panic!("expected TaskDump, got {other:?}"), + } + } + } + #[cfg(feature = "cpu-profiling")] mod cpu_tests { use super::ThreadLocalBuffer; diff --git a/dial9-tokio-telemetry/src/telemetry/events.rs b/dial9-tokio-telemetry/src/telemetry/events.rs index c43ddc84..a2c8bbfd 100644 --- a/dial9-tokio-telemetry/src/telemetry/events.rs +++ b/dial9-tokio-telemetry/src/telemetry/events.rs @@ -179,6 +179,18 @@ pub enum TelemetryEvent { /// Human-readable thread name. name: String, }, + /// Async backtrace captured at a yield point after the task was idle + /// longer than the configured threshold. Instruction pointers are + /// symbolized offline. + TaskDump { + /// Wall-clock timestamp in nanoseconds (monotonic) — capture time. + #[serde(rename = "timestamp_ns")] + timestamp_nanos: u64, + /// Task that was idle. + task_id: TaskId, + /// Raw instruction pointer addresses (leaf first). + callchain: Vec, + }, /// One task woke another task. WakeEvent { /// Wall-clock timestamp in nanoseconds (monotonic). @@ -245,6 +257,9 @@ impl TelemetryEvent { | TelemetryEvent::CpuSample { timestamp_nanos, .. } + | TelemetryEvent::TaskDump { + timestamp_nanos, .. + } | TelemetryEvent::WakeEvent { timestamp_nanos, .. } @@ -278,6 +293,7 @@ impl TelemetryEvent { TelemetryEvent::QueueSample { .. } | TelemetryEvent::TaskSpawn { .. } | TelemetryEvent::TaskTerminate { .. } + | TelemetryEvent::TaskDump { .. } | TelemetryEvent::ThreadNameDef { .. } | TelemetryEvent::WakeEvent { .. } | TelemetryEvent::SegmentMetadata { .. } diff --git a/dial9-tokio-telemetry/src/telemetry/format.rs b/dial9-tokio-telemetry/src/telemetry/format.rs index 2976a647..853255ec 100644 --- a/dial9-tokio-telemetry/src/telemetry/format.rs +++ b/dial9-tokio-telemetry/src/telemetry/format.rs @@ -203,6 +203,16 @@ pub(crate) struct CpuSampleEvent { pub cpu: Option, } +/// Wire-format event for a task dump: async backtrace captured at a yield point +/// after the task stayed idle past the configured threshold. +#[derive(TraceEvent)] +pub(crate) struct TaskDumpEvent { + #[traceevent(timestamp)] + pub timestamp_ns: u64, + pub task_id: TaskId, + pub callchain: InternedStackFrames, +} + /// Wire-format event for a wake notification. #[derive(Debug, TraceEvent)] pub struct WakeEventEvent { @@ -274,6 +284,7 @@ pub(crate) enum TelemetryEventRef<'a> { TaskSpawn(TaskSpawnEventRef<'a>), TaskTerminate(TaskTerminateEventRef<'a>), CpuSample(CpuSampleEventRef<'a>), + TaskDump(TaskDumpEventRef<'a>), WakeEvent(WakeEventEventRef<'a>), SegmentMetadata(SegmentMetadataEventRef<'a>), ClockSync(ClockSyncEventRef<'a>), @@ -293,6 +304,7 @@ impl<'a> TelemetryEventRef<'a> { Self::TaskSpawn(e) => Some(e.timestamp_ns), Self::TaskTerminate(e) => Some(e.timestamp_ns), Self::CpuSample(e) => Some(e.timestamp_ns), + Self::TaskDump(e) => Some(e.timestamp_ns), Self::WakeEvent(e) => Some(e.timestamp_ns), Self::SegmentMetadata(e) => Some(e.timestamp_ns), Self::ClockSync(e) => Some(e.timestamp_ns), @@ -344,6 +356,9 @@ pub(crate) fn decode_ref<'a>( "CpuSampleEvent" => { TelemetryEventRef::CpuSample(CpuSampleEvent::decode(timestamp_ns, fields, field_defs)?) } + "TaskDumpEvent" => { + TelemetryEventRef::TaskDump(TaskDumpEvent::decode(timestamp_ns, fields, field_defs)?) + } "WakeEventEvent" => { TelemetryEventRef::WakeEvent(WakeEventEvent::decode(timestamp_ns, fields, field_defs)?) } @@ -422,6 +437,14 @@ pub(crate) fn to_owned_event( // CPU id is varint-encoded as u64 on the wire; real CPU ids fit in u32. cpu: e.cpu.map(|v| v as u32), }, + TelemetryEventRef::TaskDump(e) => TelemetryEvent::TaskDump { + timestamp_nanos: e.timestamp_ns, + task_id: e.task_id, + callchain: stack_pool + .get(e.callchain) + .expect("stack pool entry must exist for TaskDump callchain") + .to_vec(), + }, TelemetryEventRef::WakeEvent(e) => TelemetryEvent::WakeEvent { timestamp_nanos: e.timestamp_ns, waker_task_id: e.waker_task_id, diff --git a/dial9-tokio-telemetry/src/telemetry/mod.rs b/dial9-tokio-telemetry/src/telemetry/mod.rs index 1e4a4b44..3040d186 100644 --- a/dial9-tokio-telemetry/src/telemetry/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/mod.rs @@ -13,6 +13,7 @@ pub mod cpu_profile; pub(crate) mod events; pub(crate) mod format; pub(crate) mod recorder; +pub mod task_dump_config; pub(crate) mod task_metadata; pub(crate) mod writer; @@ -27,6 +28,7 @@ pub use recorder::{ TelemetryGuard, TelemetryHandle, TelemetryRuntimeError, TraceRuntimeCoreBuilder, TracedRuntime, TracedRuntimeBuilder, current_worker_id, spawn, }; +pub use task_dump_config::TaskDumpConfig; pub use task_metadata::{TaskId, UNKNOWN_TASK_ID}; pub use writer::{NullWriter, RotatingWriter, TraceWriter}; diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index fd7555f4..1cf60d93 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -4,6 +4,8 @@ mod shared_state; pub(crate) use runtime_context::RuntimeContext; pub use runtime_context::current_worker_id; +#[cfg(feature = "taskdump")] +pub(crate) use runtime_context::poll_start_ts_or_now; pub(crate) use shared_state::SharedState; use event_writer::EventWriter; @@ -507,7 +509,8 @@ impl TelemetryHandle { let _guard = InstrumentedSpawnGuard::set(); tokio::spawn(async move { let task_id = tokio::task::try_id().map(TaskId::from).unwrap_or_default(); - crate::traced::Traced::new(future, traced_handle, task_id).await + let inner = wrap_task_dumped(future, traced_handle.shared.clone(), task_id); + crate::traced::Traced::new(inner, traced_handle, task_id).await }) } None => tokio::spawn(future), @@ -515,6 +518,32 @@ impl TelemetryHandle { } } +/// If the `taskdump` feature is on, wrap `future` in `TaskDumped`; otherwise +/// pass through unchanged. Factored so `TelemetryHandle::spawn` stays readable. +#[cfg(feature = "taskdump")] +fn wrap_task_dumped( + future: F, + shared: Arc, + task_id: TaskId, +) -> crate::task_dumped::TaskDumped +where + F: std::future::Future, +{ + crate::task_dumped::TaskDumped::new(future, shared, task_id) +} + +#[cfg(not(feature = "taskdump"))] +fn wrap_task_dumped( + future: F, + _shared: Arc, + _task_id: TaskId, +) -> F +where + F: std::future::Future, +{ + future +} + /// Spawn a traced task on the current tokio runtime. /// /// Like [`tokio::spawn`], but wraps the future with wake-event tracking @@ -843,6 +872,7 @@ pub struct HasTracePath; pub struct TracedRuntimeBuilder

{ enabled: bool, task_tracking_enabled: bool, + task_dump_config: Option, trace_path: Option, runtime_name: Option, #[cfg(feature = "cpu-profiling")] @@ -884,6 +914,23 @@ impl

TracedRuntimeBuilder

{ self } + /// Capture async backtraces at yield points for tasks that stay idle + /// longer than the configured threshold. + /// + /// Requires the `taskdump` crate feature to actually record events + pub fn with_task_dumps( + mut self, + config: crate::telemetry::task_dump_config::TaskDumpConfig, + ) -> Self { + if cfg!(not(feature = "taskdump")) { + tracing::warn!( + "taskdumps enabled but `taskdump` feature was not. No task dumps will be captured." + ) + } + self.task_dump_config = Some(config); + self + } + /// Set a human-readable name for this runtime. Used in segment metadata /// to map runtime indices to names for the trace viewer. pub fn with_runtime_name(mut self, name: impl Into) -> Self { @@ -966,6 +1013,7 @@ impl

TracedRuntimeBuilder

{ TracedRuntimeBuilder { enabled: self.enabled, task_tracking_enabled: self.task_tracking_enabled, + task_dump_config: self.task_dump_config, trace_path: self.trace_path, runtime_name: self.runtime_name, #[cfg(feature = "cpu-profiling")] @@ -1101,6 +1149,7 @@ impl TracedRuntimeBuilder { let core_builder = TelemetryCore::builder() .writer(writer) .maybe_trace_path(self.trace_path) + .maybe_task_dump_config(self.task_dump_config) .maybe_worker_poll_interval(self.worker_poll_interval) .maybe_worker_metrics_sink(self.worker_metrics_sink); @@ -1226,6 +1275,9 @@ impl TelemetryCore { /// cpu-profiling or S3 is configured. #[builder(into)] trace_path: Option, + /// Capture async backtraces at yield points. Requires the `taskdump` + /// crate feature to actually record events. + task_dump_config: Option, /// Enable CPU profiling (Linux only). #[cfg(feature = "cpu-profiling")] cpu_profiling: Option, @@ -1245,6 +1297,12 @@ impl TelemetryCore { ) -> std::io::Result { let start_mono_ns = crate::telemetry::events::clock_monotonic_ns(); let shared = Arc::new(SharedState::new(start_mono_ns)); + if let Some(cfg) = task_dump_config.as_ref() { + shared.task_dumps_enabled.store(true, Ordering::Relaxed); + shared + .task_dump_idle_threshold_ns + .store(cfg.idle_threshold().as_nanos() as u64, Ordering::Relaxed); + } #[allow(unused_mut)] let mut event_writer = EventWriter::new(Box::new(writer)); #[cfg(feature = "worker-s3")] @@ -1566,6 +1624,7 @@ impl TracedRuntime { TracedRuntimeBuilder { enabled: true, task_tracking_enabled: false, + task_dump_config: None, trace_path: None, runtime_name: None, #[cfg(feature = "cpu-profiling")] diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs index 89b89794..1e515510 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs @@ -7,6 +7,8 @@ use crate::telemetry::format::{ use crate::telemetry::task_metadata::TaskId; use std::cell::Cell; use std::collections::HashMap; +#[cfg(feature = "taskdump")] +use std::num::NonZeroU64; use std::sync::OnceLock; use std::sync::RwLock; use tokio::runtime::RuntimeMetrics; @@ -35,6 +37,21 @@ thread_local! { /// Whether we've registered this thread's OS tid for CPU profiling. #[cfg(feature = "cpu-profiling")] static TID_REGISTERED: Cell = const { Cell::new(false) }; + /// Monotonic timestamp captured in `on_before_task_poll`, cleared in + /// `on_after_task_poll`. Allows code running inside a poll (e.g. + /// `TaskDumped`) to reuse the timestamp without an extra clock read. + #[cfg(feature = "taskdump")] + static POLL_START_TS: Cell> = const { Cell::new(None) }; +} + +/// Returns the poll-start timestamp if we're inside a poll, otherwise reads +/// the clock. +#[cfg(feature = "taskdump")] +pub(crate) fn poll_start_ts_or_now() -> u64 { + POLL_START_TS.with(|c| c.get()).map_or_else( + crate::telemetry::events::clock_monotonic_ns, + NonZeroU64::get, + ) } impl RuntimeContext { @@ -209,8 +226,11 @@ pub(super) fn make_poll_start( let worker_local_queue_depth = resolved .map(|(_, idx)| ctx.local_queue_depth(idx)) .unwrap_or(0); + let timestamp_ns = crate::telemetry::events::clock_monotonic_ns(); + #[cfg(feature = "taskdump")] + POLL_START_TS.with(|c| c.set(NonZeroU64::new(timestamp_ns))); PollStart { - timestamp_ns: crate::telemetry::events::clock_monotonic_ns(), + timestamp_ns, worker_id: resolved.map(|(id, _)| id).unwrap_or(WorkerId::UNKNOWN), local_queue: worker_local_queue_depth as u8, task_id, @@ -219,6 +239,8 @@ pub(super) fn make_poll_start( } pub(super) fn make_poll_end(ctx: &RuntimeContext, shared: &SharedState) -> PollEndEvent { + #[cfg(feature = "taskdump")] + POLL_START_TS.with(|c| c.set(None)); let resolved = ctx.resolve_worker(shared); PollEndEvent { timestamp_ns: crate::telemetry::events::clock_monotonic_ns(), diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs index 421a21f8..2f4ab1a2 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs @@ -25,6 +25,13 @@ crate::primitives::thread_local! { /// No tokio imports. All runtime-specific logic lives in `RuntimeContext`. pub(crate) struct SharedState { pub(crate) enabled: AtomicBool, + /// Set when `TaskDumpConfig` is provided at build time. When `true`, + /// wrapping futures capture async backtraces at yield points. + pub(crate) task_dumps_enabled: AtomicBool, + /// Snapshot of `TaskDumpConfig::idle_threshold` in nanos. Copied onto + /// each `TaskDumped` instance at construction time so the hot poll path + /// does not need an atomic load. + pub(crate) task_dump_idle_threshold_ns: AtomicU64, pub(crate) collector: Arc, /// Absolute `CLOCK_MONOTONIC` nanosecond timestamp captured at trace start. pub(crate) start_time_ns: u64, @@ -54,6 +61,8 @@ impl SharedState { pub(super) fn new(start_time_ns: u64) -> Self { Self { enabled: AtomicBool::new(false), + task_dumps_enabled: AtomicBool::new(false), + task_dump_idle_threshold_ns: AtomicU64::new(0), collector: Arc::new(CentralCollector::new()), start_time_ns, next_worker_id: AtomicU64::new(0), diff --git a/dial9-tokio-telemetry/src/telemetry/task_dump_config.rs b/dial9-tokio-telemetry/src/telemetry/task_dump_config.rs new file mode 100644 index 00000000..fa8079ab --- /dev/null +++ b/dial9-tokio-telemetry/src/telemetry/task_dump_config.rs @@ -0,0 +1,37 @@ +//! Configuration for task dump capture. +//! +//! Task dumps capture async backtraces at yield points for tasks that have +//! been idle longer than the configured threshold. Use [`TaskDumpConfig`] with +//! [`TracedRuntimeBuilder::with_task_dumps`](crate::telemetry::TracedRuntimeBuilder::with_task_dumps) +//! or [`TelemetryCoreBuilder::task_dump_config`](crate::telemetry::TelemetryCoreBuilder::task_dump_config). +//! +//! Requires the `taskdump` crate feature. With that feature off, this module +//! is still compiled so the configuration API surface stays the same, but no +//! dumps are captured. + +use std::time::Duration; + +/// Default idle threshold after which a task dump is emitted. +const DEFAULT_IDLE_THRESHOLD: Duration = Duration::from_millis(10); + +/// Configuration for task dump capture. +#[derive(Debug, Clone, bon::Builder)] +pub struct TaskDumpConfig { + /// Minimum time a task must have been idle between polls before a task + /// dump is emitted on the next poll. Defaults to 10ms. + #[builder(default = DEFAULT_IDLE_THRESHOLD)] + idle_threshold: Duration, +} + +impl Default for TaskDumpConfig { + fn default() -> Self { + Self::builder().build() + } +} + +impl TaskDumpConfig { + /// Minimum idle duration between polls before a dump is emitted. + pub fn idle_threshold(&self) -> Duration { + self.idle_threshold + } +} diff --git a/dial9-tokio-telemetry/tests/rotation_time_alignment.rs b/dial9-tokio-telemetry/tests/rotation_time_alignment.rs index 5d0a5758..916e78cf 100644 --- a/dial9-tokio-telemetry/tests/rotation_time_alignment.rs +++ b/dial9-tokio-telemetry/tests/rotation_time_alignment.rs @@ -246,6 +246,7 @@ fn event_type_name(event: &TelemetryEvent) -> &'static str { TelemetryEvent::TaskSpawn { .. } => "TaskSpawn", TelemetryEvent::TaskTerminate { .. } => "TaskTerminate", TelemetryEvent::CpuSample { .. } => "CpuSample", + TelemetryEvent::TaskDump { .. } => "TaskDump", TelemetryEvent::ThreadNameDef { .. } => "ThreadNameDef", TelemetryEvent::WakeEvent { .. } => "WakeEvent", TelemetryEvent::SegmentMetadata { .. } => "SegmentMetadata", diff --git a/dial9-tokio-telemetry/tests/task_dump.rs b/dial9-tokio-telemetry/tests/task_dump.rs new file mode 100644 index 00000000..16e5a75c --- /dev/null +++ b/dial9-tokio-telemetry/tests/task_dump.rs @@ -0,0 +1,144 @@ +#![cfg(feature = "taskdump")] + +mod common; + +use dial9_tokio_telemetry::telemetry::{TaskDumpConfig, TelemetryEvent, TracedRuntime}; +use std::time::Duration; + +/// A task that stays idle longer than the threshold between polls should +/// produce at least one `TaskDump` event. +#[test] +fn task_dump_emitted_for_long_sleep() { + let (writer, events) = common::CapturingWriter::new(); + + let mut builder = tokio::runtime::Builder::new_current_thread(); + builder.enable_all(); + let (runtime, guard) = TracedRuntime::builder() + .with_task_tracking(true) + .with_task_dumps(TaskDumpConfig::default()) + .build_and_start(builder, writer) + .unwrap(); + + let handle = guard.handle(); + runtime.block_on(async { + let join = handle.spawn(async { + // Well above the 10ms default threshold. + tokio::time::sleep(Duration::from_millis(50)).await; + }); + join.await.unwrap(); + }); + + drop(runtime); + drop(guard); + + let events = events.lock().unwrap(); + let dumps: Vec<_> = events + .iter() + .filter(|e| matches!(e, TelemetryEvent::TaskDump { .. })) + .collect(); + + assert!( + !dumps.is_empty(), + "expected TaskDump events; got: {:?}", + events + .iter() + .map(std::mem::discriminant) + .collect::>() + ); + for dump in &dumps { + if let TelemetryEvent::TaskDump { callchain, .. } = dump { + assert!(!callchain.is_empty(), "callchain must be non-empty"); + } + } +} + +/// A task whose idles are all below threshold should produce zero dumps. +#[test] +fn no_task_dump_for_short_sleep() { + let (writer, events) = common::CapturingWriter::new(); + + let mut builder = tokio::runtime::Builder::new_current_thread(); + builder.enable_all(); + let (runtime, guard) = TracedRuntime::builder() + .with_task_tracking(true) + .with_task_dumps( + TaskDumpConfig::builder() + .idle_threshold(Duration::from_secs(1)) + .build(), + ) + .build_and_start(builder, writer) + .unwrap(); + + let handle = guard.handle(); + runtime.block_on(async { + let join = handle.spawn(async { + tokio::time::sleep(Duration::from_millis(1)).await; + }); + join.await.unwrap(); + }); + + drop(runtime); + guard + .graceful_shutdown(Duration::from_secs(1)) + .expect("clean shutdown"); + + let events = events.lock().unwrap(); + let dump_count = events + .iter() + .filter(|e| matches!(e, TelemetryEvent::TaskDump { .. })) + .count(); + assert_eq!(dump_count, 0, "expected no TaskDump events"); +} + +/// Wrapping with `TaskDumped` must not produce duplicate wake or poll events: +/// the same workload with and without dumps enabled must produce the same +/// number of `PollStart`/`PollEnd`/`WakeEvent` entries. +#[test] +fn task_dump_does_not_produce_extra_events() { + fn run(enable: bool) -> (usize, usize, usize) { + let (writer, events) = common::CapturingWriter::new(); + + let mut builder = tokio::runtime::Builder::new_current_thread(); + builder.enable_all(); + let mut tb = TracedRuntime::builder().with_task_tracking(true); + if enable { + tb = tb.with_task_dumps(TaskDumpConfig::default()); + } + let (runtime, guard) = tb.build_and_start(builder, writer).unwrap(); + + let handle = guard.handle(); + runtime.block_on(async { + let join = handle.spawn(async { + tokio::task::yield_now().await; + tokio::task::yield_now().await; + tokio::task::yield_now().await; + }); + join.await.unwrap(); + }); + drop(runtime); + guard + .graceful_shutdown(Duration::from_secs(1)) + .expect("clean shutdown"); + + let events = events.lock().unwrap(); + let mut starts = 0usize; + let mut ends = 0usize; + let mut wakes = 0usize; + for e in events.iter() { + match e { + TelemetryEvent::PollStart { .. } => starts += 1, + TelemetryEvent::PollEnd { .. } => ends += 1, + TelemetryEvent::WakeEvent { .. } => wakes += 1, + _ => {} + } + } + (starts, ends, wakes) + } + + let baseline = run(false); + let with_dumps = run(true); + assert_eq!( + baseline, with_dumps, + "enabling task dumps changed PollStart/PollEnd/WakeEvent counts: {baseline:?} vs {with_dumps:?}" + ); +} diff --git a/dial9-trace-format/src/encoder.rs b/dial9-trace-format/src/encoder.rs index aa91bca3..6cef2a89 100644 --- a/dial9-trace-format/src/encoder.rs +++ b/dial9-trace-format/src/encoder.rs @@ -439,6 +439,8 @@ impl Encoder { codec::encode_stack_pool( &[StackPoolEntry { pool_id: id, + // TODO: allow `StackPoolEntry` to have borrowed frames avoiding the unecessary clone here + // https://github.com/dial9-rs/dial9-tokio-telemetry/issues/358 frames: frames.to_vec(), }], &mut self.state.writer,