@@ -23,6 +23,7 @@ use std::time::{Duration, SystemTime};
2323use tokio:: select;
2424use tokio:: sync:: { mpsc, Semaphore } ;
2525use tokio:: task:: JoinHandle ;
26+ use tokio:: time:: interval;
2627use tokio_util:: sync:: CancellationToken ;
2728use tracing:: { debug, error, info, warn} ;
2829
@@ -464,13 +465,17 @@ async fn do_resubmits_loop(
464465
465466 let retry_conf = & conf. s3 . retry_policy ;
466467
468+ let mut recheck_ticker = interval ( retry_conf. recheck_duration ) ;
469+ let mut resubmit_ticker = interval ( retry_conf. regular_recheck_duration ) ;
470+
471+
467472 loop {
468473 select ! {
469474 _ = token. cancelled( ) => {
470475 return Ok ( ( ) )
471476 } ,
472477 // Recheck S3 ready status
473- _ = tokio :: time :: sleep ( retry_conf . recheck_duration ) => {
478+ _ = recheck_ticker . tick ( ) => {
474479 if !is_ready. load( Ordering :: Acquire ) {
475480 info!( "Recheck S3 setup ..." ) ;
476481 let ( is_ready_res, _) = check_is_ready( & client, conf) . await ;
@@ -484,13 +489,13 @@ async fn do_resubmits_loop(
484489 }
485490 }
486491 }
487-
488- // A regular resubmit to ensure there no left over tasks
489- _ = tokio :: time :: sleep ( retry_conf . regular_recheck_duration ) => {
492+ // A regular resubmit to ensure there no remaining tasks
493+ _ = resubmit_ticker . tick ( ) => {
494+ info! ( "Retry resubmit ..." ) ;
490495 try_resubmit( & pool, is_ready. clone( ) , tasks. clone( ) , token. clone( ) , DEFAULT_BATCH_SIZE ) . await
491496 . unwrap_or_else( |err| {
492497 error!( "Failed to resubmit tasks: {}" , err) ;
493- } ) ;
498+ } ) ;
494499 }
495500 }
496501 }
0 commit comments