@@ -29,6 +29,11 @@ use tracing::{debug, error, info, warn};
2929// TODO: Use a config TOML to set these values
3030pub const EVENT_CIPHERTEXTS_UPLOADED : & str = "event_ciphertexts_uploaded" ;
3131
32+ // Default batch size for fetching pending uploads
33+ // There might be pending uploads in the database
34+ // with sizes of 32MiB so the batch size is set to 10
35+ const DEFAULT_BATCH_SIZE : usize = 10 ;
36+
3237/// Process the S3 uploads
3338pub ( crate ) async fn process_s3_uploads (
3439 conf : & Config ,
@@ -441,11 +446,17 @@ async fn do_resubmits_loop(
441446 is_ready : Arc < AtomicBool > ,
442447) -> Result < ( ) , ExecutionError > {
443448 // Retry to resubmit all upload tasks at the start-up
444- try_resubmit ( & pool, is_ready. clone ( ) , tasks. clone ( ) , token. clone ( ) )
445- . await
446- . unwrap_or_else ( |err| {
447- error ! ( "Failed to resubmit tasks: {}" , err) ;
448- } ) ;
449+ try_resubmit (
450+ & pool,
451+ is_ready. clone ( ) ,
452+ tasks. clone ( ) ,
453+ token. clone ( ) ,
454+ DEFAULT_BATCH_SIZE ,
455+ )
456+ . await
457+ . unwrap_or_else ( |err| {
458+ error ! ( "Failed to resubmit tasks: {}" , err) ;
459+ } ) ;
449460
450461 let retry_conf = & conf. s3 . retry_policy ;
451462
@@ -462,7 +473,7 @@ async fn do_resubmits_loop(
462473 if is_ready_res {
463474 info!( "Reconnected to S3, buckets exist" ) ;
464475 is_ready. store( true , Ordering :: Release ) ;
465- try_resubmit( & pool, is_ready. clone( ) , tasks. clone( ) , token. clone( ) ) . await
476+ try_resubmit( & pool, is_ready. clone( ) , tasks. clone( ) , token. clone( ) , DEFAULT_BATCH_SIZE ) . await
466477 . unwrap_or_else( |err| {
467478 error!( "Failed to resubmit tasks: {}" , err) ;
468479 } ) ;
@@ -472,7 +483,7 @@ async fn do_resubmits_loop(
472483
473484 // A regular resubmit to ensure there no left over tasks
474485 _ = tokio:: time:: sleep( retry_conf. regular_recheck_duration) => {
475- try_resubmit( & pool, is_ready. clone( ) , tasks. clone( ) , token. clone( ) ) . await
486+ try_resubmit( & pool, is_ready. clone( ) , tasks. clone( ) , token. clone( ) , DEFAULT_BATCH_SIZE ) . await
476487 . unwrap_or_else( |err| {
477488 error!( "Failed to resubmit tasks: {}" , err) ;
478489 } ) ;
@@ -481,41 +492,60 @@ async fn do_resubmits_loop(
481492 }
482493}
483494
495+ /// Attempts to resubmit all pending uploads from the database.
496+ ///
497+ /// If the S3 setup is not ready, it will skip resubmitting.
498+ ///
499+ /// This function will keep fetching pending uploads in batches until there are no more
484500async fn try_resubmit (
485501 pool : & PgPool ,
486502 is_ready : Arc < AtomicBool > ,
487503 tasks : mpsc:: Sender < UploadJob > ,
488504 token : CancellationToken ,
505+ batch_size : usize ,
489506) -> Result < ( ) , ExecutionError > {
490- if !is_ready. load ( Ordering :: SeqCst ) {
491- info ! ( "S3 setup is not ready, skipping resubmit" ) ;
492- return Ok ( ( ) ) ;
493- }
507+ loop {
508+ if !is_ready. load ( Ordering :: SeqCst ) {
509+ info ! ( "S3 setup is not ready, skipping resubmit" ) ;
510+ return Ok ( ( ) ) ;
511+ }
494512
495- match fetch_pending_uploads ( pool, 10 ) . await {
496- // TODO: const, token
497- Ok ( jobs) => {
498- info ! (
499- target: "worker" ,
500- action = "retry_s3_uploads" ,
501- "Fetched {} pending uploads from the database" ,
502- jobs. len( )
503- ) ;
504- // Resubmit for uploading ciphertexts
505- for task in jobs {
506- select ! {
507- _ = tasks. send( task. clone( ) ) => {
508- // TODO: debug!("Task sent to upload worker, handle: {}", hex::encode(&task.handle));
509- } ,
510- _ = token. cancelled( ) => {
511- return Ok ( ( ) )
513+ match fetch_pending_uploads ( pool, batch_size as i64 ) . await {
514+ Ok ( jobs) => {
515+ info ! (
516+ target: "worker" ,
517+ action = "retry_s3_uploads" ,
518+ "Fetched {} pending uploads from the database" ,
519+ jobs. len( )
520+ ) ;
521+ let jobs_count = jobs. len ( ) ;
522+ // Resubmit for uploading ciphertexts
523+ for task in jobs {
524+ select ! {
525+ _ = tasks. send( task. clone( ) ) => {
526+ info!( "resubmitted, handle: {}" , compact_hex( task. handle( ) ) ) ;
527+ } ,
528+ _ = token. cancelled( ) => {
529+ return Ok ( ( ) ) ;
530+ }
512531 }
513532 }
533+
534+ if jobs_count < batch_size {
535+ info ! (
536+ target: "worker" ,
537+ action = "retry_s3_uploads" ,
538+ "No (more) pending uploads to resubmit"
539+ ) ;
540+ return Ok ( ( ) ) ;
541+ }
542+ }
543+ Err ( err) => {
544+ error ! ( "Failed to fetch pending uploads: {}" , err) ;
545+ return Err ( err) ;
514546 }
515547 }
516- Err ( err) => error ! ( "Failed to fetch pending uploads: {}" , err) ,
517- } ;
518- Ok ( ( ) )
548+ }
519549}
520550
521551/// Configure and create the S3 client.
0 commit comments