Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions medusa/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,46 @@


def main(config, max_backup_age=0, max_backup_count=0):
backups_to_purge = set()
node_backups_to_purge = set()
monitoring = Monitoring(config=config.monitoring)

try:
logging.info('Starting purge')
with Storage(config=config.storage) as storage:
# Get all backups for the local node
logging.info('Listing backups for {}'.format(config.storage.fqdn))
backup_index = storage.list_backup_index_blobs()
backups = list(storage.list_node_backups(fqdn=config.storage.fqdn, backup_index_blobs=backup_index))

# Get all cluster backups
cluster_backups = storage.list_cluster_backups(backup_index=backup_index)

# Get all backups for the local node
node_backups = list(storage.list_node_backups(fqdn=config.storage.fqdn, backup_index_blobs=backup_index))

# Split node backups by completion. Incomplete ones go straight to the to-be-purged set
complete_node_backups, incomplete_node_backups_to_purge = backups_to_purge_by_completion(node_backups)
node_backups_to_purge |= set(incomplete_node_backups_to_purge)

# Also add complete node backups that belong to an incomplete cluster backup
node_backups_to_purge |= set(
backups_to_purge_by_cluster_backup_completion(complete_node_backups, cluster_backups)
)

# list all backups to purge based on date conditions
backups_to_purge |= set(backups_to_purge_by_age(backups, max_backup_age))
node_backups_to_purge |= set(backups_to_purge_by_age(complete_node_backups, max_backup_age))
# list all backups to purge based on count conditions
backups_to_purge |= set(backups_to_purge_by_count(backups, max_backup_count))
node_backups_to_purge |= set(backups_to_purge_by_count(complete_node_backups, max_backup_count))

# purge all candidate backups
object_counts = purge_backups(
storage, backups_to_purge, config.storage.backup_grace_period_in_days, config.storage.fqdn
storage, node_backups_to_purge, config.storage.backup_grace_period_in_days, config.storage.fqdn
)
nb_objects_purged, total_purged_size, total_objects_within_grace = object_counts

logging.debug('Emitting metrics')
tags = ['medusa-node-backup', 'purge-error', 'PURGE-ERROR']
monitoring.send(tags, 0)

return nb_objects_purged, total_purged_size, total_objects_within_grace, len(backups_to_purge)
return nb_objects_purged, total_purged_size, total_objects_within_grace, len(node_backups_to_purge)

except Exception as e:
traceback.print_exc()
Expand All @@ -62,6 +77,22 @@ def main(config, max_backup_age=0, max_backup_count=0):
sys.exit(1)


def backups_to_purge_by_completion(backups):
complete, incomplete = [], []
for backup in backups:
complete.append(backup) if backup.finished is not None else incomplete.append(backup)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: This doesn't work the expected way I think. Here, the completeness of the backup is checked for the local node only (we're dealing with a NodeBackup, not a ClusterBackup).
Hence, any incomplete ClusterBackup would still be considered a complete backup for the purge process.
Remember that the purge is done locally for a node, it's not a global operation and has to run on each node.


# keep the most recent one because it might be in progress
incomplete_to_purge = set(incomplete[:-1]) if len(incomplete) > 1 else set()

return complete, incomplete_to_purge


def backups_to_purge_by_cluster_backup_completion(complete_node_backups, cluster_backups):
incomplete_cluster_backups_names = {cb.name for cb in cluster_backups if cb.finished is None}
return {cnb for cnb in complete_node_backups if cnb.name in incomplete_cluster_backups_names}


def backups_to_purge_by_age(backups, max_backup_age):
if max_backup_age > 0:
max_date = datetime.now() - timedelta(days=max_backup_age)
Expand Down
76 changes: 74 additions & 2 deletions tests/purge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import unittest

from datetime import datetime, timedelta
from unittest.mock import patch

from medusa.config import MedusaConfig, StorageConfig, _namedtuple_from_dict
from medusa.storage import Storage
from medusa.purge import backups_to_purge_by_age, backups_to_purge_by_count, backups_to_purge_by_name
from medusa.storage import Storage, ClusterBackup
from medusa.purge import (
backups_to_purge_by_age, backups_to_purge_by_count, backups_to_purge_by_name, backups_to_purge_by_completion,
backups_to_purge_by_cluster_backup_completion
)
from medusa.purge import filter_differential_backups, filter_files_within_gc_grace

from tests.storage_test import make_node_backup, make_cluster_backup, make_blob
Expand Down Expand Up @@ -91,6 +95,43 @@ def test_purge_backups_by_count(self):
obsolete_backups = backups_to_purge_by_count(backups, 40)
assert len(obsolete_backups) == 0

def test_purge_backups_by_completion(self):
backups = list()

# Build a list of 40 bi-daily backups, making every second backup incomplete
complete = True
now = datetime.now()
for i in range(0, 80, 2):
file_time = now + timedelta(days=(i + 1) - 80)
backups.append(make_node_backup(self.storage, str(i), file_time, differential=True, complete=complete))
complete = not complete

self.assertEqual(40, len(backups))
complete_backup_names = {nb.name for nb in filter(lambda nb: nb.finished is not None, backups)}
self.assertEqual(len(complete_backup_names), 20, "The amount of complete backups is not correct")

# the base with all 40 backups
complete, incomplete_to_purge = backups_to_purge_by_completion(backups)
self.assertEqual(20, len(complete)) # 1 is kept because it might be in progress
self.assertEqual(19, len(incomplete_to_purge)) # 1 is kept because it might be in progress

# take all complete backups, but only half of the incomplete ones
test_backups = list()
for i in range(0, 40, 1):
# take each complete backup
if backups[i].finished is not None:
test_backups.append(backups[i])
continue
# but only first half of the incomplete ones
if i > 20:
continue
test_backups.append(backups[i])
self.assertEqual(20, len(list(filter(lambda b: b.finished is not None, test_backups))))
self.assertEqual(10, len(list(filter(lambda b: b.finished is None, test_backups))))
complete, incomplete_to_purge = backups_to_purge_by_completion(test_backups)
self.assertEqual(20, len(complete)) # 1 is kept because it might be in progress
self.assertEqual(9, len(incomplete_to_purge)) # 1 is kept because it might be in progress

def test_filter_differential_backups(self):
backups = list()
backups.append(make_node_backup(self.storage, "one", datetime.now(), differential=True))
Expand Down Expand Up @@ -149,6 +190,37 @@ def test_purge_backups_by_name(self):
# non-existent backup name raises KeyError
self.assertRaises(KeyError, backups_to_purge_by_name, self.storage, cluster_backups, ["nonexistent"], False)

# patch a call of cluster_backup.missing_nodes because it'd actually go and read a blob off disk
# it's ok to do this because we are not testing missing nodes; we only test backup completion
# we will indicate cluster backup completion via marking one of its node backups as incomplete
@patch.object(ClusterBackup, 'missing_nodes', lambda _: {})
def test_purge_backups_by_cluster_backup_completion(self):
t = datetime.now()
complete_node_backups = [
make_node_backup(self.storage, "backup1", t, differential=True, complete=True, fqdn="node1"),
make_node_backup(self.storage, "backup2", t, differential=True, complete=True, fqdn="node1"),
make_node_backup(self.storage, "backup3", t, differential=True, complete=True, fqdn="node1"),
]
cluster_backups = [
ClusterBackup("backup1", [
make_node_backup(self.storage, "backup1", t, differential=True, complete=True, fqdn="node1"),
make_node_backup(self.storage, "backup1", t, differential=True, complete=True, fqdn="node2"),
]),
ClusterBackup("backup2", [
make_node_backup(self.storage, "backup2", t, differential=True, complete=True, fqdn="node1"),
# backup 2 has an unfinished node, this will render it purge-able by cluster backup completion
make_node_backup(self.storage, "backup2", t, differential=True, complete=False, fqdn="node2"),
]),
]
# verify the backup2 is not purge-able by looking at node backups alone
complete, incomplete_to_purge = backups_to_purge_by_completion(complete_node_backups)
self.assertEqual(3, len(complete))
self.assertEqual(0, len(incomplete_to_purge))
# verify that backup2 becomes eligible for purge if cross-checked with cluster backups
backups_to_purge = backups_to_purge_by_cluster_backup_completion(complete_node_backups, cluster_backups)
self.assertEqual(1, len(backups_to_purge))
self.assertEqual("backup2", backups_to_purge.pop().name)


if __name__ == '__main__':
unittest.main()
7 changes: 4 additions & 3 deletions tests/storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def test_get_table_prefix(self):
self.assertEqual('prefix/localhost/data/', self.storage._get_table_prefix('prefix', 'localhost'))


def make_node_backup(storage, name, backup_date, differential=False, fqdn="localhost"):
def make_node_backup(storage, name, backup_date, differential=False, fqdn="localhost", complete=True):
if differential is True:
differential_blob = make_blob("localhost/{}/meta/differential".format(name), backup_date.timestamp())
else:
Expand All @@ -464,9 +464,10 @@ def make_node_backup(storage, name, backup_date, differential=False, fqdn="local
schema_blob = make_blob("localhost/{}/meta/schema.cql".format(name), backup_date.timestamp())
manifest_blob = make_blob("localhost/{}/meta/manifest.json".format(name), backup_date.timestamp())
return NodeBackup(storage=storage, fqdn=fqdn, name=str(name),
differential_blob=differential_blob, manifest_blob=manifest_blob,
differential_blob=differential_blob, manifest_blob=manifest_blob if complete else None,
tokenmap_blob=tokenmap_blob, schema_blob=schema_blob,
started_timestamp=backup_date.timestamp(), finished_timestamp=backup_date.timestamp())
started_timestamp=backup_date.timestamp(),
finished_timestamp=backup_date.timestamp() if complete else None)


def make_unfinished_node_backup(storage, name, backup_date, differential=False, fqdn="localhost"):
Expand Down
Loading