Skip to content

Commit b7462b4

Browse files
committed
feat(coprocessor): migrate Phases C/D/H/J spans to tracing
Migrate remaining coprocessor workers from legacy OtelTracer to the tracing + OTEL bridge pattern established in Phase A/B: - Phase C: zkproof-worker verify_task spans (verifier.rs) - Phase D: tfhe-worker high-signal execution loop spans (tfhe_worker.rs) - Phase H: host-listener DB propagation spans (tfhe_event_propagate.rs) - Phase J: transaction-sender op spans (add_ciphertext, allow_handle, delegate_user_decrypt, verify_proof) Also cleans up telemetry.rs by removing the now-unused OtelTracer wrapper and legacy span helpers, fixes a pre-existing clippy lint in chain_id.rs, and fixes minor sns-worker span hygiene from Phase B. Closes zama-ai/fhevm-internal#935 Closes zama-ai/fhevm-internal#1008 Closes zama-ai/fhevm-internal#1009 Closes zama-ai/fhevm-internal#1034 Closes zama-ai/fhevm-internal#1036
1 parent a4a9aa4 commit b7462b4

File tree

15 files changed

+246
-314
lines changed

15 files changed

+246
-314
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/fhevm-engine-common/src/chain_id.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ impl TryFrom<u64> for ChainId {
5050
type Error = InvalidChainId;
5151

5252
fn try_from(value: u64) -> Result<Self, Self::Error> {
53-
if value <= i64::MAX as u64 {
54-
Ok(ChainId(value as i64))
53+
if let Ok(v) = i64::try_from(value) {
54+
Ok(ChainId(v))
5555
} else {
5656
Err(InvalidChainId {
5757
value: value.to_string(),

coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs

Lines changed: 13 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
use crate::chain_id::ChainId;
22
use crate::utils::to_hex;
33
use bigdecimal::num_traits::ToPrimitive;
4-
use opentelemetry::{
5-
global::{BoxedSpan, BoxedTracer, ObjectSafeSpan},
6-
trace::{SpanBuilder, Status, TraceContextExt, Tracer, TracerProvider},
7-
Context, KeyValue,
8-
};
4+
use opentelemetry::{global::BoxedSpan, trace::TracerProvider, KeyValue};
95
use opentelemetry_sdk::{trace::SdkTracerProvider, Resource};
106
use prometheus::{register_histogram, Histogram};
117
use sqlx::PgConnection;
@@ -14,14 +10,25 @@ use std::{
1410
num::NonZeroUsize,
1511
str::FromStr,
1612
sync::{Arc, LazyLock, OnceLock},
17-
time::SystemTime,
1813
};
1914
use tokio::sync::RwLock;
2015
use tracing::{debug, info, warn};
2116
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
2217

2318
pub const TXN_ID_ATTR_KEY: &str = "txn_id";
2419

20+
// Sets the txn_id attribute to the span
21+
// The txn_id is a shortened version of the transaction_id (first 10 characters of the hex representation)
22+
pub fn set_txn_id(span: &mut BoxedSpan, transaction_id: &[u8]) {
23+
use opentelemetry::trace::Span;
24+
let txn_id_short = to_hex(transaction_id)
25+
.get(0..10)
26+
.unwrap_or_default()
27+
.to_owned();
28+
29+
span.set_attribute(KeyValue::new(TXN_ID_ATTR_KEY, txn_id_short));
30+
}
31+
2532
/// Calls provider shutdown exactly once when dropped.
2633
pub struct TracerProviderGuard {
2734
tracer_provider: Option<SdkTracerProvider>,
@@ -142,119 +149,6 @@ fn setup_otel_with_tracer(
142149
Ok((tracer, trace_provider))
143150
}
144151

145-
#[derive(Clone)]
146-
pub struct OtelTracer {
147-
ctx: opentelemetry::Context,
148-
tracer: Arc<BoxedTracer>,
149-
}
150-
151-
impl OtelTracer {
152-
pub fn child_span(&self, name: &'static str) -> BoxedSpan {
153-
self.tracer.start_with_context(name, &self.ctx)
154-
}
155-
156-
pub fn context(&self) -> &Context {
157-
&self.ctx
158-
}
159-
160-
/// Sets attribute to the root span
161-
pub fn set_attribute(&self, key: &str, value: String) {
162-
self.ctx
163-
.span()
164-
.set_attribute(KeyValue::new(key.to_owned(), value));
165-
}
166-
167-
/// Consumes and ends the tracer with status Ok
168-
pub fn end(self) {
169-
self.ctx.span().set_status(Status::Ok);
170-
self.ctx.span().end();
171-
}
172-
}
173-
174-
#[derive(Debug, PartialEq)]
175-
struct Handle(Vec<u8>);
176-
#[derive(Debug, PartialEq)]
177-
struct Transaction(Vec<u8>);
178-
179-
pub fn tracer_with_handle(
180-
span_name: &'static str,
181-
handle: Vec<u8>,
182-
transaction_id: &Option<Vec<u8>>,
183-
) -> OtelTracer {
184-
let tracer = opentelemetry::global::tracer(format!("tracer_{}", span_name));
185-
let mut span = tracer.start(span_name);
186-
187-
if !handle.is_empty() {
188-
let handle = to_hex(&handle).get(0..10).unwrap_or_default().to_owned();
189-
190-
span.set_attribute(KeyValue::new("handle", handle));
191-
}
192-
193-
if let Some(transaction_id) = transaction_id {
194-
set_txn_id(&mut span, transaction_id);
195-
}
196-
197-
// Add handle and transaction_id to the context
198-
// so that they can be retrieved in the application code, e.g. for logging
199-
let mut ctx = Context::default().with_span(span);
200-
ctx = ctx.with_value(Handle(handle.clone()));
201-
ctx = ctx.with_value(Transaction(transaction_id.clone().unwrap_or_default()));
202-
203-
OtelTracer {
204-
ctx,
205-
tracer: Arc::new(tracer),
206-
}
207-
}
208-
209-
// Sets the txn_id attribute to the span
210-
// The txn_id is a shortened version of the transaction_id (first 10 characters of the hex representation)
211-
pub fn set_txn_id(span: &mut BoxedSpan, transaction_id: &[u8]) {
212-
let txn_id_short = to_hex(transaction_id)
213-
.get(0..10)
214-
.unwrap_or_default()
215-
.to_owned();
216-
217-
span.set_attribute(KeyValue::new(TXN_ID_ATTR_KEY, txn_id_short));
218-
}
219-
220-
/// Create a new span with start and end time
221-
pub fn tracer_with_start_time(span_name: &'static str, start_time: SystemTime) -> OtelTracer {
222-
let tracer = opentelemetry::global::tracer(span_name);
223-
let root_span = tracer.build(SpanBuilder::from_name(span_name).with_start_time(start_time));
224-
let ctx = opentelemetry::Context::default().with_span(root_span);
225-
OtelTracer {
226-
ctx,
227-
tracer: Arc::new(tracer),
228-
}
229-
}
230-
231-
pub fn tracer(span_name: &'static str, transaction_id: &Option<Vec<u8>>) -> OtelTracer {
232-
tracer_with_handle(span_name, vec![], transaction_id)
233-
}
234-
235-
pub fn attribute(span: &mut BoxedSpan, key: &str, value: String) {
236-
span.set_attribute(KeyValue::new(key.to_owned(), value));
237-
}
238-
239-
/// Ends span with status Ok
240-
pub fn end_span(mut span: BoxedSpan) {
241-
span.set_status(Status::Ok);
242-
span.end();
243-
}
244-
245-
pub fn end_span_with_timestamp(mut span: BoxedSpan, timestamp: SystemTime) {
246-
span.set_status(Status::Ok);
247-
span.end_with_timestamp(timestamp);
248-
}
249-
250-
/// Ends span with status Error with description
251-
pub fn end_span_with_err(mut span: BoxedSpan, desc: String) {
252-
span.set_status(Status::Error {
253-
description: desc.into(),
254-
});
255-
span.end();
256-
}
257-
258152
#[derive(Clone, Copy, Debug)]
259153
pub struct MetricsConfig {
260154
bucket_start: f64,

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,8 @@ impl Database {
295295
self.insert_computation_bytes(tx, result, dependencies_handles, dependencies_bytes, fhe_operation, scalar_byte, log)
296296
};
297297

298-
let _t = telemetry::tracer(
299-
"handle_tfhe_event",
300-
&log.transaction_hash.map(|h| h.to_vec()),
301-
);
298+
let _span = tracing::info_span!("handle_tfhe_event", operation = "handle_tfhe_event");
299+
let _enter = _span.enter();
302300

303301
// Record the transaction if this is a computation event
304302
if !matches!(
@@ -453,7 +451,11 @@ impl Database {
453451

454452
let transaction_hash = transaction_hash.map(|h| h.to_vec());
455453

456-
let _t = telemetry::tracer("handle_acl_event", &transaction_hash);
454+
let _span = tracing::info_span!(
455+
"handle_acl_event",
456+
operation = "handle_acl_event"
457+
);
458+
let _enter = _span.enter();
457459

458460
// Record only Allowed or AllowedForDecryption events
459461
if matches!(

coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use aws_sdk_s3::Client;
1010
use bytesize::ByteSize;
1111
use fhevm_engine_common::chain_id::ChainId;
1212
use fhevm_engine_common::pg_pool::{PostgresPoolManager, ServiceError};
13-
use fhevm_engine_common::telemetry;
1413
use fhevm_engine_common::utils::to_hex;
1514
use futures::future::join_all;
1615
use opentelemetry::trace::{Status, TraceContextExt};
@@ -166,7 +165,7 @@ async fn run_uploader_loop(
166165
// Spawn a new task to upload the ciphertexts
167166
let h = tokio::spawn(async move {
168167
let upload_span = error_span!("upload_s3", operation = "upload_s3");
169-
upload_span.set_parent(item.otel.context().clone());
168+
upload_span.set_parent(item.otel.context());
170169
match upload_ciphertexts(trx, item, &client, &conf)
171170
.instrument(upload_span.clone())
172171
.await
@@ -543,7 +542,7 @@ async fn fetch_pending_uploads(
543542
handle: handle.clone(),
544543
ct64_compressed,
545544
ct128: Arc::new(ct128),
546-
otel: telemetry::tracer_with_handle("recovery_task", handle, &transaction_id),
545+
otel: tracing::info_span!("recovery_task", operation = "recovery_task"),
547546
transaction_id,
548547
};
549548

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async fn get_keyset(
180180
pool: PgPool,
181181
keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>>,
182182
) -> Result<Option<(DbKeyId, KeySet)>, ExecutionError> {
183-
let _t = telemetry::tracer("fetch_keyset", &None);
183+
let _t = tracing::info_span!("fetch_keyset", operation = "fetch_keyset");
184184
fetch_latest_keyset(&keys_cache, &pool).await
185185
}
186186

@@ -310,7 +310,7 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
310310

311311
// Limit the number of rows to update in case of a large backlog due to catchup or burst
312312
// Skip Locked to prevent concurrent updates
313-
let start = SystemTime::now();
313+
let _start = SystemTime::now();
314314
let rows_affected: u64 = sqlx::query!(
315315
"
316316
WITH uploaded_ct128 AS (
@@ -334,7 +334,7 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
334334
.rows_affected();
335335

336336
if rows_affected > 0 {
337-
let _s = telemetry::tracer_with_start_time("cleanup_ct128", start);
337+
let _s = tracing::info_span!("cleanup_ct128", operation = "cleanup_ct128");
338338
info!(
339339
rows_affected = rows_affected,
340340
"Cleaning up old ciphertexts128"
@@ -377,8 +377,11 @@ async fn fetch_and_execute_sns_tasks(
377377
maybe_remaining = conf.db.batch_limit as usize == tasks.len();
378378
tasks_processed = tasks.len();
379379

380-
let t = telemetry::tracer("batch_execution", &None);
381-
t.set_attribute("count", tasks.len().to_string());
380+
let t = tracing::info_span!(
381+
"batch_execution",
382+
operation = "batch_execution",
383+
count = tasks.len()
384+
);
382385

383386
process_tasks(
384387
&mut tasks,
@@ -395,7 +398,7 @@ async fn fetch_and_execute_sns_tasks(
395398
"batch_store_ciphertext128",
396399
operation = "batch_store_ciphertext128"
397400
);
398-
batch_store_span.set_parent(t.context().clone());
401+
batch_store_span.set_parent(t.context());
399402
let batch_store = async {
400403
update_ciphertext128(trx, &tasks).await?;
401404
notify_ciphertext128_ready(trx, &conf.db.notify_channel).await?;
@@ -436,7 +439,7 @@ pub async fn query_sns_tasks(
436439
order: Order,
437440
key_id_gw: &DbKeyId,
438441
) -> Result<Option<Vec<HandleItem>>, ExecutionError> {
439-
let start_time = SystemTime::now();
442+
let _start_time = SystemTime::now();
440443

441444
let query = format!(
442445
"
@@ -464,9 +467,13 @@ pub async fn query_sns_tasks(
464467
return Ok(None);
465468
}
466469

467-
let t = telemetry::tracer_with_start_time("db_fetch_tasks", start_time);
468-
t.set_attribute("count", records.len().to_string());
469-
t.end();
470+
{
471+
let _t = tracing::info_span!(
472+
"db_fetch_tasks",
473+
operation = "db_fetch_tasks",
474+
count = records.len()
475+
);
476+
}
470477

471478
// Convert the records into HandleItem structs
472479
let tasks = records
@@ -487,7 +494,7 @@ pub async fn query_sns_tasks(
487494
handle: handle.clone(),
488495
ct64_compressed: Arc::new(ciphertext),
489496
ct128: Arc::new(BigCiphertext::default()), // to be computed
490-
otel: telemetry::tracer_with_handle("task", handle, &transaction_id),
497+
otel: tracing::info_span!("task", operation = "task"),
491498
transaction_id,
492499
})
493500
})
@@ -563,7 +570,7 @@ fn compute_task(
563570
let started_at = SystemTime::now();
564571
let thread_id = format!("{:?}", std::thread::current().id());
565572
let span = error_span!("compute", thread_id = %thread_id);
566-
span.set_parent(task.otel.context().clone());
573+
span.set_parent(task.otel.context());
567574
let _enter = span.enter();
568575

569576
let handle = to_hex(&task.handle);

coprocessor/fhevm-engine/sns-worker/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use fhevm_engine_common::{
2424
healthz_server::{self},
2525
metrics_server,
2626
pg_pool::{PostgresPoolManager, ServiceError},
27-
telemetry::OtelTracer,
2827
types::FhevmError,
2928
utils::{to_hex, DatabaseURL},
3029
};
@@ -237,7 +236,7 @@ pub struct HandleItem {
237236
/// The computed 128-bit ciphertext
238237
pub(crate) ct128: Arc<BigCiphertext>,
239238

240-
pub otel: OtelTracer,
239+
pub otel: tracing::Span,
241240
pub transaction_id: Option<Vec<u8>>,
242241
}
243242

coprocessor/fhevm-engine/tfhe-worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ tokio = { workspace = true }
2828
tokio-util = { workspace = true }
2929
tonic = { workspace = true }
3030
tracing = { workspace = true }
31+
tracing-opentelemetry = { workspace = true }
3132
tracing-subscriber = { workspace = true }
3233
# opentelemetry support
3334
opentelemetry = { workspace = true }

0 commit comments

Comments
 (0)