Skip to content

Commit 457454c

Browse files
committed
feat(coprocessor): sns-worker, accept missing keys
1 parent 0091e8b commit 457454c

2 files changed

Lines changed: 44 additions & 14 deletions

File tree

coprocessor/fhevm-engine/sns-worker/src/executor.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,18 @@ impl SwitchNSquashService {
154154
}
155155
}
156156

157+
async fn get_keyset(pool: PgPool,
158+
keys_cache: Arc<RwLock<lru::LruCache<String, KeySet>>>,
159+
tenant_api_key: &String
160+
) -> Result<Option<KeySet>, ExecutionError> {
161+
let t = telemetry::tracer("worker_loop_init");
162+
let s = t.child_span("fetch_keyset");
163+
let keys: Option<KeySet> = fetch_keyset(&keys_cache, &pool, tenant_api_key).await?;
164+
telemetry::end_span(s);
165+
t.end();
166+
Ok(keys)
167+
}
168+
157169
/// Executes the worker logic for the SnS task.
158170
pub(crate) async fn run_loop(
159171
conf: Config,
@@ -173,13 +185,16 @@ pub(crate) async fn run_loop(
173185
.listen_all(conf.db.listen_channels.iter().map(|v| v.as_str()))
174186
.await?;
175187

176-
let t = telemetry::tracer("worker_loop_init");
177-
let s = t.child_span("fetch_keyset");
178-
let keys: KeySet = fetch_keyset(&keys_cache, &pool, tenant_api_key).await?;
179-
telemetry::end_span(s);
180-
t.end();
181-
182-
info!(tenant_api_key = tenant_api_key, "Fetched keyset");
188+
let mut keys = match get_keyset(pool.clone(), keys_cache.clone(), tenant_api_key).await? {
189+
Some(keys) => {
190+
info!(tenant_api_key = tenant_api_key, "Fetched keyset");
191+
Some(keys)
192+
}
193+
None => {
194+
warn!(tenant_api_key = tenant_api_key, "No keys found for the given tenant_api_key");
195+
None
196+
}
197+
};
183198

184199
let mut gc_ticker = interval(conf.db.cleanup_interval);
185200
let mut gc_timestamp = SystemTime::now();
@@ -189,6 +204,16 @@ pub(crate) async fn run_loop(
189204
// Continue looping until the service is cancelled or a critical error occurs
190205
update_last_active(last_active_at.clone()).await;
191206

207+
let Some(keys) = keys.as_ref() else {
208+
warn!(tenant_api_key = tenant_api_key, "No keys available, retrying in 5 seconds");
209+
tokio::time::sleep(Duration::from_secs(5)).await;
210+
if token.is_cancelled() {
211+
return Ok(());
212+
}
213+
keys = get_keyset(pool.clone(), keys_cache.clone(), tenant_api_key).await?;
214+
continue;
215+
};
216+
192217
let maybe_remaining = fetch_and_execute_sns_tasks(&pool, &tx, &keys, &conf, &token).await?;
193218
if maybe_remaining {
194219
if token.is_cancelled() {

coprocessor/fhevm-engine/sns-worker/src/keyset.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,24 @@ pub(crate) async fn fetch_keyset(
1616
cache: &Arc<RwLock<lru::LruCache<String, KeySet>>>,
1717
pool: &PgPool,
1818
tenant_api_key: &String,
19-
) -> Result<KeySet, ExecutionError> {
19+
) -> Result<Option<KeySet>, ExecutionError> {
2020
let mut cache = cache.write().await;
2121
if let Some(keys) = cache.get(tenant_api_key) {
2222
info!(tenant_api_key, "Cache hit");
23-
return Ok(keys.clone());
23+
return Ok(Some(keys.clone()));
2424
}
2525

2626
info!(tenant_api_key, "Cache miss");
27-
let (client_key, server_key) = fetch_keys(pool, tenant_api_key).await?;
27+
let Some((client_key, server_key)) = fetch_keys(pool, tenant_api_key).await? else {
28+
return Ok(None);
29+
};
2830
let key_set: KeySet = KeySet {
2931
client_key,
3032
server_key,
3133
};
3234

3335
cache.push(tenant_api_key.clone(), key_set.clone());
34-
Ok(key_set)
36+
Ok(Some(key_set))
3537
}
3638

3739
/// Retrieve both the ClientKey and ServerKey from the tenants table
@@ -44,7 +46,7 @@ pub(crate) async fn fetch_keyset(
4446
pub async fn fetch_keys(
4547
pool: &PgPool,
4648
tenant_api_key: &String,
47-
) -> anyhow::Result<(Option<tfhe::ClientKey>, tfhe::ServerKey)> {
49+
) -> anyhow::Result<Option<(Option<tfhe::ClientKey>, tfhe::ServerKey)>> {
4850
let blob = read_keys_from_large_object(
4951
pool,
5052
tenant_api_key,
@@ -53,6 +55,9 @@ pub async fn fetch_keys(
5355
)
5456
.await?;
5557
info!(bytes_len = blob.len(), "Retrieved sns_pk");
58+
if blob.is_empty() {
59+
return Ok(None);
60+
}
5661

5762
let server_key: tfhe::ServerKey = safe_deserialize_sns_key(&blob)?;
5863

@@ -70,9 +75,9 @@ pub async fn fetch_keys(
7075
if !cks.is_empty() {
7176
info!(bytes_len = cks.len(), "Retrieved cks");
7277
let client_key: tfhe::ClientKey = safe_deserialize_sns_key(&cks)?;
73-
return Ok((Some(client_key), server_key));
78+
return Ok(Some((Some(client_key), server_key)));
7479
}
7580
}
7681

77-
Ok((None, server_key))
82+
Ok(Some((None, server_key)))
7883
}

0 commit comments

Comments
 (0)