Skip to content

Commit 765c6a6

Browse files
committed
enhancement(internal_logs source): Add option to rate limit internal_logs sources
1 parent 816563e commit 765c6a6

9 files changed

Lines changed: 199 additions & 15 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Add support for optionally applying rate limiting to the `internal_logs` source controlled by the
2+
`--internal-logs-source-rate-limit` CLI option and `VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT`
3+
environment variable. This provides the same rate limiting functionality as was available before
4+
version 0.51.1 but with a rate limit window separate from the console one.
5+
6+
authors: bruceg

docs/DEVELOPING.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ to detect common problems.
264264

265265
### Disabling internal log rate limiting
266266

267-
Vector rate limits its own internal logs by default (10-second windows). During development, you may want to see all log occurrences.
267+
Vector rate limits the console output of internal logs by default (10-second windows). During
268+
development, you may want to see all log occurrences.
268269

269270
**Globally** (CLI flag or environment variable):
270271

@@ -284,6 +285,10 @@ warn!(message = "Error occurred.", %error, internal_log_rate_limit = false);
284285
info!(message = "Processing batch.", batch_size, internal_log_rate_secs = 1);
285286
```
286287

288+
Note: The `internal_logs` source is _not_ rate limited by default. To enable rate limiting on all
289+
such sources, set the `--internal-logs-source-rate-limit` CLI flag or
290+
`VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT` environment variable to an integer number of seconds.
291+
287292
## Testing
288293

289294
Testing is very important since Vector's primary design principle is reliability.

lib/tracing-limit/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
//! - **3rd+ occurrences**: Silent until window expires
1414
//! - **After window**: Emits a summary of suppressed count, then next event normally
1515
//!
16+
//! Note: the suppressed-count summary and the resumption of normal emission are both
17+
//! triggered by the *next arriving event* after the window has elapsed, not by the
18+
//! window expiry itself. If the event stops firing, no summary is ever emitted.
19+
//!
1620
//! # Rate limit grouping
1721
//!
1822
//! Events are rate limited independently based on a combination of:
@@ -163,6 +167,7 @@ where
163167
/// - 1st occurrence: Emitted normally
164168
/// - 2nd occurrence: Shows "suppressing" warning
165169
/// - 3rd+ occurrences: Silent until window expires
170+
/// - After window: Summary and next event emitted on next arrival (see module-level note)
166171
pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self {
167172
self.internal_log_rate_limit = internal_log_rate_limit;
168173
self

src/app.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ impl Application {
212212
opts.root.log_format,
213213
opts.log_level(),
214214
opts.root.internal_log_rate_limit,
215+
opts.root.internal_logs_source_rate_limit,
215216
);
216217

217218
#[cfg(unix)]
@@ -658,14 +659,20 @@ pub async fn load_configs(
658659
Ok(config)
659660
}
660661

661-
pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
662+
pub fn init_logging(
663+
color: bool,
664+
format: LogFormat,
665+
log_level: &str,
666+
rate: u64,
667+
broadcast_rate_limit: Option<NonZeroU64>,
668+
) {
662669
let level = get_log_levels(log_level);
663670
let json = match format {
664671
LogFormat::Text => false,
665672
LogFormat::Json => true,
666673
};
667674

668-
trace::init(color, json, &level, rate);
675+
trace::init(color, json, &level, rate, broadcast_rate_limit);
669676
debug!(
670677
message = "Internal log rate limit configured.",
671678
internal_log_rate_secs = rate,

src/cli.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ pub struct RootOpts {
193193
/// This controls the time window for rate limiting Vector's own internal logs.
194194
/// Within each time window, the first occurrence of a log is emitted, the second
195195
/// shows a suppression warning, and subsequent occurrences are silent until the
196-
/// window expires.
196+
/// window expires. When the window expires and the log fires again, a summary of
197+
/// the suppressed count is emitted followed by the log itself.
197198
///
198199
/// Logs are grouped by their location in the code and the `component_id` field, so logs
199200
/// from different components are rate limited independently.
@@ -210,6 +211,16 @@ pub struct RootOpts {
210211
)]
211212
pub internal_log_rate_limit: u64,
212213

214+
/// Apply a rate limit (in seconds) to the broadcast channel that feeds all `internal_logs`
215+
/// sources. When set, the first occurrence of a repeated log is emitted, the second shows a
216+
/// suppression warning, and subsequent occurrences are silent until the window expires. When
217+
/// the window expires and the log fires again, a summary of the suppressed count is emitted
218+
/// followed by the log itself. Unset by default so that `internal_logs` consumers receive
219+
/// every log event. This limit is independent of `--internal-log-rate-limit`, which only
220+
/// applies to stdout/stderr output.
221+
#[arg(long, env = "VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT")]
222+
pub internal_logs_source_rate_limit: Option<NonZeroU64>,
223+
213224
/// Set the duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are
214225
/// received. After the duration has passed, Vector will force shutdown. To never force
215226
/// shutdown, use `--no-graceful-shutdown-limit`.

src/sources/internal_logs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ mod tests {
238238
#[tokio::test]
239239
#[serial]
240240
async fn receives_logs() {
241-
trace::init(false, false, "debug", 10);
241+
trace::init(false, false, "debug", 10, None);
242242
trace::reset_early_buffer();
243243

244244
assert_source_compliance(&SOURCE_TAGS, run_test()).await;
@@ -363,7 +363,7 @@ mod tests {
363363
#[tokio::test]
364364
#[serial]
365365
async fn registered_extra_span_field_is_captured() {
366-
trace::init(false, false, "info", 10);
366+
trace::init(false, false, "info", 10, None);
367367
trace::reset_early_buffer();
368368

369369
let test_id: u8 = rand::random();
@@ -400,7 +400,7 @@ mod tests {
400400
#[tokio::test]
401401
#[serial]
402402
async fn repeated_logs_are_not_rate_limited() {
403-
trace::init(false, false, "info", 10);
403+
trace::init(false, false, "info", 10, None);
404404
trace::reset_early_buffer();
405405

406406
let rx = start_source().await;

src/test_util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub fn trace_init() {
128128

129129
let levels = std::env::var("VECTOR_LOG").unwrap_or_else(|_| "error".to_string());
130130

131-
trace::init(color, false, &levels, 10);
131+
trace::init(color, false, &levels, 10, None);
132132

133133
// Initialize metrics as well
134134
vector_lib::metrics::init_test();

src/trace.rs

Lines changed: 132 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,41 @@ fn metrics_layer_enabled() -> bool {
5757
!matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
5858
}
5959

60-
pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
60+
pub fn init(
61+
color: bool,
62+
json: bool,
63+
levels: &str,
64+
internal_log_rate_limit: u64,
65+
broadcast_rate_limit: Option<std::num::NonZeroU64>,
66+
) {
6167
let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
6268
"logging filter targets were not formatted correctly or did not specify a valid level",
6369
);
6470

6571
let metrics_layer =
6672
metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
6773

68-
// BroadcastLayer should NOT be rate limited because it feeds the internal_logs source,
69-
// which users rely on to capture ALL internal Vector logs for debugging/monitoring.
70-
// Console output (stdout/stderr) has its own separate rate limiting below.
71-
let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());
74+
// The broadcast layer feeds the internal_logs source. By default it is NOT rate limited so
75+
// that users can capture ALL internal Vector logs for debugging/monitoring. Rate limiting can
76+
// be opted into by passing `Some(rate)` for `broadcast_rate_limit`.
77+
//
78+
// Two separate `Option<Layer>` values are used rather than a single branch because
79+
// `RateLimitedLayer<BroadcastLayer<S>>` and `BroadcastLayer<S>` are distinct types.
80+
// `tracing_subscriber` implements `Layer` for `Option<L>`, so exactly one of these will be
81+
// `Some` and contribute an active layer while the other is a no-op `None`.
82+
let rate_limited_broadcast = broadcast_rate_limit.map(|rate| {
83+
RateLimitedLayer::new(BroadcastLayer::new())
84+
.with_default_limit(rate.get())
85+
.with_filter(fmt_filter.clone())
86+
});
87+
let unlimited_broadcast = broadcast_rate_limit
88+
.is_none()
89+
.then(|| BroadcastLayer::new().with_filter(fmt_filter.clone()));
7290

7391
let subscriber = tracing_subscriber::registry()
7492
.with(metrics_layer)
75-
.with(broadcast_layer);
93+
.with(rate_limited_broadcast)
94+
.with(unlimited_broadcast);
7695

7796
#[cfg(feature = "tokio-console")]
7897
let subscriber = {
@@ -389,3 +408,110 @@ impl tracing::field::Visit for SpanFields {
389408
self.record(field, format!("{value:?}"));
390409
}
391410
}
411+
412+
#[cfg(test)]
413+
mod tests {
414+
use std::{str::FromStr, time::Duration};
415+
416+
use serial_test::serial;
417+
use tracing::info;
418+
use tracing_subscriber::layer::SubscriberExt;
419+
420+
use futures::StreamExt as _;
421+
422+
use super::*;
423+
424+
/// Verifies that `RateLimitedLayer<BroadcastLayer>` — the configuration produced by
425+
/// `init` when `broadcast_rate_limit` is `Some` — suppresses repeated identical log
426+
/// events within the rate-limit window and emits a summary once the window expires.
427+
///
428+
/// All `info!` calls share the same call site (they are inside the loop), so the
429+
/// `RateLimitedLayer` treats them as one event for suppression purposes.
430+
///
431+
/// The test uses `tracing::subscriber::with_default` rather than `init` to avoid
432+
/// touching the process-global subscriber, and `spawn_blocking` so that the
433+
/// synchronous `std::thread::sleep` — needed because `tracing-limit` uses
434+
/// `std::time::Instant` when compiled as a library dependency — does not block the
435+
/// async executor.
436+
#[tokio::test]
437+
#[serial]
438+
async fn broadcast_rate_limits_repeated_messages() {
439+
let trace_sub = TraceSubscription::subscribe();
440+
// Disable early buffering so events flow directly to the broadcast channel
441+
// rather than being held in the startup buffer.
442+
stop_early_buffering();
443+
444+
let filter =
445+
tracing_subscriber::filter::Targets::from_str("info").expect("valid filter string");
446+
let subscriber = tracing_subscriber::registry().with(
447+
RateLimitedLayer::new(BroadcastLayer::new())
448+
.with_default_limit(1) // 1-second window
449+
.with_filter(filter),
450+
);
451+
452+
tokio::task::spawn_blocking(move || {
453+
tracing::subscriber::with_default(subscriber, || {
454+
for i in 0..4u32 {
455+
if i == 3 {
456+
// Wait for the 1-second window to expire so that the 4th call
457+
// triggers a summary message before being emitted normally.
458+
std::thread::sleep(Duration::from_millis(1100));
459+
}
460+
info!("Rate limited broadcast message.");
461+
}
462+
});
463+
})
464+
.await
465+
.expect("blocking task panicked");
466+
467+
// All sends happened synchronously inside spawn_blocking above, so items are
468+
// already in the broadcast ring buffer. We drain the stream until we have the
469+
// 4 expected matching messages, with a generous timeout to guard against
470+
// unexpected delays on heavily loaded machines.
471+
//
472+
// Limitation: the "suppressed N times" summary fires on the *next* arriving
473+
// event after the window expires, not at window expiry itself.
474+
const EXPECTED: usize = 4;
475+
let mut stream = trace_sub.into_stream();
476+
let messages: Vec<String> = tokio::time::timeout(Duration::from_secs(5), async {
477+
let mut collected = Vec::with_capacity(EXPECTED);
478+
loop {
479+
let event = stream
480+
.next()
481+
.await
482+
.expect("broadcast stream ended unexpectedly");
483+
if let Some(msg) = event.get("message") {
484+
let msg = msg.to_string_lossy().into_owned();
485+
if msg.contains("Rate limited broadcast message") {
486+
collected.push(msg);
487+
if collected.len() == EXPECTED {
488+
break;
489+
}
490+
}
491+
}
492+
}
493+
collected
494+
})
495+
.await
496+
.expect("timed out waiting for rate-limited broadcast messages");
497+
498+
// Expected sequence:
499+
// [0] First occurrence → emitted normally.
500+
// [1] Second occurrence → suppression warning emitted, original dropped.
501+
// (Third occurrence is silently dropped; nothing appears in broadcast.)
502+
// [2] Fourth occurrence (after window) → summary "suppressed 2 times" emitted.
503+
// [3] Fourth occurrence → emitted normally after the window resets.
504+
assert_eq!(messages[0], "Rate limited broadcast message.");
505+
assert!(
506+
messages[1].contains("is being suppressed to avoid flooding."),
507+
"expected suppression warning, got: {}",
508+
messages[1]
509+
);
510+
assert!(
511+
messages[2].contains("has been suppressed 2 times."),
512+
"expected summary, got: {}",
513+
messages[2]
514+
);
515+
assert_eq!(messages[3], "Rate limited broadcast message.");
516+
}
517+
}

website/cue/reference/cli.cue

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,11 @@ cli: {
199199
type: "integer"
200200
env_var: "VECTOR_INTERNAL_LOG_RATE_LIMIT"
201201
}
202+
"internal-logs-source-rate-limit": {
203+
description: env_vars.VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT.description
204+
type: "integer"
205+
env_var: "VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT"
206+
}
202207
}
203208

204209
options: _core_options
@@ -678,12 +683,31 @@ cli: {
678683
}
679684
}
680685
VECTOR_INTERNAL_LOG_RATE_LIMIT: {
681-
description: "Set the internal log rate limit. This limits Vector from emitting identical logs more than once over the given number of seconds."
686+
description: """
687+
Set the internal log rate limit in seconds. Within each time window, the first occurrence of a
688+
log is emitted, the second shows a suppression warning, and subsequent occurrences are silent
689+
until the window expires. When the window expires and the log fires again, a summary of the
690+
suppressed count is emitted followed by the log itself.
691+
"""
682692
type: uint: {
683693
default: 10
684694
unit: null
685695
}
686696
}
697+
VECTOR_INTERNAL_LOGS_SOURCE_RATE_LIMIT: {
698+
description: """
699+
Apply a rate limit (in seconds) to the broadcast channel that feeds all `internal_logs` sources.
700+
When set, the first occurrence of a repeated log is emitted, the second shows a suppression
701+
warning, and subsequent occurrences are silent until the window expires. When the window expires
702+
and the log fires again, a summary of the suppressed count is emitted followed by the log itself.
703+
Unset by default so that `internal_logs` consumers receive every log event. This limit is
704+
independent of `VECTOR_INTERNAL_LOG_RATE_LIMIT`, which only applies to stdout/stderr output.
705+
"""
706+
type: uint: {
707+
default: null
708+
unit: "seconds"
709+
}
710+
}
687711
VECTOR_GRACEFUL_SHUTDOWN_LIMIT_SECS: {
688712
description: "Set the duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received. After the duration has passed, Vector will force shutdown. To never force shutdown, use `--no-graceful-shutdown-limit`."
689713
type: uint: {

0 commit comments

Comments
 (0)