Skip to content

Commit 24d9189

Browse files
committed
fix(coprocessor): allow TFHE worker to lock earliest dependence chain on no progress
1 parent abbe165 commit 24d9189

2 files changed

Lines changed: 77 additions & 2 deletions

File tree

coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,69 @@ impl LockMngr {
217217
))
218218
}
219219

220+
/// Acquire the earliest dependence-chain entry for processing
221+
/// sorted by last_updated_at (FIFO).
222+
/// Returns the dependence_chain_id if a lock was acquired
223+
pub async fn acquire_early_lock(
224+
&mut self,
225+
) -> Result<(Option<Vec<u8>>, LockingReason), sqlx::Error> {
226+
if self.disable_locking {
227+
debug!("Locking is disabled");
228+
return Ok((None, LockingReason::Missing));
229+
}
230+
231+
let started_at = SystemTime::now();
232+
let row = sqlx::query_as::<_, DatabaseChainLock>(
233+
r#"
234+
WITH candidate AS (
235+
SELECT dependence_chain_id, 'updated_unowned' AS match_reason, dependency_count
236+
FROM dependence_chain
237+
WHERE
238+
status = 'updated' -- Marked as updated by host-listener
239+
AND
240+
worker_id IS NULL -- Ensure no other workers own it
241+
ORDER BY last_updated_at ASC -- FIFO
242+
FOR UPDATE SKIP LOCKED -- Ensure no other worker is currently trying to lock it
243+
LIMIT 1
244+
)
245+
UPDATE dependence_chain AS dc
246+
SET
247+
worker_id = $1,
248+
status = 'processing',
249+
lock_acquired_at = NOW(),
250+
lock_expires_at = NOW() + make_interval(secs => $2)
251+
FROM candidate
252+
WHERE dc.dependence_chain_id = candidate.dependence_chain_id
253+
RETURNING dc.*, candidate.match_reason, candidate.dependency_count;
254+
"#,
255+
)
256+
.bind(self.worker_id)
257+
.bind(self.lock_ttl_sec)
258+
.fetch_optional(&self.pool)
259+
.await?;
260+
261+
let row = if let Some(row) = row {
262+
row
263+
} else {
264+
return Ok((None, LockingReason::Missing));
265+
};
266+
267+
self.lock.replace((row.clone(), SystemTime::now()));
268+
ACQUIRED_DEPENDENCE_CHAIN_ID_COUNTER.inc();
269+
270+
let elapsed = started_at.elapsed().map(|d| d.as_secs_f64()).unwrap_or(0.0);
271+
if elapsed > 0.0 {
272+
ACQUIRE_DEPENDENCE_CHAIN_ID_QUERY_HISTOGRAM.observe(elapsed);
273+
}
274+
275+
info!(?row, query_elapsed = %elapsed, "Acquired lock on earliest DCID");
276+
277+
Ok((
278+
Some(row.dependence_chain_id),
279+
LockingReason::from(row.match_reason.as_str()),
280+
))
281+
}
282+
220283
/// Release all locks held by this worker
221284
///
222285
/// If host-listener has marked the dependence chain as 'updated' in the meantime,

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ async fn tfhe_worker_cycle(
152152
&health_check,
153153
&mut trx,
154154
&mut dcid_mngr,
155+
&mut no_progress_cycles,
155156
&tracer,
156157
&loop_ctx,
157158
)
@@ -328,6 +329,7 @@ async fn query_for_work<'a>(
328329
health_check: &crate::health_check::HealthCheck,
329330
trx: &mut sqlx::Transaction<'a, Postgres>,
330331
deps_chain_mngr: &mut dependence_chain::LockMngr,
332+
no_progress_cycles: &mut u32,
331333
tracer: &opentelemetry::global::BoxedTracer,
332334
loop_ctx: &opentelemetry::Context,
333335
) -> Result<
@@ -340,7 +342,14 @@ async fn query_for_work<'a>(
340342
match deps_chain_mngr.extend_or_release_current_lock(true).await? {
341343
// If there is a current lock, we extend it and use its dependence_chain_id
342344
Some((id, reason)) => (Some(id), reason),
343-
None => deps_chain_mngr.acquire_next_lock().await?,
345+
None => {
346+
if no_progress_cycles < 10 * args.dcid_max_no_progress_cycles {
347+
deps_chain_mngr.acquire_next_lock().await?
348+
} else {
349+
*no_progress_cycles = 0;
350+
deps_chain_mngr.acquire_early_lock().await?
351+
}
352+
}
344353
};
345354
if deps_chain_mngr.enabled() && dependence_chain_id.is_none() {
346355
// No dependence chain to lock, so no work to do
@@ -482,7 +491,10 @@ WHERE c.transaction_id IN (
482491
inputs,
483492
is_allowed: w.is_allowed,
484493
});
485-
if w.schedule_order < earliest_schedule_order {
494+
if w.schedule_order < earliest_schedule_order && w.is_allowed {
495+
// Only account for allowed to avoid case of reorg
496+
// where trivial encrypts will be in collision in
497+
// the same transaction and old ones are re-used
486498
earliest_schedule_order = w.schedule_order;
487499
}
488500
}

0 commit comments

Comments
 (0)