Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
eed3f9b
feat(coprocessor): standardize otlp init with shutdown guard
Eikix Feb 9, 2026
44d9824
refactor(coprocessor): clarify otlp helper naming
Eikix Feb 9, 2026
28a941a
refactor(coprocessor): remove non-shutdown otlp setup path
Eikix Feb 9, 2026
29c65ea
refactor(coprocessor): simplify OTLP init runtime API
Eikix Feb 9, 2026
8649de2
refactor(coprocessor): make OTLP runtime init explicit and tested
Eikix Feb 9, 2026
b140ece
refactor(coprocessor): drop unused OTEL propagator setup
Eikix Feb 9, 2026
aabb2e9
refactor(coprocessor): align OTEL runtime naming
Eikix Feb 9, 2026
c624d37
refactor(coprocessor): collapse OTEL runtime to single guard
Eikix Feb 9, 2026
4ec1f47
refactor(coprocessor): decouple tracer from OTEL guard
Eikix Feb 9, 2026
0e3cb83
docs(coprocessor): note transitional tracer return
Eikix Feb 9, 2026
b97b1e8
refactor(coprocessor): centralize sns-worker subscriber OTEL wiring
Eikix Feb 9, 2026
cb7e9a3
refactor(coprocessor): drop migration TODO note
Eikix Feb 9, 2026
4b5fb67
refactor(coprocessor): simplify otel setup and guard naming
Eikix Feb 9, 2026
52f4378
refactor(coprocessor): unify json subscriber init path
Eikix Feb 9, 2026
b01b2df
refactor(coprocessor): keep one json subscriber init API
Eikix Feb 9, 2026
0a0a9cd
refactor(coprocessor): remove expect from otel subscriber init
Eikix Feb 9, 2026
f0f77c2
refactor(coprocessor): split otel setup paths by use case
Eikix Feb 9, 2026
eb3c8bf
refactor(coprocessor): inline otel tracer-provider setup
Eikix Feb 9, 2026
4c1bf91
test(coprocessor): stabilize host-listener restart reorg test
Eikix Feb 9, 2026
900b416
fix(coprocessor): keep logs when OTLP init fails
Eikix Feb 9, 2026
6919a91
fix(coprocessor): use error level for poller OTLP init failure
Eikix Feb 9, 2026
60f4a1f
refactor(coprocessor): inline trivial OTEL helpers
Eikix Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ tfhe = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
bytesize = { workspace = true}
tokio-util = { workspace = true}
axum = { workspace = true}
Expand Down
113 changes: 104 additions & 9 deletions coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,37 @@ use std::{
};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

pub const TXN_ID_ATTR_KEY: &str = "txn_id";

/// Calls provider shutdown exactly once when dropped.
pub struct TracerProviderGuard {
tracer_provider: Option<SdkTracerProvider>,
}

impl TracerProviderGuard {
fn new(trace_provider: SdkTracerProvider) -> Self {
Self {
tracer_provider: Some(trace_provider),
}
}

fn shutdown_once(&mut self) {
if let Some(provider) = self.tracer_provider.take() {
if let Err(err) = provider.shutdown() {
warn!(error = %err, "Failed to shutdown OTLP tracer provider");
}
}
}
}

impl Drop for TracerProviderGuard {
fn drop(&mut self) {
self.shutdown_once();
}
}

pub static HOST_TXN_LATENCY_CONFIG: OnceLock<MetricsConfig> = OnceLock::new();
pub(crate) static HOST_TXN_LATENCY_HISTOGRAM: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram(
Expand All @@ -38,17 +66,61 @@ pub(crate) static ZKPROOF_TXN_LATENCY_HISTOGRAM: LazyLock<Histogram> = LazyLock:
)
});

pub fn setup_otlp(
pub fn init_otel(
service_name: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let _ = setup_otlp_tracer(service_name, "otlp-layer")?;
Ok(())
) -> Result<Option<TracerProviderGuard>, Box<dyn std::error::Error + Send + Sync + 'static>> {
if service_name.is_empty() {
return Ok(None);
}

let (_tracer, trace_provider) = setup_otel_with_tracer(service_name, "otlp-layer")?;
opentelemetry::global::set_tracer_provider(trace_provider.clone());
Ok(Some(TracerProviderGuard::new(trace_provider)))
}

pub fn setup_otlp_tracer(
pub fn init_json_subscriber(
log_level: tracing::Level,
service_name: &str,
tracer_name: &'static str,
) -> Result<opentelemetry_sdk::trace::Tracer, Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> Result<Option<TracerProviderGuard>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let level_filter = tracing_subscriber::filter::LevelFilter::from_level(log_level);
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(false)
.with_current_span(true)
.with_span_list(false)
.with_level(true);
let base = tracing_subscriber::registry()
.with(level_filter)
.with(fmt_layer);

if service_name.is_empty() {
base.try_init()?;
return Ok(None);
}

let (tracer, trace_provider) = match setup_otel_with_tracer(service_name, tracer_name) {
Ok(v) => v,
Err(err) => {
// Keep JSON logs even if OTLP export cannot be initialized.
base.try_init()?;
return Err(err);
}
};

let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
base.with(telemetry_layer).try_init()?;
opentelemetry::global::set_tracer_provider(trace_provider.clone());
Ok(Some(TracerProviderGuard::new(trace_provider)))
}

fn setup_otel_with_tracer(
service_name: &str,
tracer_name: &'static str,
) -> Result<
(opentelemetry_sdk::trace::Tracer, SdkTracerProvider),
Box<dyn std::error::Error + Send + Sync + 'static>,
> {
let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()?;
Expand All @@ -66,9 +138,7 @@ pub fn setup_otlp_tracer(
.build();

let tracer = trace_provider.tracer(tracer_name);
opentelemetry::global::set_tracer_provider(trace_provider);

Ok(tracer)
Ok((tracer, trace_provider))
}

#[derive(Clone)]
Expand Down Expand Up @@ -526,3 +596,28 @@ pub async fn try_end_zkproof_transaction(

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn otel_guard_shutdown_once_disarms_provider() {
let provider = SdkTracerProvider::builder().build();
let mut guard = TracerProviderGuard::new(provider);
assert!(guard.tracer_provider.is_some());

guard.shutdown_once();
assert!(guard.tracer_provider.is_none());

// A second shutdown is a no-op.
guard.shutdown_once();
assert!(guard.tracer_provider.is_none());
}

#[test]
fn setup_otel_empty_service_name_returns_none() {
let otel_guard = init_otel("").unwrap();
assert!(otel_guard.is_none());
}
}
8 changes: 5 additions & 3 deletions coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ async fn main() -> anyhow::Result<()> {
.with_max_level(conf.log_level)
.init();

if !conf.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
let _otel_guard = match telemetry::init_otel(&conf.service_name) {
Ok(otel_guard) => otel_guard,
Err(err) => {
error!(error = %err, "Failed to setup OTLP");
None
}
}
};

info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries,
retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway");
Expand Down
8 changes: 5 additions & 3 deletions coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,11 +1024,13 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
)
};

if !args.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&args.service_name) {
let _otel_guard = match telemetry::init_otel(&args.service_name) {
Ok(otel_guard) => otel_guard,
Err(err) => {
error!(error = %err, "Failed to setup OTLP");
None
}
}
};

let mut log_iter = InfiniteLogIter::new(&args);
let chain_id = log_iter.get_chain_id().await?;
Expand Down
10 changes: 6 additions & 4 deletions coprocessor/fhevm-engine/host-listener/src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ pub struct PollerConfig {
}

pub async fn run_poller(config: PollerConfig) -> Result<()> {
if !config.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&config.service_name) {
warn!(error = %err, "Failed to setup OTLP");
let _otel_guard = match telemetry::init_otel(&config.service_name) {
Ok(otel_guard) => otel_guard,
Err(err) => {
error!(error = %err, "Failed to setup OTLP");
None
}
}
};

let acl_address = config.acl_address;
let tfhe_address = config.tfhe_address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ async fn test_listener_no_event_loss(
reorg: bool,
) -> Result<(), anyhow::Error> {
let setup = setup(None).await?;
let args = setup.args.clone();
let mut args = setup.args.clone();
// This test intentionally aborts/restarts the listener many times.
// Keep telemetry disabled here to avoid coupling event-loss assertions
// with exporter/shutdown timing.
args.service_name.clear();

// Start listener in background task
let listener_handle = tokio::spawn(main(args.clone()));
Expand Down
36 changes: 5 additions & 31 deletions coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use fhevm_engine_common::telemetry;
use tokio::signal::unix;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod utils;

fn handle_sigint(token: CancellationToken) {
Expand Down Expand Up @@ -69,40 +68,15 @@ async fn main() {

let mut otlp_setup_error: Option<String> = None;

let otel_tracer = if config.service_name.is_empty() {
None
} else {
match telemetry::setup_otlp_tracer(&config.service_name, "otlp-layer") {
Ok(tracer) => Some(tracer),
let _otel_guard =
match telemetry::init_json_subscriber(config.log_level, &config.service_name, "otlp-layer")
{
Ok(guard) => guard,
Err(err) => {
otlp_setup_error = Some(err.to_string());
None
}
}
};

let level_filter = tracing_subscriber::filter::LevelFilter::from_level(config.log_level);

let fmt_layer = tracing_subscriber::fmt::layer()
.json()
// drop "target" field so the logs are not too verbose. Instead, span names are used.
.with_target(false)
// keep "span"
.with_current_span(true)
// drop "spans"
.with_span_list(false)
.with_level(true);

let base = tracing_subscriber::registry()
.with(level_filter)
.with(fmt_layer);

if let Some(tracer) = otel_tracer {
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
base.with(otel_layer).init();
} else {
base.init();
}
};

if let Some(err) = otlp_setup_error {
error!(error = %err, "Failed to setup OTLP");
Expand Down
8 changes: 5 additions & 3 deletions coprocessor/fhevm-engine/tfhe-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ pub async fn async_main(
let cancel_token = CancellationToken::new();
info!(target: "async_main", args = ?args, "Starting runtime with args");

if !args.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&args.service_name) {
let _otel_guard = match telemetry::init_otel(&args.service_name) {
Ok(otel_guard) => otel_guard,
Err(err) => {
error!(error = %err, "Failed to setup OTLP");
None
}
}
};

let database_url = args.database_url.clone().unwrap_or_default();
let health_check = health_check::HealthCheck::new(database_url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,13 @@ async fn main() -> anyhow::Result<()> {
}
};

if !conf.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
let _otel_guard = match telemetry::init_otel(&conf.service_name) {
Ok(otel_guard) => otel_guard,
Err(err) => {
error!(error = %err, "Failed to setup OTLP");
None
}
}
};

let abstract_signer: AbstractSigner;
match conf.signer_type {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ async fn main() {
pg_auto_explain_with_min_duration: args.pg_auto_explain_with_min_duration,
};

if !args.service_name.is_empty() {
if let Err(err) = telemetry::setup_otlp(&args.service_name) {
let _otel_guard = match telemetry::init_otel(&args.service_name) {
Ok(otel_guard) => otel_guard,
Err(err) => {
error!(error = %err, "Failed to setup OTLP");
None
}
}
};

let cancel_token = CancellationToken::new();
let Some(service) = ZkProofService::create(conf, cancel_token.child_token()).await else {
Expand Down
Loading