Skip to content

Commit 687e931

Browse files
committed
chore(coprocessor): solve dcid unit-tests issue
1 parent a3453db commit 687e931

3 files changed

Lines changed: 24 additions & 9 deletions

File tree

coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl LockMngr {
127127
&mut self,
128128
) -> Result<(Option<Vec<u8>>, LockingReason), sqlx::Error> {
129129
if self.disable_locking {
130-
warn!("Locking is disabled");
130+
debug!("Locking is disabled");
131131
return Ok((None, LockingReason::Missing));
132132
}
133133

coprocessor/fhevm-engine/tfhe-worker/src/tests/dependence_chain.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::dependence_chain::{LockMngr, LockingReason};
22
use serial_test::serial;
33
use sqlx::postgres::PgPoolOptions;
4-
use test_harness::instance::ImportMode;
4+
use test_harness::instance::{DBInstance, ImportMode};
55
use tokio::time::{sleep, Duration};
66
use tracing::info;
77
use uuid::Uuid;
@@ -11,7 +11,13 @@ const NUM_SAMPLE_CHAINS: usize = 10;
1111
#[tokio::test]
1212
#[serial(db)]
1313
async fn test_acquire_next_lock() {
14-
let pool = setup().await;
14+
let instance = setup().await;
15+
let pool = sqlx::postgres::PgPoolOptions::new()
16+
.max_connections(2)
17+
.connect(instance.db_url.as_str())
18+
.await
19+
.expect("Failed to connect to the database");
20+
1521
let dependence_chain_ids = insert_dependence_chains(&pool, NUM_SAMPLE_CHAINS)
1622
.await
1723
.expect("inserted chains");
@@ -52,7 +58,12 @@ async fn test_acquire_next_lock() {
5258
#[tokio::test]
5359
#[serial(db)]
5460
async fn test_work_stealing() {
55-
let pool = setup().await;
61+
let instance = setup().await;
62+
let pool = sqlx::postgres::PgPoolOptions::new()
63+
.max_connections(2)
64+
.connect(instance.db_url.as_str())
65+
.await
66+
.expect("Failed to connect to the database");
5667

5768
let dependence_chain_ids = insert_dependence_chains(&pool, NUM_SAMPLE_CHAINS)
5869
.await
@@ -178,7 +189,9 @@ async fn insert_dependence_chains(
178189
let mut out = Vec::with_capacity(num_chains);
179190

180191
for i in 0..num_chains {
192+
info!("Inserting dependence chain {}", i);
181193
let dependence_chain_id = i.to_le_bytes().to_vec();
194+
182195
sqlx::query!(
183196
r#"
184197
INSERT INTO dependence_chain (dependence_chain_id, status, last_updated_at)
@@ -199,7 +212,7 @@ async fn insert_dependence_chains(
199212
Ok(out)
200213
}
201214

202-
async fn setup() -> sqlx::PgPool {
215+
async fn setup() -> DBInstance {
203216
let _ = tracing_subscriber::fmt().json().with_level(true).try_init();
204217
let test_instance = test_harness::instance::setup_test_db(ImportMode::None)
205218
.await
@@ -216,5 +229,5 @@ async fn setup() -> sqlx::PgPool {
216229
.await
217230
.unwrap();
218231

219-
pool
232+
test_instance
220233
}

coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ async fn query_for_work<'a>(
379379
));
380380
s.end();
381381

382+
let transaction_batch_size = args.work_items_batch_size;
383+
382384
// This query locks our work items so other worker doesn't select them.
383385
let mut s = tracer.start_with_context("query_work_items", loop_ctx);
384386
let started_at = SystemTime::now();
@@ -424,7 +426,7 @@ WHERE c.transaction_id IN (
424426
)
425427
FOR UPDATE SKIP LOCKED ",
426428
dependence_chain_id,
427-
args.work_items_batch_size as i32,
429+
transaction_batch_size as i32,
428430
)
429431
.fetch_all(trx.as_mut())
430432
.await
@@ -439,14 +441,14 @@ FOR UPDATE SKIP LOCKED ",
439441
health_check.update_db_access();
440442
if the_work.is_empty() {
441443
if let Some(dependence_chain_id) = &dependence_chain_id {
442-
warn!(target: "tfhe_worker", dcid = %hex::encode(dependence_chain_id), locking = ?locking_reason, "No work items found to process");
444+
warn!(target: "tfhe_worker", dcid = %hex::encode(dependence_chain_id), locking = ?locking_reason, "No computations found to process");
443445
}
444446
health_check.update_activity();
445447
return Ok((vec![], vec![]));
446448
}
447449
WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
448450
info!(target: "tfhe_worker", { count = the_work.len(), dcid = ?dependence_chain_id.as_ref().map(hex::encode),
449-
locking = ?locking_reason }, "Processing work items");
451+
locking = ?locking_reason }, "Processing computations");
450452
// Make sure we process each tenant independently to avoid
451453
// setting different keys from different tenants in the worker
452454
// threads

0 commit comments

Comments
 (0)