Skip to content

Commit 6bb3c11

Browse files
authored
Implement tracing and metrics for Boxer (#43)
* Implement tracing and metrics for Boxer * Fix formatting issues
1 parent b665a29 commit 6bb3c11

File tree

7 files changed

+167
-71
lines changed

7 files changed

+167
-71
lines changed

Cargo.lock

Lines changed: 10 additions & 68 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ base64 = "0.22.1"
3535
futures-util = "0.3.31"
3636

3737
# Open Telemetry dependencies
38-
opentelemetry = "0.24.0"
38+
opentelemetry = "0.30.0"
3939
opentelemetry-appender-log = "0.30.0"
4040
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", "trace", "metrics", "logs"] }
41-
opentelemetry-resource-detectors = "0.3.0"
4241
opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
43-
opentelemetry-stdout = "0.30.0"
4442
opentelemetry-appender-tracing = "0.30.1"
4543

4644
[dev-dependencies]

src/services/backends/kubernetes/kubernetes_resource_watcher.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,15 @@ where
2828

2929
fn stop(&self) -> anyhow::Result<()>;
3030
}
31+
32+
#[async_trait]
33+
pub trait KubernetesResourceWatcherRunner<H, R>: Sized
34+
where
35+
R: Resource<Scope = NamespaceResourceScope> + Clone + Debug + Serialize + DeserializeOwned + Send + Sync,
36+
R::DynamicType: Hash + Eq + Clone + Default,
37+
H: ResourceUpdateHandler<R> + Send + Sync + 'static,
38+
{
39+
async fn start(&mut self, config: KubernetesResourceManagerConfig) -> anyhow::Result<()>;
40+
41+
fn stop(&self) -> anyhow::Result<()>;
42+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
pub mod logging;
2+
pub mod metrics;
3+
pub mod tracing;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use anyhow::Context;
2+
use opentelemetry::global;
3+
use opentelemetry::metrics::Counter;
4+
5+
pub fn init_metrics() -> anyhow::Result<()> {
6+
let exporter = opentelemetry_otlp::MetricExporter::builder()
7+
.with_tonic()
8+
.with_temporality(opentelemetry_sdk::metrics::Temporality::Delta)
9+
.build()
10+
.with_context(|| "creating metric exporter")?;
11+
12+
let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
13+
.with_reader(opentelemetry_sdk::metrics::PeriodicReader::builder(exporter).build())
14+
.build();
15+
16+
global::set_meter_provider(meter_provider);
17+
Ok(())
18+
}
19+
20+
pub fn token_succeeded(app_name: &'static str) -> Counter<u64> {
21+
let meter = global::meter(app_name);
22+
meter
23+
.u64_counter(format!("{}.{}", app_name, "token_succeeded"))
24+
.with_description("Count of successfully processed tokens")
25+
.with_unit("tokens")
26+
.build()
27+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
pub mod tracing_facade;
2+
3+
use opentelemetry::trace::{Status, TraceContextExt, Tracer};
4+
use opentelemetry::{Context, global};
5+
use opentelemetry_sdk::propagation::TraceContextPropagator;
6+
use std::fmt::Display;
7+
8+
/// Initialize OpenTelemetry tracing with OTLP exporter
9+
/// Should be called once at the start of the application
10+
pub fn init_tracer() -> anyhow::Result<()> {
11+
global::set_text_map_propagator(TraceContextPropagator::new());
12+
13+
let exporter = opentelemetry_otlp::SpanExporter::builder().with_tonic().build()?;
14+
15+
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
16+
.with_batch_exporter(exporter)
17+
.build();
18+
19+
global::set_tracer_provider(tracer_provider);
20+
Ok(())
21+
}
22+
23+
/// Start a new trace span with the given name
24+
/// Returns a Context containing the new span
25+
/// The caller is responsible for ending the span
26+
pub fn start_trace(span_name: &str) -> Context {
27+
let tracer = global::tracer("");
28+
let span = tracer
29+
.span_builder(span_name.to_string())
30+
.with_kind(opentelemetry::trace::SpanKind::Internal)
31+
.start(&tracer);
32+
Context::current().with_span(span)
33+
}
34+
35+
/// Extension trait for Result to stop tracing and set span status
36+
pub trait ErrorExt<T, E> {
37+
fn stop_trace(self, ctx: Context) -> Self;
38+
}
39+
40+
/// Implementation of ErrorExt for Result
41+
impl<T, E> ErrorExt<T, E> for Result<T, E>
42+
where
43+
E: Display,
44+
{
45+
fn stop_trace(self, ctx: Context) -> Self {
46+
if let Err(err) = &self {
47+
ctx.span().set_status(Status::Error {
48+
description: err.to_string().into(),
49+
});
50+
} else {
51+
ctx.span().set_status(Status::Ok);
52+
}
53+
ctx.span().end();
54+
self
55+
}
56+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use crate::services::base::upsert_repository::ReadOnlyRepository;
2+
use crate::services::observability::open_telemetry::tracing::{ErrorExt, start_trace};
3+
use async_trait::async_trait;
4+
use opentelemetry::context::FutureExt;
5+
use std::marker::PhantomData;
6+
use std::sync::Arc;
7+
8+
pub trait WithTracingFacade<Key, Value> {
9+
fn with_tracing(
10+
self: Arc<Self>,
11+
span_name: String,
12+
) -> Arc<dyn ReadOnlyRepository<Key, Value, ReadError = anyhow::Error>>
13+
where
14+
Self: Sized + Send + Sync + 'static;
15+
}
16+
17+
impl<Repo, Key, Value> WithTracingFacade<Key, Value> for Repo
18+
where
19+
Repo: ReadOnlyRepository<Key, Value, ReadError = anyhow::Error> + Send + Sync + 'static,
20+
Key: Send + Sync + 'static,
21+
Value: Send + Sync + 'static,
22+
{
23+
fn with_tracing(
24+
self: Arc<Self>,
25+
span_name: String,
26+
) -> Arc<dyn ReadOnlyRepository<Key, Value, ReadError = anyhow::Error>> {
27+
Arc::new(TracingFacade {
28+
span_name,
29+
underlying: self,
30+
_p: Default::default(),
31+
})
32+
}
33+
}
34+
35+
struct TracingFacade<Repo, Key, Value>
36+
where
37+
Repo: ReadOnlyRepository<Key, Value, ReadError = anyhow::Error>,
38+
Key: Send + Sync + 'static,
39+
Value: Send + Sync + 'static,
40+
{
41+
span_name: String,
42+
underlying: Arc<Repo>,
43+
_p: PhantomData<(Key, Value)>,
44+
}
45+
46+
#[async_trait]
47+
impl<Repo, Key, Value> ReadOnlyRepository<Key, Value> for TracingFacade<Repo, Key, Value>
48+
where
49+
Repo: ReadOnlyRepository<Key, Value, ReadError = anyhow::Error>,
50+
Key: Send + Sync + 'static,
51+
Value: Send + Sync + 'static,
52+
{
53+
type ReadError = anyhow::Error;
54+
55+
async fn get(&self, key: Key) -> Result<Value, Self::ReadError> {
56+
let cx = start_trace(&self.span_name);
57+
self.underlying.get(key).with_context(cx.clone()).await.stop_trace(cx)
58+
}
59+
}

0 commit comments

Comments
 (0)