From de025ab81fa85e3f13f2d398300e961e6654c27d Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Thu, 13 Jun 2024 17:02:13 +0300 Subject: [PATCH 1/2] Always purge all but one incomplete backups --- medusa/purge.py | 21 +++++++++++++++++++-- tests/purge_test.py | 41 ++++++++++++++++++++++++++++++++++++++++- tests/storage_test.py | 7 ++++--- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/medusa/purge.py b/medusa/purge.py index ca15aca5a..36bad89f3 100644 --- a/medusa/purge.py +++ b/medusa/purge.py @@ -38,10 +38,16 @@ def main(config, max_backup_age=0, max_backup_count=0): logging.info('Listing backups for {}'.format(config.storage.fqdn)) backup_index = storage.list_backup_index_blobs() backups = list(storage.list_node_backups(fqdn=config.storage.fqdn, backup_index_blobs=backup_index)) + + # split backups by completion + complete_backups, incomplete_to_purge = backups_to_purge_by_completion(backups) + backups_to_purge |= set(incomplete_to_purge) + # list all backups to purge based on date conditions - backups_to_purge |= set(backups_to_purge_by_age(backups, max_backup_age)) + backups_to_purge |= set(backups_to_purge_by_age(complete_backups, max_backup_age)) # list all backups to purge based on count conditions - backups_to_purge |= set(backups_to_purge_by_count(backups, max_backup_count)) + backups_to_purge |= set(backups_to_purge_by_count(complete_backups, max_backup_count)) + # purge all candidate backups object_counts = purge_backups( storage, backups_to_purge, config.storage.backup_grace_period_in_days, config.storage.fqdn @@ -62,6 +68,17 @@ def main(config, max_backup_age=0, max_backup_count=0): sys.exit(1) +def backups_to_purge_by_completion(backups): + complete, incomplete = [], [] + for backup in backups: + complete.append(backup) if backup.finished is not None else incomplete.append(backup) + + # keep the most recent one because it might be in progress + incomplete_to_purge = set(incomplete[:-1]) if len(incomplete) > 1 else set() + + return complete, incomplete_to_purge + + def backups_to_purge_by_age(backups, max_backup_age): if max_backup_age > 0: max_date = datetime.now() - timedelta(days=max_backup_age) diff --git a/tests/purge_test.py b/tests/purge_test.py index 545dac553..d88bfd26f 100644 --- a/tests/purge_test.py +++ b/tests/purge_test.py @@ -20,7 +20,9 @@ from medusa.config import MedusaConfig, StorageConfig, _namedtuple_from_dict from medusa.storage import Storage -from medusa.purge import backups_to_purge_by_age, backups_to_purge_by_count, backups_to_purge_by_name +from medusa.purge import ( + backups_to_purge_by_age, backups_to_purge_by_count, backups_to_purge_by_name, backups_to_purge_by_completion +) from medusa.purge import filter_differential_backups, filter_files_within_gc_grace from tests.storage_test import make_node_backup, make_cluster_backup, make_blob @@ -91,6 +93,43 @@ def test_purge_backups_by_count(self): obsolete_backups = backups_to_purge_by_count(backups, 40) assert len(obsolete_backups) == 0 + def test_purge_backups_by_completion(self): + backups = list() + + # Build a list of 40 bi-daily backups, making every second backup incomplete + complete = True + now = datetime.now() + for i in range(0, 80, 2): + file_time = now + timedelta(days=(i + 1) - 80) + backups.append(make_node_backup(self.storage, str(i), file_time, differential=True, complete=complete)) + complete = not complete + + self.assertEqual(40, len(backups)) + complete_backup_names = {nb.name for nb in filter(lambda nb: nb.finished is not None, backups)} + self.assertEqual(len(complete_backup_names), 20, "The amount of complete backups is not correct") + + # the base with all 40 backups + complete, incomplete_to_purge = backups_to_purge_by_completion(backups) + self.assertEqual(20, len(complete)) # 1 is kept because it might be in progress + self.assertEqual(19, len(incomplete_to_purge)) # 1 is kept because it might be in progress + + # take all complete backups, but only half of the incomplete ones + test_backups = list() + for i in range(0, 40, 1): + # take each complete backup + if backups[i].finished is not None: + test_backups.append(backups[i]) + continue + # but only first half of the incomplete ones + if i > 20: + continue + test_backups.append(backups[i]) + self.assertEqual(20, len(list(filter(lambda b: b.finished is not None, test_backups)))) + self.assertEqual(10, len(list(filter(lambda b: b.finished is None, test_backups)))) + complete, incomplete_to_purge = backups_to_purge_by_completion(test_backups) + self.assertEqual(20, len(complete)) # 1 is kept because it might be in progress + self.assertEqual(9, len(incomplete_to_purge)) # 1 is kept because it might be in progress + def test_filter_differential_backups(self): backups = list() backups.append(make_node_backup(self.storage, "one", datetime.now(), differential=True)) diff --git a/tests/storage_test.py b/tests/storage_test.py index d130d3d55..f6467d3f2 100644 --- a/tests/storage_test.py +++ b/tests/storage_test.py @@ -455,7 +455,7 @@ def test_get_table_prefix(self): self.assertEqual('prefix/localhost/data/', self.storage._get_table_prefix('prefix', 'localhost')) -def make_node_backup(storage, name, backup_date, differential=False, fqdn="localhost"): +def make_node_backup(storage, name, backup_date, differential=False, fqdn="localhost", complete=True): if differential is True: differential_blob = make_blob("localhost/{}/meta/differential".format(name), backup_date.timestamp()) else: @@ -464,9 +464,10 @@ def make_node_backup(storage, name, backup_date, differential=False, fqdn="local schema_blob = make_blob("localhost/{}/meta/schema.cql".format(name), backup_date.timestamp()) manifest_blob = make_blob("localhost/{}/meta/manifest.json".format(name), backup_date.timestamp()) return NodeBackup(storage=storage, fqdn=fqdn, name=str(name), - differential_blob=differential_blob, manifest_blob=manifest_blob, + differential_blob=differential_blob, manifest_blob=manifest_blob if complete else None, tokenmap_blob=tokenmap_blob, schema_blob=schema_blob, - started_timestamp=backup_date.timestamp(), finished_timestamp=backup_date.timestamp()) + started_timestamp=backup_date.timestamp(), + finished_timestamp=backup_date.timestamp() if complete else None) def make_unfinished_node_backup(storage, name, backup_date, differential=False, fqdn="localhost"): From 82bca49bd58af0e49a8ca22925269aa4574081fb Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Fri, 7 Feb 2025 12:33:21 +0200 Subject: [PATCH 2/2] Purge complete node backups if they belong to incomplete cluster backups --- medusa/purge.py | 34 ++++++++++++++++++++++++---------- tests/purge_test.py | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/medusa/purge.py b/medusa/purge.py index 36bad89f3..e48041510 100644 --- a/medusa/purge.py +++ b/medusa/purge.py @@ -28,29 +28,38 @@ def main(config, max_backup_age=0, max_backup_count=0): - backups_to_purge = set() + node_backups_to_purge = set() monitoring = Monitoring(config=config.monitoring) try: logging.info('Starting purge') with Storage(config=config.storage) as storage: - # Get all backups for the local node logging.info('Listing backups for {}'.format(config.storage.fqdn)) backup_index = storage.list_backup_index_blobs() - backups = list(storage.list_node_backups(fqdn=config.storage.fqdn, backup_index_blobs=backup_index)) - # split backups by completion - complete_backups, incomplete_to_purge = backups_to_purge_by_completion(backups) - backups_to_purge |= set(incomplete_to_purge) + # Get all cluster backups + cluster_backups = storage.list_cluster_backups(backup_index=backup_index) + + # Get all backups for the local node + node_backups = list(storage.list_node_backups(fqdn=config.storage.fqdn, backup_index_blobs=backup_index)) + + # Split node backups by completion. Incomplete ones go straight to the to-be-purged set + complete_node_backups, incomplete_node_backups_to_purge = backups_to_purge_by_completion(node_backups) + node_backups_to_purge |= set(incomplete_node_backups_to_purge) + + # Also add complete node backups that belong to an incomplete cluster backup + node_backups_to_purge |= set( + backups_to_purge_by_cluster_backup_completion(complete_node_backups, cluster_backups) + ) # list all backups to purge based on date conditions - backups_to_purge |= set(backups_to_purge_by_age(complete_backups, max_backup_age)) + node_backups_to_purge |= set(backups_to_purge_by_age(complete_node_backups, max_backup_age)) # list all backups to purge based on count conditions - backups_to_purge |= set(backups_to_purge_by_count(complete_backups, max_backup_count)) + node_backups_to_purge |= set(backups_to_purge_by_count(complete_node_backups, max_backup_count)) # purge all candidate backups object_counts = purge_backups( - storage, backups_to_purge, config.storage.backup_grace_period_in_days, config.storage.fqdn + storage, node_backups_to_purge, config.storage.backup_grace_period_in_days, config.storage.fqdn ) nb_objects_purged, total_purged_size, total_objects_within_grace = object_counts @@ -58,7 +67,7 @@ def main(config, max_backup_age=0, max_backup_count=0): tags = ['medusa-node-backup', 'purge-error', 'PURGE-ERROR'] monitoring.send(tags, 0) - return nb_objects_purged, total_purged_size, total_objects_within_grace, len(backups_to_purge) + return nb_objects_purged, total_purged_size, total_objects_within_grace, len(node_backups_to_purge) except Exception as e: traceback.print_exc() @@ -79,6 +88,11 @@ def backups_to_purge_by_completion(backups): return complete, incomplete_to_purge +def backups_to_purge_by_cluster_backup_completion(complete_node_backups, cluster_backups): + incomplete_cluster_backups_names = {cb.name for cb in cluster_backups if cb.finished is None} + return {cnb for cnb in complete_node_backups if cnb.name in incomplete_cluster_backups_names} + + def backups_to_purge_by_age(backups, max_backup_age): if max_backup_age > 0: max_date = datetime.now() - timedelta(days=max_backup_age) diff --git a/tests/purge_test.py b/tests/purge_test.py index d88bfd26f..22fcf4e0b 100644 --- a/tests/purge_test.py +++ b/tests/purge_test.py @@ -17,11 +17,13 @@ import unittest from datetime import datetime, timedelta +from unittest.mock import patch from medusa.config import MedusaConfig, StorageConfig, _namedtuple_from_dict -from medusa.storage import Storage +from medusa.storage import Storage, ClusterBackup from medusa.purge import ( - backups_to_purge_by_age, backups_to_purge_by_count, backups_to_purge_by_name, backups_to_purge_by_completion + backups_to_purge_by_age, backups_to_purge_by_count, backups_to_purge_by_name, backups_to_purge_by_completion, + backups_to_purge_by_cluster_backup_completion ) from medusa.purge import filter_differential_backups, filter_files_within_gc_grace @@ -188,6 +190,37 @@ def test_purge_backups_by_name(self): # non-existent backup name raises KeyError self.assertRaises(KeyError, backups_to_purge_by_name, self.storage, cluster_backups, ["nonexistent"], False) + # patch a call of cluster_backup.missing_nodes because it'd actually go and read a blob off disk + # it's ok to do this because we are not testing missing nodes; we only test backup completion + # we will indicate cluster backup completion via marking one of its node backups as incomplete + @patch.object(ClusterBackup, 'missing_nodes', lambda _: {}) + def test_purge_backups_by_cluster_backup_completion(self): + t = datetime.now() + complete_node_backups = [ + make_node_backup(self.storage, "backup1", t, differential=True, complete=True, fqdn="node1"), + make_node_backup(self.storage, "backup2", t, differential=True, complete=True, fqdn="node1"), + make_node_backup(self.storage, "backup3", t, differential=True, complete=True, fqdn="node1"), + ] + cluster_backups = [ + ClusterBackup("backup1", [ + make_node_backup(self.storage, "backup1", t, differential=True, complete=True, fqdn="node1"), + make_node_backup(self.storage, "backup1", t, differential=True, complete=True, fqdn="node2"), + ]), + ClusterBackup("backup2", [ + make_node_backup(self.storage, "backup2", t, differential=True, complete=True, fqdn="node1"), + # backup 2 has an unfinished node, this will render it purge-able by cluster backup completion + make_node_backup(self.storage, "backup2", t, differential=True, complete=False, fqdn="node2"), + ]), + ] + # verify the backup2 is not purge-able by looking at node backups alone + complete, incomplete_to_purge = backups_to_purge_by_completion(complete_node_backups) + self.assertEqual(3, len(complete)) + self.assertEqual(0, len(incomplete_to_purge)) + # verify that backup2 becomes eligible for purge if cross-checked with cluster backups + backups_to_purge = backups_to_purge_by_cluster_backup_completion(complete_node_backups, cluster_backups) + self.assertEqual(1, len(backups_to_purge)) + self.assertEqual("backup2", backups_to_purge.pop().name) + if __name__ == '__main__': unittest.main()