Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,23 @@ ALTER TABLE ciphertexts DROP CONSTRAINT ciphertexts_pkey;
ALTER TABLE ciphertexts DROP COLUMN tenant_id;
ALTER TABLE ciphertexts ADD PRIMARY KEY (handle, ciphertext_version);

-- ciphertexts128.tenant_id no longer needed.
ALTER TABLE ciphertexts128 DROP CONSTRAINT ciphertexts128_pkey;
ALTER TABLE ciphertexts128 DROP COLUMN tenant_id;
ALTER TABLE ciphertexts128 ADD PRIMARY KEY (handle);
DROP INDEX IF EXISTS idx_ciphertexts128_handle;

-- computations.tenant_id no longer needed.
ALTER TABLE computations DROP CONSTRAINT computations_pkey;
DROP INDEX IF EXISTS idx_computations_pk;
ALTER TABLE computations DROP COLUMN tenant_id;
ALTER TABLE computations ADD PRIMARY KEY (output_handle, transaction_id);

-- pbs_computations.tenant_id no longer needed.
ALTER TABLE pbs_computations ADD COLUMN host_chain_id BIGINT DEFAULT NULL;
UPDATE pbs_computations SET host_chain_id = (SELECT chain_id FROM keys WHERE tenant_id = pbs_computations.tenant_id);
ALTER TABLE pbs_computations ALTER COLUMN host_chain_id SET NOT NULL;
ALTER TABLE pbs_computations ADD CONSTRAINT pbs_computations_host_chain_id_positive CHECK (host_chain_id > 0);
ALTER TABLE pbs_computations DROP CONSTRAINT pbs_computations_pkey;
ALTER TABLE pbs_computations DROP COLUMN tenant_id;
ALTER TABLE pbs_computations ADD PRIMARY KEY (handle);
Expand Down
15 changes: 8 additions & 7 deletions coprocessor/fhevm-engine/sns-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,33 @@ Upon receiving a notification, it mainly does the following steps:

Runs sns-executor. See also `src/bin/utils/daemon_cli.rs`


## Running a SnS Worker

### The SnS key can be retrieved from the Large Objects table (pg_largeobject). Before running a worker, the sns_pk should be imported into tenants tables as shown below. If tenants table is not in use, then keys can be passed with CLI param --keys_file_path
### The SnS key can be retrieved from the Large Objects table (pg_largeobject). Before running a worker, the sns_pk should be imported into the keys table as shown below. If the keys table is not in use, then keys can be passed with CLI param --keys_file_path
```sql
-- Example query to import sns_pk from fhevm-keys/sns_pk
-- Import the sns_pk into the Large Object storage
sns_pk_loid := lo_import('../fhevm-keys/sns_pk');

-- Update the tenants table with the new Large Object OID
UPDATE tenants
-- Update the keys table with the new Large Object OID
UPDATE keys
SET sns_pk = sns_pk_loid
WHERE tenant_id = 1;
WHERE sequence_number = (SELECT sequence_number FROM keys ORDER BY sequence_number DESC LIMIT 1);
Comment thread
dartdart26 marked this conversation as resolved.
```

### Multiple workers can be launched independently to perform 128-PBS computations.
```bash
# Run a single instance of the worker
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/coprocessor \
cargo run --release -- \
--tenant-api-key "a1503fb6-d79b-4e9e-826d-44cf262f3e05" \
--pg-listen-channels "event_pbs_computations" "event_ciphertext_computed" \
--pg-notify-channel "event_pbs_computed" \
```

Notes:
- `host_chain_id` is read directly from `pbs_computations`/`ciphertext_digest` rows.

## Testing

- Using `Postgres` docker image
Expand All @@ -59,4 +61,3 @@ COPROCESSOR_TEST_LOCALHOST_RESET=1 cargo test --release -- --nocapture
# Then, on every run
COPROCESSOR_TEST_LOCALHOST=1 cargo test --release
```

16 changes: 6 additions & 10 deletions coprocessor/fhevm-engine/sns-worker/src/aws_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ async fn run_uploader_loop(
UploadJob::DatabaseLock(item) => {
if let Err(err) = sqlx::query!(
"SELECT * FROM ciphertext_digest
WHERE handle = $2 AND tenant_id = $1 AND
WHERE handle = $1 AND
(ciphertext128 IS NULL OR ciphertext IS NULL)
FOR UPDATE SKIP LOCKED",
item.tenant_id,
item.handle
)
.fetch_one(trx.as_mut())
Expand Down Expand Up @@ -234,7 +233,6 @@ async fn upload_ciphertexts(
info!(
handle = handle_as_hex,
len = ?ByteSize::b(ct128_bytes.len() as u64),
tenant_id = task.tenant_id,
"Uploading ct128"
);

Expand Down Expand Up @@ -286,7 +284,6 @@ async fn upload_ciphertexts(
info!(
handle = handle_as_hex,
len = ?ByteSize::b(ct64_compressed.len() as u64),
tenant_id = task.tenant_id,
"Uploading ct64",
);

Expand Down Expand Up @@ -400,7 +397,7 @@ async fn fetch_pending_uploads(
limit: i64,
) -> Result<Vec<UploadJob>, ExecutionError> {
let rows = sqlx::query!(
"SELECT tenant_id, handle, ciphertext, ciphertext128, ciphertext128_format, transaction_id
"SELECT handle, ciphertext, ciphertext128, ciphertext128_format, transaction_id, host_chain_id, key_id
FROM ciphertext_digest
WHERE ciphertext IS NULL OR ciphertext128 IS NULL
FOR UPDATE SKIP LOCKED
Expand All @@ -423,8 +420,7 @@ async fn fetch_pending_uploads(
// Fetch missing ciphertext
if ciphertext_digest.is_none() {
if let Ok(row) = sqlx::query!(
"SELECT ciphertext FROM ciphertexts WHERE tenant_id = $1 AND handle = $2;",
row.tenant_id,
"SELECT ciphertext FROM ciphertexts WHERE handle = $1;",
handle
)
.fetch_optional(db_pool)
Expand All @@ -441,8 +437,7 @@ async fn fetch_pending_uploads(
// Fetch missing ciphertext128
if ciphertext128_digest.is_none() {
if let Ok(row) = sqlx::query!(
"SELECT ciphertext FROM ciphertexts128 WHERE tenant_id = $1 AND handle = $2;",
row.tenant_id,
"SELECT ciphertext FROM ciphertexts128 WHERE handle = $1;",
handle
)
.fetch_optional(db_pool)
Expand Down Expand Up @@ -484,7 +479,8 @@ async fn fetch_pending_uploads(

if !ct64_compressed.is_empty() || !is_ct128_empty {
let item = HandleItem {
tenant_id: row.tenant_id,
host_chain_id: row.host_chain_id,
key_id: row.key_id,
handle: handle.clone(),
ct64_compressed,
ct128: Arc::new(ct128),
Expand Down
1 change: 0 additions & 1 deletion coprocessor/fhevm-engine/sns-worker/src/bin/sns_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ fn construct_config() -> Config {
let db_url = args.database_url.clone().unwrap_or_default();

Config {
tenant_api_key: args.tenant_api_key,
service_name: args.service_name,
metrics: SNSMetricsConfig {
addr: args.metrics_addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ use tracing::Level;
#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
/// Tenant API key
#[arg(long)]
pub tenant_api_key: String,

Comment thread
dartdart26 marked this conversation as resolved.
/// Work items batch size
#[arg(long, default_value_t = 4)]
pub work_items_batch_size: u32,
Expand Down
75 changes: 35 additions & 40 deletions coprocessor/fhevm-engine/sns-worker/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::aws_upload::check_is_ready;
use crate::keyset::fetch_keyset;
use crate::keyset::fetch_latest_keyset;
use crate::metrics::SNS_LATENCY_OP_HISTOGRAM;
use crate::metrics::TASK_EXECUTE_FAILURE_COUNTER;
use crate::metrics::TASK_EXECUTE_SUCCESS_COUNTER;
Expand All @@ -13,6 +13,7 @@ use crate::SchedulePolicy;
use crate::UploadJob;
use crate::{Config, ExecutionError};
use aws_sdk_s3::Client;
use fhevm_engine_common::db_keys::DbKeyId;
use fhevm_engine_common::healthz_server::{HealthCheckService, HealthStatus, Version};
use fhevm_engine_common::pg_pool::PostgresPoolManager;
use fhevm_engine_common::pg_pool::ServiceError;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl SwitchNSquashService {
}

pub async fn run(&self, pool_mngr: &PostgresPoolManager) {
let keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>> = Arc::new(RwLock::new(
let keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>> = Arc::new(RwLock::new(
lru::LruCache::new(NonZeroUsize::new(10).unwrap()),
));

Expand Down Expand Up @@ -174,19 +175,10 @@ impl SwitchNSquashService {

async fn get_keyset(
pool: PgPool,
keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>>,
tenant_api_key: &String,
) -> Result<Option<KeySet>, ExecutionError> {
keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>>,
) -> Result<Option<(DbKeyId, KeySet)>, ExecutionError> {
let _t = telemetry::tracer("fetch_keyset", &None);
{
let mut cache = keys_cache.write().await;
if let Some(keys) = cache.get(tenant_api_key) {
info!(tenant_api_key = tenant_api_key, "Keyset found in cache");
return Ok(Some(keys.clone()));
}
}
let keys: Option<KeySet> = fetch_keyset(&keys_cache, &pool, tenant_api_key).await?;
Ok(keys)
fetch_latest_keyset(&keys_cache, &pool).await
}

/// Executes the worker logic for the SnS task.
Expand All @@ -196,20 +188,19 @@ pub(crate) async fn run_loop(
pool: PgPool,
token: CancellationToken,
last_active_at: Arc<RwLock<SystemTime>>,
keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>>,
keys_cache: Arc<RwLock<lru::LruCache<DbKeyId, KeySet>>>,
events_tx: InternalEvents,
) -> Result<(), ExecutionError> {
update_last_active(last_active_at.clone()).await;

let tenant_api_key = &conf.tenant_api_key;
let mut listener = PgListener::connect_with(&pool).await?;
info!("Connected to PostgresDB");

listener
.listen_all(conf.db.listen_channels.iter().map(|v| v.as_str()))
.await?;

let mut keys = None;
let mut keys: Option<(DbKeyId, KeySet)> = None;
let mut gc_ticker = interval(conf.db.cleanup_interval);
let mut gc_timestamp = SystemTime::now();
let mut polling_ticker = interval(Duration::from_secs(conf.db.polling_interval.into()));
Expand All @@ -218,27 +209,31 @@ pub(crate) async fn run_loop(
// Continue looping until the service is cancelled or a critical error occurs
update_last_active(last_active_at.clone()).await;

let Some(keys) = keys.as_ref() else {
keys = get_keyset(pool.clone(), keys_cache.clone(), tenant_api_key).await?;
if keys.is_some() {
info!(tenant_api_key = tenant_api_key, "Fetched keyset");
let latest_keys = get_keyset(pool.clone(), keys_cache.clone()).await?;
if let Some((key_id, keyset)) = latest_keys {
let key_changed = keys
.as_ref()
.map(|(current_key_id, _)| current_key_id != &key_id)
.unwrap_or(true);
if key_changed {
info!(key_id = hex::encode(&key_id), "Fetched keyset");
// Notify that the keys are loaded
if let Some(events_tx) = &events_tx {
let _ = events_tx.try_send("event_keys_loaded");
}
} else {
warn!(
tenant_api_key = tenant_api_key,
"No keys available, retrying in 5 seconds"
);
tokio::time::sleep(Duration::from_secs(5)).await;
}

keys = Some((key_id, keyset));
} else {
warn!("No keys available, retrying in 5 seconds");
tokio::time::sleep(Duration::from_secs(5)).await;
if token.is_cancelled() {
return Ok(());
}
continue;
};
}

// keys is guaranteed by the branch above; panic here if that invariant ever regresses.
let (_, keys) = keys.as_ref().expect("keyset should be available");

let (maybe_remaining, _tasks_processed) =
fetch_and_execute_sns_tasks(&pool, &tx, keys, &conf, &token)
Expand Down Expand Up @@ -316,20 +311,18 @@ pub async fn garbage_collect(pool: &PgPool, limit: u32) -> Result<(), ExecutionE
let rows_affected: u64 = sqlx::query!(
"
WITH uploaded_ct128 AS (
SELECT c.tenant_id, c.handle
SELECT c.handle
FROM ciphertexts128 c
JOIN ciphertext_digest d
ON d.tenant_id = c.tenant_id
AND d.handle = c.handle
ON d.handle = c.handle
WHERE d.ciphertext128 IS NOT NULL
FOR UPDATE OF c SKIP LOCKED
LIMIT $1
)

DELETE FROM ciphertexts128 c
USING uploaded_ct128 r
WHERE c.tenant_id = r.tenant_id
AND c.handle = r.handle;
WHERE c.handle = r.handle;
",
limit as i32
)
Expand Down Expand Up @@ -375,7 +368,7 @@ async fn fetch_and_execute_sns_tasks(

let mut maybe_remaining = false;
let tasks_processed;
if let Some(mut tasks) = query_sns_tasks(trx, conf.db.batch_limit, order).await? {
if let Some(mut tasks) = query_sns_tasks(trx, conf.db.batch_limit, order, &keys.key_id).await? {
maybe_remaining = conf.db.batch_limit as usize == tasks.len();
tasks_processed = tasks.len();

Expand Down Expand Up @@ -423,6 +416,7 @@ pub async fn query_sns_tasks(
db_txn: &mut Transaction<'_, Postgres>,
limit: u32,
order: Order,
key_id: &DbKeyId,
) -> Result<Option<Vec<HandleItem>>, ExecutionError> {
let start_time = SystemTime::now();

Expand Down Expand Up @@ -460,13 +454,16 @@ pub async fn query_sns_tasks(
let tasks = records
.into_iter()
.map(|record| {
let tenant_id: i32 = record.try_get("tenant_id")?;
let host_chain_id: i64 = record.try_get("host_chain_id")?;
let handle: Vec<u8> = record.try_get("handle")?;
let ciphertext: Vec<u8> = record.try_get("ciphertext")?;
let transaction_id: Option<Vec<u8>> = record.try_get("transaction_id")?;

Ok(HandleItem {
tenant_id,
// TODO: During key rotation, ensure all coprocessors pin the same key_id for a batch
// (e.g., via gateway coordination) to keep ciphertext_digest consistent.
key_id: key_id.clone(),
host_chain_id,
handle: handle.clone(),
ct64_compressed: Arc::new(ciphertext),
ct128: Arc::new(BigCiphertext::default()), // to be computed
Expand Down Expand Up @@ -644,12 +641,10 @@ async fn update_ciphertext128(
let res = sqlx::query!(
"
INSERT INTO ciphertexts128 (
tenant_id,
handle,
ciphertext
)
VALUES ($1, $2, $3)",
task.tenant_id,
VALUES ($1, $2)",
task.handle,
ciphertext128,
)
Expand Down
Loading
Loading