Skip to content

Commit 32a1ad8

Browse files
authored
fix(self-instrumentation): otel metrics reporting (#1273)
1 parent b3cbce2 commit 32a1ad8

File tree

2 files changed

+150
-71
lines changed

2 files changed

+150
-71
lines changed

agent-control/src/instrumentation/tracing.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ pub fn try_init_tracing(config: TracingConfig) -> Result<Vec<TracingGuardBox>, T
103103
}
104104

105105
if let Some(otel_config) = config.instrumentation_config.opentelemetry.as_ref() {
106-
layers.push(OtelLayers::try_build(otel_config)?);
106+
let (otel_layers, otel_guard) = OtelLayers::try_build(otel_config)?;
107+
layers.push(otel_layers);
108+
guards.push(Box::new(otel_guard));
107109

108110
// Allows including the log information on spans that contain them when send to otlp.
109111
opentelemetry::global::set_text_map_propagator(

agent-control/src/instrumentation/tracing_layers/otel.rs

Lines changed: 147 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::http::client::{HttpBuildError, HttpClient};
22
use crate::http::config::HttpConfig;
33
use crate::instrumentation::config::otel::OtelConfig;
4-
use crate::instrumentation::tracing::LayerBox;
4+
use crate::instrumentation::tracing::{LayerBox, TracingGuard};
55
use opentelemetry::KeyValue;
66
use opentelemetry::trace::TracerProvider;
77
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
@@ -28,21 +28,26 @@ pub enum OtelBuildError {
2828
FilteringDirective { directive: String, err: String },
2929
}
3030

31-
/// Holds the OpenTelemetry providers to report instrumentation. These providers will be used to
32-
/// build the corresponding tracing layers.
31+
/// Holds the resources to build the layers for [tracing_subscriber] that will allow reporting telemetry
32+
/// through OpenTelemetry.
3333
///
34-
/// The providers' shutdown will be automatically triggered when all their references are dropped.
35-
/// Check the providers documentation for details. Eg: [SdkTracerProvider].
34+
/// The underlying OpenTelemetry providers will be automatically shutdown when all their references are dropped.
35+
/// Therefore, in order to keep the reference for as long as needed, a guard is returned with the layers.
36+
/// For more information about automatic shutting down the OpenTelemetry providers, check the providers documentation.
37+
/// Eg: [SdkTracerProvider].
38+
#[derive(Default)]
3639
pub struct OtelLayers {
37-
traces_provider: Option<SdkTracerProvider>,
38-
metrics_provider: Option<SdkMeterProvider>,
39-
logs_provider: Option<SdkLoggerProvider>,
40-
filter: EnvFilter,
40+
logs_layer_builder: Option<(SdkLoggerProvider, EnvFilter)>,
41+
traces_layer_builder: Option<(SdkTracerProvider, EnvFilter)>,
42+
// Metrics are reported regardless of the configured level, there are no filtering options supported for now.
43+
metrics_layer_builder: Option<SdkMeterProvider>,
4144
}
4245

4346
impl OtelLayers {
44-
/// Returns the [tracing_subscriber] layers corresponding to the provided configuration.
45-
pub fn try_build(config: &OtelConfig) -> Result<LayerBox, OtelBuildError> {
47+
/// Returns the layers for [tracing_subscriber] corresponding to the enabled OpenTelemetry providers and the corresponding
48+
/// _guard_ that needs to be keep alive in order to avoid shutting down the corresponding exporters while telemetry
49+
/// is emitted. When the _guard_ is dropped all the exporters are shut down and the remaining telemetry is sent.
50+
pub fn try_build(config: &OtelConfig) -> Result<(LayerBox, OtelGuard), OtelBuildError> {
4651
let http_config = HttpConfig::new(
4752
config.client_timeout.clone().into(),
4853
config.client_timeout.clone().into(),
@@ -53,61 +58,72 @@ impl OtelLayers {
5358
Ok(otel_layers.layers())
5459
}
5560

56-
/// Builds the providers corresponding to the provided configuration.
61+
/// Builds the providers and filters corresponding to the provided configuration.
5762
pub(crate) fn try_new_with_client<C>(
5863
config: &OtelConfig,
5964
client: C,
6065
) -> Result<Self, OtelBuildError>
6166
where
6267
C: OtelHttpClient + Send + Sync + Clone + 'static,
6368
{
64-
let mut traces_provider = None;
65-
let mut metrics_provider = None;
66-
let mut logs_provider = None;
67-
68-
if config.traces.enabled || config.metrics.enabled || config.logs.enabled {
69-
let attributes: Vec<KeyValue> = config
70-
.custom_attributes
71-
.iter()
72-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
73-
.collect();
74-
75-
let resource = Resource::builder()
76-
.with_service_name(TRACER_NAME)
77-
.with_attributes(attributes)
78-
.build();
79-
80-
traces_provider = config
81-
.traces
82-
.enabled
83-
.then(|| Self::traces_provider(client.clone(), config, resource.clone()))
84-
.transpose()?;
85-
86-
metrics_provider = config
87-
.metrics
88-
.enabled
89-
.then(|| Self::metrics_provider(client.clone(), config, resource.clone()))
90-
.transpose()?;
91-
92-
logs_provider = config
93-
.logs
94-
.enabled
95-
.then(|| Self::logs_provider(client, config, resource))
96-
.transpose()?;
69+
if !(config.traces.enabled || config.metrics.enabled || config.logs.enabled) {
70+
return Ok(Self::default());
9771
}
9872

99-
let filter = EnvFilter::builder()
100-
.parse(&config.insecure_level)
101-
.map_err(|err| OtelBuildError::FilteringDirective {
102-
directive: config.insecure_level.clone(),
103-
err: err.to_string(),
104-
})?;
73+
// Set up the resource and custom attributes
74+
let attributes: Vec<KeyValue> = config
75+
.custom_attributes
76+
.iter()
77+
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
78+
.collect();
79+
80+
let resource = Resource::builder()
81+
.with_service_name(TRACER_NAME)
82+
.with_attributes(attributes)
83+
.build();
84+
85+
// Build each layer if configured
86+
let traces_layer_builder = if config.traces.enabled {
87+
Some((
88+
Self::traces_provider(client.clone(), config, resource.clone())?,
89+
Self::filter(&config.insecure_level)?,
90+
))
91+
} else {
92+
None
93+
};
94+
95+
let metrics_layer_builder = if config.metrics.enabled {
96+
Some(Self::metrics_provider(
97+
client.clone(),
98+
config,
99+
resource.clone(),
100+
)?)
101+
} else {
102+
None
103+
};
104+
105+
let logs_layer_builder = if config.logs.enabled {
106+
Some((
107+
Self::logs_provider(client, config, resource)?,
108+
Self::filter(&config.insecure_level)?,
109+
))
110+
} else {
111+
None
112+
};
105113

106114
Ok(Self {
107-
traces_provider,
108-
metrics_provider,
109-
logs_provider,
110-
filter,
115+
logs_layer_builder,
116+
metrics_layer_builder,
117+
traces_layer_builder,
118+
})
119+
}
120+
121+
fn filter(insecure_level: &str) -> Result<EnvFilter, OtelBuildError> {
122+
EnvFilter::builder().parse(insecure_level).map_err(|err| {
123+
OtelBuildError::FilteringDirective {
124+
directive: insecure_level.to_string(),
125+
err: err.to_string(),
126+
}
111127
})
112128
}
113129

@@ -186,37 +202,56 @@ impl OtelLayers {
186202
.build())
187203
}
188204

189-
/// Return the layers for [tracing_subscriber] corresponding to the enabled OpenTelemetry providers.
190-
pub fn layers(self) -> LayerBox {
205+
pub fn layers(self) -> (LayerBox, OtelGuard) {
191206
let mut layers = Vec::<LayerBox>::new();
192-
if let Some(traces_provider) = self.traces_provider.as_ref() {
207+
let mut guard = OtelGuard::default();
208+
209+
if let Some((traces_provider, traces_filter)) = self.traces_layer_builder {
210+
guard._traces_provider = Some(traces_provider.clone());
193211
let layer =
194212
tracing_opentelemetry::layer().with_tracer(traces_provider.tracer(TRACER_NAME));
195-
layers.push(Box::new(layer));
213+
layers.push(Box::new(layer.with_filter(traces_filter)));
196214
}
197-
if let Some(metrics_provider) = self.metrics_provider.as_ref() {
215+
216+
if let Some(metrics_provider) = self.metrics_layer_builder {
217+
guard._metrics_provider = Some(metrics_provider.clone());
198218
let layer = MetricsLayer::new(metrics_provider.clone());
199219
layers.push(Box::new(layer));
200220
}
201-
if let Some(logs_provider) = self.logs_provider.as_ref() {
202-
let layer = OpenTelemetryTracingBridge::new(logs_provider);
203-
layers.push(Box::new(layer));
221+
222+
if let Some((logs_provider, logs_filter)) = self.logs_layer_builder {
223+
guard._logs_provider = Some(logs_provider.clone());
224+
let layer = OpenTelemetryTracingBridge::new(&logs_provider);
225+
layers.push(Box::new(layer.with_filter(logs_filter)));
204226
}
205227

206-
layers.with_filter(self.filter).boxed()
228+
(layers.boxed(), guard)
207229
}
208230
}
209231

232+
/// Keeps a reference to the OpenTelemetry providers to avoid shutting down the underlying reporters while telemetry
233+
/// is emitted.
234+
#[derive(Default)]
235+
pub struct OtelGuard {
236+
_logs_provider: Option<SdkLoggerProvider>,
237+
_metrics_provider: Option<SdkMeterProvider>,
238+
_traces_provider: Option<SdkTracerProvider>,
239+
}
240+
241+
impl TracingGuard for OtelGuard {}
242+
210243
#[cfg(test)]
211244
mod tests {
245+
use std::time::Duration;
246+
212247
use http::Response;
213248
use opentelemetry_sdk::Resource;
214249
use tracing::{debug, info, trace};
215250
use tracing_subscriber::EnvFilter;
216251
use tracing_subscriber::layer::SubscriberExt;
217252

218253
use crate::http::client::tests::MockOtelHttpClient;
219-
use crate::instrumentation::config::otel::{LogsConfig, OtelConfig};
254+
use crate::instrumentation::config::otel::{LogsConfig, MetricsConfig, OtelConfig};
220255
use crate::instrumentation::tracing_layers::otel::OtelLayers;
221256

222257
#[test]
@@ -258,17 +293,59 @@ mod tests {
258293
.unwrap();
259294

260295
let otel_providers = OtelLayers {
261-
logs_provider: Some(logs_provider),
262-
filter: EnvFilter::builder().parse_lossy("info"),
263-
traces_provider: None,
264-
metrics_provider: None,
296+
logs_layer_builder: Some((logs_provider, EnvFilter::builder().parse_lossy("info"))),
297+
..Default::default()
265298
};
266299

267-
let subscriber = tracing_subscriber::Registry::default().with(otel_providers.layers());
300+
let (layers, _guard) = otel_providers.layers();
301+
let subscriber = tracing_subscriber::Registry::default().with(layers);
268302
tracing::subscriber::with_default(subscriber, || {
269303
info!(INFO_LOG);
270304
debug!(DEBUG_LOG);
271305
trace!(TRACE_LOG);
272306
});
273307
}
308+
309+
#[test]
310+
fn test_metrics_layer() {
311+
let mut mock_http_client = MockOtelHttpClient::new();
312+
// Asserts metrics are sent
313+
mock_http_client
314+
.expect_send_bytes()
315+
.times(1..) // The metric should be sent at least once
316+
.withf(|req| {
317+
let body = String::from_utf8_lossy(req.body().as_ref());
318+
req.uri().path().eq("/v1/metrics") && body.contains("uptime")
319+
})
320+
.returning(|_| {
321+
Ok(Response::builder()
322+
.status(200)
323+
.body(opentelemetry_http::Bytes::default())
324+
.unwrap())
325+
});
326+
327+
let metrics_provider = OtelLayers::metrics_provider(
328+
mock_http_client,
329+
&OtelConfig {
330+
metrics: MetricsConfig {
331+
enabled: true,
332+
interval: Duration::from_secs(1).into(),
333+
},
334+
..Default::default()
335+
},
336+
Resource::builder().build(),
337+
)
338+
.unwrap();
339+
340+
let otel_layers = OtelLayers {
341+
metrics_layer_builder: Some(metrics_provider),
342+
..Default::default()
343+
};
344+
let (layers, _guard) = otel_layers.layers();
345+
let subscriber = tracing_subscriber::Registry::default().with(layers);
346+
tracing::subscriber::with_default(subscriber, || {
347+
trace!(monotonic_counter.uptime = 42);
348+
std::thread::sleep(Duration::from_secs(2));
349+
});
350+
}
274351
}

0 commit comments

Comments
 (0)