Skip to content

Commit 5778aaa

Browse files
committed
feat(coprocessor): add mechanism to release dependence chains when no progress
1 parent c1421d9 commit 5778aaa

File tree

5 files changed

+78
-17
lines changed

5 files changed

+78
-17
lines changed

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/tfhe-worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ strum = { workspace = true }
2424
sqlx = { workspace = true }
2525
tfhe = { workspace = true }
2626
tfhe-zk-pok = { workspace = true }
27+
time = { workspace = true }
2728
tokio = { workspace = true }
2829
tokio-util = { workspace = true }
2930
tonic = { workspace = true }

coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ pub struct Args {
116116
#[arg(long, default_value_t = 3600)]
117117
pub dcid_cleanup_interval_sec: u32,
118118

119+
/// Maximum number of worker cycles allowed without progress on a
120+
/// dependence chain
121+
#[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 2)]
122+
pub dcid_max_no_progress_cycles: u32,
123+
119124
/// Log level for the application
120125
#[arg(
121126
long,

coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
22
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
33
use sqlx::Postgres;
44
use std::{fmt, sync::LazyLock, time::SystemTime};
5+
use time::PrimitiveDateTime;
56
use tracing::{debug, error, info, warn};
67
use uuid::Uuid;
78

@@ -252,6 +253,7 @@ impl LockMngr {
252253
pub async fn release_current_lock(
253254
&mut self,
254255
mark_as_processed: bool,
256+
update_at: Option<PrimitiveDateTime>,
255257
) -> Result<u64, sqlx::Error> {
256258
if self.disable_locking {
257259
debug!("Locking is disabled, skipping release_current_lock");
@@ -268,13 +270,20 @@ impl LockMngr {
268270

269271
// Since UPDATE always aquire a row-level lock internally,
270272
// this acts as atomic_exchange
271-
let rows = sqlx::query!(
273+
let rows = if let Some(update_at) = update_at {
274+
// Add an epsilon to differentiate this chain being
275+
// released from others in the same block.
276+
let update_at = update_at.saturating_add(
277+
time::Duration::microseconds(1),
278+
);
279+
sqlx::query!(
272280
r#"
273281
UPDATE dependence_chain
274282
SET
275283
worker_id = NULL,
276284
lock_acquired_at = NULL,
277285
lock_expires_at = NULL,
286+
last_updated_at = $4::timestamp,
278287
status = CASE
279288
WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed
280289
WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired
@@ -286,9 +295,33 @@ impl LockMngr {
286295
self.worker_id,
287296
dep_chain_id,
288297
mark_as_processed,
298+
update_at,
289299
)
290300
.execute(&self.pool)
291-
.await?;
301+
.await?
302+
} else {
303+
sqlx::query!(
304+
r#"
305+
UPDATE dependence_chain
306+
SET
307+
worker_id = NULL,
308+
lock_acquired_at = NULL,
309+
lock_expires_at = NULL,
310+
status = CASE
311+
WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed
312+
WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired
313+
ELSE status
314+
END
315+
WHERE worker_id = $1
316+
AND dependence_chain_id = $2
317+
"#,
318+
self.worker_id,
319+
dep_chain_id,
320+
mark_as_processed,
321+
)
322+
.execute(&self.pool)
323+
.await?
324+
};
292325

293326
let mut dependents_updated = 0;
294327
if mark_as_processed {
@@ -405,7 +438,7 @@ impl LockMngr {
405438

406439
// Release the lock instead of extending it as the timeslice's been consumed
407440
// Do not mark as processed so it can be re-acquired
408-
self.release_current_lock(false).await?;
441+
self.release_current_lock(false, None).await?;
409442
return Ok(None);
410443
}
411444
}

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::{
2020
collections::{BTreeSet, HashMap},
2121
num::NonZeroUsize,
2222
};
23+
use time::PrimitiveDateTime;
2324
use tracing::{debug, error, info, warn};
2425

2526
const EVENT_CIPHERTEXT_COMPUTED: &str = "event_ciphertext_computed";
@@ -118,6 +119,7 @@ async fn tfhe_worker_cycle(
118119
#[cfg(feature = "bench")]
119120
populate_cache_with_tenant_keys(vec![1i32], &pool, &tenant_key_cache).await?;
120121
let mut immedially_poll_more_work = false;
122+
let mut no_progress_cycles = 0;
121123
loop {
122124
// only if previous iteration had no work done do the wait
123125
if !immedially_poll_more_work {
@@ -145,7 +147,7 @@ async fn tfhe_worker_cycle(
145147
s.end();
146148

147149
// Query for transactions to execute, and if relevant the associated keys
148-
let (mut transactions, _, has_more_work) = query_for_work(
150+
let (mut transactions, earliest_computation, has_more_work) = query_for_work(
149151
args,
150152
&health_check,
151153
&mut trx,
@@ -159,7 +161,7 @@ async fn tfhe_worker_cycle(
159161
// for a notification after this cycle.
160162
immedially_poll_more_work = true;
161163
} else {
162-
dcid_mngr.release_current_lock(true).await?;
164+
dcid_mngr.release_current_lock(true, None).await?;
163165
dcid_mngr.do_cleanup().await?;
164166

165167
// Lock another dependence chain if available and
@@ -212,7 +214,7 @@ async fn tfhe_worker_cycle(
212214
&loop_ctx,
213215
)
214216
.await?;
215-
upload_transaction_graph_results(
217+
let has_progressed = upload_transaction_graph_results(
216218
tenant_id,
217219
&mut tx_graph,
218220
&mut trx,
@@ -221,6 +223,20 @@ async fn tfhe_worker_cycle(
221223
&loop_ctx,
222224
)
223225
.await?;
226+
if has_progressed {
227+
no_progress_cycles = 0;
228+
} else {
229+
no_progress_cycles += 1;
230+
if no_progress_cycles >= args.dcid_max_no_progress_cycles {
231+
// If we're not making progress on this dependence
232+
// chain, update the last_updated_at field and
233+
// release the lock so we can try to execute
234+
// another chain.
235+
dcid_mngr
236+
.release_current_lock(false, Some(earliest_computation))
237+
.await?;
238+
}
239+
}
224240
}
225241
s.end();
226242
trx.commit().await?;
@@ -313,7 +329,7 @@ async fn query_for_work<'a>(
313329
tracer: &opentelemetry::global::BoxedTracer,
314330
loop_ctx: &opentelemetry::Context,
315331
) -> Result<
316-
(Vec<(i32, Vec<ComponentNode>)>, Vec<(Handle, Handle)>, bool),
332+
(Vec<(i32, Vec<ComponentNode>)>, PrimitiveDateTime, bool),
317333
Box<dyn std::error::Error + Send + Sync>,
318334
> {
319335
let mut s = tracer.start_with_context("query_dependence_chain", loop_ctx);
@@ -331,7 +347,7 @@ async fn query_for_work<'a>(
331347
health_check.update_db_access();
332348
health_check.update_activity();
333349
info!(target: "tfhe_worker", "No dcid found to process");
334-
return Ok((vec![], vec![], false));
350+
return Ok((vec![], PrimitiveDateTime::MAX, false));
335351
}
336352

337353
s.set_attribute(KeyValue::new(
@@ -357,7 +373,8 @@ SELECT
357373
c.is_scalar,
358374
c.is_allowed,
359375
c.dependence_chain_id,
360-
c.transaction_id
376+
c.transaction_id,
377+
c.created_at
361378
FROM computations c
362379
WHERE c.transaction_id IN (
363380
SELECT DISTINCT
@@ -393,11 +410,12 @@ FOR UPDATE SKIP LOCKED ",
393410
info!(target: "tfhe_worker", dcid = %hex::encode(dependence_chain_id), locking = ?locking_reason, "No work items found to process");
394411
}
395412
health_check.update_activity();
396-
return Ok((vec![], vec![], false));
413+
return Ok((vec![], PrimitiveDateTime::MAX, false));
397414
}
398415
WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
399416
info!(target: "tfhe_worker", { count = the_work.len(), dcid = ?dependence_chain_id.as_ref().map(hex::encode),
400-
locking = ?locking_reason }, "Processing work items");
417+
locking = ?locking_reason }, "Processing work items");
418+
let mut earliest_created_at = the_work.first().unwrap().created_at;
401419
// Make sure we process each tenant independently to avoid
402420
// setting different keys from different tenants in the worker
403421
// threads
@@ -417,7 +435,6 @@ FOR UPDATE SKIP LOCKED ",
417435
}
418436
// Traverse transactions and build transaction nodes
419437
let mut transactions: Vec<(i32, Vec<ComponentNode>)> = vec![];
420-
let mut unneeded_handles: Vec<(Handle, Handle)> = vec![];
421438
for (tenant_id, work_by_transaction) in work_by_tenant_by_transaction.iter() {
422439
let mut tenant_transactions: Vec<ComponentNode> = vec![];
423440
for (transaction_id, txwork) in work_by_transaction.iter() {
@@ -469,15 +486,17 @@ FOR UPDATE SKIP LOCKED ",
469486
inputs,
470487
is_allowed: w.is_allowed,
471488
});
489+
if w.created_at < earliest_created_at {
490+
earliest_created_at = w.created_at;
491+
}
472492
}
473-
let (mut components, mut unneeded) = build_component_nodes(ops, transaction_id)?;
493+
let (mut components, _) = build_component_nodes(ops, transaction_id)?;
474494
tenant_transactions.append(&mut components);
475-
unneeded_handles.append(&mut unneeded);
476495
}
477496
transactions.push((*tenant_id, tenant_transactions));
478497
}
479498
s_prep.end();
480-
Ok((transactions, unneeded_handles, true))
499+
Ok((transactions, earliest_created_at, true))
481500
}
482501

483502
#[allow(clippy::too_many_arguments)]
@@ -563,10 +582,11 @@ async fn upload_transaction_graph_results<'a>(
563582
deps_mngr: &mut dependence_chain::LockMngr,
564583
tracer: &opentelemetry::global::BoxedTracer,
565584
loop_ctx: &opentelemetry::Context,
566-
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
585+
) -> Result<(bool), Box<dyn std::error::Error + Send + Sync>> {
567586
// Get computation results
568587
let graph_results = tx_graph.get_results();
569588
let mut handles_to_update = vec![];
589+
let mut res = false;
570590

571591
// Traverse computations that have been scheduled and
572592
// upload their results/errors.
@@ -705,8 +725,9 @@ async fn upload_transaction_graph_results<'a>(
705725
err
706726
})?;
707727
s.end();
728+
res = true;
708729
}
709-
Ok(())
730+
Ok(res)
710731
}
711732

712733
#[allow(clippy::too_many_arguments)]

0 commit comments

Comments
 (0)