Skip to content

Commit 46dcec4

Browse files
committed
fix(coprocessor): update test for completion of processing of dcid
1 parent e305ddf commit 46dcec4

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ async fn tfhe_worker_cycle(
145145
s.end();
146146

147147
// Query for transactions to execute, and if relevant the associated keys
148-
let (mut transactions, mut unneeded_handles) = query_for_work(
148+
let (mut transactions, mut unneeded_handles, has_more_work) = query_for_work(
149149
args,
150150
&health_check,
151151
&mut trx,
@@ -154,7 +154,11 @@ async fn tfhe_worker_cycle(
154154
&loop_ctx,
155155
)
156156
.await?;
157-
if transactions.is_empty() {
157+
if has_more_work {
158+
// We've fetched work, so we'll poll again without waiting
159+
// for a notification after this cycle.
160+
immedially_poll_more_work = true;
161+
} else {
158162
dcid_mngr.release_current_lock(true).await?;
159163
dcid_mngr.do_cleanup().await?;
160164

@@ -172,10 +176,6 @@ async fn tfhe_worker_cycle(
172176
s.end();
173177

174178
continue;
175-
} else {
176-
// We've fetched work, so we'll poll again without waiting
177-
// for a notification after this cycle.
178-
immedially_poll_more_work = true;
179179
}
180180
query_tenants_and_keys(
181181
&transactions,
@@ -359,7 +359,7 @@ async fn query_for_work<'a>(
359359
tracer: &opentelemetry::global::BoxedTracer,
360360
loop_ctx: &opentelemetry::Context,
361361
) -> Result<
362-
(Vec<(i32, Vec<ComponentNode>)>, Vec<(Handle, Handle)>),
362+
(Vec<(i32, Vec<ComponentNode>)>, Vec<(Handle, Handle)>, bool),
363363
Box<dyn std::error::Error + Send + Sync>,
364364
> {
365365
let mut s = tracer.start_with_context("query_dependence_chain", loop_ctx);
@@ -452,7 +452,7 @@ FOR UPDATE SKIP LOCKED ",
452452
info!(target: "tfhe_worker", dcid = %hex::encode(dependence_chain_id), locking = ?locking_reason, "No work items found to process");
453453
}
454454
health_check.update_activity();
455-
return Ok((vec![], vec![]));
455+
return Ok((vec![], vec![]), false);
456456
}
457457
WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
458458
info!(target: "tfhe_worker", { count = the_work.len(), dcid = ?dependence_chain_id.as_ref().map(hex::encode),
@@ -536,7 +536,7 @@ FOR UPDATE SKIP LOCKED ",
536536
transactions.push((*tenant_id, tenant_transactions));
537537
}
538538
s_prep.end();
539-
Ok((transactions, unneeded_handles))
539+
Ok((transactions, unneeded_handles, true))
540540
}
541541

542542
async fn build_transaction_graph_and_execute<'a>(

0 commit comments

Comments
 (0)