Skip to content

Commit 0573086

Browse files
authored
feat(coprocessor): standardize tracing/OTEL spans across workers (#1976)
* feat(coprocessor): migrate Phases C/D/H/J spans to tracing Migrate remaining coprocessor workers from legacy OtelTracer to the tracing + OTEL bridge pattern established in Phase A/B: - Phase C: zkproof-worker verify_task spans (verifier.rs) - Phase D: tfhe-worker high-signal execution loop spans (tfhe_worker.rs) - Phase H: host-listener DB propagation spans (tfhe_event_propagate.rs) - Phase J: transaction-sender op spans (add_ciphertext, allow_handle, delegate_user_decrypt, verify_proof) Also cleans up telemetry.rs by removing the now-unused OtelTracer wrapper and legacy span helpers, fixes a pre-existing clippy lint in chain_id.rs, and fixes minor sns-worker span hygiene from Phase B. Closes zama-ai/fhevm-internal#935 Closes zama-ai/fhevm-internal#1008 Closes zama-ai/fhevm-internal#1009 Closes zama-ai/fhevm-internal#1034 Closes zama-ai/fhevm-internal#1036 * refactor(coprocessor): use idiomatic #[tracing::instrument] everywhere Address PR review feedback: - Replace manual info_span!() + .enter() with #[tracing::instrument] on functions (host-listener, transaction-sender, zkproof-worker, tfhe-worker) - Remove all set_parent() calls in zkproof-worker — tracing-opentelemetry layer handles parent propagation automatically via #[instrument] - Remove span: tracing::Span parameter threading in verify_proof and create_ciphertext - Use .instrument(loop_span) for async parent context in tfhe-worker instead of passing loop_span as a function parameter - Revert out-of-scope chain_id.rs clippy fix * refactor(coprocessor): remove last non-cross-boundary set_parent, document remaining - Replace set_parent(t.context()) with parent: &t in batch_store span (same execution context, not cross-boundary) - Add comments on the two remaining set_parent calls explaining they restore OTel context across async/thread boundaries * refactor(coprocessor): migrate scheduler crate from raw OTel to tracing Replace opentelemetry::global::tracer + manual spans with tracing::info_span! in the scheduler's DFG execution pipeline. Remove set_txn_id / BoxedSpan from telemetry.rs (no remaining callers). * refactor(coprocessor): address review feedback on tracing style * fix(coprocessor): restore tracing telemetry parity * refactor(tfhe-worker): avoid unnecessary txn id allocation * fix(coprocessor): address telemetry rereview cleanup items * fix(coprocessor): address obatirou telemetry review comments * refactor(coprocessor): dedupe short id telemetry helper * refactor(coprocessor): streamline span metadata per obatirou review * fix(zkproof-worker): instrument db_insert span timing * fix(transaction-sender): restore prepare_delegate tracing parity * fix(telemetry): simplify short-id helper callsites * fix(coprocessor): drop out-of-scope chain_id style delta * refactor(coprocessor): dedupe span error helper and remove stale dead_code attr * fix(coprocessor): restore service otel wiring and align span op names * refactor(coprocessor): simplify OTLP subscriber init fallback * chore(coprocessor): remove unused init_otel helper * fix(zkproof-worker): drop duplicate create_handle info event * docs(telemetry): clarify OTLP fallback semantics * refactor(telemetry): rename otel init helper for explicit fallback semantics * refactor(coprocessor): unify info macro usage and remove duplicate event * docs(coprocessor): add telemetry style guide and keep local OTLP endpoint * refactor(tfhe-worker): use shared span-error helper in error paths * refactor(txn-sender): simplify optional txn_id span fielding * refactor(txn-sender): restore lean optional txn_id span recording * fix(coprocessor): restore span attribute parity and fix A/B regressions - Remove redundant info! events in tfhe-worker that duplicated span fields - Restore db_insert count/valid and verify_proof list_len as span fields instead of log events in zkproof-worker (attribute parity with main) - Add count field to expand_verified_list span (was set by caller on main) - Restore distinct span names in txn-sender via #[instrument(name = ...)] to avoid collapsing 4 operations into 2 generic names * refactor(coprocessor): remove redundant operation= span attributes The span name already serves as the operation identifier in Jaeger/OTEL, making the duplicate operation field unnecessary. Where operation differed from the function name, name= on #[instrument] was already set in the prior commit. The scheduler's FheGetCiphertext operation is intentionally kept as it represents the FHE opcode, not the function name.
1 parent ad06d43 commit 0573086

File tree

28 files changed

+669
-739
lines changed

28 files changed

+669
-739
lines changed

coprocessor/README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,55 @@ When using the `aws-kms` signer type, standard `AWS_*` environment variables are
283283
- **AWS_SECRET_ACCESS_KEY** (i.e. password)
284284
- etc.
285285

286+
## Telemetry Style Guide (Tracing + OTEL)
287+
288+
Use `tracing` spans as the default telemetry API.
289+
290+
### Rules
291+
292+
1. Use function/span names as the operation name.
293+
- Do not add an `operation = "..."` span field.
294+
2. Do not attach high-cardinality identifiers to span attributes.
295+
- Do not put `txn_id`, `transaction_hash`, or `handle` on spans.
296+
- If needed for debugging, log these values in events/log lines.
297+
3. For async work, instrument futures with `.instrument(...)`.
298+
- Do not keep `span.enter()` guards alive across `.await`.
299+
4. Set OTEL error status on error exits.
300+
- Logging an error is not enough for trace error visibility.
301+
5. Keep span fields low-cardinality and useful for aggregation.
302+
- Good examples: `request_id`, counts, booleans, retry bucket, chain id.
303+
304+
### Preferred snippets
305+
306+
```rust
307+
#[tracing::instrument(skip_all)]
308+
async fn process_proof(...) -> anyhow::Result<()> {
309+
// business logic
310+
Ok(())
311+
}
312+
```
313+
314+
```rust
315+
use tracing::Instrument;
316+
317+
let db_insert_span = tracing::info_span!("db_insert", request_id);
318+
async {
319+
sqlx::query("UPDATE ...").execute(pool).await?;
320+
Ok::<(), sqlx::Error>(())
321+
}
322+
.instrument(db_insert_span.clone())
323+
.await?;
324+
```
325+
326+
```rust
327+
use tracing_opentelemetry::OpenTelemetrySpanExt;
328+
329+
if let Err(err) = do_work().instrument(span.clone()).await {
330+
span.context().span().set_status(opentelemetry::trace::Status::error(err.to_string()));
331+
return Err(err.into());
332+
}
333+
```
334+
286335

287336
## Resources
288337

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs

Lines changed: 50 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
use crate::chain_id::ChainId;
22
use crate::utils::to_hex;
33
use bigdecimal::num_traits::ToPrimitive;
4-
use opentelemetry::{
5-
global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
6-
trace::{SpanBuilder, Status, TraceContextExt, Tracer, TracerProvider},
7-
Context, KeyValue,
8-
};
4+
use opentelemetry::{trace::TraceContextExt, trace::TracerProvider, KeyValue};
95
use opentelemetry_sdk::{trace::SdkTracerProvider, Resource};
106
use prometheus::{register_histogram, Histogram};
117
use sqlx::PgConnection;
@@ -14,14 +10,12 @@ use std::{
1410
num::NonZeroUsize,
1511
str::FromStr,
1612
sync::{Arc, LazyLock, OnceLock},
17-
time::SystemTime,
1813
};
1914
use tokio::sync::RwLock;
20-
use tracing::{debug, info, warn};
15+
use tracing::{debug, error, info, warn, Span};
16+
use tracing_opentelemetry::OpenTelemetrySpanExt;
2117
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
2218

23-
pub const TXN_ID_ATTR_KEY: &str = "txn_id";
24-
2519
/// Calls provider shutdown exactly once when dropped.
2620
pub struct TracerProviderGuard {
2721
tracer_provider: Option<SdkTracerProvider>,
@@ -67,18 +61,6 @@ pub(crate) static ZKPROOF_TXN_LATENCY_HISTOGRAM: LazyLock<Histogram> = LazyLock:
6761
)
6862
});
6963

70-
pub fn init_otel(
71-
service_name: &str,
72-
) -> Result<Option<TracerProviderGuard>, Box<dyn std::error::Error + Send + Sync + 'static>> {
73-
if service_name.is_empty() {
74-
return Ok(None);
75-
}
76-
77-
let (_tracer, trace_provider) = setup_otel_with_tracer(service_name, "otlp-layer")?;
78-
opentelemetry::global::set_tracer_provider(trace_provider.clone());
79-
Ok(Some(TracerProviderGuard::new(trace_provider)))
80-
}
81-
8264
pub fn init_json_subscriber(
8365
log_level: tracing::Level,
8466
service_name: &str,
@@ -115,6 +97,25 @@ pub fn init_json_subscriber(
11597
Ok(Some(TracerProviderGuard::new(trace_provider)))
11698
}
11799

100+
/// Initializes tracing with JSON logs and best-effort OTLP export.
101+
///
102+
/// Fallback here means "logs-only mode": if OTLP setup fails, we keep
103+
/// JSON logging enabled and continue execution without an OTLP exporter.
104+
/// It does not try alternate OTLP endpoints.
105+
pub fn init_tracing_otel_with_logs_only_fallback(
106+
log_level: tracing::Level,
107+
service_name: &str,
108+
tracer_name: &'static str,
109+
) -> Option<TracerProviderGuard> {
110+
match init_json_subscriber(log_level, service_name, tracer_name) {
111+
Ok(guard) => guard,
112+
Err(err) => {
113+
error!(error = %err, "Failed to setup OTLP");
114+
None
115+
}
116+
}
117+
}
118+
118119
fn setup_otel_with_tracer(
119120
service_name: &str,
120121
tracer_name: &'static str,
@@ -142,119 +143,6 @@ fn setup_otel_with_tracer(
142143
Ok((tracer, trace_provider))
143144
}
144145

145-
#[derive(Clone)]
146-
pub struct OtelTracer {
147-
ctx: opentelemetry::Context,
148-
tracer: Arc<BoxedTracer>,
149-
}
150-
151-
impl OtelTracer {
152-
pub fn child_span(&self, name: &'static str) -> BoxedSpan {
153-
self.tracer.start_with_context(name, &self.ctx)
154-
}
155-
156-
pub fn context(&self) -> &Context {
157-
&self.ctx
158-
}
159-
160-
/// Sets attribute to the root span
161-
pub fn set_attribute(&self, key: &str, value: String) {
162-
self.ctx
163-
.span()
164-
.set_attribute(KeyValue::new(key.to_owned(), value));
165-
}
166-
167-
/// Consumes and ends the tracer with status Ok
168-
pub fn end(self) {
169-
self.ctx.span().set_status(Status::Ok);
170-
self.ctx.span().end();
171-
}
172-
}
173-
174-
#[derive(Debug, PartialEq)]
175-
struct Handle(Vec<u8>);
176-
#[derive(Debug, PartialEq)]
177-
struct Transaction(Vec<u8>);
178-
179-
pub fn tracer_with_handle(
180-
span_name: &'static str,
181-
handle: Vec<u8>,
182-
transaction_id: &Option<Vec<u8>>,
183-
) -> OtelTracer {
184-
let tracer = opentelemetry::global::tracer(format!("tracer_{}", span_name));
185-
let mut span = tracer.start(span_name);
186-
187-
if !handle.is_empty() {
188-
let handle = to_hex(&handle).get(0..10).unwrap_or_default().to_owned();
189-
190-
span.set_attribute(KeyValue::new("handle", handle));
191-
}
192-
193-
if let Some(transaction_id) = transaction_id {
194-
set_txn_id(&mut span, transaction_id);
195-
}
196-
197-
// Add handle and transaction_id to the context
198-
// so that they can be retrieved in the application code, e.g. for logging
199-
let mut ctx = Context::default().with_span(span);
200-
ctx = ctx.with_value(Handle(handle.clone()));
201-
ctx = ctx.with_value(Transaction(transaction_id.clone().unwrap_or_default()));
202-
203-
OtelTracer {
204-
ctx,
205-
tracer: Arc::new(tracer),
206-
}
207-
}
208-
209-
// Sets the txn_id attribute to the span
210-
// The txn_id is a shortened version of the transaction_id (first 10 characters of the hex representation)
211-
pub fn set_txn_id(span: &mut BoxedSpan, transaction_id: &[u8]) {
212-
let txn_id_short = to_hex(transaction_id)
213-
.get(0..10)
214-
.unwrap_or_default()
215-
.to_owned();
216-
217-
span.set_attribute(KeyValue::new(TXN_ID_ATTR_KEY, txn_id_short));
218-
}
219-
220-
/// Create a new span with start and end time
221-
pub fn tracer_with_start_time(span_name: &'static str, start_time: SystemTime) -> OtelTracer {
222-
let tracer = opentelemetry::global::tracer(span_name);
223-
let root_span = tracer.build(SpanBuilder::from_name(span_name).with_start_time(start_time));
224-
let ctx = opentelemetry::Context::default().with_span(root_span);
225-
OtelTracer {
226-
ctx,
227-
tracer: Arc::new(tracer),
228-
}
229-
}
230-
231-
pub fn tracer(span_name: &'static str, transaction_id: &Option<Vec<u8>>) -> OtelTracer {
232-
tracer_with_handle(span_name, vec![], transaction_id)
233-
}
234-
235-
pub fn attribute(span: &mut BoxedSpan, key: &str, value: String) {
236-
span.set_attribute(KeyValue::new(key.to_owned(), value));
237-
}
238-
239-
/// Ends span with status Ok
240-
pub fn end_span(mut span: BoxedSpan) {
241-
span.set_status(Status::Ok);
242-
span.end();
243-
}
244-
245-
pub fn end_span_with_timestamp(mut span: BoxedSpan, timestamp: SystemTime) {
246-
span.set_status(Status::Ok);
247-
span.end_with_timestamp(timestamp);
248-
}
249-
250-
/// Ends span with status Error with description
251-
pub fn end_span_with_err(mut span: BoxedSpan, desc: String) {
252-
span.set_status(Status::Error {
253-
description: desc.into(),
254-
});
255-
span.end();
256-
}
257-
258146
#[derive(Clone, Copy, Debug)]
259147
pub struct MetricsConfig {
260148
bucket_start: f64,
@@ -326,6 +214,34 @@ pub fn register_histogram(config: Option<&MetricsConfig>, name: &str, desc: &str
326214
.unwrap_or_else(|_| panic!("Failed to register latency histogram: {}", name))
327215
}
328216

217+
/// Returns the legacy short-form hex id used by telemetry spans.
218+
pub fn short_hex_id(value: &[u8]) -> String {
219+
to_hex(value).get(0..10).unwrap_or_default().to_owned()
220+
}
221+
222+
pub fn record_short_hex(span: &Span, field: &'static str, value: &[u8]) {
223+
span.record(field, tracing::field::display(short_hex_id(value)));
224+
}
225+
226+
pub fn record_short_hex_if_some<T: AsRef<[u8]>>(
227+
span: &Span,
228+
field: &'static str,
229+
value: Option<T>,
230+
) {
231+
if let Some(value) = value {
232+
record_short_hex(span, field, value.as_ref());
233+
}
234+
}
235+
236+
pub fn set_current_span_error(error: &impl fmt::Display) {
237+
tracing::Span::current()
238+
.context()
239+
.span()
240+
.set_status(opentelemetry::trace::Status::Error {
241+
description: error.to_string().into(),
242+
});
243+
}
244+
329245
pub(crate) static TXN_METRICS_MANAGER: LazyLock<TransactionMetrics> =
330246
LazyLock::new(|| TransactionMetrics::new(NonZeroUsize::new(100).unwrap()));
331247

@@ -615,10 +531,4 @@ mod tests {
615531
guard.shutdown_once();
616532
assert!(guard.tracer_provider.is_none());
617533
}
618-
619-
#[test]
620-
fn setup_otel_empty_service_name_returns_none() {
621-
let otel_guard = init_otel("").unwrap();
622-
assert!(otel_guard.is_none());
623-
}
624534
}

coprocessor/fhevm-engine/gw-listener/src/bin/gw_listener.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,11 @@ async fn main() -> anyhow::Result<()> {
110110

111111
let conf = Conf::parse();
112112

113-
tracing_subscriber::fmt()
114-
.json()
115-
.with_level(true)
116-
.with_max_level(conf.log_level)
117-
.init();
118-
119-
let _otel_guard = match telemetry::init_otel(&conf.service_name) {
120-
Ok(otel_guard) => otel_guard,
121-
Err(err) => {
122-
error!(error = %err, "Failed to setup OTLP");
123-
None
124-
}
125-
};
113+
let _otel_guard = telemetry::init_tracing_otel_with_logs_only_fallback(
114+
conf.log_level,
115+
&conf.service_name,
116+
"otlp-layer",
117+
);
126118

127119
info!(gateway_url = %conf.gw_url, max_retries = %conf.provider_max_retries,
128120
retry_interval = ?conf.provider_retry_interval, "Connecting to Gateway");
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use clap::Parser;
2+
use fhevm_engine_common::telemetry;
23

34
#[tokio::main]
45
async fn main() -> anyhow::Result<()> {
56
let args = host_listener::cmd::Args::parse();
67

7-
tracing_subscriber::fmt()
8-
.json()
9-
.with_level(true)
10-
.with_max_level(args.log_level)
11-
.init();
8+
let _otel_guard = telemetry::init_tracing_otel_with_logs_only_fallback(
9+
args.log_level,
10+
&args.service_name,
11+
"otlp-layer",
12+
);
1213

1314
host_listener::cmd::main(args).await
1415
}

0 commit comments

Comments
 (0)