diff --git a/AGENTS.md b/AGENTS.md index 088cfce5..77284502 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -28,6 +28,14 @@ Avoid dropping an error without logging it. Use `tracing` for logging. let _ = ... ``` +**ALWAYS rate-limit logging that can fire from a loop or repeated error path.** Any `warn!`/`error!` reachable from a background task loop, retry loop, or per-request hot path MUST be wrapped in `rate_limited!`: +```rust +rate_limited!(Duration::from_secs(60), { + tracing::warn!("...: {e}"); +}); +``` +Unguarded logging in loops causes log spam that degrades observability and can itself become a performance problem. One-time paths (startup, shutdown, per-thread init) are exempt. + ## Running tests - Always run `cargo nextest run` to run tests diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs index 4da7a824..2ec08683 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/mod.rs @@ -1704,7 +1704,9 @@ fn run_flush_loop( && !exit && let Err(e) = event_writer.drained() { - tracing::warn!("failed to complete post-drain action: {e}"); + rate_limited!(Duration::from_secs(60), { + tracing::warn!("failed to complete post-drain action: {e}"); + }); } // Create the metrics guard up front; mutate on the exit path, diff --git a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs index ff1cb56e..fc0ced59 100644 --- a/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs +++ b/dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs @@ -472,6 +472,7 @@ mod shuttle_tests { use dial9_trace_format::TraceEvent; use shuttle::rand::Rng; use std::collections::HashMap; + use std::io; // ── Event definition ──────────────────────────────────────────────── @@ -673,4 +674,173 @@ mod shuttle_tests { fn pct_real_pipeline() { shuttle::check_pct(test_telemetry_core_pipeline, 10000, 3); } + + // ── Always-erroring writer ────────────────────────────────────────── + // + // The companion to `InvariantCheckingWriter`: every fallible method + // returns an `io::Error`. This exercises the error paths in the flush + // loop and lets us assert that rate limiting bounds log output even + // when every operation fails. + + use std::sync::Arc as StdArc; + use std::sync::atomic::{AtomicU64 as StdAtomicU64, Ordering as StdOrdering}; + + struct AlwaysErroringWriter; + + impl AlwaysErroringWriter { + fn err() -> io::Error { + io::Error::from(io::ErrorKind::PermissionDenied) + } + } + + impl TraceWriter for AlwaysErroringWriter { + fn write_encoded_batch(&mut self, _batch: &Batch) -> io::Result<()> { + Err(Self::err()) + } + + fn flush(&mut self) -> io::Result<()> { + Err(Self::err()) + } + + fn finalize(&mut self) -> io::Result<()> { + Err(Self::err()) + } + + fn write_current_segment_metadata(&mut self) -> io::Result<()> { + Err(Self::err()) + } + + fn should_drain(&self) -> bool { + // Always want to drain so the post-drain error path is exercised + // every flush tick. + true + } + + fn drained(&mut self) -> io::Result { + Err(Self::err()) + } + } + + // ── Counting subscriber ───────────────────────────────────────────── + // + // A minimal `tracing::Subscriber` that increments a shared counter + // on every WARN or ERROR event. We use `tracing::subscriber::with_default` + // to scope it to a single test invocation. We deliberately avoid + // depending on `tracing-subscriber` since that crate is gated behind + // the `tracing-layer` feature and isn't enabled under `_shuttle`. + + struct CountingSubscriber { + warn_or_error_count: StdArc, + } + + impl tracing::Subscriber for CountingSubscriber { + fn enabled(&self, metadata: &tracing::Metadata<'_>) -> bool { + matches!( + *metadata.level(), + tracing::Level::WARN | tracing::Level::ERROR + ) + } + + fn new_span(&self, _span: &tracing::span::Attributes<'_>) -> tracing::span::Id { + // We never actually use spans; return a fixed non-zero id. + tracing::span::Id::from_u64(1) + } + + fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {} + + fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {} + + fn event(&self, event: &tracing::Event<'_>) { + let level = *event.metadata().level(); + if level == tracing::Level::WARN || level == tracing::Level::ERROR { + self.warn_or_error_count.fetch_add(1, StdOrdering::Relaxed); + } + } + + fn enter(&self, _span: &tracing::span::Id) {} + + fn exit(&self, _span: &tracing::span::Id) {} + } + + // ── Erroring-pipeline test body ───────────────────────────────────── + + fn test_telemetry_core_erroring_pipeline() { + let _ts_guard = + metrique_timesource::set_time_source(metrique_timesource::TimeSource::custom( + metrique_timesource::fakes::StaticTimeSource::at_time(std::time::UNIX_EPOCH), + )); + + let warn_count = StdArc::new(StdAtomicU64::new(0)); + let subscriber = CountingSubscriber { + warn_or_error_count: warn_count.clone(), + }; + + tracing::subscriber::with_default(subscriber, || { + let num_threads = 3; + let next_id = Arc::new(AtomicU64::new(0)); + + let guard = TelemetryCore::builder() + .writer(AlwaysErroringWriter) + .build() + .unwrap(); + guard.enable(); + let handle = guard.handle(); + + let writers: Vec<_> = (0..num_threads) + .map(|thread_id| { + let h = handle.clone(); + let next_id = next_id.clone(); + let thread_id = thread_id as u64; + crate::primitives::thread::spawn(move || { + let mut rng = shuttle::rand::thread_rng(); + let count = rng.gen_range(3u64..=10); + let mut ts = rng.gen_range(1000u64..2000); + for seq in 0..count { + let id = next_id.fetch_add(1, Ordering::Relaxed); + let timestamp_ns = next_timestamp(&mut ts); + let ev = ValidationEvent { + timestamp_ns, + thread_id, + seq, + id, + }; + // Errors are expected; whether the event is + // recorded or dropped is not asserted here. + h.record_encodable_event(&ev); + } + }) + }) + .collect(); + + for w in writers { + w.join().unwrap(); + } + // Dropping the guard triggers the shutdown/finalize path, + // which should also be rate-limited if it logs on error. + drop(guard); + }); + + // Bound the warn+error count by the number of distinct rate-limited + // call sites. There are roughly 6 sites that might fire on every + // shuttle run; allow some slack but cap firmly below "one per loop + // iteration". If a `rate_limited!` wrapper is removed from the flush + // loop, this number explodes. + let total = warn_count.load(StdOrdering::Relaxed); + assert!( + total <= 10, + "rate limiting failed under persistent writer errors: \ + observed {total} WARN/ERROR events, expected <= 10. \ + A `rate_limited!` wrapper has likely been removed from a tight loop." + ); + } + + #[test] + fn determinism_check_erroring() { + shuttle::check_uncontrolled_nondeterminism(test_telemetry_core_erroring_pipeline, 10000); + } + + #[test] + fn pct_erroring_pipeline() { + shuttle::check_pct(test_telemetry_core_erroring_pipeline, 10000, 3); + } }