@@ -323,7 +323,7 @@ def create_backup_site_paths(self, site: str) -> BackupSitePaths:
323
323
324
324
return backup_site_paths
325
325
326
- def delete_remote_wal_before (self , wal_segment , site , pg_version ):
326
+ def delete_remote_wal_before (self , wal_segment , site , site_prefix , pg_version ):
327
327
self .log .info ("Starting WAL deletion from: %r before: %r, pg_version: %r" , site , wal_segment , pg_version )
328
328
storage = self .site_transfers .get (site )
329
329
valid_timeline = True
@@ -334,7 +334,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version):
334
334
lsn = lsn .previous_walfile_start_lsn
335
335
if lsn is None :
336
336
break
337
- wal_path = os .path .join (self . _get_site_prefix ( site ) , "xlog" , lsn .walfile_name )
337
+ wal_path = os .path .join (site_prefix , "xlog" , lsn .walfile_name )
338
338
self .log .debug ("Deleting wal_file: %r" , wal_path )
339
339
try :
340
340
storage .delete_key (wal_path )
@@ -360,8 +360,8 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version):
360
360
def _get_delta_basebackup_files (self , site , storage , metadata , basebackup_name_to_delete , backups_to_keep ) -> List :
361
361
delta_formats = (BaseBackupFormat .delta_v1 , BaseBackupFormat .delta_v2 )
362
362
assert metadata ["format" ] in delta_formats
363
- all_hexdigests = set ()
364
- keep_hexdigests = set ()
363
+ all_hexdigests : Dict [ str , str ] = {}
364
+ keep_hexdigests : Dict [ str , str ] = {}
365
365
366
366
basebackup_data_files = list ()
367
367
delta_backup_names = {
@@ -372,7 +372,7 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t
372
372
delta_backup_names [basebackup_name_to_delete ] = metadata
373
373
374
374
for backup_name , backup_metadata in delta_backup_names .items ():
375
- delta_backup_key = os .path .join (self . _get_site_prefix ( site ) , "basebackup" , backup_name )
375
+ delta_backup_key = os .path .join (backup_metadata [ " site-prefix" ] , "basebackup" , backup_name )
376
376
meta , _ = download_backup_meta_file (
377
377
storage = storage ,
378
378
basebackup_path = delta_backup_key ,
@@ -385,18 +385,22 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t
385
385
backup_state = snapshot_result ["state" ]
386
386
files = backup_state ["files" ]
387
387
388
- backup_hexdigests = set (delta_file ["hexdigest" ] for delta_file in files if delta_file ["hexdigest" ])
389
- all_hexdigests |= backup_hexdigests
388
+ backup_hexdigests = {
389
+ delta_file ["hexdigest" ]: backup_metadata ["site-prefix" ]
390
+ for delta_file in files
391
+ if delta_file ["hexdigest" ]
392
+ }
393
+ all_hexdigests .update (backup_hexdigests )
390
394
391
395
if backup_name != basebackup_name_to_delete :
392
396
# Keep data file in case if there is still a reference from other backups
393
- keep_hexdigests |= backup_hexdigests
397
+ keep_hexdigests . update ( backup_hexdigests )
394
398
else :
395
399
# Add bundles to remove
396
400
for chunk in meta .get ("chunks" , []):
397
401
basebackup_data_files .append (
398
402
os .path .join (
399
- self . _get_site_prefix ( site ) ,
403
+ backup_metadata [ " site-prefix" ] ,
400
404
FileTypePrefixes [FileType .Basebackup_delta_chunk ],
401
405
chunk ["chunk_filename" ],
402
406
)
@@ -405,14 +409,14 @@ def _get_delta_basebackup_files(self, site, storage, metadata, basebackup_name_t
405
409
# Remove unreferenced files
406
410
extra_hexdigests = set (all_hexdigests ).difference (keep_hexdigests )
407
411
for hexdigest in extra_hexdigests :
408
- basebackup_data_files .append (os .path .join (self . _get_site_prefix ( site ) , "basebackup_delta" , hexdigest ))
412
+ basebackup_data_files .append (os .path .join (all_hexdigests [ hexdigest ] , "basebackup_delta" , hexdigest ))
409
413
410
414
return basebackup_data_files
411
415
412
416
def delete_remote_basebackup (self , site , basebackup , metadata , basebackups ):
413
417
start_time = time .monotonic ()
414
418
storage = self .site_transfers .get (site )
415
- main_backup_key = os .path .join (self . _get_site_prefix ( site ) , "basebackup" , basebackup )
419
+ main_backup_key = os .path .join (metadata [ " site-prefix" ] , "basebackup" , basebackup )
416
420
basebackup_data_files = [main_backup_key ]
417
421
418
422
if metadata .get ("format" ) == BaseBackupFormat .v2 :
@@ -427,7 +431,7 @@ def delete_remote_basebackup(self, site, basebackup, metadata, basebackups):
427
431
for chunk in bmeta ["chunks" ]:
428
432
basebackup_data_files .append (
429
433
os .path .join (
430
- self . _get_site_prefix ( site ) ,
434
+ metadata [ " site-prefix" ] ,
431
435
"basebackup_chunk" ,
432
436
chunk ["chunk_filename" ],
433
437
)
@@ -457,14 +461,15 @@ def get_or_create_site_storage(self, site):
457
461
self .site_transfers [site ] = storage
458
462
return storage
459
463
460
- def get_remote_basebackups_info (self , site ):
464
+ def get_remote_basebackups_info (self , site , site_prefix = None ):
461
465
storage = self .get_or_create_site_storage (site = site )
462
466
site_config = self .config ["backup_sites" ][site ]
463
- results = storage .list_path (os .path .join (site_config ["prefix" ], "basebackup" ))
467
+ site_prefix = site_prefix or site_config ["prefix" ]
468
+ results = storage .list_path (os .path .join (site_prefix , "basebackup" ))
464
469
for entry in results :
465
470
self .patch_basebackup_info (entry = entry , site_config = site_config )
466
471
467
- preservation_requests = storage .list_path (os .path .join (site_config [ "prefix" ] , "preservation_request" ))
472
+ preservation_requests = storage .list_path (os .path .join (site_prefix , "preservation_request" ))
468
473
backups_to_preserve = parse_preservation_requests (preservation_requests )
469
474
for entry in results :
470
475
patch_basebackup_metadata_with_preservation (entry , backups_to_preserve )
@@ -517,7 +522,7 @@ def determine_backups_to_delete(self, *, basebackups, site_config):
517
522
if max_age_days and min_backups > 0 :
518
523
while basebackups and len (basebackups ) > min_backups :
519
524
if is_basebackup_preserved (basebackups [0 ], now ):
520
- self .log .info ("Not deleting more backups because %r still needs to preserved" , basebackups [0 ]["name" ])
525
+ self .log .info ("Not deleting more backups because %r still needs to be preserved" , basebackups [0 ]["name" ])
521
526
break
522
527
# For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when
523
528
# backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old.
@@ -539,31 +544,64 @@ def determine_backups_to_delete(self, *, basebackups, site_config):
539
544
540
545
def refresh_backup_list_and_delete_old (self , site ):
541
546
"""Look up basebackups from the object store, prune any extra
542
- backups and return the datetime of the latest backup."""
543
- basebackups = self .get_remote_basebackups_info (site )
544
- self .log .debug ("Found %r basebackups" , basebackups )
547
+ backups from the current and the extra backup sites and update
548
+ the state with the up-to-date backups list."""
549
+ current_basebackups = self .get_remote_basebackups_info (site )
550
+ current_site_prefix = self ._get_site_prefix (site )
551
+ for basebackup in current_basebackups :
552
+ basebackup ["metadata" ]["site-prefix" ] = current_site_prefix
553
+
554
+ # If `extra_backup_sites_prefixes` is set, let's also check those sites for backups that are due for cleanup.
555
+ extra_basebackups = []
556
+ if self .config .get ("extra_backup_sites_prefixes" ):
557
+ new_extra_backup_sites_prefixes = []
558
+ extra_backup_sites_prefixes = self .state ["extra_backup_sites_prefixes" ]
559
+
560
+ for site_prefix in extra_backup_sites_prefixes :
561
+ extra_site_basebackups = self .get_remote_basebackups_info (site , site_prefix = site_prefix )
562
+ if not extra_site_basebackups :
563
+ continue
564
+ for basebackup in extra_site_basebackups :
565
+ basebackup ["metadata" ]["site-prefix" ] = site_prefix
566
+ extra_basebackups .extend (extra_site_basebackups )
567
+ # We found some basebackups in this site, so let's include it in the next round of checks as well.
568
+ new_extra_backup_sites_prefixes .append (site_prefix )
569
+
570
+ self .state ["extra_backup_sites_prefixes" ] = new_extra_backup_sites_prefixes
571
+
572
+ extra_basebackups .sort (key = lambda entry : entry ["metadata" ]["start-time" ])
573
+ all_basebackups = extra_basebackups + current_basebackups
574
+
575
+ self .log .debug ("Found %r basebackups" , all_basebackups )
545
576
546
577
site_config = self .config ["backup_sites" ][site ]
547
578
# Never delete backups from a recovery site. This check is already elsewhere as well
548
579
# but still check explicitly here to ensure we certainly won't delete anything unexpectedly
549
580
if site_config ["active" ]:
550
- basebackups_to_delete = self .determine_backups_to_delete (basebackups = basebackups , site_config = site_config )
581
+ basebackups_to_delete = self .determine_backups_to_delete (basebackups = all_basebackups , site_config = site_config )
551
582
552
583
for basebackup_to_be_deleted in basebackups_to_delete :
553
- pg_version_str = basebackup_to_be_deleted ["metadata" ].get ("pg-version" )
554
- pg_version = None if pg_version_str is None else int (pg_version_str )
555
- last_wal_segment_still_needed = 0
556
- if basebackups :
557
- last_wal_segment_still_needed = basebackups [0 ]["metadata" ]["start-wal-segment" ]
558
-
559
- if last_wal_segment_still_needed :
560
- # This is breaking concurrent PITR starting from the *previous* backup.
561
- # That's why once a backup is preserved, we keep that backup and all the next ones.
562
- self .delete_remote_wal_before (last_wal_segment_still_needed , site , pg_version )
563
- self .delete_remote_basebackup (
564
- site , basebackup_to_be_deleted ["name" ], basebackup_to_be_deleted ["metadata" ], basebackups = basebackups
565
- )
566
- self .state ["backup_sites" ][site ]["basebackups" ] = basebackups
584
+ metadata = basebackup_to_be_deleted ["metadata" ]
585
+ pg_version = metadata .get ("pg-version" ) and int (metadata .get ("pg-version" ))
586
+ # When we delete a basebackup, let's also delete any WAL segments before its `start-wal-segment`.
587
+ self .delete_remote_wal_before (metadata ["start-wal-segment" ], site , metadata ["site-prefix" ], pg_version )
588
+ self .delete_remote_basebackup (site , basebackup_to_be_deleted ["name" ], metadata , basebackups = all_basebackups )
589
+
590
+ # Delete WAL segments that are before `start-wal-segment` of the oldest still kept basebackup.
591
+ # This oldest kept basebackup could be from the current or from one of the extra sites when set.
592
+ oldest_wal_segment_to_keep = ""
593
+ if all_basebackups :
594
+ metadata = all_basebackups [0 ]["metadata" ]
595
+ site_prefix = metadata ["site-prefix" ]
596
+ oldest_wal_segment_to_keep = metadata ["start-wal-segment" ]
597
+ pg_version = metadata .get ("pg-version" ) and int (metadata .get ("pg-version" ))
598
+
599
+ if oldest_wal_segment_to_keep :
600
+ # This is breaking concurrent PITR starting from the *previous* backup.
601
+ # That's why once a backup is preserved, we keep that backup and all the next ones.
602
+ self .delete_remote_wal_before (oldest_wal_segment_to_keep , site , site_prefix , pg_version )
603
+
604
+ self .state ["backup_sites" ][site ]["basebackups" ] = current_basebackups
567
605
568
606
def get_normalized_backup_time (self , site_config , * , now = None ):
569
607
"""Returns the closest historical backup time that current time matches to (or current time if it matches).
@@ -589,6 +627,8 @@ def get_normalized_backup_time(self, site_config, *, now=None):
589
627
def set_state_defaults (self , site ):
590
628
if site not in self .state ["backup_sites" ]:
591
629
self .state ["backup_sites" ][site ] = {"basebackups" : []}
630
+ if "extra_backup_sites_prefixes" not in self .state :
631
+ self .state ["extra_backup_sites_prefixes" ] = self .config .get ("extra_backup_sites_prefixes" , [])
592
632
593
633
def startup_walk_for_missed_files (self ):
594
634
"""Check xlog and xlog_incoming directories for files that receivexlog has received but not yet
0 commit comments