@@ -154,6 +154,19 @@ impl SwitchNSquashService {
154154 }
155155}
156156
157+ async fn get_keyset (
158+ pool : PgPool ,
159+ keys_cache : Arc < RwLock < lru:: LruCache < String , KeySet > > > ,
160+ tenant_api_key : & String ,
161+ ) -> Result < Option < KeySet > , ExecutionError > {
162+ let t = telemetry:: tracer ( "worker_loop_init" ) ;
163+ let s = t. child_span ( "fetch_keyset" ) ;
164+ let keys: Option < KeySet > = fetch_keyset ( & keys_cache, & pool, tenant_api_key) . await ?;
165+ telemetry:: end_span ( s) ;
166+ t. end ( ) ;
167+ Ok ( keys)
168+ }
169+
157170/// Executes the worker logic for the SnS task.
158171pub ( crate ) async fn run_loop (
159172 conf : Config ,
@@ -173,13 +186,19 @@ pub(crate) async fn run_loop(
173186 . listen_all ( conf. db . listen_channels . iter ( ) . map ( |v| v. as_str ( ) ) )
174187 . await ?;
175188
176- let t = telemetry:: tracer ( "worker_loop_init" ) ;
177- let s = t. child_span ( "fetch_keyset" ) ;
178- let keys: KeySet = fetch_keyset ( & keys_cache, & pool, tenant_api_key) . await ?;
179- telemetry:: end_span ( s) ;
180- t. end ( ) ;
181-
182- info ! ( tenant_api_key = tenant_api_key, "Fetched keyset" ) ;
189+ let mut keys = match get_keyset ( pool. clone ( ) , keys_cache. clone ( ) , tenant_api_key) . await ? {
190+ Some ( keys) => {
191+ info ! ( tenant_api_key = tenant_api_key, "Fetched keyset" ) ;
192+ Some ( keys)
193+ }
194+ None => {
195+ warn ! (
196+ tenant_api_key = tenant_api_key,
197+ "No keys found for the given tenant_api_key"
198+ ) ;
199+ None
200+ }
201+ } ;
183202
184203 let mut gc_ticker = interval ( conf. db . cleanup_interval ) ;
185204 let mut gc_timestamp = SystemTime :: now ( ) ;
@@ -189,7 +208,20 @@ pub(crate) async fn run_loop(
189208 // Continue looping until the service is cancelled or a critical error occurs
190209 update_last_active ( last_active_at. clone ( ) ) . await ;
191210
192- let maybe_remaining = fetch_and_execute_sns_tasks ( & pool, & tx, & keys, & conf, & token) . await ?;
211+ let Some ( keys) = keys. as_ref ( ) else {
212+ warn ! (
213+ tenant_api_key = tenant_api_key,
214+ "No keys available, retrying in 5 seconds"
215+ ) ;
216+ tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
217+ if token. is_cancelled ( ) {
218+ return Ok ( ( ) ) ;
219+ }
220+ keys = get_keyset ( pool. clone ( ) , keys_cache. clone ( ) , tenant_api_key) . await ?;
221+ continue ;
222+ } ;
223+
224+ let maybe_remaining = fetch_and_execute_sns_tasks ( & pool, & tx, keys, & conf, & token) . await ?;
193225 if maybe_remaining {
194226 if token. is_cancelled ( ) {
195227 return Ok ( ( ) ) ;
0 commit comments