Skip to content

Commit b128e15

Browse files
obatiroudartdart26
authored andcommitted
refactor(coprocessor): remove tenant notion from sns-worker (#1903)
* db-migration: remove tenant_id from ciphertexts128 and add host_chain_id to pbs_computations * sns-worker: remove tenant API key and refactor keyset loading * tests: align sns-worker and harness with key_id/host_chain_id * docs/compose: update sns-worker usage for tenant-less schema * test-harness: drop keys.chain_id filter after tenant removal * style: cargo fmt * Clarify keyset invariant and rotation TODO
1 parent f802a85 commit b128e15

File tree

12 files changed

+191
-177
lines changed

12 files changed

+191
-177
lines changed

coprocessor/fhevm-engine/db-migration/migrations/20260128095635_remove_tenants.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,23 @@ ALTER TABLE ciphertexts DROP CONSTRAINT ciphertexts_pkey;
8080
ALTER TABLE ciphertexts DROP COLUMN tenant_id;
8181
ALTER TABLE ciphertexts ADD PRIMARY KEY (handle, ciphertext_version);
8282

83+
-- ciphertexts128.tenant_id no longer needed.
84+
ALTER TABLE ciphertexts128 DROP CONSTRAINT ciphertexts128_pkey;
85+
ALTER TABLE ciphertexts128 DROP COLUMN tenant_id;
86+
ALTER TABLE ciphertexts128 ADD PRIMARY KEY (handle);
87+
DROP INDEX IF EXISTS idx_ciphertexts128_handle;
88+
8389
-- computations.tenant_id no longer needed.
8490
ALTER TABLE computations DROP CONSTRAINT computations_pkey;
8591
DROP INDEX IF EXISTS idx_computations_pk;
8692
ALTER TABLE computations DROP COLUMN tenant_id;
8793
ALTER TABLE computations ADD PRIMARY KEY (output_handle, transaction_id);
8894

8995
-- pbs_computations.tenant_id no longer needed.
96+
ALTER TABLE pbs_computations ADD COLUMN host_chain_id BIGINT DEFAULT NULL;
97+
UPDATE pbs_computations SET host_chain_id = (SELECT chain_id FROM keys WHERE tenant_id = pbs_computations.tenant_id);
98+
ALTER TABLE pbs_computations ALTER COLUMN host_chain_id SET NOT NULL;
99+
ALTER TABLE pbs_computations ADD CONSTRAINT pbs_computations_host_chain_id_positive CHECK (host_chain_id > 0);
90100
ALTER TABLE pbs_computations DROP CONSTRAINT pbs_computations_pkey;
91101
ALTER TABLE pbs_computations DROP COLUMN tenant_id;
92102
ALTER TABLE pbs_computations ADD PRIMARY KEY (handle);

coprocessor/fhevm-engine/sns-worker/README.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,33 @@ Upon receiving a notification, it mainly does the following steps:
1717

1818
Runs sns-executor. See also `src/bin/utils/daemon_cli.rs`
1919

20-
20+
2121
## Running a SnS Worker
2222

23-
### The SnS key can be retrieved from the Large Objects table (pg_largeobject). Before running a worker, the sns_pk should be imported into tenants tables as shown below. If tenants table is not in use, then keys can be passed with CLI param --keys_file_path
23+
### The SnS key can be retrieved from the Large Objects table (pg_largeobject). Before running a worker, the sns_pk should be imported into the keys table as shown below. If the keys table is not in use, then keys can be passed with CLI param --keys_file_path
2424
```sql
2525
-- Example query to import sns_pk from fhevm-keys/sns_pk
2626
-- Import the sns_pk into the Large Object storage
2727
sns_pk_loid := lo_import('../fhevm-keys/sns_pk');
2828

29-
-- Update the tenants table with the new Large Object OID
30-
UPDATE tenants
29+
-- Update the keys table with the new Large Object OID
30+
UPDATE keys
3131
SET sns_pk = sns_pk_loid
32-
WHERE tenant_id = 1;
32+
WHERE sequence_number = (SELECT sequence_number FROM keys ORDER BY sequence_number DESC LIMIT 1);
3333
```
3434

3535
### Multiple workers can be launched independently to perform 128-PBS computations.
3636
```bash
3737
# Run a single instance of the worker
3838
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/coprocessor \
3939
cargo run --release -- \
40-
--tenant-api-key "a1503fb6-d79b-4e9e-826d-44cf262f3e05" \
4140
--pg-listen-channels "event_pbs_computations" "event_ciphertext_computed" \
4241
--pg-notify-channel "event_pbs_computed" \
4342
```
4443

44+
Notes:
45+
- `host_chain_id` is read directly from `pbs_computations`/`ciphertext_digest` rows.
46+
4547
## Testing
4648

4749
- Using `Postgres` docker image
@@ -59,4 +61,3 @@ COPROCESSOR_TEST_LOCALHOST_RESET=1 cargo test --release -- --nocapture
5961
# Then, on every run
6062
COPROCESSOR_TEST_LOCALHOST=1 cargo test --release
6163
```
62-

coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,9 @@ async fn run_uploader_loop(
119119
UploadJob::DatabaseLock(item) => {
120120
if let Err(err) = sqlx::query!(
121121
"SELECT * FROM ciphertext_digest
122-
WHERE handle = $2 AND tenant_id = $1 AND
122+
WHERE handle = $1 AND
123123
(ciphertext128 IS NULL OR ciphertext IS NULL)
124124
FOR UPDATE SKIP LOCKED",
125-
item.tenant_id,
126125
item.handle
127126
)
128127
.fetch_one(trx.as_mut())
@@ -234,7 +233,6 @@ async fn upload_ciphertexts(
234233
info!(
235234
handle = handle_as_hex,
236235
len = ?ByteSize::b(ct128_bytes.len() as u64),
237-
tenant_id = task.tenant_id,
238236
"Uploading ct128"
239237
);
240238

@@ -286,7 +284,6 @@ async fn upload_ciphertexts(
286284
info!(
287285
handle = handle_as_hex,
288286
len = ?ByteSize::b(ct64_compressed.len() as u64),
289-
tenant_id = task.tenant_id,
290287
"Uploading ct64",
291288
);
292289

@@ -400,7 +397,7 @@ async fn fetch_pending_uploads(
400397
limit: i64,
401398
) -> Result<Vec<UploadJob>, ExecutionError> {
402399
let rows = sqlx::query!(
403-
"SELECT tenant_id, handle, ciphertext, ciphertext128, ciphertext128_format, transaction_id
400+
"SELECT handle, ciphertext, ciphertext128, ciphertext128_format, transaction_id, host_chain_id, key_id
404401
FROM ciphertext_digest
405402
WHERE ciphertext IS NULL OR ciphertext128 IS NULL
406403
FOR UPDATE SKIP LOCKED
@@ -423,8 +420,7 @@ async fn fetch_pending_uploads(
423420
// Fetch missing ciphertext
424421
if ciphertext_digest.is_none() {
425422
if let Ok(row) = sqlx::query!(
426-
"SELECT ciphertext FROM ciphertexts WHERE tenant_id = $1 AND handle = $2;",
427-
row.tenant_id,
423+
"SELECT ciphertext FROM ciphertexts WHERE handle = $1;",
428424
handle
429425
)
430426
.fetch_optional(db_pool)
@@ -441,8 +437,7 @@ async fn fetch_pending_uploads(
441437
// Fetch missing ciphertext128
442438
if ciphertext128_digest.is_none() {
443439
if let Ok(row) = sqlx::query!(
444-
"SELECT ciphertext FROM ciphertexts128 WHERE tenant_id = $1 AND handle = $2;",
445-
row.tenant_id,
440+
"SELECT ciphertext FROM ciphertexts128 WHERE handle = $1;",
446441
handle
447442
)
448443
.fetch_optional(db_pool)
@@ -484,7 +479,8 @@ async fn fetch_pending_uploads(
484479

485480
if !ct64_compressed.is_empty() || !is_ct128_empty {
486481
let item = HandleItem {
487-
tenant_id: row.tenant_id,
482+
host_chain_id: row.host_chain_id,
483+
key_id: row.key_id,
488484
handle: handle.clone(),
489485
ct64_compressed,
490486
ct128: Arc::new(ct128),

coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ fn construct_config() -> Config {
1919
let db_url = args.database_url.clone().unwrap_or_default();
2020

2121
Config {
22-
tenant_api_key: args.tenant_api_key,
2322
service_name: args.service_name,
2423
metrics: SNSMetricsConfig {
2524
addr: args.metrics_addr,

coprocessor/fhevm-engine/sns-worker/src/bin/utils/daemon_cli.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@ use tracing::Level;
1111
#[derive(Parser, Debug, Clone)]
1212
#[command(version, about, long_about = None)]
1313
pub struct Args {
14-
/// Tenant API key
15-
#[arg(long)]
16-
pub tenant_api_key: String,
17-
1814
/// Work items batch size
1915
#[arg(long, default_value_t = 4)]
2016
pub work_items_batch_size: u32,

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::aws_upload::check_is_ready;
2-
use crate::keyset::fetch_keyset;
2+
use crate::keyset::fetch_latest_keyset;
33
use crate::metrics::SNS_LATENCY_OP_HISTOGRAM;
44
use crate::metrics::TASK_EXECUTE_FAILURE_COUNTER;
55
use crate::metrics::TASK_EXECUTE_SUCCESS_COUNTER;
@@ -13,6 +13,7 @@ use crate::SchedulePolicy;
1313
use crate::UploadJob;
1414
use crate::{Config, ExecutionError};
1515
use aws_sdk_s3::Client;
16+
use fhevm_engine_common::db_keys::DbKeyId;
1617
use fhevm_engine_common::healthz_server::{HealthCheckService, HealthStatus, Version};
1718
use fhevm_engine_common::pg_pool::PostgresPoolManager;
1819
use fhevm_engine_common::pg_pool::ServiceError;
@@ -142,7 +143,7 @@ impl SwitchNSquashService {
142143
}
143144

144145
pub async fn run(&self, pool_mngr: &PostgresPoolManager) {
145-
let keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>> = Arc::new(RwLock::new(
146+
let keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>> = Arc::new(RwLock::new(
146147
lru::LruCache::new(NonZeroUsize::new(10).unwrap()),
147148
));
148149

@@ -174,19 +175,10 @@ impl SwitchNSquashService {
174175

175176
async fn get_keyset(
176177
pool: PgPool,
177-
keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>>,
178-
tenant_api_key: &String,
179-
) -> Result<Option<KeySet>, ExecutionError> {
178+
keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>>,
179+
) -> Result<Option<(DbKeyId, KeySet)>, ExecutionError> {
180180
let _t = telemetry::tracer("fetch_keyset", &None);
181-
{
182-
let mut cache = keys_cache.write().await;
183-
if let Some(keys) = cache.get(tenant_api_key) {
184-
info!(tenant_api_key = tenant_api_key, "Keyset found in cache");
185-
return Ok(Some(keys.clone()));
186-
}
187-
}
188-
let keys: Option<KeySet> = fetch_keyset(&keys_cache, &pool, tenant_api_key).await?;
189-
Ok(keys)
181+
fetch_latest_keyset(&keys_cache, &pool).await
190182
}
191183

192184
/// Executes the worker logic for the SnS task.
@@ -196,20 +188,19 @@ pub(crate) async fn run_loop(
196188
pool: PgPool,
197189
token: CancellationToken,
198190
last_active_at: Arc<RwLock<SystemTime>>,
199-
keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>>,
191+
keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>>,
200192
events_tx: InternalEvents,
201193
) -> Result<(), ExecutionError> {
202194
update_last_active(last_active_at.clone()).await;
203195

204-
let tenant_api_key = &conf.tenant_api_key;
205196
let mut listener = PgListener::connect_with(&pool).await?;
206197
info!("Connected to PostgresDB");
207198

208199
listener
209200
.listen_all(conf.db.listen_channels.iter().map(|v| v.as_str()))
210201
.await?;
211202

212-
let mut keys = None;
203+
let mut keys: Option<(DbKeyId, KeySet)> = None;
213204
let mut gc_ticker = interval(conf.db.cleanup_interval);
214205
let mut gc_timestamp = SystemTime::now();
215206
let mut polling_ticker = interval(Duration::from_secs(conf.db.polling_interval.into()));
@@ -218,27 +209,31 @@ pub(crate) async fn run_loop(
218209
// Continue looping until the service is cancelled or a critical error occurs
219210
update_last_active(last_active_at.clone()).await;
220211

221-
let Some(keys) = keys.as_ref() else {
222-
keys = get_keyset(pool.clone(), keys_cache.clone(), tenant_api_key).await?;
223-
if keys.is_some() {
224-
info!(tenant_api_key = tenant_api_key, "Fetched keyset");
212+
let latest_keys = get_keyset(pool.clone(), keys_cache.clone()).await?;
213+
if let Some((key_id, keyset)) = latest_keys {
214+
let key_changed = keys
215+
.as_ref()
216+
.map(|(current_key_id, _)| current_key_id != &key_id)
217+
.unwrap_or(true);
218+
if key_changed {
219+
info!(key_id = hex::encode(&key_id), "Fetched keyset");
225220
// Notify that the keys are loaded
226221
if let Some(events_tx) = &events_tx {
227222
let _ = events_tx.try_send("event_keys_loaded");
228223
}
229-
} else {
230-
warn!(
231-
tenant_api_key = tenant_api_key,
232-
"No keys available, retrying in 5 seconds"
233-
);
234-
tokio::time::sleep(Duration::from_secs(5)).await;
235224
}
236-
225+
keys = Some((key_id, keyset));
226+
} else {
227+
warn!("No keys available, retrying in 5 seconds");
228+
tokio::time::sleep(Duration::from_secs(5)).await;
237229
if token.is_cancelled() {
238230
return Ok(());
239231
}
240232
continue;
241-
};
233+
}
234+
235+
// keys is guaranteed by the branch above; panic here if that invariant ever regresses.
236+
let (_, keys) = keys.as_ref().expect("keyset should be available");
242237

243238
let (maybe_remaining, _tasks_processed) =
244239
fetch_and_execute_sns_tasks(&pool, &tx, keys, &conf, &token)
@@ -316,20 +311,18 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
316311
let rows_affected: u64 = sqlx::query!(
317312
"
318313
WITH uploaded_ct128 AS (
319-
SELECT c.tenant_id, c.handle
314+
SELECT c.handle
320315
FROM ciphertexts128 c
321316
JOIN ciphertext_digest d
322-
ON d.tenant_id = c.tenant_id
323-
AND d.handle = c.handle
317+
ON d.handle = c.handle
324318
WHERE d.ciphertext128 IS NOT NULL
325319
FOR UPDATE OF c SKIP LOCKED
326320
LIMIT $1
327321
)
328322
329323
DELETE FROM ciphertexts128 c
330324
USING uploaded_ct128 r
331-
WHERE c.tenant_id = r.tenant_id
332-
AND c.handle = r.handle;
325+
WHERE c.handle = r.handle;
333326
",
334327
limit as i32
335328
)
@@ -375,7 +368,7 @@ async fn fetch_and_execute_sns_tasks(
375368

376369
let mut maybe_remaining = false;
377370
let tasks_processed;
378-
if let Some(mut tasks) = query_sns_tasks(trx, conf.db.batch_limit, order).await? {
371+
if let Some(mut tasks) = query_sns_tasks(trx, conf.db.batch_limit, order, &keys.key_id).await? {
379372
maybe_remaining = conf.db.batch_limit as usize == tasks.len();
380373
tasks_processed = tasks.len();
381374

@@ -423,6 +416,7 @@ pub async fn query_sns_tasks(
423416
db_txn: &mut Transaction<'_, Postgres>,
424417
limit: u32,
425418
order: Order,
419+
key_id: &DbKeyId,
426420
) -> Result<Option<Vec<HandleItem>>, ExecutionError> {
427421
let start_time = SystemTime::now();
428422

@@ -460,13 +454,16 @@ pub async fn query_sns_tasks(
460454
let tasks = records
461455
.into_iter()
462456
.map(|record| {
463-
let tenant_id: i32 = record.try_get("tenant_id")?;
457+
let host_chain_id: i64 = record.try_get("host_chain_id")?;
464458
let handle: Vec<u8> = record.try_get("handle")?;
465459
let ciphertext: Vec<u8> = record.try_get("ciphertext")?;
466460
let transaction_id: Option<Vec<u8>> = record.try_get("transaction_id")?;
467461

468462
Ok(HandleItem {
469-
tenant_id,
463+
// TODO: During key rotation, ensure all coprocessors pin the same key_id for a batch
464+
// (e.g., via gateway coordination) to keep ciphertext_digest consistent.
465+
key_id: key_id.clone(),
466+
host_chain_id,
470467
handle: handle.clone(),
471468
ct64_compressed: Arc::new(ciphertext),
472469
ct128: Arc::new(BigCiphertext::default()), // to be computed
@@ -644,12 +641,10 @@ async fn update_ciphertext128(
644641
let res = sqlx::query!(
645642
"
646643
INSERT INTO ciphertexts128 (
647-
tenant_id,
648644
handle,
649645
ciphertext
650646
)
651-
VALUES ($1, $2, $3)",
652-
task.tenant_id,
647+
VALUES ($1, $2)",
653648
task.handle,
654649
ciphertext128,
655650
)

0 commit comments

Comments
 (0)