Skip to content

Commit 82de0df

Browse files
authored
fix: Don't register sched events on blocking pool threads (#316)
* fix: Don't register sched events on blocking pool threads * fix: use Drop-based guard for per-thread telemetry cleanup Replace manual on_thread_stop cleanup with a ThreadCleanupGuard stored in a thread-local. The guard's Drop impl handles all per-thread teardown: clearing CURRENT_HANDLE, removing thread roles, stopping sched profiling, and unregistering ctimer. This fixes a perf_event fd leak where the calling thread's CURRENT_HANDLE (set in attach_runtime) was never cleared, keeping Arc<SharedState> alive after drop(runtime) + drop(guard). The SchedProfiler inside SharedState still owned PerfEvents with open fds. The guard approach is more reliable because: - The calling thread's cleanup is now handled (was missing before) - Even if on_thread_stop doesn't fire, the guard runs on thread exit - Cleanup logic is centralized in one place * fix: serialize perf fd tests to prevent cargo test parallelism interference cargo test runs tests within the same binary in parallel (threads in the same process). Both sched_profiler_fd_leak tests inspect process-wide perf_event fd counts, so running concurrently causes one test to see the other's fds. Add a static Mutex to serialize them. * Revert "fix: use Drop-based guard for per-thread telemetry cleanup" This reverts commit 75706c8. * fix: add max_tracked_threads cap to PerfSamplerImpl Adds a configurable hard cap (default: 256) on the number of threads that can be tracked simultaneously in per-thread mode. If the cap is reached, track_current_thread returns an error instead of opening another fd. This prevents unbounded fd/mmap growth if cleanup fails, ensuring dial9 never breaks the calling application. * fix: add missing max_tracked_threads field in ctimer_sampler test * Cleanup comments
1 parent 8d61fb0 commit 82de0df

8 files changed

Lines changed: 189 additions & 9 deletions

File tree

dial9-tokio-telemetry/design/cpu-sampling-and-worker-id-design.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ The `SchedProfiler` captures context switches on each worker thread:
136136

137137
- Created during `TracedRuntimeBuilder::build()` if `with_sched_events()` was called.
138138
- Stored in `SharedState.sched_profiler` (behind `Mutex<Option<...>>`).
139-
- `on_thread_start` hook calls `profiler.track_current_thread()` → opens a perf fd for the calling thread.
139+
- `on_thread_start` registers the thread as `Blocking`; worker threads re-register as `Worker(i)` in `register_tid_if_needed()` on their first poll/park, which also calls `profiler.track_current_thread()` → opens a perf fd for the calling thread.
140140
- `on_thread_stop` hook calls `profiler.stop_tracking_current_thread()`.
141141
- Flush thread drains samples, maps tids, writes `CpuSample` events with `source: SchedEvent`.
142142

dial9-tokio-telemetry/src/telemetry/cpu_profile.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl CpuProfilingConfig {
5959
/// Configuration for per-worker sched event capture (context switches).
6060
///
6161
/// Uses `perf_event_open` with `SwContextSwitches` in per-thread mode,
62-
/// so each worker thread gets its own perf fd via `on_thread_start`.
62+
/// so each worker thread gets its own perf fd on first poll/park.
6363
#[derive(Debug, Clone, Default)]
6464
pub struct SchedEventConfig {
6565
sampling_interval: Option<u64>,

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,10 @@ fn register_hooks(
241241
.lock()
242242
.unwrap()
243243
.insert(tid, crate::telemetry::events::ThreadRole::Blocking);
244-
if let Ok(mut prof) = s_start.sched_profiler.lock()
245-
&& let Some(ref mut p) = *prof
246-
{
247-
// Register the current thread for sched event sampling.
248-
let _ = p.track_current_thread();
249-
}
244+
// Sched event sampling is deferred to register_tid_if_needed(),
245+
// which runs only for worker threads on their first poll/park.
246+
// This avoids opening perf fds for blocking pool threads.
247+
250248
// Registers the current thread for the CPU-profiling fallback (ctimer).
251249
// No-op when perf is the active backend (perf uses inherit).
252250
let _ = dial9_perf_self_profile::register_current_thread();

dial9-tokio-telemetry/src/telemetry/recorder/runtime_context.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ fn register_worker_if_needed(ctx: &RuntimeContext, local_index: usize, global_id
110110
}
111111

112112
/// Register the current thread's OS tid for CPU profiling (once per thread).
113+
/// Also starts sched event sampling for this worker thread.
113114
#[cfg(feature = "cpu-profiling")]
114115
fn register_tid_if_needed(global_id: u64, shared: &SharedState) {
115116
TID_REGISTERED.with(|cell| {
@@ -119,6 +120,15 @@ fn register_tid_if_needed(global_id: u64, shared: &SharedState) {
119120
os_tid,
120121
crate::telemetry::events::ThreadRole::Worker(global_id as usize),
121122
);
123+
// Start sched event sampling for this worker thread. Deferred from
124+
// on_thread_start so that only worker threads (not blocking pool
125+
// threads) open perf fds.
126+
if let Ok(mut prof) = shared.sched_profiler.lock()
127+
&& let Some(ref mut p) = *prof
128+
&& let Err(e) = p.track_current_thread()
129+
{
130+
tracing::warn!("failed to start sched profiling for worker thread: {e}");
131+
}
122132
cell.set(true);
123133
}
124134
});
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//! Integration test: sched profiler should only track worker threads, not blocking pool threads.
2+
//!
3+
//! Before the fix, every thread that started (including blocking pool threads) opened a
4+
//! perf_event_open fd + 2 MB mmap ring buffer for sched event sampling. With Tokio's default
5+
//! blocking pool limit of 512 threads, this could exhaust file descriptors.
6+
//!
7+
//! Additionally, `stop_tracking_current_thread` never cleaned up fds because `open_perf_event`
8+
//! stored `tid: 0` (the "current thread" sentinel) while the cleanup searched for `gettid()`.
9+
10+
#![cfg(all(feature = "cpu-profiling", target_os = "linux"))]
11+
12+
mod common;
13+
14+
use dial9_tokio_telemetry::telemetry::TracedRuntime;
15+
use dial9_tokio_telemetry::telemetry::cpu_profile::SchedEventConfig;
16+
use std::sync::Mutex;
17+
use std::time::Duration;
18+
19+
/// Serialize tests that inspect process-wide perf_event fd counts.
20+
/// `cargo test` runs tests in the same binary in parallel (threads),
21+
/// so concurrent tests would see each other's fds.
22+
static PERF_FD_TEST_LOCK: Mutex<()> = Mutex::new(());
23+
24+
/// Count open perf_event fds specifically.
25+
fn count_perf_fds() -> usize {
26+
std::fs::read_dir("/proc/self/fd")
27+
.expect("failed to read /proc/self/fd")
28+
.filter_map(|e| e.ok())
29+
.filter(|e| {
30+
std::fs::read_link(e.path())
31+
.map(|p| p.to_string_lossy().contains("perf_event"))
32+
.unwrap_or(false)
33+
})
34+
.count()
35+
}
36+
37+
/// Spawning many blocking threads should NOT cause fd count to grow proportionally.
38+
///
39+
/// With the bug, each `spawn_blocking` call opens a perf fd that is never reclaimed.
40+
/// After the fix, only worker threads (a fixed, small number) get perf fds.
41+
#[test]
42+
fn sched_profiler_fds_bounded_with_many_blocking_threads() {
43+
let _lock = PERF_FD_TEST_LOCK.lock().unwrap();
44+
let (writer, _events) = common::CapturingWriter::new();
45+
46+
let num_workers = 2;
47+
let num_blocking_tasks = 50;
48+
49+
let mut builder = tokio::runtime::Builder::new_multi_thread();
50+
builder.worker_threads(num_workers).enable_all();
51+
52+
let (runtime, guard) = TracedRuntime::builder()
53+
.with_sched_events(SchedEventConfig::default())
54+
.build_and_start(builder, writer)
55+
.unwrap();
56+
57+
// Let workers start and resolve their identity.
58+
runtime.block_on(async {
59+
tokio::time::sleep(Duration::from_millis(100)).await;
60+
});
61+
62+
let perf_fds_before = count_perf_fds();
63+
64+
// Spawn many blocking tasks. Each one creates a new blocking pool thread.
65+
// Use std::thread::sleep to ensure they actually block and force new threads.
66+
runtime.block_on(async {
67+
let mut handles = Vec::new();
68+
for _ in 0..num_blocking_tasks {
69+
handles.push(tokio::task::spawn_blocking(|| {
70+
std::thread::sleep(Duration::from_millis(50));
71+
}));
72+
}
73+
for h in handles {
74+
h.await.unwrap();
75+
}
76+
// Wait for threads to exit and on_thread_stop to fire.
77+
tokio::time::sleep(Duration::from_millis(500)).await;
78+
});
79+
80+
let perf_fds_after = count_perf_fds();
81+
82+
drop(runtime);
83+
drop(guard);
84+
85+
// Only worker threads should have perf fds. Before the fix, we'd see
86+
// ~50 new perf fds (one per blocking thread). After the fix, the count
87+
// should stay at exactly num_workers.
88+
assert_eq!(
89+
perf_fds_before, perf_fds_after,
90+
"perf fd count changed from {perf_fds_before} to {perf_fds_after} after \
91+
spawning {num_blocking_tasks} blocking tasks. \
92+
Sched profiler is likely opening fds for blocking pool threads."
93+
);
94+
}
95+
96+
/// Verify that sched profiler fds are properly cleaned up when the runtime shuts down.
97+
///
98+
/// This catches the tid=0 bug where `stop_tracking_current_thread` can never find
99+
/// the event to remove because `open_perf_event` stored tid=0 instead of the real tid.
100+
#[test]
101+
fn sched_profiler_fds_cleaned_up_on_shutdown() {
102+
let _lock = PERF_FD_TEST_LOCK.lock().unwrap();
103+
assert_eq!(count_perf_fds(), 0, "no perf fds should exist before test");
104+
105+
{
106+
let (writer, _events) = common::CapturingWriter::new();
107+
108+
let num_workers = 4;
109+
let mut builder = tokio::runtime::Builder::new_multi_thread();
110+
builder.worker_threads(num_workers).enable_all();
111+
112+
let (runtime, guard) = TracedRuntime::builder()
113+
.with_sched_events(SchedEventConfig::default())
114+
.build_and_start(builder, writer)
115+
.unwrap();
116+
117+
// Do some work so workers resolve their identity.
118+
runtime.block_on(async {
119+
for _ in 0..10 {
120+
tokio::spawn(async { tokio::task::yield_now().await })
121+
.await
122+
.unwrap();
123+
}
124+
tokio::time::sleep(Duration::from_millis(200)).await;
125+
});
126+
127+
// Workers should have perf fds while running.
128+
let perf_fds_during = count_perf_fds();
129+
assert!(
130+
perf_fds_during > 0,
131+
"expected perf fds while runtime is running, got 0"
132+
);
133+
134+
drop(runtime);
135+
drop(guard);
136+
}
137+
138+
let perf_fds_after = count_perf_fds();
139+
assert_eq!(
140+
perf_fds_after, 0,
141+
"leaked {perf_fds_after} perf_event fds after runtime shutdown. \
142+
stop_tracking_current_thread likely failed to find events due to tid=0 bug."
143+
);
144+
}

perf-self-profile/src/sampler.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ pub struct SamplerConfig {
4141
pub(crate) event_source: EventSource,
4242
pub(crate) sampling: SamplingMode,
4343
pub(crate) include_kernel: bool,
44+
/// Maximum number of threads that can be tracked simultaneously in
45+
/// per-thread mode. Prevents unbounded fd/mmap growth if cleanup fails.
46+
/// Default: 256.
47+
pub(crate) max_tracked_threads: usize,
4448
}
4549

4650
impl Default for SamplerConfig {
@@ -49,6 +53,7 @@ impl Default for SamplerConfig {
4953
sampling: SamplingMode::FrequencyHz(999),
5054
event_source: EventSource::SwCpuClock,
5155
include_kernel: false,
56+
max_tracked_threads: 256,
5257
}
5358
}
5459
}
@@ -72,6 +77,14 @@ impl SamplerConfig {
7277
self.include_kernel = yes;
7378
self
7479
}
80+
81+
/// Maximum number of threads tracked simultaneously in per-thread mode.
82+
/// If the cap is reached, `track_current_thread` returns an error instead
83+
/// of opening another fd. Default: 256.
84+
pub fn max_tracked_threads(mut self, max: usize) -> Self {
85+
self.max_tracked_threads = max;
86+
self
87+
}
7588
}
7689

7790
/// A single sample captured from perf events.

perf-self-profile/src/sys/linux/ctimer_sampler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ mod tests {
216216
sampling: SamplingMode::Period(1),
217217
event_source: EventSource::SwCpuClock,
218218
include_kernel: false,
219+
max_tracked_threads: 256,
219220
};
220221
let err = CtimerSampler::start(&config).unwrap_err();
221222
assert_eq!(err.kind(), io::ErrorKind::Unsupported);

perf-self-profile/src/sys/linux/perf_sampler.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub(super) struct PerfSamplerImpl {
9292
parse_config: ParseConfig<Little>,
9393
attr: perf_event_attr,
9494
include_kernel: bool,
95+
max_tracked_threads: usize,
9596
}
9697

9798
impl PerfSamplerImpl {
@@ -122,6 +123,7 @@ impl PerfSamplerImpl {
122123
parse_config: ParseConfig::from(attr),
123124
attr,
124125
include_kernel: config.include_kernel,
126+
max_tracked_threads: config.max_tracked_threads,
125127
})
126128
}
127129

@@ -150,6 +152,7 @@ impl PerfSamplerImpl {
150152
parse_config: ParseConfig::from(attr),
151153
attr,
152154
include_kernel: config.include_kernel,
155+
max_tracked_threads: config.max_tracked_threads,
153156
})
154157
}
155158

@@ -279,7 +282,18 @@ impl super::sampler::SamplerBackend for PerfSamplerImpl {
279282
// Per-thread mode: call from the thread you want to monitor.
280283
// This opens an event fd scoped to the calling tid with cpu=-1.
281284
fn track_current_thread(&mut self) -> io::Result<()> {
282-
let ev = open_perf_event(&mut self.attr, 0, -1)?;
285+
if self.events.len() >= self.max_tracked_threads {
286+
return Err(io::Error::other(format!(
287+
"perf sampler: max tracked threads ({}) reached, \
288+
refusing to open another fd",
289+
self.max_tracked_threads
290+
)));
291+
}
292+
let mut ev = open_perf_event(&mut self.attr, 0, -1)?;
293+
// open_perf_event stores tid=0 (the "current thread" sentinel passed to
294+
// perf_event_open). Resolve to the real tid so stop_tracking_current_thread
295+
// can find this event later.
296+
ev.tid = gettid() as i32;
283297
self.events.push(ev);
284298
Ok(())
285299
}

0 commit comments

Comments
 (0)