Skip to content

Commit 0543149

Browse files
committed
fix(coprocessor): chain release and update
1 parent ccd48cc commit 0543149

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,12 @@ async fn tfhe_worker_cycle(
227227
no_progress_cycles = 0;
228228
} else {
229229
no_progress_cycles += 1;
230-
if no_progress_cycles >= args.dcid_max_no_progress_cycles {
230+
if no_progress_cycles > args.dcid_max_no_progress_cycles {
231231
// If we're not making progress on this dependence
232232
// chain, update the last_updated_at field and
233233
// release the lock so we can try to execute
234234
// another chain.
235+
info!(target: "tfhe_worker", "no progress on dependence chain, releasing");
235236
dcid_mngr
236237
.release_current_lock(false, Some(earliest_computation))
237238
.await?;
@@ -676,7 +677,7 @@ async fn upload_transaction_graph_results<'a>(
676677
Vec<_>,
677678
(Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))),
678679
) = cts_to_insert.into_iter().unzip();
679-
let _ = query!(
680+
let cts_inserted = query!(
680681
"
681682
INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
682683
SELECT * FROM UNNEST($1::INTEGER[], $2::BYTEA[], $3::BYTEA[], $4::SMALLINT[], $5::SMALLINT[])
@@ -687,13 +688,14 @@ async fn upload_transaction_graph_results<'a>(
687688
.await.map_err(|err| {
688689
error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while inserting new ciphertexts");
689690
err
690-
})?;
691+
})?.rows_affected();
691692
// Notify all workers that new ciphertext is inserted
692693
// For now, it's only the SnS workers that are listening for these events
693694
let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED)
694695
.execute(trx.as_mut())
695696
.await?;
696697
s.end();
698+
res |= cts_inserted > 0;
697699
}
698700

699701
if !handles_to_update.is_empty() {
@@ -705,7 +707,7 @@ async fn upload_transaction_graph_results<'a>(
705707
.map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))),
706708
);
707709
let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip();
708-
let _ = query!(
710+
let comp_updated = query!(
709711
"
710712
UPDATE computations
711713
SET is_completed = true, completed_at = CURRENT_TIMESTAMP
@@ -723,9 +725,9 @@ async fn upload_transaction_graph_results<'a>(
723725
.await.map_err(|err| {
724726
error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while updating computations as completed");
725727
err
726-
})?;
728+
})?.rows_affected();
727729
s.end();
728-
res = true;
730+
res |= comp_updated > 0;
729731
}
730732
Ok(res)
731733
}

0 commit comments

Comments
 (0)