Skip to content

Commit 0ab293c

Browse files
committed
fix(coprocessor): restore tracing telemetry parity
1 parent afe490f commit 0ab293c

File tree

10 files changed

+359
-193
lines changed

10 files changed

+359
-193
lines changed

coprocessor/fhevm-engine/fhevm-engine-common/src/telemetry.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,14 @@ pub fn register_histogram(config: Option<&MetricsConfig>, name: &str, desc: &str
206206
.unwrap_or_else(|_| panic!("Failed to register latency histogram: {}", name))
207207
}
208208

209+
/// Returns the legacy short-form transaction id used by older telemetry helpers.
210+
pub fn short_txn_id(transaction_id: &[u8]) -> String {
211+
to_hex(transaction_id)
212+
.get(0..10)
213+
.unwrap_or_default()
214+
.to_owned()
215+
}
216+
209217
pub(crate) static TXN_METRICS_MANAGER: LazyLock<TransactionMetrics> =
210218
LazyLock::new(|| TransactionMetrics::new(NonZeroUsize::new(100).unwrap()));
211219

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl Database {
274274
}
275275

276276
#[rustfmt::skip]
277-
#[tracing::instrument(skip_all, fields(operation = "handle_tfhe_event"))]
277+
#[tracing::instrument(skip_all, fields(operation = "handle_tfhe_event", txn_id = tracing::field::Empty))]
278278
pub async fn insert_tfhe_event(
279279
&self,
280280
tx: &mut Transaction<'_>,
@@ -289,6 +289,12 @@ impl Database {
289289
let ty = |to_type: &ToType| vec![*to_type];
290290
let as_bytes = |x: &ClearConst| x.to_be_bytes_vec();
291291
let fhe_operation = event_to_op_int(event);
292+
if let Some(transaction_hash) = log.transaction_hash.as_ref() {
293+
tracing::Span::current().record(
294+
"txn_id",
295+
tracing::field::display(telemetry::short_txn_id(transaction_hash.as_ref())),
296+
);
297+
}
292298
let insert_computation = |tx, result, dependencies, scalar_byte| {
293299
self.insert_computation(tx, result, dependencies, fhe_operation, scalar_byte, log)
294300
};
@@ -436,7 +442,7 @@ impl Database {
436442
}
437443

438444
/// Handles all types of ACL events
439-
#[tracing::instrument(skip_all, fields(operation = "handle_acl_event"))]
445+
#[tracing::instrument(skip_all, fields(operation = "handle_acl_event", txn_id = tracing::field::Empty))]
440446
pub async fn handle_acl_event(
441447
&self,
442448
tx: &mut Transaction<'_>,
@@ -447,6 +453,14 @@ impl Database {
447453
block_number: u64,
448454
) -> Result<bool, SqlxError> {
449455
let data = &event.data;
456+
if let Some(transaction_hash) = transaction_hash.as_ref() {
457+
tracing::Span::current().record(
458+
"txn_id",
459+
tracing::field::display(telemetry::short_txn_id(
460+
transaction_hash.as_ref(),
461+
)),
462+
);
463+
}
450464

451465
let transaction_hash = transaction_hash.map(|h| h.to_vec());
452466

coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use aws_sdk_s3::Client;
1010
use bytesize::ByteSize;
1111
use fhevm_engine_common::chain_id::ChainId;
1212
use fhevm_engine_common::pg_pool::{PostgresPoolManager, ServiceError};
13-
use fhevm_engine_common::utils::to_hex;
13+
use fhevm_engine_common::{telemetry, utils::to_hex};
1414
use futures::future::join_all;
1515
use opentelemetry::trace::{Status, TraceContextExt};
1616
use sha3::{Digest, Keccak256};
@@ -537,18 +537,25 @@ async fn fetch_pending_uploads(
537537
};
538538

539539
if !ct64_compressed.is_empty() || !is_ct128_empty {
540+
let recovery_span = tracing::info_span!(
541+
"recovery_task",
542+
operation = "recovery_task",
543+
txn_id = tracing::field::Empty
544+
);
545+
if let Some(transaction_id) = transaction_id.as_deref() {
546+
recovery_span.record(
547+
"txn_id",
548+
tracing::field::display(telemetry::short_txn_id(transaction_id)),
549+
);
550+
}
540551
let item = HandleItem {
541552
host_chain_id: ChainId::try_from(row.host_chain_id)
542553
.map_err(|e| ExecutionError::ConversionError(e.into()))?,
543554
key_id_gw: row.key_id_gw,
544555
handle: handle.clone(),
545556
ct64_compressed,
546557
ct128: Arc::new(ct128),
547-
otel: tracing::info_span!(
548-
"recovery_task",
549-
operation = "recovery_task",
550-
handle = %to_hex(&handle)
551-
),
558+
otel: recovery_span,
552559
transaction_id,
553560
};
554561

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,17 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
308308

309309
info!(count, "Starting garbage collection of ciphertexts128");
310310

311-
// Limit the number of rows to update in case of a large backlog due to catchup or burst
312-
// Skip Locked to prevent concurrent updates
313-
let _start = SystemTime::now();
314-
let rows_affected: u64 = sqlx::query!(
315-
"
311+
// Limit the number of rows to update in case of a large backlog due to catchup or burst.
312+
// Skip locked to prevent concurrent updates.
313+
let cleanup_span = tracing::info_span!(
314+
"cleanup_ct128",
315+
operation = "cleanup_ct128",
316+
rows_affected = tracing::field::Empty
317+
);
318+
let rows_affected: u64 = async {
319+
Ok::<u64, sqlx::Error>(
320+
sqlx::query!(
321+
"
316322
WITH uploaded_ct128 AS (
317323
SELECT c.handle
318324
FROM ciphertexts128 c
@@ -327,20 +333,22 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
327333
USING uploaded_ct128 r
328334
WHERE c.handle = r.handle;
329335
",
330-
limit as i32
331-
)
332-
.execute(pool)
333-
.await?
334-
.rows_affected();
336+
limit as i32
337+
)
338+
.execute(pool)
339+
.await?
340+
.rows_affected(),
341+
)
342+
}
343+
.instrument(cleanup_span.clone())
344+
.await?;
345+
cleanup_span.record("rows_affected", rows_affected as i64);
335346

336347
if rows_affected > 0 {
337-
let _cleanup_span = tracing::info_span!(
338-
"cleanup_ct128",
339-
operation = "cleanup_ct128",
340-
rows_affected = rows_affected
341-
)
342-
.entered();
343-
info!("Cleaning up old ciphertexts128");
348+
info!(parent: &cleanup_span,
349+
rows_affected = rows_affected,
350+
"Cleaning up old ciphertexts128"
351+
);
344352
}
345353

346354
Ok(())
@@ -446,7 +454,11 @@ pub async fn query_sns_tasks(
446454
order: Order,
447455
key_id_gw: &DbKeyId,
448456
) -> Result<Option<Vec<HandleItem>>, ExecutionError> {
449-
let _start_time = SystemTime::now();
457+
let fetch_span = tracing::info_span!(
458+
"db_fetch_tasks",
459+
operation = "db_fetch_tasks",
460+
count = tracing::field::Empty
461+
);
450462

451463
let query = format!(
452464
"
@@ -466,7 +478,9 @@ pub async fn query_sns_tasks(
466478
let records = sqlx::query(&query)
467479
.bind(limit as i64)
468480
.fetch_all(db_txn.as_mut())
481+
.instrument(fetch_span.clone())
469482
.await?;
483+
fetch_span.record("count", records.len() as i64);
470484

471485
info!(target: "worker", { count = records.len(), order = order.to_string() }, "Fetched SnS tasks");
472486
tracing::Span::current().record("count", records.len());
@@ -485,6 +499,14 @@ pub async fn query_sns_tasks(
485499
let handle: Vec<u8> = record.try_get("handle")?;
486500
let ciphertext: Vec<u8> = record.try_get("ciphertext")?;
487501
let transaction_id: Option<Vec<u8>> = record.try_get("transaction_id")?;
502+
let task_span =
503+
tracing::info_span!("task", operation = "task", txn_id = tracing::field::Empty);
504+
if let Some(transaction_id) = transaction_id.as_deref() {
505+
task_span.record(
506+
"txn_id",
507+
tracing::field::display(telemetry::short_txn_id(transaction_id)),
508+
);
509+
}
488510

489511
Ok(HandleItem {
490512
// TODO: During key rotation, ensure all coprocessors pin the same key_id_gw for a batch
@@ -494,11 +516,7 @@ pub async fn query_sns_tasks(
494516
handle: handle.clone(),
495517
ct64_compressed: Arc::new(ciphertext),
496518
ct128: Arc::new(BigCiphertext::default()), // to be computed
497-
otel: tracing::info_span!(
498-
"task",
499-
operation = "task",
500-
handle = %to_hex(&handle)
501-
),
519+
otel: task_span,
502520
transaction_id,
503521
})
504522
})

0 commit comments

Comments
 (0)