Skip to content

Commit 74e9474

Browse files
authored
fix(coprocessor): dependence chain update during reorg in case of collisions (#1768) (#1899)
fix(coprocessor): improve dependence chain update during reorg in case of collisions (#1768) * fix(coprocessor): improve dependence chain update in HL during reorg in case of collisions * fix(coprocessor): allow TFHE worker to lock earliest dependence chain on no progress * chore(coprocessor): add index for pending dependence chains where dependence count is not zero * fix(coprocessor): add threshold for ignoring dependency_count and make dependent list unique
1 parent d268e78 commit 74e9474

File tree

9 files changed

+115
-17
lines changed

9 files changed

+115
-17
lines changed

coprocessor/fhevm-engine/.sqlx/query-99f4c777cb37bcd4b28ac64e6def5355e74e6f7745a2bc22ca914dad7f2377ae.json renamed to coprocessor/fhevm-engine/.sqlx/query-193e44bce47bd04357a3e86a707c63c35621140ab5ed044d895f480f2ef90d71.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 12 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
CREATE INDEX IF NOT EXISTS idx_dependence_chain_last_updated_at
2+
ON dependence_chain (last_updated_at)
3+
WHERE status = 'updated'::text
4+
AND worker_id IS NULL;
5+

coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,8 +800,14 @@ impl Database {
800800
SET status = 'updated',
801801
last_updated_at = CASE
802802
WHEN dependence_chain.status = 'processed' THEN EXCLUDED.last_updated_at
803-
ELSE dependence_chain.last_updated_at
804-
END
803+
ELSE LEAST(dependence_chain.last_updated_at, EXCLUDED.last_updated_at)
804+
END,
805+
dependents = (
806+
SELECT ARRAY(
807+
SELECT DISTINCT d
808+
FROM unnest(dependence_chain.dependents || EXCLUDED.dependents) AS d
809+
)
810+
)
805811
"#,
806812
chain.hash.to_vec(),
807813
last_updated_at,

coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
115115
dcid_cleanup_interval_sec: 0,
116116
processed_dcid_ttl_sec: 0,
117117
dcid_max_no_progress_cycles: 2,
118+
dcid_ignore_dependency_count_threshold: 100,
118119
};
119120

120121
std::thread::spawn(move || {

coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ pub struct Args {
121121
#[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 2)]
122122
pub dcid_max_no_progress_cycles: u32,
123123

124+
/// Number of no-progress DCID releases before ignoring dependence counter
125+
#[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 100)]
126+
pub dcid_ignore_dependency_count_threshold: u32,
127+
124128
/// Log level for the application
125129
#[arg(
126130
long,

coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,72 @@ impl LockMngr {
217217
))
218218
}
219219

220+
/// Acquire the earliest dependence-chain entry for processing
221+
/// sorted by last_updated_at (FIFO). Here we ignore
222+
/// dependency_count as reorgs can lead to incorrect counts and
223+
/// set of dependents until we add block hashes to transaction
224+
/// hashes to uniquely identify transactions.
225+
/// Returns the dependence_chain_id if a lock was acquired
226+
pub async fn acquire_early_lock(
227+
&mut self,
228+
) -> Result<(Option<Vec<u8>>, LockingReason), sqlx::Error> {
229+
if self.disable_locking {
230+
debug!("Locking is disabled");
231+
return Ok((None, LockingReason::Missing));
232+
}
233+
234+
let started_at = SystemTime::now();
235+
let row = sqlx::query_as::<_, DatabaseChainLock>(
236+
r#"
237+
WITH candidate AS (
238+
SELECT dependence_chain_id, 'updated_unowned' AS match_reason, dependency_count
239+
FROM dependence_chain
240+
WHERE
241+
status = 'updated' -- Marked as updated by host-listener
242+
AND
243+
worker_id IS NULL -- Ensure no other workers own it
244+
ORDER BY last_updated_at ASC -- FIFO
245+
FOR UPDATE SKIP LOCKED -- Ensure no other worker is currently trying to lock it
246+
LIMIT 1
247+
)
248+
UPDATE dependence_chain AS dc
249+
SET
250+
worker_id = $1,
251+
status = 'processing',
252+
lock_acquired_at = NOW(),
253+
lock_expires_at = NOW() + make_interval(secs => $2)
254+
FROM candidate
255+
WHERE dc.dependence_chain_id = candidate.dependence_chain_id
256+
RETURNING dc.*, candidate.match_reason, candidate.dependency_count;
257+
"#,
258+
)
259+
.bind(self.worker_id)
260+
.bind(self.lock_ttl_sec)
261+
.fetch_optional(&self.pool)
262+
.await?;
263+
264+
let row = if let Some(row) = row {
265+
row
266+
} else {
267+
return Ok((None, LockingReason::Missing));
268+
};
269+
270+
self.lock.replace((row.clone(), SystemTime::now()));
271+
ACQUIRED_DEPENDENCE_CHAIN_ID_COUNTER.inc();
272+
273+
let elapsed = started_at.elapsed().map(|d| d.as_secs_f64()).unwrap_or(0.0);
274+
if elapsed > 0.0 {
275+
ACQUIRE_DEPENDENCE_CHAIN_ID_QUERY_HISTOGRAM.observe(elapsed);
276+
}
277+
278+
info!(?row, query_elapsed = %elapsed, "Acquired lock on earliest DCID");
279+
280+
Ok((
281+
Some(row.dependence_chain_id),
282+
LockingReason::from(row.match_reason.as_str()),
283+
))
284+
}
285+
220286
/// Release all locks held by this worker
221287
///
222288
/// If host-listener has marked the dependence chain as 'updated' in the meantime,

coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
124124
dcid_cleanup_interval_sec: 0,
125125
processed_dcid_ttl_sec: 0,
126126
dcid_max_no_progress_cycles: 2,
127+
dcid_ignore_dependency_count_threshold: 100,
127128
};
128129

129130
std::thread::spawn(move || {

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ async fn tfhe_worker_cycle(
152152
&health_check,
153153
&mut trx,
154154
&mut dcid_mngr,
155+
&mut no_progress_cycles,
155156
&tracer,
156157
&loop_ctx,
157158
)
@@ -328,6 +329,7 @@ async fn query_for_work<'a>(
328329
health_check: &crate::health_check::HealthCheck,
329330
trx: &mut sqlx::Transaction<'a, Postgres>,
330331
deps_chain_mngr: &mut dependence_chain::LockMngr,
332+
no_progress_cycles: &mut u32,
331333
tracer: &opentelemetry::global::BoxedTracer,
332334
loop_ctx: &opentelemetry::Context,
333335
) -> Result<
@@ -340,7 +342,16 @@ async fn query_for_work<'a>(
340342
match deps_chain_mngr.extend_or_release_current_lock(true).await? {
341343
// If there is a current lock, we extend it and use its dependence_chain_id
342344
Some((id, reason)) => (Some(id), reason),
343-
None => deps_chain_mngr.acquire_next_lock().await?,
345+
None => {
346+
if *no_progress_cycles
347+
< args.dcid_ignore_dependency_count_threshold * args.dcid_max_no_progress_cycles
348+
{
349+
deps_chain_mngr.acquire_next_lock().await?
350+
} else {
351+
*no_progress_cycles = 0;
352+
deps_chain_mngr.acquire_early_lock().await?
353+
}
354+
}
344355
};
345356
if deps_chain_mngr.enabled() && dependence_chain_id.is_none() {
346357
// No dependence chain to lock, so no work to do
@@ -482,7 +493,10 @@ WHERE c.transaction_id IN (
482493
inputs,
483494
is_allowed: w.is_allowed,
484495
});
485-
if w.schedule_order < earliest_schedule_order {
496+
if w.schedule_order < earliest_schedule_order && w.is_allowed {
497+
// Only account for allowed to avoid case of reorg
498+
// where trivial encrypts will be in collision in
499+
// the same transaction and old ones are re-used
486500
earliest_schedule_order = w.schedule_order;
487501
}
488502
}

0 commit comments

Comments
 (0)