@@ -49,7 +49,7 @@ pub struct Replicator {
4949 /// Last frame which has been confirmed as stored locally outside of WAL file.
5050 /// Always: [last_committed_frame_no] <= [last_sent_frame_no].
5151 last_committed_frame_no : Receiver < Result < u32 > > ,
52- flush_trigger : Sender < ( ) > ,
52+ flush_trigger : Option < Sender < ( ) > > ,
5353 snapshot_waiter : Receiver < Result < Option < Uuid > > > ,
5454 snapshot_notifier : Arc < Sender < Result < Option < Uuid > > > > ,
5555
@@ -65,7 +65,7 @@ pub struct Replicator {
6565 use_compression : CompressionKind ,
6666 max_frames_per_batch : usize ,
6767 s3_upload_max_parallelism : usize ,
68- _join_set : JoinSet < ( ) > ,
68+ join_set : JoinSet < ( ) > ,
6969}
7070
7171#[ derive( Debug ) ]
@@ -262,7 +262,7 @@ impl Replicator {
262262 let next_frame_no = Arc :: new ( AtomicU32 :: new ( 1 ) ) ;
263263 let last_sent_frame_no = Arc :: new ( AtomicU32 :: new ( 0 ) ) ;
264264
265- let mut _join_set = JoinSet :: new ( ) ;
265+ let mut join_set = JoinSet :: new ( ) ;
266266
267267 let ( frames_outbox, mut frames_inbox) = tokio:: sync:: mpsc:: channel ( 64 ) ;
268268 let _local_backup = {
@@ -278,7 +278,7 @@ impl Replicator {
278278 let next_frame_no = next_frame_no. clone ( ) ;
279279 let last_sent_frame_no = last_sent_frame_no. clone ( ) ;
280280 let batch_interval = options. max_batch_interval ;
281- _join_set . spawn ( async move {
281+ join_set . spawn ( async move {
282282 loop {
283283 let timeout = Instant :: now ( ) + batch_interval;
284284 let trigger = match timeout_at ( timeout, flush_trigger_rx. changed ( ) ) . await {
@@ -313,7 +313,7 @@ impl Replicator {
313313 let client = client. clone ( ) ;
314314 let bucket = options. bucket_name . clone ( ) ;
315315 let max_parallelism = options. s3_upload_max_parallelism ;
316- _join_set . spawn ( async move {
316+ join_set . spawn ( async move {
317317 let sem = Arc :: new ( tokio:: sync:: Semaphore :: new ( max_parallelism) ) ;
318318 let mut join_set = JoinSet :: new ( ) ;
319319 while let Some ( fdesc) = frames_inbox. recv ( ) . await {
@@ -353,7 +353,7 @@ impl Replicator {
353353 generation,
354354 next_frame_no,
355355 last_sent_frame_no,
356- flush_trigger,
356+ flush_trigger : Some ( flush_trigger ) ,
357357 last_committed_frame_no,
358358 verify_crc : options. verify_crc ,
359359 db_path,
@@ -365,10 +365,22 @@ impl Replicator {
365365 use_compression : options. use_compression ,
366366 max_frames_per_batch : options. max_frames_per_batch ,
367367 s3_upload_max_parallelism : options. s3_upload_max_parallelism ,
368- _join_set ,
368+ join_set ,
369369 } )
370370 }
371371
372+ pub async fn shutdown_gracefully ( & mut self ) -> Result < ( ) > {
373+ let last_frame_no = self . last_known_frame ( ) ;
374+ // drop flush trigger, which will cause background task for local WAL copier to complete
375+ self . flush_trigger . take ( ) ;
376+ self . wait_until_committed ( last_frame_no) . await ?;
377+ self . wait_until_snapshotted ( ) . await ?;
378+ while let Some ( t) = self . join_set . join_next ( ) . await {
379+ t?;
380+ }
381+ Ok ( ( ) )
382+ }
383+
372384 pub fn next_frame_no ( & self ) -> u32 {
373385 self . next_frame_no . load ( Ordering :: Acquire )
374386 }
@@ -624,8 +636,12 @@ impl Replicator {
624636 }
625637
626638 pub fn request_flush ( & self ) {
627- tracing:: trace!( "Requesting flush" ) ;
628- let _ = self . flush_trigger . send ( ( ) ) ;
639+ if let Some ( tx) = self . flush_trigger . as_ref ( ) {
640+ tracing:: trace!( "Requesting flush" ) ;
641+ let _ = tx. send ( ( ) ) ;
642+ } else {
643+ tracing:: warn!( "Cannot request flush - replicator is closing" ) ;
644+ }
629645 }
630646
631647 // Drops uncommitted frames newer than given last valid frame
0 commit comments