Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 12 additions & 11 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE INDEX IF NOT EXISTS idx_dependence_chain_last_updated_at
ON dependence_chain (last_updated_at)
WHERE status = 'updated'::text
AND worker_id IS NULL;

Original file line number Diff line number Diff line change
Expand Up @@ -800,8 +800,14 @@ impl Database {
SET status = 'updated',
last_updated_at = CASE
WHEN dependence_chain.status = 'processed' THEN EXCLUDED.last_updated_at
ELSE dependence_chain.last_updated_at
END
ELSE LEAST(dependence_chain.last_updated_at, EXCLUDED.last_updated_at)
END,
dependents = (
SELECT ARRAY(
SELECT DISTINCT d
FROM unnest(dependence_chain.dependents || EXCLUDED.dependents) AS d
)
)
"#,
chain.hash.to_vec(),
last_updated_at,
Expand Down
1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
dcid_cleanup_interval_sec: 0,
processed_dcid_ttl_sec: 0,
dcid_max_no_progress_cycles: 2,
dcid_ignore_dependency_count_threshold: 100,
};

std::thread::spawn(move || {
Expand Down
4 changes: 4 additions & 0 deletions coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ pub struct Args {
#[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 2)]
pub dcid_max_no_progress_cycles: u32,

/// Number of no-progress DCID releases before ignoring dependence counter
#[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 100)]
pub dcid_ignore_dependency_count_threshold: u32,

/// Log level for the application
#[arg(
long,
Expand Down
66 changes: 66 additions & 0 deletions coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,72 @@ impl LockMngr {
))
}

/// Acquire the earliest dependence-chain entry for processing
/// sorted by last_updated_at (FIFO). Here we ignore
/// dependency_count as reorgs can lead to incorrect counts and
/// set of dependents until we add block hashes to transaction
/// hashes to uniquely identify transactions.
/// Returns the dependence_chain_id if a lock was acquired
pub async fn acquire_early_lock(
&mut self,
) -> Result<(Option<Vec<u8>>, LockingReason), sqlx::Error> {
if self.disable_locking {
debug!("Locking is disabled");
return Ok((None, LockingReason::Missing));
}

let started_at = SystemTime::now();
let row = sqlx::query_as::<_, DatabaseChainLock>(
r#"
WITH candidate AS (
SELECT dependence_chain_id, 'updated_unowned' AS match_reason, dependency_count
FROM dependence_chain
WHERE
status = 'updated' -- Marked as updated by host-listener
AND
worker_id IS NULL -- Ensure no other workers own it
ORDER BY last_updated_at ASC -- FIFO
FOR UPDATE SKIP LOCKED -- Ensure no other worker is currently trying to lock it
LIMIT 1
)
UPDATE dependence_chain AS dc
SET
worker_id = $1,
status = 'processing',
lock_acquired_at = NOW(),
lock_expires_at = NOW() + make_interval(secs => $2)
FROM candidate
WHERE dc.dependence_chain_id = candidate.dependence_chain_id
RETURNING dc.*, candidate.match_reason, candidate.dependency_count;
"#,
)
.bind(self.worker_id)
.bind(self.lock_ttl_sec)
.fetch_optional(&self.pool)
.await?;

let row = if let Some(row) = row {
row
} else {
return Ok((None, LockingReason::Missing));
};

self.lock.replace((row.clone(), SystemTime::now()));
ACQUIRED_DEPENDENCE_CHAIN_ID_COUNTER.inc();

let elapsed = started_at.elapsed().map(|d| d.as_secs_f64()).unwrap_or(0.0);
if elapsed > 0.0 {
ACQUIRE_DEPENDENCE_CHAIN_ID_QUERY_HISTOGRAM.observe(elapsed);
}

info!(?row, query_elapsed = %elapsed, "Acquired lock on earliest DCID");

Ok((
Some(row.dependence_chain_id),
LockingReason::from(row.match_reason.as_str()),
))
}

/// Release all locks held by this worker
///
/// If host-listener has marked the dependence chain as 'updated' in the meantime,
Expand Down
1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
dcid_cleanup_interval_sec: 0,
processed_dcid_ttl_sec: 0,
dcid_max_no_progress_cycles: 2,
dcid_ignore_dependency_count_threshold: 100,
};

std::thread::spawn(move || {
Expand Down
18 changes: 16 additions & 2 deletions coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async fn tfhe_worker_cycle(
&health_check,
&mut trx,
&mut dcid_mngr,
&mut no_progress_cycles,
&tracer,
&loop_ctx,
)
Expand Down Expand Up @@ -328,6 +329,7 @@ async fn query_for_work<'a>(
health_check: &crate::health_check::HealthCheck,
trx: &mut sqlx::Transaction<'a, Postgres>,
deps_chain_mngr: &mut dependence_chain::LockMngr,
no_progress_cycles: &mut u32,
tracer: &opentelemetry::global::BoxedTracer,
loop_ctx: &opentelemetry::Context,
) -> Result<
Expand All @@ -340,7 +342,16 @@ async fn query_for_work<'a>(
match deps_chain_mngr.extend_or_release_current_lock(true).await? {
// If there is a current lock, we extend it and use its dependence_chain_id
Some((id, reason)) => (Some(id), reason),
None => deps_chain_mngr.acquire_next_lock().await?,
None => {
if *no_progress_cycles
< args.dcid_ignore_dependency_count_threshold * args.dcid_max_no_progress_cycles
{
deps_chain_mngr.acquire_next_lock().await?
} else {
*no_progress_cycles = 0;
deps_chain_mngr.acquire_early_lock().await?
}
}
};
if deps_chain_mngr.enabled() && dependence_chain_id.is_none() {
// No dependence chain to lock, so no work to do
Expand Down Expand Up @@ -482,7 +493,10 @@ WHERE c.transaction_id IN (
inputs,
is_allowed: w.is_allowed,
});
if w.schedule_order < earliest_schedule_order {
if w.schedule_order < earliest_schedule_order && w.is_allowed {
// Only account for allowed to avoid case of reorg
// where trivial encrypts will be in collision in
// the same transaction and old ones are re-used
earliest_schedule_order = w.schedule_order;
}
}
Expand Down
Loading