Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Unguarded logging in loops causes log spam that degrades observability and can i
## Running tests

- Always run `cargo nextest run` to run tests
- Shuttle tests are NOT included in `cargo nextest run`. They require a separate invocation: `./scripts/test-shuttle.sh`. Always run this when modifying code under `#[cfg(all(test, shuttle))]` or the flush/source paths.

## Ownership

Expand Down Expand Up @@ -125,3 +126,13 @@ Failing to update it will cause the viewer to fail when loading the demo.
## Ways of working
- After finishing your work use showboat to demonstrate what you have done. include key code, tests, and what was changed.
- Use a progress doc to keep track of what you are doing. progress docs are just for you. when we are ready for PR, we unstage the progress docs

## Agent skills

### Issue tracker

GitHub Issues on `dial9-rs/dial9-tokio-telemetry`. See `docs/agents/issue-tracker.md`.

### Domain docs

Single-context layout. See `docs/agents/domain.md`.
74 changes: 73 additions & 1 deletion dial9-tokio-telemetry/src/telemetry/cpu_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
//! configurable frequency. The flush thread drains raw samples; the caller
//! (EventWriter) maps OS thread IDs to worker IDs via SharedState.thread_roles.

use crate::telemetry::events::{CpuSampleSource, ThreadName};
use crate::telemetry::buffer::record_encodable_event;
use crate::telemetry::events::{CpuSampleData, CpuSampleSource, ThreadName};
use crate::telemetry::format::WorkerId;
use crate::telemetry::recorder::source::{FlushContext, Source};
use dial9_perf_self_profile::{EventSource, PerfSampler, SamplerConfig, SamplingMode};
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -179,3 +182,72 @@ impl SchedProfiler {
});
}
}

// ── Source trait implementations ────────────────────────────────────────────

impl Source for CpuProfiler {
fn flush(&mut self, ctx: &FlushContext<'_>) {
let resolve = |tid: u32| -> WorkerId {
match ctx.thread_roles.get(&tid) {
Some(crate::telemetry::events::ThreadRole::Worker(id)) => WorkerId::from(*id),
Some(crate::telemetry::events::ThreadRole::Blocking) => WorkerId::BLOCKING,
None => WorkerId::UNKNOWN,
}
};

self.drain(|raw, thread_name| {
let worker_id = resolve(raw.tid);
let data = CpuSampleData {
timestamp_nanos: raw.timestamp_nanos,
worker_id,
tid: raw.tid,
source: raw.source,
callchain: raw.callchain,
thread_name: thread_name.cloned(),
cpu: raw.cpu,
};
record_encodable_event(&data, ctx.collector, ctx.drain_epoch);
});
}

fn name(&self) -> &'static str {
"cpu_profile"
}
}

impl Source for SchedProfiler {
fn flush(&mut self, ctx: &FlushContext<'_>) {
let resolve = |tid: u32| -> WorkerId {
match ctx.thread_roles.get(&tid) {
Some(crate::telemetry::events::ThreadRole::Worker(id)) => WorkerId::from(*id),
Some(crate::telemetry::events::ThreadRole::Blocking) => WorkerId::BLOCKING,
None => WorkerId::UNKNOWN,
}
};

self.drain(|raw| {
let data = CpuSampleData {
timestamp_nanos: raw.timestamp_nanos,
worker_id: resolve(raw.tid),
tid: raw.tid,
source: raw.source,
callchain: raw.callchain,
thread_name: None,
cpu: raw.cpu,
};
record_encodable_event(&data, ctx.collector, ctx.drain_epoch);
});
}

fn on_worker_thread_start(&mut self) -> io::Result<()> {
self.track_current_thread()
}

fn on_thread_stop(&mut self) {
self.stop_tracking_current_thread();
}

fn name(&self) -> &'static str {
"sched"
}
}
1 change: 0 additions & 1 deletion dial9-tokio-telemetry/src/telemetry/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use serde::Serialize;
use std::sync::Arc;

/// Role of a thread known to the telemetry system.
#[cfg(feature = "cpu-profiling")]
#[derive(Debug, Clone, Copy)]
pub(crate) enum ThreadRole {
/// A tokio worker thread with the given index.
Expand Down
73 changes: 13 additions & 60 deletions dial9-tokio-telemetry/src/telemetry/recorder/event_writer.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
#[cfg(feature = "cpu-profiling")]
use super::shared_state::SharedState;
use crate::telemetry::collector::Batch;
#[cfg(feature = "cpu-profiling")]
use crate::telemetry::events::{CpuSampleData, ThreadRole};
#[cfg(feature = "cpu-profiling")]
use crate::telemetry::format::WorkerId;
use crate::telemetry::writer::TraceWriter;

/// Intermediate layer between the recorder and the raw `TraceWriter`.
///
/// Owns the writer and the CPU profiler. Its API is roughly:
/// Owns the writer. Its API is roughly:
///
/// - `write_raw_event(event)` — encode and write a single event (test only)
/// - `flush_cpu(shared)` — drain CPU/sched profilers into the trace
/// - `flush_sources(shared)` — drain data sources into the trace
/// - `flush()` — flush the underlying writer
pub(crate) struct EventWriter {
pub(super) writer: Box<dyn TraceWriter>,
events_written: u64,
#[cfg(feature = "cpu-profiling")]
pub(super) cpu_profiler: Option<crate::telemetry::cpu_profile::CpuProfiler>,
}

impl EventWriter {
pub(crate) fn new(writer: Box<dyn TraceWriter>) -> Self {
Self {
writer,
events_written: 0,
#[cfg(feature = "cpu-profiling")]
cpu_profiler: None,
}
}

Expand Down Expand Up @@ -61,59 +52,21 @@ impl EventWriter {
Ok(())
}

/// Drain CPU/sched profilers and write their events into the trace.
#[cfg(feature = "cpu-profiling")]
pub(crate) fn flush_cpu(&mut self, shared: &SharedState) {
// Snapshot thread_roles once per flush cycle.
/// Drain data sources and write their events into the trace.
pub(crate) fn flush_sources(&mut self, shared: &SharedState) {
use super::source::FlushContext;

let roles = shared.thread_roles.lock().unwrap().clone();

let resolve = |tid: u32| -> WorkerId {
match roles.get(&tid) {
Some(ThreadRole::Worker(id)) => WorkerId::from(*id),
Some(ThreadRole::Blocking) => WorkerId::BLOCKING,
None => WorkerId::UNKNOWN,
}
let ctx = FlushContext {
collector: &shared.collector,
drain_epoch: &shared.drain_epoch,
thread_roles: &roles,
};

if let Some(mut profiler) = self.cpu_profiler.take() {
profiler.drain(|raw, thread_name| {
use crate::telemetry::buffer::record_encodable_event;

let worker_id = resolve(raw.tid);
let data = CpuSampleData {
timestamp_nanos: raw.timestamp_nanos,
worker_id,
tid: raw.tid,
source: raw.source,
callchain: raw.callchain,
thread_name: thread_name.cloned(),
cpu: raw.cpu,
};
record_encodable_event(&data, &shared.collector, &shared.drain_epoch);
});
self.cpu_profiler = Some(profiler);
}

{
let mut shared_profiler = shared.sched_profiler.lock().unwrap();
if let Some(ref mut profiler) = *shared_profiler {
profiler.drain(|raw| {
use crate::telemetry::buffer::record_encodable_event;

let data = CpuSampleData {
timestamp_nanos: raw.timestamp_nanos,
worker_id: resolve(raw.tid),
tid: raw.tid,
source: raw.source,
callchain: raw.callchain,
// TODO: we should be able to also track thread name here.
// sampler is running on worker threads so no thread name
thread_name: None,
cpu: raw.cpu,
};
record_encodable_event(&data, &shared.collector, &shared.drain_epoch);
});
}
let mut sources = shared.sources.lock().unwrap();
for source in sources.iter_mut() {
source.flush(&ctx);
}
}

Expand Down
20 changes: 9 additions & 11 deletions dial9-tokio-telemetry/src/telemetry/recorder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod event_writer;
mod runtime_context;
mod shared_state;
pub(crate) mod source;

pub(crate) use runtime_context::RuntimeContext;
pub use runtime_context::current_worker_id;
Expand Down Expand Up @@ -89,11 +90,8 @@ fn flush_once(
) -> FlushStats {
let events_before = event_writer.events_written();
let cpu_events_time = std::time::Instant::now();
#[cfg(feature = "cpu-profiling")]
{
if shared.is_enabled() {
event_writer.flush_cpu(shared);
}
if shared.is_enabled() {
event_writer.flush_sources(shared);
}
let cpu_flush_duration = cpu_events_time.elapsed();

Expand Down Expand Up @@ -273,10 +271,10 @@ fn register_hooks(
{
let tid = crate::telemetry::events::current_tid();
s_stop.thread_roles.lock().unwrap().remove(&tid);
if let Ok(mut prof) = s_stop.sched_profiler.lock()
&& let Some(ref mut p) = *prof
{
p.stop_tracking_current_thread();
if let Ok(mut sources) = s_stop.sources.lock() {
for source in sources.iter_mut() {
source.on_thread_stop();
}
}
dial9_perf_self_profile::unregister_current_thread();
}
Expand Down Expand Up @@ -1611,15 +1609,15 @@ impl TelemetryCore {
{
if let Some(ref config) = cpu_profiling {
match crate::telemetry::cpu_profile::CpuProfiler::start(config.clone()) {
Ok(sampler) => event_writer.cpu_profiler = Some(sampler),
Ok(sampler) => shared.push_source(Box::new(sampler)),
Err(e) => rate_limited!(Duration::from_secs(60), {
tracing::warn!("failed to start CPU profiler: {e}");
}),
}
}
if let Some(sched_cfg) = sched_events {
match crate::telemetry::cpu_profile::SchedProfiler::new(sched_cfg) {
Ok(sched) => *shared.sched_profiler.lock().unwrap() = Some(sched),
Ok(sched) => shared.push_source(Box::new(sched)),
Err(e) => rate_limited!(Duration::from_secs(60), {
tracing::warn!("failed to start scheduler event profiler: {e}");
}),
Expand Down
14 changes: 9 additions & 5 deletions dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,15 @@ fn register_tid_if_needed(global_id: u64, shared: &SharedState) {
// Start sched event sampling for this worker thread. Deferred from
// on_thread_start so that only worker threads (not blocking pool
// threads) open perf fds.
if let Ok(mut prof) = shared.sched_profiler.lock()
&& let Some(ref mut p) = *prof
&& let Err(e) = p.track_current_thread()
{
tracing::warn!("failed to start sched profiling for worker thread: {e}");
if let Ok(mut sources) = shared.sources.lock() {
for source in sources.iter_mut() {
if let Err(e) = source.on_worker_thread_start() {
tracing::warn!(
"failed to start source {} for worker thread: {e}",
source.name()
);
}
}
}
cell.set(true);
}
Expand Down
Loading
Loading