Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions src/common/tracing/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::time::Duration;

use opentelemetry::Key;
// These match the OTEL_EXPORTER_OTLP_* environment variables from:
// https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#endpoint-configuration
use opentelemetry_otlp::{
OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, Protocol,
};
use opentelemetry_sdk::Resource;

/// Environment variable for the general OTLP endpoint.
pub const ENV_OLD_OTLP_ENDPOINT: &str = "DAFT_DEV_OTEL_EXPORTER_OTLP_ENDPOINT";
Expand Down Expand Up @@ -107,4 +109,123 @@ impl Config {
.map(Duration::from_millis)
.unwrap_or(Duration::from_millis(500))
}

pub fn resource(&self) -> Resource {
let detected_resource = Resource::builder().build();
let service_name = std::env::var("OTEL_SERVICE_NAME")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a constant in the opentelemetry_otlp create we can use?

.ok()
.filter(|name| !name.is_empty())
.or_else(|| {
detected_resource
.get(&Key::new("service.name"))
.map(|value| value.to_string())
.filter(|name| name != "unknown_service")
})
.unwrap_or_else(|| "daft".to_string());

Resource::builder().with_service_name(service_name).build()
}
}

#[cfg(test)]
mod tests {
use std::sync::Mutex;

use opentelemetry::{Key, Value};

use super::*;

static ENV_LOCK: Mutex<()> = Mutex::new(());

fn with_env_vars<R>(vars: &[(&str, Option<&str>)], f: impl FnOnce() -> R) -> R {
let _guard = ENV_LOCK.lock().unwrap();
let previous_values: Vec<_> = vars
.iter()
.map(|(name, _)| (*name, std::env::var(name).ok()))
.collect();

for (name, value) in vars {
match value {
Some(value) => unsafe { std::env::set_var(name, value) },
None => unsafe { std::env::remove_var(name) },
}
}

let result = f();

for (name, value) in previous_values {
match value {
Some(value) => unsafe { std::env::set_var(name, value) },
None => unsafe { std::env::remove_var(name) },
}
}

result
}
Comment on lines +140 to +164

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Test env-var cleanup is not panic-safe

If the closure f() panics (e.g. an assertion fires), the for loop that restores the previous values never executes, leaving the modified env vars behind for every subsequent test in the process. The ENV_LOCK mutex will also be poisoned by the panic, causing all later with_env_vars calls to propagate the poison via .unwrap(). A Drop-based restore guard would fix both problems.


#[test]
fn resource_defaults_to_daft_service_name() {
with_env_vars(
&[
("OTEL_SERVICE_NAME", None),
("OTEL_RESOURCE_ATTRIBUTES", None),
],
|| {
let resource = Config::from_env().resource();
assert_eq!(
resource.get(&Key::new("service.name")),
Some(Value::from("daft"))
);
},
);
}

#[test]
fn resource_uses_otel_service_name_env() {
with_env_vars(
&[
("OTEL_SERVICE_NAME", Some("daft-ray-worker")),
("OTEL_RESOURCE_ATTRIBUTES", None),
],
|| {
let resource = Config::from_env().resource();
assert_eq!(
resource.get(&Key::new("service.name")),
Some(Value::from("daft-ray-worker"))
);
},
);
}

#[test]
fn resource_includes_otel_resource_attributes() {
with_env_vars(
&[
("OTEL_SERVICE_NAME", None),
(
"OTEL_RESOURCE_ATTRIBUTES",
Some("deployment.environment=staging,service.version=0.7.14"),
),
],
|| {
let resource = Config::from_env().resource();
assert_eq!(
resource.get(&Key::new("deployment.environment")),
Some(Value::from("staging"))
);
assert_eq!(
resource.get(&Key::new("telemetry.sdk.language")),
Some(Value::from("rust"))
);
assert_eq!(
resource.get(&Key::new("service.version")),
Some(Value::from("0.7.14"))
);
assert_eq!(
resource.get(&Key::new("service.name")),
Some(Value::from("daft"))
);
},
);
}
}
11 changes: 2 additions & 9 deletions src/common/tracing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
mod config;

use std::{
sync::{LazyLock, Mutex},
time::Duration,
};
use std::sync::{LazyLock, Mutex};

use common_runtime::get_io_runtime;
pub use config::Config;
Expand Down Expand Up @@ -53,7 +50,7 @@ impl TraceFormat {

pub fn init_tracing() {
let config = Config::from_env();
let resource = Resource::builder().with_service_name("daft").build();
let resource = config.resource();
let tracer_provider = if config.enabled() {
let runtime = get_io_runtime(true);
runtime.block_on_current_thread(async {
Expand Down Expand Up @@ -158,7 +155,6 @@ fn init_otlp_logger_provider(config: &Config, otlp_endpoint: &str, resource: Res
let log_exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(otlp_endpoint)
.with_timeout(Duration::from_secs(10))
.build()
.expect("Failed to build OTLP logger exporter.");

Expand Down Expand Up @@ -188,14 +184,12 @@ fn init_otlp_metrics_provider(config: &Config, endpoint: &str, resource: Resourc
opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_timeout(Duration::from_secs(10))
.build()
}
Protocol::HttpBinary => {
opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_timeout(Duration::from_secs(10))
.with_protocol(config.otlp_protocol)
.build()
}
Expand Down Expand Up @@ -242,7 +236,6 @@ fn init_otlp_tracer_provider(
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(otlp_endpoint)
.with_timeout(Duration::from_secs(10))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is now being read from the env what is the default if it's not set?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.build()
.expect("Failed to build OTLP span exporter for tracing");

Expand Down
Loading