diff --git a/coprocessor/fhevm-engine/Cargo.lock b/coprocessor/fhevm-engine/Cargo.lock index a54297553f..70bcf18336 100644 --- a/coprocessor/fhevm-engine/Cargo.lock +++ b/coprocessor/fhevm-engine/Cargo.lock @@ -3425,6 +3425,8 @@ dependencies = [ "tonic", "tonic-build", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] diff --git a/coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml b/coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml index d9d9652409..8ce7d689c2 100644 --- a/coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml +++ b/coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml @@ -23,6 +23,8 @@ tfhe = { workspace = true } tonic = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } bytesize = { workspace = true} tokio-util = { workspace = true} axum = { workspace = true} diff --git a/coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs b/coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs index 68c78184a3..53765d8b93 100644 --- a/coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs +++ b/coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs @@ -17,9 +17,37 @@ use std::{ }; use tokio::sync::RwLock; use tracing::{debug, info, warn}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub const TXN_ID_ATTR_KEY: &str = "txn_id"; +/// Calls provider shutdown exactly once when dropped. +pub struct TracerProviderGuard { + tracer_provider: Option, +} + +impl TracerProviderGuard { + fn new(trace_provider: SdkTracerProvider) -> Self { + Self { + tracer_provider: Some(trace_provider), + } + } + + fn shutdown_once(&mut self) { + if let Some(provider) = self.tracer_provider.take() { + if let Err(err) = provider.shutdown() { + warn!(error = %err, "Failed to shutdown OTLP tracer provider"); + } + } + } +} + +impl Drop for TracerProviderGuard { + fn drop(&mut self) { + self.shutdown_once(); + } +} + pub static HOST_TXN_LATENCY_CONFIG: OnceLock = OnceLock::new(); pub(crate) static HOST_TXN_LATENCY_HISTOGRAM: LazyLock = LazyLock::new(|| { register_histogram( @@ -38,17 +66,61 @@ pub(crate) static ZKPROOF_TXN_LATENCY_HISTOGRAM: LazyLock = LazyLock: ) }); -pub fn setup_otlp( +pub fn init_otel( service_name: &str, -) -> Result<(), Box> { - let _ = setup_otlp_tracer(service_name, "otlp-layer")?; - Ok(()) +) -> Result, Box> { + if service_name.is_empty() { + return Ok(None); + } + + let (_tracer, trace_provider) = setup_otel_with_tracer(service_name, "otlp-layer")?; + opentelemetry::global::set_tracer_provider(trace_provider.clone()); + Ok(Some(TracerProviderGuard::new(trace_provider))) } -pub fn setup_otlp_tracer( +pub fn init_json_subscriber( + log_level: tracing::Level, service_name: &str, tracer_name: &'static str, -) -> Result> { +) -> Result, Box> { + let level_filter = tracing_subscriber::filter::LevelFilter::from_level(log_level); + let fmt_layer = tracing_subscriber::fmt::layer() + .json() + .with_target(false) + .with_current_span(true) + .with_span_list(false) + .with_level(true); + let base = tracing_subscriber::registry() + .with(level_filter) + .with(fmt_layer); + + if service_name.is_empty() { + base.try_init()?; + return Ok(None); + } + + let (tracer, trace_provider) = match setup_otel_with_tracer(service_name, tracer_name) { + Ok(v) => v, + Err(err) => { + // Keep JSON logs even if OTLP export cannot be initialized. + base.try_init()?; + return Err(err); + } + }; + + let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); + base.with(telemetry_layer).try_init()?; + opentelemetry::global::set_tracer_provider(trace_provider.clone()); + Ok(Some(TracerProviderGuard::new(trace_provider))) +} + +fn setup_otel_with_tracer( + service_name: &str, + tracer_name: &'static str, +) -> Result< + (opentelemetry_sdk::trace::Tracer, SdkTracerProvider), + Box, +> { let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .build()?; @@ -66,9 +138,7 @@ pub fn setup_otlp_tracer( .build(); let tracer = trace_provider.tracer(tracer_name); - opentelemetry::global::set_tracer_provider(trace_provider); - - Ok(tracer) + Ok((tracer, trace_provider)) } #[derive(Clone)] @@ -526,3 +596,28 @@ pub async fn try_end_zkproof_transaction( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn otel_guard_shutdown_once_disarms_provider() { + let provider = SdkTracerProvider::builder().build(); + let mut guard = TracerProviderGuard::new(provider); + assert!(guard.tracer_provider.is_some()); + + guard.shutdown_once(); + assert!(guard.tracer_provider.is_none()); + + // A second shutdown is a no-op. + guard.shutdown_once(); + assert!(guard.tracer_provider.is_none()); + } + + #[test] + fn setup_otel_empty_service_name_returns_none() { + let otel_guard = init_otel("").unwrap(); + assert!(otel_guard.is_none()); + } +} diff --git a/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs b/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs index 0c262935b0..2226136f33 100644 --- a/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs +++ b/coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs @@ -108,11 +108,13 @@ async fn main() -> anyhow::Result<()> { .with_max_level(conf.log_level) .init(); - if !conf.service_name.is_empty() { - if let Err(err) = telemetry::setup_otlp(&conf.service_name) { + let _otel_guard = match telemetry::init_otel(&conf.service_name) { + Ok(otel_guard) => otel_guard, + Err(err) => { error!(error = %err, "Failed to setup OTLP"); + None } - } + }; info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries, retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway"); diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index 2898f3aeca..19d9d9b9a2 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -1024,11 +1024,13 @@ pub async fn main(args: Args) -> anyhow::Result<()> { ) }; - if !args.service_name.is_empty() { - if let Err(err) = telemetry::setup_otlp(&args.service_name) { + let _otel_guard = match telemetry::init_otel(&args.service_name) { + Ok(otel_guard) => otel_guard, + Err(err) => { error!(error = %err, "Failed to setup OTLP"); + None } - } + }; let mut log_iter = InfiniteLogIter::new(&args); let chain_id = log_iter.get_chain_id().await?; diff --git a/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs b/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs index 8c6520aeaf..a4104b9647 100644 --- a/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/poller/mod.rs @@ -90,11 +90,13 @@ pub struct PollerConfig { } pub async fn run_poller(config: PollerConfig) -> Result<()> { - if !config.service_name.is_empty() { - if let Err(err) = telemetry::setup_otlp(&config.service_name) { - warn!(error = %err, "Failed to setup OTLP"); + let _otel_guard = match telemetry::init_otel(&config.service_name) { + Ok(otel_guard) => otel_guard, + Err(err) => { + error!(error = %err, "Failed to setup OTLP"); + None } - } + }; let acl_address = config.acl_address; let tfhe_address = config.tfhe_address; diff --git a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs index dc254de057..fc3a1e9bac 100644 --- a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs +++ b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs @@ -316,7 +316,11 @@ async fn test_listener_no_event_loss( reorg: bool, ) -> Result<(), anyhow::Error> { let setup = setup(None).await?; - let args = setup.args.clone(); + let mut args = setup.args.clone(); + // This test intentionally aborts/restarts the listener many times. + // Keep telemetry disabled here to avoid coupling event-loss assertions + // with exporter/shutdown timing. + args.service_name.clear(); // Start listener in background task let listener_handle = tokio::spawn(main(args.clone())); diff --git a/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs b/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs index d1ec6ffa91..59d258ff65 100644 --- a/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs +++ b/coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs @@ -4,7 +4,6 @@ use fhevm_engine_common::telemetry; use tokio::signal::unix; use tokio_util::sync::CancellationToken; use tracing::error; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod utils; fn handle_sigint(token: CancellationToken) { @@ -69,40 +68,15 @@ async fn main() { let mut otlp_setup_error: Option = None; - let otel_tracer = if config.service_name.is_empty() { - None - } else { - match telemetry::setup_otlp_tracer(&config.service_name, "otlp-layer") { - Ok(tracer) => Some(tracer), + let _otel_guard = + match telemetry::init_json_subscriber(config.log_level, &config.service_name, "otlp-layer") + { + Ok(guard) => guard, Err(err) => { otlp_setup_error = Some(err.to_string()); None } - } - }; - - let level_filter = tracing_subscriber::filter::LevelFilter::from_level(config.log_level); - - let fmt_layer = tracing_subscriber::fmt::layer() - .json() - // drop "target" field so the logs are not too verbose. Instead, span names are used. - .with_target(false) - // keep "span" - .with_current_span(true) - // drop "spans" - .with_span_list(false) - .with_level(true); - - let base = tracing_subscriber::registry() - .with(level_filter) - .with(fmt_layer); - - if let Some(tracer) = otel_tracer { - let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - base.with(otel_layer).init(); - } else { - base.init(); - } + }; if let Some(err) = otlp_setup_error { error!(error = %err, "Failed to setup OTLP"); diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs b/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs index d88e033787..7367dcc9c9 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs @@ -65,11 +65,13 @@ pub async fn async_main( let cancel_token = CancellationToken::new(); info!(target: "async_main", args = ?args, "Starting runtime with args"); - if !args.service_name.is_empty() { - if let Err(err) = telemetry::setup_otlp(&args.service_name) { + let _otel_guard = match telemetry::init_otel(&args.service_name) { + Ok(otel_guard) => otel_guard, + Err(err) => { error!(error = %err, "Failed to setup OTLP"); + None } - } + }; let database_url = args.database_url.clone().unwrap_or_default(); let health_check = health_check::HealthCheck::new(database_url); diff --git a/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs b/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs index 2cab182235..2ccd6210ea 100644 --- a/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs +++ b/coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs @@ -308,11 +308,13 @@ async fn main() -> anyhow::Result<()> { } }; - if !conf.service_name.is_empty() { - if let Err(err) = telemetry::setup_otlp(&conf.service_name) { + let _otel_guard = match telemetry::init_otel(&conf.service_name) { + Ok(otel_guard) => otel_guard, + Err(err) => { error!(error = %err, "Failed to setup OTLP"); + None } - } + }; let abstract_signer: AbstractSigner; match conf.signer_type { diff --git a/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs b/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs index 7cb1444a19..7cd21bcd89 100644 --- a/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs +++ b/coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs @@ -103,11 +103,13 @@ async fn main() { pg_auto_explain_with_min_duration: args.pg_auto_explain_with_min_duration, }; - if !args.service_name.is_empty() { - if let Err(err) = telemetry::setup_otlp(&args.service_name) { + let _otel_guard = match telemetry::init_otel(&args.service_name) { + Ok(otel_guard) => otel_guard, + Err(err) => { error!(error = %err, "Failed to setup OTLP"); + None } - } + }; let cancel_token = CancellationToken::new(); let Some(service) = ZkProofService::create(conf, cancel_token.child_token()).await else {