Skip to content

Commit afe490f

Browse files
committed
refactor(coprocessor): address review feedback on tracing style
1 parent 875b82b commit afe490f

File tree

7 files changed

+75
-49
lines changed

7 files changed

+75
-49
lines changed

coprocessor/fhevm-engine/fhevm-engine-common/src/chain_id.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ impl TryFrom<u64> for ChainId {
5050
type Error = InvalidChainId;
5151

5252
fn try_from(value: u64) -> Result<Self, Self::Error> {
53-
#[allow(clippy::checked_conversions)]
54-
if value <= i64::MAX as u64 {
55-
Ok(ChainId(value as i64))
53+
if let Ok(value) = i64::try_from(value) {
54+
Ok(ChainId(value))
5655
} else {
5756
Err(InvalidChainId {
5857
value: value.to_string(),

coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,11 @@ async fn fetch_pending_uploads(
544544
handle: handle.clone(),
545545
ct64_compressed,
546546
ct128: Arc::new(ct128),
547-
otel: tracing::info_span!("recovery_task", operation = "recovery_task"),
547+
otel: tracing::info_span!(
548+
"recovery_task",
549+
operation = "recovery_task",
550+
handle = %to_hex(&handle)
551+
),
548552
transaction_id,
549553
};
550554

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,11 @@ impl SwitchNSquashService {
176176
}
177177
}
178178

179+
#[tracing::instrument(skip_all, fields(operation = "fetch_keyset"))]
179180
async fn get_keyset(
180181
pool: PgPool,
181182
keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>>,
182183
) -> Result<Option<(DbKeyId, KeySet)>, ExecutionError> {
183-
let _t = tracing::info_span!("fetch_keyset", operation = "fetch_keyset");
184184
fetch_latest_keyset(&keys_cache, &pool).await
185185
}
186186

@@ -334,11 +334,13 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
334334
.rows_affected();
335335

336336
if rows_affected > 0 {
337-
let _s = tracing::info_span!("cleanup_ct128", operation = "cleanup_ct128");
338-
info!(
339-
rows_affected = rows_affected,
340-
"Cleaning up old ciphertexts128"
341-
);
337+
let _cleanup_span = tracing::info_span!(
338+
"cleanup_ct128",
339+
operation = "cleanup_ct128",
340+
rows_affected = rows_affected
341+
)
342+
.entered();
343+
info!("Cleaning up old ciphertexts128");
342344
}
343345

344346
Ok(())
@@ -377,25 +379,29 @@ async fn fetch_and_execute_sns_tasks(
377379
maybe_remaining = conf.db.batch_limit as usize == tasks.len();
378380
tasks_processed = tasks.len();
379381

380-
let t = tracing::info_span!(
382+
let batch_exec_span = tracing::info_span!(
381383
"batch_execution",
382384
operation = "batch_execution",
383385
count = tasks.len()
384386
);
385387

386-
process_tasks(
387-
&mut tasks,
388-
keys,
389-
tx,
390-
conf.enable_compression,
391-
conf.schedule_policy,
392-
token.clone(),
393-
)?;
388+
batch_exec_span.in_scope(|| {
389+
process_tasks(
390+
&mut tasks,
391+
keys,
392+
tx,
393+
conf.enable_compression,
394+
conf.schedule_policy,
395+
token.clone(),
396+
)
397+
})?;
394398

395-
update_computations_status(trx, &tasks).await?;
399+
update_computations_status(trx, &tasks)
400+
.instrument(batch_exec_span.clone())
401+
.await?;
396402

397403
let batch_store_span = tracing::info_span!(
398-
parent: &t,
404+
parent: &batch_exec_span,
399405
"batch_store_ciphertext128",
400406
operation = "batch_store_ciphertext128"
401407
);
@@ -433,6 +439,7 @@ async fn fetch_and_execute_sns_tasks(
433439
}
434440

435441
/// Queries the database for a fixed number of tasks.
442+
#[tracing::instrument(skip_all, fields(operation = "db_fetch_tasks", count = tracing::field::Empty))]
436443
pub async fn query_sns_tasks(
437444
db_txn: &mut Transaction<'_, Postgres>,
438445
limit: u32,
@@ -462,19 +469,12 @@ pub async fn query_sns_tasks(
462469
.await?;
463470

464471
info!(target: "worker", { count = records.len(), order = order.to_string() }, "Fetched SnS tasks");
472+
tracing::Span::current().record("count", records.len());
465473

466474
if records.is_empty() {
467475
return Ok(None);
468476
}
469477

470-
{
471-
let _t = tracing::info_span!(
472-
"db_fetch_tasks",
473-
operation = "db_fetch_tasks",
474-
count = records.len()
475-
);
476-
}
477-
478478
// Convert the records into HandleItem structs
479479
let tasks = records
480480
.into_iter()
@@ -494,7 +494,11 @@ pub async fn query_sns_tasks(
494494
handle: handle.clone(),
495495
ct64_compressed: Arc::new(ciphertext),
496496
ct128: Arc::new(BigCiphertext::default()), // to be computed
497-
otel: tracing::info_span!("task", operation = "task"),
497+
otel: tracing::info_span!(
498+
"task",
499+
operation = "task",
500+
handle = %to_hex(&handle)
501+
),
498502
transaction_id,
499503
})
500504
})

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,26 @@ async fn tfhe_worker_cycle(
164164

165165
// Lock another dependence chain if available and
166166
// continue processing without waiting for notification
167-
let _dcid_span = tracing::info_span!(parent: &loop_span, "query_dependence_chain", operation = "query_dependence_chain");
167+
let _dcid_span = tracing::info_span!(
168+
parent: &loop_span,
169+
"query_dependence_chain",
170+
operation = "query_dependence_chain",
171+
dependence_chain_id = tracing::field::Empty
172+
);
168173

169174
let (dependence_chain_id, _) = dcid_mngr.acquire_next_lock().await?;
170175
immediately_poll_more_work = dependence_chain_id.is_some();
171176

172-
tracing::info!(
173-
parent: &_dcid_span,
174-
dependence_chain_id = ?dependence_chain_id.as_ref().map(hex::encode),
175-
"acquired dependence chain lock"
177+
_dcid_span.record(
178+
"dependence_chain_id",
179+
tracing::field::display(
180+
dependence_chain_id
181+
.as_ref()
182+
.map(hex::encode)
183+
.unwrap_or_else(|| "none".to_string()),
184+
),
176185
);
186+
tracing::info!(parent: &_dcid_span, "acquired dependence chain lock");
177187
drop(_dcid_span);
178188

179189
continue;
@@ -277,7 +287,8 @@ async fn query_for_work<'a>(
277287
{
278288
let _s_dcid = tracing::info_span!(
279289
"query_dependence_chain",
280-
operation = "query_dependence_chain"
290+
operation = "query_dependence_chain",
291+
dependence_chain_id = tracing::field::Empty
281292
);
282293
// Lock dependence chain
283294
let (dependence_chain_id, locking_reason) =
@@ -302,14 +313,23 @@ async fn query_for_work<'a>(
302313
info!(target: "tfhe_worker", "No dcid found to process");
303314
return Ok((vec![], PrimitiveDateTime::MAX, false));
304315
}
305-
tracing::info!(
306-
parent: &_s_dcid,
307-
dependence_chain_id = ?dependence_chain_id.as_ref().map(hex::encode),
308-
"query dependence chain result"
316+
_s_dcid.record(
317+
"dependence_chain_id",
318+
tracing::field::display(
319+
dependence_chain_id
320+
.as_ref()
321+
.map(hex::encode)
322+
.unwrap_or_else(|| "none".to_string()),
323+
),
309324
);
325+
tracing::info!(parent: &_s_dcid, "query dependence chain result");
310326
drop(_s_dcid);
311327

312-
let _s_work = tracing::info_span!("query_work_items", operation = "query_work_items");
328+
let _s_work = tracing::info_span!(
329+
"query_work_items",
330+
operation = "query_work_items",
331+
count = tracing::field::Empty
332+
);
313333
let transaction_batch_size = args.work_items_batch_size;
314334
let started_at = SystemTime::now();
315335
let the_work = query!(
@@ -351,7 +371,8 @@ WHERE c.transaction_id IN (
351371
})?;
352372

353373
WORK_ITEMS_QUERY_HISTOGRAM.observe(started_at.elapsed().unwrap_or_default().as_secs_f64());
354-
tracing::info!(parent: &_s_work, count = the_work.len(), "work items queried");
374+
_s_work.record("count", the_work.len());
375+
tracing::info!(parent: &_s_work, "work items queried");
355376
drop(_s_work);
356377
health_check.update_db_access();
357378
if the_work.is_empty() {

coprocessor/fhevm-engine/transaction-sender/src/ops/allow_handle.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ where
361361
let src_transaction_id = row.transaction_id.clone();
362362
let _span =
363363
tracing::info_span!("prepare_allow_account", operation = "prepare_allow_account");
364+
// Use `enter()` in async loops to avoid keeping a non-Send
365+
// entered guard across await points.
364366
let _enter = _span.enter();
365367

366368
let handle = row.handle.clone();
@@ -430,9 +432,6 @@ where
430432
event_type,
431433
};
432434

433-
drop(_enter);
434-
drop(_span);
435-
436435
let operation = self.clone();
437436
join_set.spawn(async move {
438437
operation

coprocessor/fhevm-engine/transaction-sender/src/ops/verify_proof.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ where
251251
"prepare_verify_proof_resp",
252252
operation = "prepare_verify_proof_resp"
253253
);
254+
// Use `enter()` in async loops to avoid keeping a non-Send
255+
// entered guard across await points.
254256
let _enter = _span.enter();
255257

256258
let txn_request = match row.verified {
@@ -354,9 +356,6 @@ where
354356
}
355357
};
356358

357-
drop(_enter);
358-
drop(_span);
359-
360359
let self_clone = self.clone();
361360
let src_transaction_id = transaction_id;
362361
join_set.spawn(async move {

coprocessor/fhevm-engine/zkproof-worker/src/verifier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ fn create_ciphertext(
521521
handle[30] = serialized_type as u8;
522522
handle[31] = current_ciphertext_version() as u8;
523523

524-
tracing::Span::current().record("ct_type", serialized_type as i64);
524+
tracing::Span::current().record("ct_type", tracing::field::display(serialized_type));
525525
tracing::info!(
526526
request_id,
527527
handle = %hex::encode(&handle),

0 commit comments

Comments
 (0)