Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ Avoid dropping an error without logging it. Use `tracing` for logging.
let _ = ...
```

**ALWAYS rate-limit logging that can fire from a loop or repeated error path.** Any `warn!`/`error!` reachable from a background task loop, retry loop, or per-request hot path MUST be wrapped in `rate_limited!`:
```rust
rate_limited!(Duration::from_secs(60), {
tracing::warn!("...: {e}");
});
```
Unguarded logging in loops causes log spam that degrades observability and can itself become a performance problem. One-time paths (startup, shutdown, per-thread init) are exempt.

## Running tests

- Always run `cargo nextest run` to run tests
Expand Down
4 changes: 3 additions & 1 deletion dial9-tokio-telemetry/src/telemetry/recorder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,9 @@ fn run_flush_loop(
&& !exit
&& let Err(e) = event_writer.drained()
{
tracing::warn!("failed to complete post-drain action: {e}");
rate_limited!(Duration::from_secs(60), {
tracing::warn!("failed to complete post-drain action: {e}");
});
}

// Create the metrics guard up front; mutate on the exit path,
Expand Down
170 changes: 170 additions & 0 deletions dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ mod shuttle_tests {
use dial9_trace_format::TraceEvent;
use shuttle::rand::Rng;
use std::collections::HashMap;
use std::io;

// ── Event definition ────────────────────────────────────────────────

Expand Down Expand Up @@ -673,4 +674,173 @@ mod shuttle_tests {
fn pct_real_pipeline() {
shuttle::check_pct(test_telemetry_core_pipeline, 10000, 3);
}

// ── Always-erroring writer ──────────────────────────────────────────
//
// The companion to `InvariantCheckingWriter`: every fallible method
// returns an `io::Error`. This exercises the error paths in the flush
// loop and lets us assert that rate limiting bounds log output even
// when every operation fails.

use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicU64 as StdAtomicU64, Ordering as StdOrdering};

struct AlwaysErroringWriter;

impl AlwaysErroringWriter {
fn err() -> io::Error {
io::Error::from(io::ErrorKind::PermissionDenied)
}
}

impl TraceWriter for AlwaysErroringWriter {
fn write_encoded_batch(&mut self, _batch: &Batch) -> io::Result<()> {
Err(Self::err())
}

fn flush(&mut self) -> io::Result<()> {
Err(Self::err())
}

fn finalize(&mut self) -> io::Result<()> {
Err(Self::err())
}

fn write_current_segment_metadata(&mut self) -> io::Result<()> {
Err(Self::err())
}

fn should_drain(&self) -> bool {
// Always want to drain so the post-drain error path is exercised
// every flush tick.
true
}

fn drained(&mut self) -> io::Result<bool> {
Err(Self::err())
}
}

// ── Counting subscriber ─────────────────────────────────────────────
//
// A minimal `tracing::Subscriber` that increments a shared counter
// on every WARN or ERROR event. We use `tracing::subscriber::with_default`
// to scope it to a single test invocation. We deliberately avoid
// depending on `tracing-subscriber` since that crate is gated behind
// the `tracing-layer` feature and isn't enabled under `_shuttle`.

struct CountingSubscriber {
warn_or_error_count: StdArc<StdAtomicU64>,
}

impl tracing::Subscriber for CountingSubscriber {
fn enabled(&self, metadata: &tracing::Metadata<'_>) -> bool {
matches!(
*metadata.level(),
tracing::Level::WARN | tracing::Level::ERROR
)
}

fn new_span(&self, _span: &tracing::span::Attributes<'_>) -> tracing::span::Id {
// We never actually use spans; return a fixed non-zero id.
tracing::span::Id::from_u64(1)
}

fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {}

fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}

fn event(&self, event: &tracing::Event<'_>) {
let level = *event.metadata().level();
if level == tracing::Level::WARN || level == tracing::Level::ERROR {
self.warn_or_error_count.fetch_add(1, StdOrdering::Relaxed);
}
}

fn enter(&self, _span: &tracing::span::Id) {}

fn exit(&self, _span: &tracing::span::Id) {}
}

// ── Erroring-pipeline test body ─────────────────────────────────────

fn test_telemetry_core_erroring_pipeline() {
let _ts_guard =
metrique_timesource::set_time_source(metrique_timesource::TimeSource::custom(
metrique_timesource::fakes::StaticTimeSource::at_time(std::time::UNIX_EPOCH),
));

let warn_count = StdArc::new(StdAtomicU64::new(0));
let subscriber = CountingSubscriber {
warn_or_error_count: warn_count.clone(),
};

tracing::subscriber::with_default(subscriber, || {
let num_threads = 3;
let next_id = Arc::new(AtomicU64::new(0));

let guard = TelemetryCore::builder()
.writer(AlwaysErroringWriter)
.build()
.unwrap();
guard.enable();
let handle = guard.handle();

let writers: Vec<_> = (0..num_threads)
.map(|thread_id| {
let h = handle.clone();
let next_id = next_id.clone();
let thread_id = thread_id as u64;
crate::primitives::thread::spawn(move || {
let mut rng = shuttle::rand::thread_rng();
let count = rng.gen_range(3u64..=10);
let mut ts = rng.gen_range(1000u64..2000);
for seq in 0..count {
let id = next_id.fetch_add(1, Ordering::Relaxed);
let timestamp_ns = next_timestamp(&mut ts);
let ev = ValidationEvent {
timestamp_ns,
thread_id,
seq,
id,
};
// Errors are expected; whether the event is
// recorded or dropped is not asserted here.
h.record_encodable_event(&ev);
}
})
})
.collect();

for w in writers {
w.join().unwrap();
}
// Dropping the guard triggers the shutdown/finalize path,
// which should also be rate-limited if it logs on error.
drop(guard);
});

// Bound the warn+error count by the number of distinct rate-limited
// call sites. There are roughly 6 sites that might fire on every
// shuttle run; allow some slack but cap firmly below "one per loop
// iteration". If a `rate_limited!` wrapper is removed from the flush
// loop, this number explodes.
let total = warn_count.load(StdOrdering::Relaxed);
assert!(
total <= 10,
"rate limiting failed under persistent writer errors: \
observed {total} WARN/ERROR events, expected <= 10. \
A `rate_limited!` wrapper has likely been removed from a tight loop."
);
}

#[test]
fn determinism_check_erroring() {
shuttle::check_uncontrolled_nondeterminism(test_telemetry_core_erroring_pipeline, 10000);
}

#[test]
fn pct_erroring_pipeline() {
shuttle::check_pct(test_telemetry_core_erroring_pipeline, 10000, 3);
}
}
Loading