Skip to content

Commit 5e2ae45

Browse files
committed
fix(coprocessor): eliminate db contention between the two workers in sns-executor
Each worker performs write operations on non-shared database tables. The executor do updates to pbs_computations and ciphertexts, while the uploader only updates the ciphertext_digest, thus optimizing overall performance by reducing write conflicts. In addition, the executor does a garbage collecting for any uploaded ct128.
1 parent 67164df commit 5e2ae45

11 files changed

Lines changed: 109 additions & 77 deletions

coprocessor/fhevm-engine/sns-executor/.sqlx/query-455eb6d9fe4e6ffd2fe7f45c2fd668b1e9884a2eff149dd2638089417bad1e95.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-5006562c943244b0e6dd278a60e76924ff0e43a08971950550f8c97a6651dfe3.json

Lines changed: 0 additions & 15 deletions
This file was deleted.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-a006d9ccf85f04ed5c9d65eb759b6e83376343cfc56ad730b15d5fa476d5db37.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-a8b64a29e5c7c917a75c5e758ad494cf75ab8291ba6d47366157827b3550cc0f.json

Lines changed: 0 additions & 14 deletions
This file was deleted.

coprocessor/fhevm-engine/sns-executor/.sqlx/query-fd37e7dc679caa21ba22d7724a27409c232cceb719aa41a71e7faf44d2cb8ef9.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/sns-executor/src/aws_upload.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ pub(crate) async fn process_s3_uploads(
9292
let mut trx = pool.begin().await?;
9393

9494
let item = match job {
95-
UploadJob::Normal(item) => item,
95+
UploadJob::Normal(item) =>
96+
{
97+
item.enqueue_upload_task(&mut trx).await?;
98+
item
99+
},
96100
UploadJob::DatabaseLock(item) => {
97101
if let Err(err) = sqlx::query!(
98102
"SELECT * FROM ciphertext_digest

coprocessor/fhevm-engine/sns-executor/src/bin/sns_worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ fn construct_config() -> Config {
3434
batch_limit: args.work_items_batch_size,
3535
polling_interval: args.pg_polling_interval,
3636
max_connections: args.pg_pool_connections,
37+
cleanup_interval: args.cleanup_interval,
3738
},
3839
s3: S3Config {
3940
bucket_ct128: args.bucket_name_ct128,

coprocessor/fhevm-engine/sns-executor/src/bin/utils/daemon_cli.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ pub struct Args {
7171

7272
#[arg(long, default_value = "120s", value_parser = parse_duration)]
7373
pub s3_regular_recheck_duration: Duration,
74+
75+
#[arg(long, default_value = "120s", value_parser = parse_duration)]
76+
pub cleanup_interval: Duration,
7477
}
7578

7679
pub fn parse_args() -> Args {

coprocessor/fhevm-engine/sns-executor/src/executor.rs

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::time::SystemTime;
1515
use tfhe::set_server_key;
1616
use tokio::select;
1717
use tokio::sync::mpsc::Sender;
18+
use tokio::time::{interval, sleep};
1819
use tokio_util::sync::CancellationToken;
1920
use tracing::{debug, error, info};
2021

@@ -64,12 +65,15 @@ pub(crate) async fn run_loop(
6465
match acquire_connection(&pool, token.clone()).await {
6566
ConnStatus::Established(conn) => conn,
6667
ConnStatus::Failed => {
67-
tokio::time::sleep(RETRY_DB_CONN_INTERVAL).await;
68+
sleep(RETRY_DB_CONN_INTERVAL).await;
6869
continue; // Retry to reacquire a connection
6970
}
7071
ConnStatus::Cancelled => break,
7172
};
7273

74+
let mut gc_ticker = interval(conf.cleanup_interval);
75+
let mut polling_ticker = interval(Duration::from_secs(conf.polling_interval.into()));
76+
7377
loop {
7478
match fetch_and_execute_sns_tasks(&mut conn, tx, &keys, conf).await {
7579
Ok(_) => {
@@ -94,11 +98,15 @@ pub(crate) async fn run_loop(
9498

9599
select! {
96100
_ = token.cancelled() => return Ok(()),
97-
notification = listener.try_recv() => {
98-
info!(target: "worker", "Received notification {:?}", notification);
101+
n = listener.try_recv() => {
102+
info!(target: "worker", "Received notification {:?}", n);
99103
},
100-
_ = tokio::time::sleep(Duration::from_secs(conf.polling_interval.into())) => {
104+
_ = polling_ticker.tick() => {
101105
debug!(target: "worker", "Polling timeout, rechecking for tasks");
106+
},
107+
// Garbage collecting
108+
_ = gc_ticker.tick() => {
109+
garbage_collect(&mut conn).await?;
102110
}
103111
}
104112
}
@@ -107,6 +115,32 @@ pub(crate) async fn run_loop(
107115
Ok(())
108116
}
109117

118+
// Clean up the database by removing old ciphertexts128 already uploaded to S3.
119+
async fn garbage_collect(conn: &mut PoolConnection<Postgres>) -> Result<(), ExecutionError> {
120+
let start = SystemTime::now();
121+
let rows = sqlx::query!(
122+
"UPDATE ciphertexts c
123+
SET ciphertext128 = NULL
124+
WHERE ciphertext128 is NOT NULL -- needed for getting an accurate rows_affected
125+
AND EXISTS (
126+
SELECT 1
127+
FROM ciphertext_digest d
128+
WHERE d.tenant_id = c.tenant_id
129+
AND d.handle = c.handle
130+
AND d.ciphertext128 IS NOT NULL
131+
);"
132+
)
133+
.execute(conn.as_mut())
134+
.await?.rows_affected();
135+
136+
if rows > 0 {
137+
let _s = telemetry::tracer_with_start_time("cleanup_ct128", start);
138+
info!(target: "worker", "Cleaning up old ciphertexts128, rows_affected: {}", rows);
139+
}
140+
141+
Ok(())
142+
}
143+
110144
/// Fetch and process SnS tasks from the database.
111145
async fn fetch_and_execute_sns_tasks(
112146
conn: &mut PoolConnection<Postgres>,
@@ -128,17 +162,19 @@ async fn fetch_and_execute_sns_tasks(
128162
let t = telemetry::tracer("batch_execution");
129163
t.set_attribute("count", tasks.len().to_string());
130164

131-
process_tasks(trx, &mut tasks, keys, tx).await?;
165+
process_tasks(&mut tasks, keys, tx)?;
132166
update_computations_status(trx, &tasks).await?;
133167

134168
let s = t.child_span("batch_store_ciphertext128");
135169
update_ciphertext128(trx, &tasks).await?;
136170
notify_ciphertext128_ready(trx, &conf.notify_channel).await?;
171+
172+
// Try to enqueue the tasks for upload in the DB
173+
// This is a best-effort attempt, as the upload worker might not be available
174+
enqueue_upload_tasks(trx, &tasks).await?;
137175
telemetry::end_span(s);
138176

139177
db_txn.commit().await?;
140-
141-
t.end();
142178
} else {
143179
db_txn.rollback().await?;
144180
}
@@ -241,9 +277,18 @@ async fn get_remaining_tasks(
241277
Ok(records_count.unwrap_or(0))
242278
}
243279

244-
/// Processes the tasks by decompressing and transforming ciphertexts.
245-
async fn process_tasks(
280+
async fn enqueue_upload_tasks(
246281
db_txn: &mut Transaction<'_, Postgres>,
282+
tasks: &[HandleItem],
283+
) -> Result<(), ExecutionError> {
284+
for task in tasks.iter() {
285+
task.enqueue_upload_task(db_txn).await?;
286+
}
287+
Ok(())
288+
}
289+
290+
/// Processes the tasks by decompressing and transforming ciphertexts.
291+
fn process_tasks(
247292
tasks: &mut [HandleItem],
248293
keys: &KeySet,
249294
tx: &Sender<UploadJob>,
@@ -292,9 +337,6 @@ async fn process_tasks(
292337
}
293338
};
294339

295-
// Enqueue the task for upload in DB
296-
task.enqueue_upload_task(db_txn).await?;
297-
298340
// Start uploading the ciphertexts as soon as the ct128 is computed
299341
//
300342
// The service must continue running the squashed noise algorithm,
@@ -336,28 +378,23 @@ async fn update_ciphertext128(
336378
let ciphertext128 = &task.ct128_uncompressed;
337379
let s = task.otel.child_span("ct128_db_insert");
338380

339-
// Insert the ciphertext128 into the database only if
340-
// the uploader has not already put it in AWS S3
381+
// Insert the ciphertext128 into the database for reliability
382+
// Later on, we clean up all uploaded ct128
341383
let res = sqlx::query!(
342-
"
343-
UPDATE ciphertexts
344-
SET ciphertext128 = $1
345-
WHERE handle = $2
346-
AND EXISTS (
347-
SELECT 1
348-
FROM ciphertext_digest
349-
WHERE handle = $2
350-
AND ciphertext128 IS NULL
351-
);",
352-
ciphertext128.as_ref(),
353-
task.handle
354-
)
355-
.execute(db_txn.as_mut())
356-
.await;
384+
"
385+
UPDATE ciphertexts
386+
SET ciphertext128 = $1
387+
WHERE handle = $2;",
388+
ciphertext128.as_ref(),
389+
task.handle
390+
)
391+
.execute(db_txn.as_mut())
392+
.await;
357393

358394
match res {
359-
Ok(_) => {
360-
debug!(target: "worker", handle = ?task.handle, "Inserted ct128 in DB");
395+
Ok(val) => {
396+
info!(target: "worker", handle = compact_hex(&task.handle),
397+
query_res = format!("{:?}", val), "Inserted ct128 in DB");
361398
telemetry::end_span(s);
362399
}
363400
Err(err) => {

coprocessor/fhevm-engine/sns-executor/src/lib.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct DBConfig {
3232
pub notify_channel: String,
3333
pub batch_limit: u32,
3434
pub polling_interval: u32,
35+
pub cleanup_interval: Duration,
3536
pub max_connections: u32,
3637
}
3738

@@ -128,19 +129,6 @@ impl HandleItem {
128129
.execute(trx.as_mut())
129130
.await?;
130131

131-
// Reset ciphertext128 as the ct128 has been successfully uploaded to S3
132-
// NB: For reclaiming the disk-space in DB, we rely on auto vacuuming in
133-
// Postgres
134-
135-
sqlx::query!(
136-
"UPDATE ciphertexts
137-
SET ciphertext128 = NULL
138-
WHERE handle = $1",
139-
self.handle
140-
)
141-
.execute(trx.as_mut())
142-
.await?;
143-
144132
info!(
145133
"Mark ct128 as uploaded, handle: {}, digest: {}",
146134
compact_hex(&self.handle),
@@ -229,7 +217,7 @@ pub enum UploadJob {
229217
}
230218

231219
impl UploadJob {
232-
pub fn handle(&self) -> &Vec<u8> {
220+
pub fn handle(&self) -> &[u8] {
233221
match self {
234222
UploadJob::Normal(item) => &item.handle,
235223
UploadJob::DatabaseLock(item) => &item.handle,

0 commit comments

Comments
 (0)