Skip to content

Commit a269d09

Browse files
authored
Rate-limit all tracing/logging to prevent log spam (#209)
Wraps all warn/error/info tracing calls that could fire repeatedly in the `rate_limited!` macro (at most once per 60s per call site). One-time lifecycle events (worker started/stopped, drain complete, resolved bucket region, graceful_shutdown) are left un-rate-limited. Also fixes the `rate_limited!` macro to use `std::time::Duration::MAX` (fully qualified) so it works at call sites without `Duration` in scope. Closes #139
1 parent 6f4438b commit a269d09

6 files changed

Lines changed: 102 additions & 50 deletions

File tree

dial9-tokio-telemetry/src/background_task/mod.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod s3;
77
pub(crate) mod sealed;
88

99
use crate::metrics::{Operation, SegmentProcessMetrics, SegmentProcessMetricsGuard};
10+
use crate::rate_limit::rate_limited;
1011
use metrique::timers::Timer;
1112
use metrique_writer::BoxEntrySink;
1213
use pipeline_metrics::{MetriqueResult, PipelineMetrics, StageMetrics};
@@ -80,11 +81,13 @@ impl BackgroundTaskConfig {
8081
match stem {
8182
Some(s) if !s.is_empty() => s,
8283
_ => {
83-
tracing::error!(
84-
target: "dial9_worker",
85-
path = %self.trace_path.display(),
86-
"trace_path has no file stem — pass a path like /tmp/traces/trace.bin, not a directory"
87-
);
84+
rate_limited!(Duration::from_secs(60), {
85+
tracing::error!(
86+
target: "dial9_worker",
87+
path = %self.trace_path.display(),
88+
"trace_path has no file stem — pass a path like /tmp/traces/trace.bin, not a directory"
89+
);
90+
});
8891
"trace"
8992
}
9093
}
@@ -378,7 +381,9 @@ impl SegmentProcessor for SymbolizeProcessor {
378381
Ok(data)
379382
}
380383
Ok(Err(e)) => {
381-
tracing::warn!(target: "dial9_worker", error = %e, "symbolization failed, preserving original bytes");
384+
rate_limited!(Duration::from_secs(60), {
385+
tracing::warn!(target: "dial9_worker", error = %e, "symbolization failed, preserving original bytes");
386+
});
382387
Err(ProcessError {
383388
data,
384389
kind: ProcessErrorKind::Io(e),
@@ -434,10 +439,12 @@ impl SegmentProcessor for WriteBackProcessor {
434439
let _ = std::fs::remove_file(&dest_path);
435440
}
436441
Err(e) => {
437-
tracing::warn!(
438-
"failed to remove original segment {}: {e}",
439-
original_path.display()
440-
);
442+
rate_limited!(Duration::from_secs(60), {
443+
tracing::warn!(
444+
"failed to remove original segment {}: {e}",
445+
original_path.display()
446+
);
447+
});
441448
}
442449
}
443450
}
@@ -512,7 +519,9 @@ impl WorkerLoop {
512519
let segments = match sealed::find_sealed_segments(&self.dir, &self.stem) {
513520
Ok(s) => s,
514521
Err(e) => {
515-
tracing::warn!(target: "dial9_worker", "failed to scan for sealed segments: {e}");
522+
rate_limited!(Duration::from_secs(60), {
523+
tracing::warn!(target: "dial9_worker", "failed to scan for sealed segments: {e}");
524+
});
516525
return false;
517526
}
518527
};
@@ -540,7 +549,9 @@ impl WorkerLoop {
540549
continue;
541550
}
542551
Err(e) => {
543-
tracing::warn!(target: "dial9_worker", error = %e, "failed to read segment");
552+
rate_limited!(Duration::from_secs(60), {
553+
tracing::warn!(target: "dial9_worker", error = %e, "failed to read segment");
554+
});
544555
continue;
545556
}
546557
};
@@ -592,9 +603,13 @@ impl WorkerLoop {
592603
tracing::debug!(target: "dial9_worker", path = %segment.path.display(), "segment evicted during processing, skipping");
593604
} else {
594605
if let Err(remove_err) = std::fs::remove_file(&segment.path) {
595-
tracing::warn!(target: "dial9_worker", error = %remove_err, path = %segment.path.display(), "failed to remove corrupted segment");
606+
rate_limited!(Duration::from_secs(60), {
607+
tracing::warn!(target: "dial9_worker", error = %remove_err, path = %segment.path.display(), "failed to remove corrupted segment");
608+
});
596609
}
597-
tracing::warn!(target: "dial9_worker", error = %e.kind, cause = ?e.kind, path = %segment.path.display(), "processor failed, removing segment");
610+
rate_limited!(Duration::from_secs(60), {
611+
tracing::warn!(target: "dial9_worker", error = %e.kind, cause = ?e.kind, path = %segment.path.display(), "processor failed, removing segment");
612+
});
598613
}
599614
continue 'next_segment;
600615
}
@@ -684,7 +699,9 @@ impl SegmentProcessor for S3PipelineUploader {
684699
{
685700
Ok(key) => {
686701
self.circuit_breaker.on_success();
687-
tracing::info!(target: "dial9_worker", "uploaded {key}");
702+
rate_limited!(Duration::from_secs(10), {
703+
tracing::info!(target: "dial9_worker", "uploaded {key}");
704+
});
688705
Ok(data)
689706
}
690707
Err(kind) => {
@@ -693,7 +710,9 @@ impl SegmentProcessor for S3PipelineUploader {
693710
tracing::debug!(target: "dial9_worker", path = %data.segment.path.display(), "segment already evicted, skipping");
694711
} else {
695712
self.circuit_breaker.on_failure();
696-
tracing::warn!(target: "dial9_worker", error = %kind, "upload failed");
713+
rate_limited!(Duration::from_secs(60), {
714+
tracing::warn!(target: "dial9_worker", error = %kind, "upload failed");
715+
});
697716
}
698717
Err(ProcessError { data, kind })
699718
}

dial9-tokio-telemetry/src/background_task/sealed.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ pub(crate) fn creation_epoch_secs(data: &[u8], path: &Path) -> (u64, bool) {
1919
match parse_segment_timestamp(data) {
2020
Ok(ts) => return (ts / 1_000_000_000, true),
2121
Err(e) => {
22-
tracing::warn!(
23-
path = %path.display(),
24-
error = %e,
25-
"failed to parse segment timestamp, falling back to mtime"
26-
);
22+
crate::rate_limit::rate_limited!(std::time::Duration::from_secs(60), {
23+
tracing::warn!(
24+
path = %path.display(),
25+
error = %e,
26+
"failed to parse segment timestamp, falling back to mtime"
27+
);
28+
});
2729
}
2830
}
2931
let secs = std::fs::metadata(path)

dial9-tokio-telemetry/src/rate_limit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ macro_rules! rate_limited {
2222
if next <= time.as_secs() {
2323
let new_next = time
2424
.checked_add(interval)
25-
.unwrap_or(Duration::MAX)
25+
.unwrap_or(std::time::Duration::MAX)
2626
.as_secs();
2727
if NEXT_CALL
2828
.compare_exchange(next, new_next, Ordering::Relaxed, Ordering::Relaxed)

dial9-tokio-telemetry/src/telemetry/buffer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,12 @@ impl Drop for ThreadLocalBuffer {
241241
if let Some(collector) = self.collector.take() {
242242
collector.accept_flush(self.flush());
243243
} else {
244-
tracing::warn!(
245-
"dial9-tokio-telemetry: dropping {} unflushed events (no collector registered on this thread)",
246-
self.event_count
247-
);
244+
crate::rate_limit::rate_limited!(Duration::from_secs(60), {
245+
tracing::warn!(
246+
"dial9-tokio-telemetry: dropping {} unflushed events (no collector registered on this thread)",
247+
self.event_count
248+
);
249+
});
248250
}
249251
}
250252
}

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use event_writer::EventWriter;
99
use runtime_context::{make_poll_end, make_poll_start, make_worker_park, make_worker_unpark};
1010

1111
use crate::metrics::{FlushMetrics, Operation, TlDrainMetrics};
12+
use crate::rate_limit::rate_limited;
1213
use crate::telemetry::buffer;
1314
use crate::telemetry::events::RawEvent;
1415
use crate::telemetry::task_metadata::TaskId;
@@ -66,17 +67,21 @@ fn flush_once(
6667

6768
let dropped = shared.collector.take_dropped_batches();
6869
if dropped > 0 {
69-
tracing::warn!(
70-
dropped_batches = dropped,
71-
"telemetry flush fell behind, dropped batches"
72-
);
70+
rate_limited!(Duration::from_secs(60), {
71+
tracing::warn!(
72+
dropped_batches = dropped,
73+
"telemetry flush fell behind, dropped batches"
74+
);
75+
});
7376
}
7477

7578
while let Some(batch) = shared.collector.next() {
7679
if batch.event_count > 0
7780
&& let Err(e) = event_writer.write_encoded_batch(&batch)
7881
{
79-
tracing::warn!("failed to transcode batch: {e}");
82+
rate_limited!(Duration::from_secs(60), {
83+
tracing::warn!("failed to transcode batch: {e}");
84+
});
8085
shared.enabled.store(false, Ordering::Relaxed);
8186
return FlushStats {
8287
event_count: event_writer.events_written() - events_before,
@@ -86,7 +91,9 @@ fn flush_once(
8691
}
8792
}
8893
if let Err(e) = event_writer.flush() {
89-
tracing::warn!("failed to flush trace data: {e}");
94+
rate_limited!(Duration::from_secs(60), {
95+
tracing::warn!("failed to flush trace data: {e}");
96+
});
9097
}
9198
FlushStats {
9299
event_count: event_writer.events_written() - events_before,
@@ -222,9 +229,11 @@ fn attach_runtime(
222229
ctx.metrics_and_base
223230
.set((metrics, base))
224231
.unwrap_or_else(|_| {
225-
tracing::warn!(
226-
"metrics_and_base already set for runtime context; ignoring duplicate attach"
227-
);
232+
rate_limited!(Duration::from_secs(60), {
233+
tracing::warn!(
234+
"metrics_and_base already set for runtime context; ignoring duplicate attach"
235+
);
236+
});
228237
});
229238

230239
shared.contexts.lock().unwrap().push(ctx);
@@ -855,13 +864,17 @@ impl TelemetryCore {
855864
if let Some(ref config) = cpu_profiling {
856865
match crate::telemetry::cpu_profile::CpuProfiler::start(config.clone()) {
857866
Ok(sampler) => event_writer.cpu_profiler = Some(sampler),
858-
Err(e) => tracing::warn!("failed to start CPU profiler: {e}"),
867+
Err(e) => rate_limited!(Duration::from_secs(60), {
868+
tracing::warn!("failed to start CPU profiler: {e}");
869+
}),
859870
}
860871
}
861872
if let Some(sched_cfg) = sched_events {
862873
match crate::telemetry::cpu_profile::SchedProfiler::new(sched_cfg) {
863874
Ok(sched) => *shared.sched_profiler.lock().unwrap() = Some(sched),
864-
Err(e) => tracing::warn!("failed to start scheduler event profiler: {e}"),
875+
Err(e) => rate_limited!(Duration::from_secs(60), {
876+
tracing::warn!("failed to start scheduler event profiler: {e}");
877+
}),
865878
}
866879
}
867880
}
@@ -1059,13 +1072,17 @@ fn run_flush_loop(
10591072
// Write final metadata before sealing so single-segment
10601073
// traces contain runtime→worker mappings.
10611074
if let Err(e) = event_writer.write_current_segment_metadata() {
1062-
tracing::warn!("failed to write final segment metadata: {e}");
1075+
rate_limited!(Duration::from_secs(60), {
1076+
tracing::warn!("failed to write final segment metadata: {e}");
1077+
});
10631078
if let Some(g) = flush_guard.as_mut() {
10641079
g.write_metadata_failed = true;
10651080
}
10661081
}
10671082
if let Err(e) = event_writer.finalize() {
1068-
tracing::warn!("failed to finalize trace segment: {e}");
1083+
rate_limited!(Duration::from_secs(60), {
1084+
tracing::warn!("failed to finalize trace segment: {e}");
1085+
});
10691086
if let Some(g) = flush_guard.as_mut() {
10701087
g.finalize_failed = true;
10711088
}

dial9-tokio-telemetry/src/telemetry/writer.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use dial9_trace_format::encoder::{Encoder, RawEncoder};
22

3+
use crate::rate_limit::rate_limited;
34
use crate::telemetry::collector::Batch;
45
use crate::telemetry::format::SegmentMetadataEvent;
56
use std::collections::VecDeque;
@@ -414,16 +415,23 @@ impl RotatingWriter {
414415
&& name_str != file_name
415416
&& let Err(e2) = fs::remove_file(entry.path())
416417
{
417-
tracing::warn!(
418-
"failed to evict old trace segment {}: {e2}",
419-
entry.path().display()
420-
);
418+
rate_limited!(Duration::from_secs(60), {
419+
tracing::warn!(
420+
"failed to evict old trace segment {}: {e2}",
421+
entry.path().display()
422+
);
423+
});
421424
}
422425
}
423426
}
424427
}
425428
Err(e) => {
426-
tracing::warn!("failed to evict old trace segment {}: {e}", path.display());
429+
rate_limited!(Duration::from_secs(60), {
430+
tracing::warn!(
431+
"failed to evict old trace segment {}: {e}",
432+
path.display()
433+
);
434+
});
427435
}
428436
}
429437
}
@@ -478,7 +486,9 @@ impl TraceWriter for RotatingWriter {
478486

479487
fn finalize(&mut self) -> std::io::Result<()> {
480488
if matches!(self.state, WriterState::Finished) {
481-
tracing::warn!("writer is already closed.")
489+
rate_limited!(Duration::from_secs(60), {
490+
tracing::warn!("writer is already closed.");
491+
});
482492
}
483493
self.flush()?;
484494
// Rename .active → .bin for the current segment (if it has .active suffix)
@@ -536,11 +546,13 @@ impl TraceWriter for RotatingWriter {
536546
impl Drop for RotatingWriter {
537547
fn drop(&mut self) {
538548
if self.dropped_events > 0 {
539-
tracing::info!(
540-
target: "dial9_telemetry",
541-
dropped_events = self.dropped_events,
542-
"RotatingWriter dropped events after finalization"
543-
);
549+
rate_limited!(Duration::from_secs(60), {
550+
tracing::info!(
551+
target: "dial9_telemetry",
552+
dropped_events = self.dropped_events,
553+
"RotatingWriter dropped events after finalization"
554+
);
555+
});
544556
}
545557
}
546558
}

0 commit comments

Comments
 (0)