Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Leverage Suppression Context in Sdk #2868

Merged
merged 16 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docs/design/logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,25 @@ only meant for OTel components itself and anyone writing extensions like custom
Exporters etc.

// TODO: Document the principles followed when selecting severity for internal
logs // TODO: Document how this can cause circular loop and plans to address it.
logs

When OpenTelemetry components generate logs that could potentially feed back
into OpenTelemetry, this can result in what is known as "telemetry-induced
telemetry." To address this, OpenTelemetry provides a mechanism to suppress such
telemetry using the `Context`. Components are expected to mark telemetry as
suppressed within a specific `Context` by invoking
`Context::enter_telemetry_suppressed_scope()`. The Logs SDK implementation
checks this flag in the current `Context` and ignores logs if suppression is
enabled.

This mechanism relies on proper in-process propagation of the `Context`.
However, external libraries like `hyper` and `tonic`, which are used by
OpenTelemetry in its OTLP Exporters, do not propagate OpenTelemetry's `Context`.
As a result, the suppression mechanism does not work out-of-the-box to suppress
logs originating from these libraries.

// TODO: Document how OTLP can solve this issue without asking external
crates to respect and propagate OTel Context.

## Summary

Expand Down
1 change: 0 additions & 1 deletion examples/logs-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ fn main() {
// https://github.com/open-telemetry/opentelemetry-rust/issues/761
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
Expand Down
13 changes: 7 additions & 6 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ mod tests {
use opentelemetry::logs::Severity;
use opentelemetry::trace::TracerProvider;
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
use opentelemetry::InstrumentationScope;
use opentelemetry::{logs::AnyValue, Key};
use opentelemetry::{Context, InstrumentationScope};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{InMemoryLogExporter, LogProcessor};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
Expand Down Expand Up @@ -319,25 +319,26 @@ mod tests {

impl LogExporter for ReentrantLogExporter {
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
// This will cause a deadlock as the export itself creates a log
let _suppress = Context::enter_telemetry_suppressed_scope();
// Without the suppression above, this will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}
}

#[test]
#[ignore = "See issue: https://github.com/open-telemetry/opentelemetry-rust/issues/1745"]
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests and hence ignored in CI."]
fn simple_processor_deadlock() {
// TODO: This test maybe better suited in the opentelemetry-sdk crate tests
let exporter: ReentrantLogExporter = ReentrantLogExporter;
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(exporter.clone())
.with_simple_exporter(exporter)
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

// Setting subscriber as global as that is the only way to test this scenario.
tracing_subscriber::registry().with(layer).init();

warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

Expand Down
1 change: 0 additions & 1 deletion opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// https://github.com/open-telemetry/opentelemetry-rust/issues/761
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// https://github.com/open-telemetry/opentelemetry-rust/issues/761
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
Expand Down
10 changes: 10 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## vNext

`SdkLogger` modified to respect telemetry suppression based on `Context`. In
other words, if the current context has telemetry suppression enabled, then logs
will be ignored. The flag is typically set by OTel components to prevent
telemetry from itself being fed back into OTel. `BatchLogProcessor`,
`BatchSpanProcessor`, and `PeriodicReader` modified to set the suppression flag
in their dedicated thread, so that telemetry generated from those threads will
not be fed back into OTel. Similarly, `SimpleSpanProcessor` and
`SimpleLogProcessor` also modified to suppress telemetry before invoking
exporters.

## 0.29.0

Released 2025-Mar-21
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/log_enabled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
Total Number of Cores:   14 (10 performance and 4 efficiency)
| Test | Average time|
|---------------------------------------------|-------------|
| exporter_disabled_concurrent_processor | 1.1 ns |
| exporter_disabled_simple_processor | 4.3 ns |
| exporter_disabled_concurrent_processor | 2.5 ns |
| exporter_disabled_simple_processor | 5.3 ns |
*/

// cargo bench --bench log_enabled --features="spec_unstable_logs_enabled,experimental_logs_concurrent_log_processor"
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
};
use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};

use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
Expand Down Expand Up @@ -342,6 +342,7 @@ impl BatchLogProcessor {
let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
.spawn(move || {
let _suppress_guard = Context::enter_telemetry_suppressed_scope();
otel_debug!(
name: "BatchLogProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry-sdk/src/logs/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ impl opentelemetry::logs::Logger for SdkLogger {

/// Emit a `LogRecord`.
fn emit(&self, mut record: Self::LogRecord) {
if Context::is_current_telemetry_suppressed() {
return;
}
let provider = &self.provider;
let processors = provider.log_processors();

Expand All @@ -52,6 +55,9 @@ impl opentelemetry::logs::Logger for SdkLogger {
#[cfg(feature = "spec_unstable_logs_enabled")]
#[inline]
fn event_enabled(&self, level: Severity, target: &str, name: Option<&str>) -> bool {
if Context::is_current_telemetry_suppressed() {
return false;
}
self.provider
.log_processors()
.iter()
Expand Down
27 changes: 27 additions & 0 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,31 @@ mod tests {
&AnyValue::String("value-from-bag".into())
));
}

#[test]
fn log_suppression() {
// Arrange
let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();

// Act
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.set_severity_number(Severity::Error);

{
let _suppressed_context = Context::enter_telemetry_suppressed_scope();
logger.emit(log_record);
}

// Assert
let exported_logs = exporter.get_emitted_logs().expect("this should not fail.");
assert_eq!(
exported_logs.len(),
0,
"There should be a no logs as log emission is done inside a suppressed context"
);
}
}
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
Resource,
};

use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use opentelemetry::{otel_debug, otel_error, otel_warn, Context, InstrumentationScope};

use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -76,6 +76,7 @@ impl<T: LogExporter> SimpleLogProcessor<T> {

impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
let _suppress_guard = Context::enter_telemetry_suppressed_scope();
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
time::{Duration, Instant},
};

use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, Context};

use crate::{
error::{OTelSdkError, OTelSdkResult},
Expand Down Expand Up @@ -158,6 +158,7 @@ impl<E: PushMetricExporter> PeriodicReader<E> {
let result_thread_creation = thread::Builder::new()
.name("OpenTelemetry.Metrics.PeriodicReader".to_string())
.spawn(move || {
let _suppress_guard = Context::enter_telemetry_suppressed_scope();
let mut interval_start = Instant::now();
let mut remaining_interval = interval;
otel_debug!(
Expand Down
26 changes: 26 additions & 0 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,30 @@ mod tests {
let tracer2 = tracer_provider.tracer_with_scope(tracer_scope);
tracer_name_retained_helper(tracer2, tracer_provider, exporter).await;
}

#[test]
fn trace_suppression() {
// Arrange
let exporter = InMemorySpanExporter::default();
let span_processor = SimpleSpanProcessor::new(exporter.clone());
let tracer_provider = SdkTracerProvider::builder()
.with_span_processor(span_processor)
.build();

// Act
let tracer = tracer_provider.tracer("test");
{
let _suppressed_context = Context::enter_telemetry_suppressed_scope();
let span = tracer.span_builder("span_name").start(&tracer);
drop(span);
}

// Assert
let finished_spans = exporter.get_finished_spans().expect("this should not fail");
assert_eq!(
finished_spans.len(),
0,
"There should be a no spans as span emission is done inside a suppressed context"
);
}
}
19 changes: 19 additions & 0 deletions opentelemetry-sdk/src/trace/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,25 @@ impl ShouldSample for Sampler {
attributes: &[KeyValue],
links: &[Link],
) -> SamplingResult {
// Checking suppression mode in sampler is more efficient than checking
// it in the span processor as this allows earlier short-circuiting.
// Also, it is currently *NOT* possible to check suppression mode or
// even access current context in the span processor's OnEnd method. See
// https://github.com/open-telemetry/opentelemetry-rust/issues/2871
// TODO: Can we move this even earlier, i.e. in the Tracer itself?
// TODO: Check and fix: why this methods gets Option<&Context> and not
// Context?
if let Some(parent_context) = parent_context {
if parent_context.is_telemetry_suppressed() {
return SamplingResult {
decision: SamplingDecision::Drop,
// No extra attributes ever set by the SDK samplers.
attributes: Vec::new(),
// all sampler in SDK will not modify trace state.
trace_state: TraceState::default(),
};
}
}
let decision = match self {
// Always sample the trace
Sampler::AlwaysOn => SamplingDecision::RecordAndSample,
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl BatchSpanProcessor {
let handle = thread::Builder::new()
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
.spawn(move || {
let _suppress_guard = Context::enter_telemetry_suppressed_scope();
otel_debug!(
name: "BatchSpanProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),
Expand Down
9 changes: 5 additions & 4 deletions opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## vNext

Added the ability to prevent recursive telemetry generation through new
context-based suppression mechanisms. This feature helps prevent feedback loops
and excessive telemetry when OpenTelemetry components perform their own
operations.
[#2821](https://github.com/open-telemetry/opentelemetry-rust/pull/2821) Context
based suppression capabilities added: Added the ability to prevent recursive
telemetry generation through new context-based suppression mechanisms. This
feature helps prevent feedback loops and excessive telemetry when OpenTelemetry
components perform their own operations.

New methods added to `Context`:

Expand Down
Loading