Skip to content

Commit 350145b

Browse files
authored
fix: rate_limit log when drain is failing (#385)
1 parent 4b878dd commit 350145b

3 files changed

Lines changed: 181 additions & 1 deletion

File tree

AGENTS.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ Avoid dropping an error without logging it. Use `tracing` for logging.
2828
let _ = ...
2929
```
3030

31+
**ALWAYS rate-limit logging that can fire from a loop or repeated error path.** Any `warn!`/`error!` reachable from a background task loop, retry loop, or per-request hot path MUST be wrapped in `rate_limited!`:
32+
```rust
33+
rate_limited!(Duration::from_secs(60), {
34+
tracing::warn!("...: {e}");
35+
});
36+
```
37+
Unguarded logging in loops causes log spam that degrades observability and can itself become a performance problem. One-time paths (startup, shutdown, per-thread init) are exempt.
38+
3139
## Running tests
3240

3341
- Always run `cargo nextest run` to run tests

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1704,7 +1704,9 @@ fn run_flush_loop(
17041704
&& !exit
17051705
&& let Err(e) = event_writer.drained()
17061706
{
1707-
tracing::warn!("failed to complete post-drain action: {e}");
1707+
rate_limited!(Duration::from_secs(60), {
1708+
tracing::warn!("failed to complete post-drain action: {e}");
1709+
});
17081710
}
17091711

17101712
// Create the metrics guard up front; mutate on the exit path,

dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ mod shuttle_tests {
472472
use dial9_trace_format::TraceEvent;
473473
use shuttle::rand::Rng;
474474
use std::collections::HashMap;
475+
use std::io;
475476

476477
// ── Event definition ────────────────────────────────────────────────
477478

@@ -673,4 +674,173 @@ mod shuttle_tests {
673674
fn pct_real_pipeline() {
674675
shuttle::check_pct(test_telemetry_core_pipeline, 10000, 3);
675676
}
677+
678+
// ── Always-erroring writer ──────────────────────────────────────────
679+
//
680+
// The companion to `InvariantCheckingWriter`: every fallible method
681+
// returns an `io::Error`. This exercises the error paths in the flush
682+
// loop and lets us assert that rate limiting bounds log output even
683+
// when every operation fails.
684+
685+
use std::sync::Arc as StdArc;
686+
use std::sync::atomic::{AtomicU64 as StdAtomicU64, Ordering as StdOrdering};
687+
688+
struct AlwaysErroringWriter;
689+
690+
impl AlwaysErroringWriter {
691+
fn err() -> io::Error {
692+
io::Error::from(io::ErrorKind::PermissionDenied)
693+
}
694+
}
695+
696+
impl TraceWriter for AlwaysErroringWriter {
697+
fn write_encoded_batch(&mut self, _batch: &Batch) -> io::Result<()> {
698+
Err(Self::err())
699+
}
700+
701+
fn flush(&mut self) -> io::Result<()> {
702+
Err(Self::err())
703+
}
704+
705+
fn finalize(&mut self) -> io::Result<()> {
706+
Err(Self::err())
707+
}
708+
709+
fn write_current_segment_metadata(&mut self) -> io::Result<()> {
710+
Err(Self::err())
711+
}
712+
713+
fn should_drain(&self) -> bool {
714+
// Always want to drain so the post-drain error path is exercised
715+
// every flush tick.
716+
true
717+
}
718+
719+
fn drained(&mut self) -> io::Result<bool> {
720+
Err(Self::err())
721+
}
722+
}
723+
724+
// ── Counting subscriber ─────────────────────────────────────────────
725+
//
726+
// A minimal `tracing::Subscriber` that increments a shared counter
727+
// on every WARN or ERROR event. We use `tracing::subscriber::with_default`
728+
// to scope it to a single test invocation. We deliberately avoid
729+
// depending on `tracing-subscriber` since that crate is gated behind
730+
// the `tracing-layer` feature and isn't enabled under `_shuttle`.
731+
732+
struct CountingSubscriber {
733+
warn_or_error_count: StdArc<StdAtomicU64>,
734+
}
735+
736+
impl tracing::Subscriber for CountingSubscriber {
737+
fn enabled(&self, metadata: &tracing::Metadata<'_>) -> bool {
738+
matches!(
739+
*metadata.level(),
740+
tracing::Level::WARN | tracing::Level::ERROR
741+
)
742+
}
743+
744+
fn new_span(&self, _span: &tracing::span::Attributes<'_>) -> tracing::span::Id {
745+
// We never actually use spans; return a fixed non-zero id.
746+
tracing::span::Id::from_u64(1)
747+
}
748+
749+
fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {}
750+
751+
fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
752+
753+
fn event(&self, event: &tracing::Event<'_>) {
754+
let level = *event.metadata().level();
755+
if level == tracing::Level::WARN || level == tracing::Level::ERROR {
756+
self.warn_or_error_count.fetch_add(1, StdOrdering::Relaxed);
757+
}
758+
}
759+
760+
fn enter(&self, _span: &tracing::span::Id) {}
761+
762+
fn exit(&self, _span: &tracing::span::Id) {}
763+
}
764+
765+
// ── Erroring-pipeline test body ─────────────────────────────────────
766+
767+
fn test_telemetry_core_erroring_pipeline() {
768+
let _ts_guard =
769+
metrique_timesource::set_time_source(metrique_timesource::TimeSource::custom(
770+
metrique_timesource::fakes::StaticTimeSource::at_time(std::time::UNIX_EPOCH),
771+
));
772+
773+
let warn_count = StdArc::new(StdAtomicU64::new(0));
774+
let subscriber = CountingSubscriber {
775+
warn_or_error_count: warn_count.clone(),
776+
};
777+
778+
tracing::subscriber::with_default(subscriber, || {
779+
let num_threads = 3;
780+
let next_id = Arc::new(AtomicU64::new(0));
781+
782+
let guard = TelemetryCore::builder()
783+
.writer(AlwaysErroringWriter)
784+
.build()
785+
.unwrap();
786+
guard.enable();
787+
let handle = guard.handle();
788+
789+
let writers: Vec<_> = (0..num_threads)
790+
.map(|thread_id| {
791+
let h = handle.clone();
792+
let next_id = next_id.clone();
793+
let thread_id = thread_id as u64;
794+
crate::primitives::thread::spawn(move || {
795+
let mut rng = shuttle::rand::thread_rng();
796+
let count = rng.gen_range(3u64..=10);
797+
let mut ts = rng.gen_range(1000u64..2000);
798+
for seq in 0..count {
799+
let id = next_id.fetch_add(1, Ordering::Relaxed);
800+
let timestamp_ns = next_timestamp(&mut ts);
801+
let ev = ValidationEvent {
802+
timestamp_ns,
803+
thread_id,
804+
seq,
805+
id,
806+
};
807+
// Errors are expected; whether the event is
808+
// recorded or dropped is not asserted here.
809+
h.record_encodable_event(&ev);
810+
}
811+
})
812+
})
813+
.collect();
814+
815+
for w in writers {
816+
w.join().unwrap();
817+
}
818+
// Dropping the guard triggers the shutdown/finalize path,
819+
// which should also be rate-limited if it logs on error.
820+
drop(guard);
821+
});
822+
823+
// Bound the warn+error count by the number of distinct rate-limited
824+
// call sites. There are roughly 6 sites that might fire on every
825+
// shuttle run; allow some slack but cap firmly below "one per loop
826+
// iteration". If a `rate_limited!` wrapper is removed from the flush
827+
// loop, this number explodes.
828+
let total = warn_count.load(StdOrdering::Relaxed);
829+
assert!(
830+
total <= 10,
831+
"rate limiting failed under persistent writer errors: \
832+
observed {total} WARN/ERROR events, expected <= 10. \
833+
A `rate_limited!` wrapper has likely been removed from a tight loop."
834+
);
835+
}
836+
837+
#[test]
838+
fn determinism_check_erroring() {
839+
shuttle::check_uncontrolled_nondeterminism(test_telemetry_core_erroring_pipeline, 10000);
840+
}
841+
842+
#[test]
843+
fn pct_erroring_pipeline() {
844+
shuttle::check_pct(test_telemetry_core_erroring_pipeline, 10000, 3);
845+
}
676846
}

0 commit comments

Comments
 (0)