Skip to content

Commit 471b7e3

Browse files
committed
fix(coprocessor): eliminate db contention between the two workers in sns-executor
Each worker performs write operations on non-shared database tables. The executor do updates to pbs_computations and ciphertexts, while the uploader only updates the ciphertext_digest, thus optimizing overall performance by reducing write conflicts. In addition, the executor does a garbage collecting for any uploaded ct128.
1 parent 3d42a90 commit 471b7e3

11 files changed

Lines changed: 105 additions & 77 deletions

coprocessor/fhevm-engine/sns-executor/.sqlx/query-455eb6d9fe4e6ffd2fe7f45c2fd668b1e9884a2eff149dd2638089417bad1e95.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-5006562c943244b0e6dd278a60e76924ff0e43a08971950550f8c97a6651dfe3.json

Lines changed: 0 additions & 15 deletions
This file was deleted.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-a006d9ccf85f04ed5c9d65eb759b6e83376343cfc56ad730b15d5fa476d5db37.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-a8b64a29e5c7c917a75c5e758ad494cf75ab8291ba6d47366157827b3550cc0f.json

Lines changed: 0 additions & 14 deletions
This file was deleted.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-fd37e7dc679caa21ba22d7724a27409c232cceb719aa41a71e7faf44d2cb8ef9.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/sns-executor/src/aws_upload.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ pub(crate) async fn process_s3_uploads(
9292
let mut trx = pool.begin().await?;
9393

9494
let item = match job {
95-
UploadJob::Normal(item) => item,
95+
UploadJob::Normal(item) =>
96+
{
97+
item.enqueue_upload_task(&mut trx).await?;
98+
item
99+
},
96100
UploadJob::DatabaseLock(item) => {
97101
if let Err(err) = sqlx::query!(
98102
"SELECT * FROM ciphertext_digest

coprocessor/fhevm-engine/sns-executor/src/bin/sns_worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ fn construct_config() -> Config {
3434
batch_limit: args.work_items_batch_size,
3535
polling_interval: args.pg_polling_interval,
3636
max_connections: args.pg_pool_connections,
37+
cleanup_interval: args.cleanup_interval,
3738
},
3839
s3: S3Config {
3940
bucket_ct128: args.bucket_name_ct128,

coprocessor/fhevm-engine/sns-executor/src/bin/utils/daemon_cli.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ pub struct Args {
7171

7272
#[arg(long, default_value = "120s", value_parser = parse_duration)]
7373
pub s3_regular_recheck_duration: Duration,
74+
75+
#[arg(long, default_value = "120s", value_parser = parse_duration)]
76+
pub cleanup_interval: Duration,
7477
}
7578

7679
pub fn parse_args() -> Args {

coprocessor/fhevm-engine/sns-executor/src/executor.rs

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::time::SystemTime;
1515
use tfhe::set_server_key;
1616
use tokio::select;
1717
use tokio::sync::mpsc::Sender;
18+
use tokio::time::sleep;
1819
use tokio_util::sync::CancellationToken;
1920
use tracing::{debug, error, info};
2021

@@ -64,7 +65,7 @@ pub(crate) async fn run_loop(
6465
match acquire_connection(&pool, token.clone()).await {
6566
ConnStatus::Established(conn) => conn,
6667
ConnStatus::Failed => {
67-
tokio::time::sleep(RETRY_DB_CONN_INTERVAL).await;
68+
sleep(RETRY_DB_CONN_INTERVAL).await;
6869
continue; // Retry to reacquire a connection
6970
}
7071
ConnStatus::Cancelled => break,
@@ -94,11 +95,15 @@ pub(crate) async fn run_loop(
9495

9596
select! {
9697
_ = token.cancelled() => return Ok(()),
97-
notification = listener.try_recv() => {
98-
info!(target: "worker", "Received notification {:?}", notification);
98+
n = listener.try_recv() => {
99+
info!(target: "worker", "Received notification {:?}", n);
99100
},
100-
_ = tokio::time::sleep(Duration::from_secs(conf.polling_interval.into())) => {
101+
_ = sleep(Duration::from_secs(conf.polling_interval.into())) => {
101102
debug!(target: "worker", "Polling timeout, rechecking for tasks");
103+
},
104+
// Garbage collecting
105+
_ = sleep(conf.cleanup_interval) => {
106+
garbage_collect(&mut conn).await?;
102107
}
103108
}
104109
}
@@ -107,6 +112,31 @@ pub(crate) async fn run_loop(
107112
Ok(())
108113
}
109114

115+
// Clean up the database by removing old ciphertexts128 already uploaded to S3.
116+
async fn garbage_collect(conn: &mut PoolConnection<Postgres>) -> Result<(), ExecutionError> {
117+
let _s = telemetry::tracer("cleanup_ct128");
118+
let rows = sqlx::query!(
119+
"UPDATE ciphertexts c
120+
SET ciphertext128 = NULL
121+
WHERE ciphertext128 is NOT NULL -- needed for getting an accurate rows_affected
122+
AND EXISTS (
123+
SELECT 1
124+
FROM ciphertext_digest d
125+
WHERE d.tenant_id = c.tenant_id
126+
AND d.handle = c.handle
127+
AND d.ciphertext128 IS NOT NULL
128+
);"
129+
)
130+
.execute(conn.as_mut())
131+
.await?.rows_affected();
132+
133+
if rows > 0 {
134+
info!(target: "worker", "Cleaning up old ciphertexts128, rows_affected: {}", rows);
135+
}
136+
137+
Ok(())
138+
}
139+
110140
/// Fetch and process SnS tasks from the database.
111141
async fn fetch_and_execute_sns_tasks(
112142
conn: &mut PoolConnection<Postgres>,
@@ -128,17 +158,19 @@ async fn fetch_and_execute_sns_tasks(
128158
let t = telemetry::tracer("batch_execution");
129159
t.set_attribute("count", tasks.len().to_string());
130160

131-
process_tasks(trx, &mut tasks, keys, tx).await?;
161+
process_tasks(&mut tasks, keys, tx)?;
132162
update_computations_status(trx, &tasks).await?;
133163

134164
let s = t.child_span("batch_store_ciphertext128");
135165
update_ciphertext128(trx, &tasks).await?;
136166
notify_ciphertext128_ready(trx, &conf.notify_channel).await?;
167+
168+
// Try to enqueue the tasks for upload in the DB
169+
// This is a best-effort attempt, as the upload worker might not be available
170+
enqueue_upload_tasks(trx, &tasks).await?;
137171
telemetry::end_span(s);
138172

139173
db_txn.commit().await?;
140-
141-
t.end();
142174
} else {
143175
db_txn.rollback().await?;
144176
}
@@ -241,9 +273,18 @@ async fn get_remaining_tasks(
241273
Ok(records_count.unwrap_or(0))
242274
}
243275

244-
/// Processes the tasks by decompressing and transforming ciphertexts.
245-
async fn process_tasks(
276+
async fn enqueue_upload_tasks(
246277
db_txn: &mut Transaction<'_, Postgres>,
278+
tasks: &[HandleItem],
279+
) -> Result<(), ExecutionError> {
280+
for task in tasks.iter() {
281+
task.enqueue_upload_task(db_txn).await?;
282+
}
283+
Ok(())
284+
}
285+
286+
/// Processes the tasks by decompressing and transforming ciphertexts.
287+
fn process_tasks(
247288
tasks: &mut [HandleItem],
248289
keys: &KeySet,
249290
tx: &Sender<UploadJob>,
@@ -292,9 +333,6 @@ async fn process_tasks(
292333
}
293334
};
294335

295-
// Enqueue the task for upload in DB
296-
task.enqueue_upload_task(db_txn).await?;
297-
298336
// Start uploading the ciphertexts as soon as the ct128 is computed
299337
//
300338
// The service must continue running the squashed noise algorithm,
@@ -336,28 +374,23 @@ async fn update_ciphertext128(
336374
let ciphertext128 = &task.ct128_uncompressed;
337375
let s = task.otel.child_span("ct128_db_insert");
338376

339-
// Insert the ciphertext128 into the database only if
340-
// the uploader has not already put it in AWS S3
377+
// Insert the ciphertext128 into the database for reliability
378+
// Later on, we clean up all uploaded ct128
341379
let res = sqlx::query!(
342-
"
343-
UPDATE ciphertexts
344-
SET ciphertext128 = $1
345-
WHERE handle = $2
346-
AND EXISTS (
347-
SELECT 1
348-
FROM ciphertext_digest
349-
WHERE handle = $2
350-
AND ciphertext128 IS NULL
351-
);",
352-
ciphertext128.as_ref(),
353-
task.handle
354-
)
355-
.execute(db_txn.as_mut())
356-
.await;
380+
"
381+
UPDATE ciphertexts
382+
SET ciphertext128 = $1
383+
WHERE handle = $2;",
384+
ciphertext128.as_ref(),
385+
task.handle
386+
)
387+
.execute(db_txn.as_mut())
388+
.await;
357389

358390
match res {
359-
Ok(_) => {
360-
debug!(target: "worker", handle = ?task.handle, "Inserted ct128 in DB");
391+
Ok(val) => {
392+
info!(target: "worker", handle = compact_hex(&task.handle),
393+
query_res = format!("{:?}", val), "Inserted ct128 in DB");
361394
telemetry::end_span(s);
362395
}
363396
Err(err) => {

coprocessor/fhevm-engine/sns-executor/src/lib.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct DBConfig {
3232
pub notify_channel: String,
3333
pub batch_limit: u32,
3434
pub polling_interval: u32,
35+
pub cleanup_interval: Duration,
3536
pub max_connections: u32,
3637
}
3738

@@ -128,19 +129,6 @@ impl HandleItem {
128129
.execute(trx.as_mut())
129130
.await?;
130131

131-
// Reset ciphertext128 as the ct128 has been successfully uploaded to S3
132-
// NB: For reclaiming the disk-space in DB, we rely on auto vacuuming in
133-
// Postgres
134-
135-
sqlx::query!(
136-
"UPDATE ciphertexts
137-
SET ciphertext128 = NULL
138-
WHERE handle = $1",
139-
self.handle
140-
)
141-
.execute(trx.as_mut())
142-
.await?;
143-
144132
info!(
145133
"Mark ct128 as uploaded, handle: {}, digest: {}",
146134
compact_hex(&self.handle),
@@ -229,7 +217,7 @@ pub enum UploadJob {
229217
}
230218

231219
impl UploadJob {
232-
pub fn handle(&self) -> &Vec<u8> {
220+
pub fn handle(&self) -> &[u8] {
233221
match self {
234222
UploadJob::Normal(item) => &item.handle,
235223
UploadJob::DatabaseLock(item) => &item.handle,

0 commit comments

Comments
 (0)