Skip to content

Commit 7169c3f

Browse files
authored
Merge of #1942
2 parents 22f7d0f + 60f4a1f commit 7169c3f

File tree

11 files changed

+149
-60
lines changed

11 files changed

+149
-60
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ tfhe = { workspace = true }
2323
tonic = { workspace = true }
2424
tokio = { workspace = true }
2525
tracing = { workspace = true }
26+
tracing-opentelemetry = { workspace = true }
27+
tracing-subscriber = { workspace = true }
2628
bytesize = { workspace = true}
2729
tokio-util = { workspace = true}
2830
axum = { workspace = true}

coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs

Lines changed: 104 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,37 @@ use std::{
1717
};
1818
use tokio::sync::RwLock;
1919
use tracing::{debug, info, warn};
20+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
2021

2122
pub const TXN_ID_ATTR_KEY: &str = "txn_id";
2223

24+
/// Calls provider shutdown exactly once when dropped.
25+
pub struct TracerProviderGuard {
26+
tracer_provider: Option<SdkTracerProvider>,
27+
}
28+
29+
impl TracerProviderGuard {
30+
fn new(trace_provider: SdkTracerProvider) -> Self {
31+
Self {
32+
tracer_provider: Some(trace_provider),
33+
}
34+
}
35+
36+
fn shutdown_once(&mut self) {
37+
if let Some(provider) = self.tracer_provider.take() {
38+
if let Err(err) = provider.shutdown() {
39+
warn!(error = %err, "Failed to shutdown OTLP tracer provider");
40+
}
41+
}
42+
}
43+
}
44+
45+
impl Drop for TracerProviderGuard {
46+
fn drop(&mut self) {
47+
self.shutdown_once();
48+
}
49+
}
50+
2351
pub static HOST_TXN_LATENCY_CONFIG: OnceLock<MetricsConfig> = OnceLock::new();
2452
pub(crate) static HOST_TXN_LATENCY_HISTOGRAM: LazyLock<Histogram> = LazyLock::new(|| {
2553
register_histogram(
@@ -38,17 +66,61 @@ pub(crate) static ZKPROOF_TXN_LATENCY_HISTOGRAM: LazyLock<Histogram> = LazyLock:
3866
)
3967
});
4068

41-
pub fn setup_otlp(
69+
pub fn init_otel(
4270
service_name: &str,
43-
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
44-
let _ = setup_otlp_tracer(service_name, "otlp-layer")?;
45-
Ok(())
71+
) -> Result<Option<TracerProviderGuard>, Box<dyn std::error::Error + Send + Sync + 'static>> {
72+
if service_name.is_empty() {
73+
return Ok(None);
74+
}
75+
76+
let (_tracer, trace_provider) = setup_otel_with_tracer(service_name, "otlp-layer")?;
77+
opentelemetry::global::set_tracer_provider(trace_provider.clone());
78+
Ok(Some(TracerProviderGuard::new(trace_provider)))
4679
}
4780

48-
pub fn setup_otlp_tracer(
81+
pub fn init_json_subscriber(
82+
log_level: tracing::Level,
4983
service_name: &str,
5084
tracer_name: &'static str,
51-
) -> Result<opentelemetry_sdk::trace::Tracer, Box<dyn std::error::Error + Send + Sync + 'static>> {
85+
) -> Result<Option<TracerProviderGuard>, Box<dyn std::error::Error + Send + Sync + 'static>> {
86+
let level_filter = tracing_subscriber::filter::LevelFilter::from_level(log_level);
87+
let fmt_layer = tracing_subscriber::fmt::layer()
88+
.json()
89+
.with_target(false)
90+
.with_current_span(true)
91+
.with_span_list(false)
92+
.with_level(true);
93+
let base = tracing_subscriber::registry()
94+
.with(level_filter)
95+
.with(fmt_layer);
96+
97+
if service_name.is_empty() {
98+
base.try_init()?;
99+
return Ok(None);
100+
}
101+
102+
let (tracer, trace_provider) = match setup_otel_with_tracer(service_name, tracer_name) {
103+
Ok(v) => v,
104+
Err(err) => {
105+
// Keep JSON logs even if OTLP export cannot be initialized.
106+
base.try_init()?;
107+
return Err(err);
108+
}
109+
};
110+
111+
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
112+
base.with(telemetry_layer).try_init()?;
113+
opentelemetry::global::set_tracer_provider(trace_provider.clone());
114+
Ok(Some(TracerProviderGuard::new(trace_provider)))
115+
}
116+
117+
fn setup_otel_with_tracer(
118+
service_name: &str,
119+
tracer_name: &'static str,
120+
) -> Result<
121+
(opentelemetry_sdk::trace::Tracer, SdkTracerProvider),
122+
Box<dyn std::error::Error + Send + Sync + 'static>,
123+
> {
52124
let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
53125
.with_tonic()
54126
.build()?;
@@ -66,9 +138,7 @@ pub fn setup_otlp_tracer(
66138
.build();
67139

68140
let tracer = trace_provider.tracer(tracer_name);
69-
opentelemetry::global::set_tracer_provider(trace_provider);
70-
71-
Ok(tracer)
141+
Ok((tracer, trace_provider))
72142
}
73143

74144
#[derive(Clone)]
@@ -526,3 +596,28 @@ pub async fn try_end_zkproof_transaction(
526596

527597
Ok(())
528598
}
599+
600+
#[cfg(test)]
601+
mod tests {
602+
use super::*;
603+
604+
#[test]
605+
fn otel_guard_shutdown_once_disarms_provider() {
606+
let provider = SdkTracerProvider::builder().build();
607+
let mut guard = TracerProviderGuard::new(provider);
608+
assert!(guard.tracer_provider.is_some());
609+
610+
guard.shutdown_once();
611+
assert!(guard.tracer_provider.is_none());
612+
613+
// A second shutdown is a no-op.
614+
guard.shutdown_once();
615+
assert!(guard.tracer_provider.is_none());
616+
}
617+
618+
#[test]
619+
fn setup_otel_empty_service_name_returns_none() {
620+
let otel_guard = init_otel("").unwrap();
621+
assert!(otel_guard.is_none());
622+
}
623+
}

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,13 @@ async fn main() -> anyhow::Result<()> {
108108
.with_max_level(conf.log_level)
109109
.init();
110110

111-
if !conf.service_name.is_empty() {
112-
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
111+
let _otel_guard = match telemetry::init_otel(&conf.service_name) {
112+
Ok(otel_guard) => otel_guard,
113+
Err(err) => {
113114
error!(error = %err, "Failed to setup OTLP");
115+
None
114116
}
115-
}
117+
};
116118

117119
info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries,
118120
retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway");

coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,11 +1024,13 @@ pub async fn main(args: Args) -> anyhow::Result<()> {
10241024
)
10251025
};
10261026

1027-
if !args.service_name.is_empty() {
1028-
if let Err(err) = telemetry::setup_otlp(&args.service_name) {
1027+
let _otel_guard = match telemetry::init_otel(&args.service_name) {
1028+
Ok(otel_guard) => otel_guard,
1029+
Err(err) => {
10291030
error!(error = %err, "Failed to setup OTLP");
1031+
None
10301032
}
1031-
}
1033+
};
10321034

10331035
let mut log_iter = InfiniteLogIter::new(&args);
10341036
let chain_id = log_iter.get_chain_id().await?;

coprocessor/fhevm-engine/host-listener/src/poller/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ pub struct PollerConfig {
9090
}
9191

9292
pub async fn run_poller(config: PollerConfig) -> Result<()> {
93-
if !config.service_name.is_empty() {
94-
if let Err(err) = telemetry::setup_otlp(&config.service_name) {
95-
warn!(error = %err, "Failed to setup OTLP");
93+
let _otel_guard = match telemetry::init_otel(&config.service_name) {
94+
Ok(otel_guard) => otel_guard,
95+
Err(err) => {
96+
error!(error = %err, "Failed to setup OTLP");
97+
None
9698
}
97-
}
99+
};
98100

99101
let acl_address = config.acl_address;
100102
let tfhe_address = config.tfhe_address;

coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,11 @@ async fn test_listener_no_event_loss(
316316
reorg: bool,
317317
) -> Result<(), anyhow::Error> {
318318
let setup = setup(None).await?;
319-
let args = setup.args.clone();
319+
let mut args = setup.args.clone();
320+
// This test intentionally aborts/restarts the listener many times.
321+
// Keep telemetry disabled here to avoid coupling event-loss assertions
322+
// with exporter/shutdown timing.
323+
args.service_name.clear();
320324

321325
// Start listener in background task
322326
let listener_handle = tokio::spawn(main(args.clone()));

coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use fhevm_engine_common::telemetry;
44
use tokio::signal::unix;
55
use tokio_util::sync::CancellationToken;
66
use tracing::error;
7-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
87
mod utils;
98

109
fn handle_sigint(token: CancellationToken) {
@@ -69,40 +68,15 @@ async fn main() {
6968

7069
let mut otlp_setup_error: Option<String> = None;
7170

72-
let otel_tracer = if config.service_name.is_empty() {
73-
None
74-
} else {
75-
match telemetry::setup_otlp_tracer(&config.service_name, "otlp-layer") {
76-
Ok(tracer) => Some(tracer),
71+
let _otel_guard =
72+
match telemetry::init_json_subscriber(config.log_level, &config.service_name, "otlp-layer")
73+
{
74+
Ok(guard) => guard,
7775
Err(err) => {
7876
otlp_setup_error = Some(err.to_string());
7977
None
8078
}
81-
}
82-
};
83-
84-
let level_filter = tracing_subscriber::filter::LevelFilter::from_level(config.log_level);
85-
86-
let fmt_layer = tracing_subscriber::fmt::layer()
87-
.json()
88-
// drop "target" field so the logs are not too verbose. Instead, span names are used.
89-
.with_target(false)
90-
// keep "span"
91-
.with_current_span(true)
92-
// drop "spans"
93-
.with_span_list(false)
94-
.with_level(true);
95-
96-
let base = tracing_subscriber::registry()
97-
.with(level_filter)
98-
.with(fmt_layer);
99-
100-
if let Some(tracer) = otel_tracer {
101-
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
102-
base.with(otel_layer).init();
103-
} else {
104-
base.init();
105-
}
79+
};
10680

10781
if let Some(err) = otlp_setup_error {
10882
error!(error = %err, "Failed to setup OTLP");

coprocessor/fhevm-engine/tfhe-worker/src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ pub async fn async_main(
6565
let cancel_token = CancellationToken::new();
6666
info!(target: "async_main", args = ?args, "Starting runtime with args");
6767

68-
if !args.service_name.is_empty() {
69-
if let Err(err) = telemetry::setup_otlp(&args.service_name) {
68+
let _otel_guard = match telemetry::init_otel(&args.service_name) {
69+
Ok(otel_guard) => otel_guard,
70+
Err(err) => {
7071
error!(error = %err, "Failed to setup OTLP");
72+
None
7173
}
72-
}
74+
};
7375

7476
let database_url = args.database_url.clone().unwrap_or_default();
7577
let health_check = health_check::HealthCheck::new(database_url);

coprocessor/fhevm-engine/transaction-sender/src/bin/transaction_sender.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,13 @@ async fn main() -> anyhow::Result<()> {
308308
}
309309
};
310310

311-
if !conf.service_name.is_empty() {
312-
if let Err(err) = telemetry::setup_otlp(&conf.service_name) {
311+
let _otel_guard = match telemetry::init_otel(&conf.service_name) {
312+
Ok(otel_guard) => otel_guard,
313+
Err(err) => {
313314
error!(error = %err, "Failed to setup OTLP");
315+
None
314316
}
315-
}
317+
};
316318

317319
let abstract_signer: AbstractSigner;
318320
match conf.signer_type {

0 commit comments

Comments
 (0)