diff --git a/AGENTS.md b/AGENTS.md index 8ef96204..7ea420e9 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -77,6 +77,7 @@ Unguarded logging in loops causes log spam that degrades observability and can i ## Running tests - Always run `cargo nextest run` to run tests +- Shuttle tests are NOT included in `cargo nextest run`. They require a separate invocation: `./scripts/test-shuttle.sh`. Always run this when modifying code under `#[cfg(all(test, shuttle))]` or the flush/source paths. ## Ownership @@ -125,3 +126,13 @@ Failing to update it will cause the viewer to fail when loading the demo. ## Ways of working - After finishing your work use showboat to demonstrate what you have done. include key code, tests, and what was changed. - Use a progress doc to keep track of what you are doing. progress docs are just for you. when we are ready for PR, we unstage the progress docs + +## Agent skills + +### Issue tracker + +GitHub Issues on `dial9-rs/dial9-tokio-telemetry`. See `docs/agents/issue-tracker.md`. + +### Domain docs + +Single-context layout. See `docs/agents/domain.md`. diff --git a/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs b/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs index 1e56f245..22c50bba 100644 --- a/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs +++ b/dial9-tokio-telemetry/src/telemetry/cpu_profile.rs @@ -4,7 +4,10 @@ //! configurable frequency. The flush thread drains raw samples; the caller //! (EventWriter) maps OS thread IDs to worker IDs via SharedState.thread_roles. -use crate::telemetry::events::{CpuSampleSource, ThreadName}; +use crate::telemetry::buffer::record_encodable_event; +use crate::telemetry::events::{CpuSampleData, CpuSampleSource, ThreadName}; +use crate::telemetry::format::WorkerId; +use crate::telemetry::recorder::source::{FlushContext, Source}; use dial9_perf_self_profile::{EventSource, PerfSampler, SamplerConfig, SamplingMode}; use std::collections::HashMap; use std::io; @@ -179,3 +182,72 @@ impl SchedProfiler { }); } } + +// ── Source trait implementations ──────────────────────────────────────────── + +impl Source for CpuProfiler { + fn flush(&mut self, ctx: &FlushContext<'_>) { + let resolve = |tid: u32| -> WorkerId { + match ctx.thread_roles.get(&tid) { + Some(crate::telemetry::events::ThreadRole::Worker(id)) => WorkerId::from(*id), + Some(crate::telemetry::events::ThreadRole::Blocking) => WorkerId::BLOCKING, + None => WorkerId::UNKNOWN, + } + }; + + self.drain(|raw, thread_name| { + let worker_id = resolve(raw.tid); + let data = CpuSampleData { + timestamp_nanos: raw.timestamp_nanos, + worker_id, + tid: raw.tid, + source: raw.source, + callchain: raw.callchain, + thread_name: thread_name.cloned(), + cpu: raw.cpu, + }; + record_encodable_event(&data, ctx.collector, ctx.drain_epoch); + }); + } + + fn name(&self) -> &'static str { + "cpu_profile" + } +} + +impl Source for SchedProfiler { + fn flush(&mut self, ctx: &FlushContext<'_>) { + let resolve = |tid: u32| -> WorkerId { + match ctx.thread_roles.get(&tid) { + Some(crate::telemetry::events::ThreadRole::Worker(id)) => WorkerId::from(*id), + Some(crate::telemetry::events::ThreadRole::Blocking) => WorkerId::BLOCKING, + None => WorkerId::UNKNOWN, + } + }; + + self.drain(|raw| { + let data = CpuSampleData { + timestamp_nanos: raw.timestamp_nanos, + worker_id: resolve(raw.tid), + tid: raw.tid, + source: raw.source, + callchain: raw.callchain, + thread_name: None, + cpu: raw.cpu, + }; + record_encodable_event(&data, ctx.collector, ctx.drain_epoch); + }); + } + + fn on_worker_thread_start(&mut self) -> io::Result<()> { + self.track_current_thread() + } + + fn on_thread_stop(&mut self) { + self.stop_tracking_current_thread(); + } + + fn name(&self) -> &'static str { + "sched" + } +} diff --git a/dial9-tokio-telemetry/src/telemetry/events.rs b/dial9-tokio-telemetry/src/telemetry/events.rs index 5e940a03..d67b95bd 100644 --- a/dial9-tokio-telemetry/src/telemetry/events.rs +++ b/dial9-tokio-telemetry/src/telemetry/events.rs @@ -5,7 +5,6 @@ use serde::Serialize; use std::sync::Arc; /// Role of a thread known to the telemetry system. -#[cfg(feature = "cpu-profiling")] #[derive(Debug, Clone, Copy)] pub(crate) enum ThreadRole { /// A tokio worker thread with the given index. diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs b/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs index 0d27bf51..f07e3671 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs @@ -1,24 +1,17 @@ -#[cfg(feature = "cpu-profiling")] use super::shared_state::SharedState; use crate::telemetry::collector::Batch; -#[cfg(feature = "cpu-profiling")] -use crate::telemetry::events::{CpuSampleData, ThreadRole}; -#[cfg(feature = "cpu-profiling")] -use crate::telemetry::format::WorkerId; use crate::telemetry::writer::TraceWriter; /// Intermediate layer between the recorder and the raw `TraceWriter`. /// -/// Owns the writer and the CPU profiler. Its API is roughly: +/// Owns the writer. Its API is roughly: /// /// - `write_raw_event(event)` — encode and write a single event (test only) -/// - `flush_cpu(shared)` — drain CPU/sched profilers into the trace +/// - `flush_sources(shared)` — drain data sources into the trace /// - `flush()` — flush the underlying writer pub(crate) struct EventWriter { pub(super) writer: Box, events_written: u64, - #[cfg(feature = "cpu-profiling")] - pub(super) cpu_profiler: Option, } impl EventWriter { @@ -26,8 +19,6 @@ impl EventWriter { Self { writer, events_written: 0, - #[cfg(feature = "cpu-profiling")] - cpu_profiler: None, } } @@ -61,59 +52,21 @@ impl EventWriter { Ok(()) } - /// Drain CPU/sched profilers and write their events into the trace. - #[cfg(feature = "cpu-profiling")] - pub(crate) fn flush_cpu(&mut self, shared: &SharedState) { - // Snapshot thread_roles once per flush cycle. + /// Drain data sources and write their events into the trace. + pub(crate) fn flush_sources(&mut self, shared: &SharedState) { + use super::source::FlushContext; + let roles = shared.thread_roles.lock().unwrap().clone(); - let resolve = |tid: u32| -> WorkerId { - match roles.get(&tid) { - Some(ThreadRole::Worker(id)) => WorkerId::from(*id), - Some(ThreadRole::Blocking) => WorkerId::BLOCKING, - None => WorkerId::UNKNOWN, - } + let ctx = FlushContext { + collector: &shared.collector, + drain_epoch: &shared.drain_epoch, + thread_roles: &roles, }; - if let Some(mut profiler) = self.cpu_profiler.take() { - profiler.drain(|raw, thread_name| { - use crate::telemetry::buffer::record_encodable_event; - - let worker_id = resolve(raw.tid); - let data = CpuSampleData { - timestamp_nanos: raw.timestamp_nanos, - worker_id, - tid: raw.tid, - source: raw.source, - callchain: raw.callchain, - thread_name: thread_name.cloned(), - cpu: raw.cpu, - }; - record_encodable_event(&data, &shared.collector, &shared.drain_epoch); - }); - self.cpu_profiler = Some(profiler); - } - - { - let mut shared_profiler = shared.sched_profiler.lock().unwrap(); - if let Some(ref mut profiler) = *shared_profiler { - profiler.drain(|raw| { - use crate::telemetry::buffer::record_encodable_event; - - let data = CpuSampleData { - timestamp_nanos: raw.timestamp_nanos, - worker_id: resolve(raw.tid), - tid: raw.tid, - source: raw.source, - callchain: raw.callchain, - // TODO: we should be able to also track thread name here. - // sampler is running on worker threads so no thread name - thread_name: None, - cpu: raw.cpu, - }; - record_encodable_event(&data, &shared.collector, &shared.drain_epoch); - }); - } + let mut sources = shared.sources.lock().unwrap(); + for source in sources.iter_mut() { + source.flush(&ctx); } } diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index e433ff78..df8b6610 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -1,6 +1,7 @@ mod event_writer; mod runtime_context; mod shared_state; +pub(crate) mod source; pub(crate) use runtime_context::RuntimeContext; pub use runtime_context::current_worker_id; @@ -89,11 +90,8 @@ fn flush_once( ) -> FlushStats { let events_before = event_writer.events_written(); let cpu_events_time = std::time::Instant::now(); - #[cfg(feature = "cpu-profiling")] - { - if shared.is_enabled() { - event_writer.flush_cpu(shared); - } + if shared.is_enabled() { + event_writer.flush_sources(shared); } let cpu_flush_duration = cpu_events_time.elapsed(); @@ -273,10 +271,10 @@ fn register_hooks( { let tid = crate::telemetry::events::current_tid(); s_stop.thread_roles.lock().unwrap().remove(&tid); - if let Ok(mut prof) = s_stop.sched_profiler.lock() - && let Some(ref mut p) = *prof - { - p.stop_tracking_current_thread(); + if let Ok(mut sources) = s_stop.sources.lock() { + for source in sources.iter_mut() { + source.on_thread_stop(); + } } dial9_perf_self_profile::unregister_current_thread(); } @@ -1611,7 +1609,7 @@ impl TelemetryCore { { if let Some(ref config) = cpu_profiling { match crate::telemetry::cpu_profile::CpuProfiler::start(config.clone()) { - Ok(sampler) => event_writer.cpu_profiler = Some(sampler), + Ok(sampler) => shared.push_source(Box::new(sampler)), Err(e) => rate_limited!(Duration::from_secs(60), { tracing::warn!("failed to start CPU profiler: {e}"); }), @@ -1619,7 +1617,7 @@ impl TelemetryCore { } if let Some(sched_cfg) = sched_events { match crate::telemetry::cpu_profile::SchedProfiler::new(sched_cfg) { - Ok(sched) => *shared.sched_profiler.lock().unwrap() = Some(sched), + Ok(sched) => shared.push_source(Box::new(sched)), Err(e) => rate_limited!(Duration::from_secs(60), { tracing::warn!("failed to start scheduler event profiler: {e}"); }), diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs index 6b97a785..03455d8d 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs @@ -143,11 +143,15 @@ fn register_tid_if_needed(global_id: u64, shared: &SharedState) { // Start sched event sampling for this worker thread. Deferred from // on_thread_start so that only worker threads (not blocking pool // threads) open perf fds. - if let Ok(mut prof) = shared.sched_profiler.lock() - && let Some(ref mut p) = *prof - && let Err(e) = p.track_current_thread() - { - tracing::warn!("failed to start sched profiling for worker thread: {e}"); + if let Ok(mut sources) = shared.sources.lock() { + for source in sources.iter_mut() { + if let Err(e) = source.on_worker_thread_start() { + tracing::warn!( + "failed to start source {} for worker thread: {e}", + source.name() + ); + } + } } cell.set(true); } diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs index b6e7bc3b..f55a0ddc 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs @@ -4,12 +4,10 @@ use crate::primitives::sync::{Arc, Mutex}; use crate::telemetry::buffer; use crate::telemetry::buffer::TlBufferHandle; use crate::telemetry::collector::CentralCollector; -#[cfg(feature = "cpu-profiling")] use crate::telemetry::events::ThreadRole; use crate::telemetry::format::{QueueSampleEvent, WakeEventEvent}; use crate::telemetry::task_metadata::TaskId; use std::cell::Cell; -#[cfg(feature = "cpu-profiling")] use std::collections::HashMap; use std::time::Duration; @@ -55,10 +53,9 @@ pub(crate) struct SharedState { pub(crate) contexts: Mutex>>, /// Maps OS tid → thread role so that CPU samples returned from perf can be /// attributed to the correct worker or blocking-pool bucket at flush time. - #[cfg(feature = "cpu-profiling")] pub(crate) thread_roles: Mutex>, - #[cfg(feature = "cpu-profiling")] - pub(crate) sched_profiler: Mutex>, + /// Data sources (CPU profiler, sched profiler, etc.) that the flush thread drains. + pub(crate) sources: Mutex>>, } impl SharedState { @@ -74,13 +71,16 @@ impl SharedState { drain_epoch: AtomicU64::new(0), tl_buffers: Mutex::new(Vec::new()), contexts: Mutex::new(Vec::new()), - #[cfg(feature = "cpu-profiling")] thread_roles: Mutex::new(HashMap::new()), - #[cfg(feature = "cpu-profiling")] - sched_profiler: Mutex::new(None), + sources: Mutex::new(Vec::new()), } } + /// Register a data source to be drained by the flush thread each cycle. + pub(crate) fn push_source(&self, source: Box) { + self.sources.lock().unwrap().push(source); + } + fn timestamp_nanos(&self) -> u64 { crate::telemetry::events::clock_monotonic_ns() } @@ -602,6 +602,40 @@ mod shuttle_tests { } } + // ── Mock Source ──────────────────────────────────────────────────── + + /// A Source that accumulates events from worker threads and emits them + /// during flush. Exercises the Source flush path under shuttle. + struct MockSource { + pending: Arc>>, + } + + impl MockSource { + fn new(pending: Arc>>) -> Self { + Self { pending } + } + } + + impl crate::telemetry::recorder::source::Source for MockSource { + fn flush(&mut self, ctx: &crate::telemetry::recorder::source::FlushContext<'_>) { + let events: Vec<_> = self.pending.lock().unwrap().drain(..).collect(); + for ev in &events { + crate::telemetry::buffer::record_encodable_event( + ev, + ctx.collector, + ctx.drain_epoch, + ); + } + } + + fn name(&self) -> &'static str { + "mock" + } + + // TODO: exercise on_worker_thread_start/on_thread_stop once shuttle + // tests include a Tokio runtime. + } + // ── Test body ─────────────────────────────────────────────────────── fn test_telemetry_core_pipeline() { @@ -614,10 +648,17 @@ mod shuttle_tests { let next_id = Arc::new(AtomicU64::new(0)); let segments: Arc>>> = Arc::new(Mutex::new(Vec::new())); + // Mock source: worker threads push events here, flush thread drains them. + let source_pending: Arc>> = Arc::new(Mutex::new(Vec::new())); + let guard = TelemetryCore::builder() .writer(InvariantCheckingWriter::new(segments.clone())) .build() .unwrap(); + guard + .shared() + .unwrap() + .push_source(Box::new(MockSource::new(source_pending.clone()))); guard.enable(); let handle = guard.handle(); @@ -627,6 +668,7 @@ mod shuttle_tests { let h = handle.clone(); let next_id = next_id.clone(); let expected = expected.clone(); + let source_pending = source_pending.clone(); let thread_id = thread_id as u64; crate::primitives::thread::spawn(move || { let mut rng = shuttle::rand::thread_rng(); @@ -642,7 +684,13 @@ mod shuttle_tests { id, }; expected.lock().unwrap().push(ev.clone()); - h.record_encodable_event(&ev); + // Randomly choose: emit via handle (TL buffer path) + // or via mock source (flush-thread path). + if rng.gen_range(0u32..2) == 0 { + h.record_encodable_event(&ev); + } else { + source_pending.lock().unwrap().push(ev); + } } }) }) diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/source.rs b/dial9-tokio-telemetry/src/telemetry/recorder/source.rs new file mode 100644 index 00000000..b7653a58 --- /dev/null +++ b/dial9-tokio-telemetry/src/telemetry/recorder/source.rs @@ -0,0 +1,37 @@ +//! Source trait for abstracting flush-thread data sources. + +use crate::primitives::sync::Arc; +use crate::primitives::sync::atomic::AtomicU64; +use crate::telemetry::collector::CentralCollector; +use crate::telemetry::events::ThreadRole; +use std::collections::HashMap; + +/// Context passed to [`Source::flush`] containing shared state needed for draining. +pub(crate) struct FlushContext<'a> { + pub collector: &'a Arc, + pub drain_epoch: &'a AtomicU64, + pub thread_roles: &'a HashMap, +} + +/// A data source that the flush thread drains into the central collector. +/// +/// Implementors (e.g. `CpuProfiler`, `SchedProfiler`) provide a `flush` method +/// that drains pending data and records it via `record_encodable_event`. +pub(crate) trait Source: Send { + /// Drain pending data into the dial9 trace. Called once per flush cycle + /// from the flush thread. + fn flush(&mut self, ctx: &FlushContext<'_>); + + /// Diagnostic name (e.g. "cpu_profile", "sched"). + fn name(&self) -> &'static str; + + /// Called when a worker thread starts. Used by per-thread sources like SchedProfiler + /// to start tracking the current thread. Returns an error if tracking fails. + fn on_worker_thread_start(&mut self) -> std::io::Result<()> { + Ok(()) + } + + /// Called when a thread stops. Used by per-thread sources like SchedProfiler + /// to stop tracking the current thread. + fn on_thread_stop(&mut self) {} +} diff --git a/docs/agents/domain.md b/docs/agents/domain.md new file mode 100644 index 00000000..2f763291 --- /dev/null +++ b/docs/agents/domain.md @@ -0,0 +1,35 @@ +# Domain Docs + +How the engineering skills should consume this repo's domain documentation when exploring the codebase. + +## Before exploring, read these + +- **`CONTEXT.md`** at the repo root. +- **`docs/adr/`** — read ADRs that touch the area you're about to work in. + +If any of these files don't exist, **proceed silently**. Don't flag their absence; don't suggest creating them upfront. The producer skill (`/grill-with-docs`) creates them lazily when terms or decisions actually get resolved. + +## File structure + +Single-context repo: + +``` +/ +├── CONTEXT.md +├── docs/adr/ +│ ├── 0001-*.md +│ └── ... +└── src/ +``` + +## Use the glossary's vocabulary + +When your output names a domain concept (in an issue title, a refactor proposal, a hypothesis, a test name), use the term as defined in `CONTEXT.md`. Don't drift to synonyms the glossary explicitly avoids. + +If the concept you need isn't in the glossary yet, that's a signal — either you're inventing language the project doesn't use (reconsider) or there's a real gap (note it for `/grill-with-docs`). + +## Flag ADR conflicts + +If your output contradicts an existing ADR, surface it explicitly rather than silently overriding: + +> _Contradicts ADR-0007 — but worth reopening because…_ diff --git a/docs/agents/issue-tracker.md b/docs/agents/issue-tracker.md new file mode 100644 index 00000000..cce77ecb --- /dev/null +++ b/docs/agents/issue-tracker.md @@ -0,0 +1,22 @@ +# Issue tracker: GitHub + +Issues and PRDs for this repo live as GitHub issues. Use the `gh` CLI for all operations. + +## Conventions + +- **Create an issue**: `gh issue create --title "..." --body "..."`. Use a heredoc for multi-line bodies. +- **Read an issue**: `gh issue view --comments`, filtering comments by `jq` and also fetching labels. +- **List issues**: `gh issue list --state open --json number,title,body,labels,comments --jq '[.[] | {number, title, body, labels: [.labels[].name], comments: [.comments[].body]}]'` with appropriate `--label` and `--state` filters. +- **Comment on an issue**: `gh issue comment --body "..."` +- **Apply / remove labels**: `gh issue edit --add-label "..."` / `--remove-label "..."` +- **Close**: `gh issue close --comment "..."` + +Infer the repo from `git remote -v` — `gh` does this automatically when run inside a clone. + +## When a skill says "publish to the issue tracker" + +Create a GitHub issue. + +## When a skill says "fetch the relevant ticket" + +Run `gh issue view --comments`. diff --git a/dial9-tokio-telemetry/design/cpu-sampling-and-worker-id-design.md b/docs/design/cpu-sampling-and-worker-id-design.md similarity index 100% rename from dial9-tokio-telemetry/design/cpu-sampling-and-worker-id-design.md rename to docs/design/cpu-sampling-and-worker-id-design.md diff --git a/dial9-tokio-telemetry/design/memory-profiling.md b/docs/design/memory-profiling.md similarity index 100% rename from dial9-tokio-telemetry/design/memory-profiling.md rename to docs/design/memory-profiling.md diff --git a/dial9-tokio-telemetry/design/metrique-integration.md b/docs/design/metrique-integration.md similarity index 100% rename from dial9-tokio-telemetry/design/metrique-integration.md rename to docs/design/metrique-integration.md diff --git a/dial9-tokio-telemetry/design/s3-worker-design.md b/docs/design/s3-worker-design.md similarity index 100% rename from dial9-tokio-telemetry/design/s3-worker-design.md rename to docs/design/s3-worker-design.md diff --git a/dial9-tokio-telemetry/design/tl-buffer-drain.md b/docs/design/tl-buffer-drain.md similarity index 100% rename from dial9-tokio-telemetry/design/tl-buffer-drain.md rename to docs/design/tl-buffer-drain.md