-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement storage- and relationship-aware cleanup (#973)
- Loading branch information
Showing
23 changed files
with
1,945 additions
and
690 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import logging | ||
|
||
from django.db.models.query import QuerySet | ||
|
||
from services.cleanup.models import MANUAL_CLEANUP | ||
from services.cleanup.relations import build_relation_graph | ||
from services.cleanup.utils import CleanupResult, CleanupSummary, cleanup_context | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def run_cleanup( | ||
query: QuerySet, | ||
) -> CleanupSummary: | ||
""" | ||
Cleans up all the models and storage files reachable from the given `QuerySet`. | ||
This deletes all database models in topological sort order, and also removes | ||
all the files in storage for any of the models in the relationship graph. | ||
Returns the number of models and files being cleaned up in total, and per-Model. | ||
""" | ||
models_to_cleanup = build_relation_graph(query) | ||
|
||
summary = {} | ||
cleaned_models = 0 | ||
cleaned_files = 0 | ||
|
||
with cleanup_context() as context: | ||
for model, query in models_to_cleanup: | ||
# This is needed so that the correct connection is chosen for the | ||
# `_raw_delete` queries, as otherwise it might chose a readonly connection. | ||
query._for_write = True | ||
|
||
manual_cleanup = MANUAL_CLEANUP.get(model) | ||
if manual_cleanup is not None: | ||
result = manual_cleanup(context, query) | ||
else: | ||
result = CleanupResult(query._raw_delete(query.db)) | ||
|
||
if result.cleaned_models > 0 or result.cleaned_files > 0: | ||
summary[model] = result | ||
|
||
log.info( | ||
f"Finished cleaning up {model.__name__}", | ||
extra={ | ||
"cleaned_models": result.cleaned_models, | ||
"cleaned_files": result.cleaned_files, | ||
}, | ||
) | ||
|
||
cleaned_models += result.cleaned_models | ||
cleaned_files += result.cleaned_files | ||
|
||
totals = CleanupResult(cleaned_models, cleaned_files) | ||
return CleanupSummary(totals, summary) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
import dataclasses | ||
from collections import defaultdict | ||
from collections.abc import Callable | ||
from functools import partial | ||
|
||
from django.db.models import Model | ||
from django.db.models.query import QuerySet | ||
from shared.bundle_analysis import StoragePaths | ||
from shared.django_apps.compare.models import CommitComparison | ||
from shared.django_apps.core.models import Commit, Pull | ||
from shared.django_apps.profiling.models import ProfilingUpload | ||
from shared.django_apps.reports.models import CommitReport, ReportDetails | ||
from shared.django_apps.reports.models import ReportSession as Upload | ||
from shared.django_apps.staticanalysis.models import StaticAnalysisSingleFileSnapshot | ||
|
||
from services.archive import ArchiveService, MinioEndpoints | ||
from services.cleanup.utils import CleanupContext, CleanupResult | ||
|
||
MANUAL_QUERY_CHUNKSIZE = 5_000 | ||
DELETE_FILES_BATCHSIZE = 50 | ||
|
||
|
||
def cleanup_files_batched( | ||
context: CleanupContext, buckets_paths: dict[str, list[str]] | ||
) -> int: | ||
def delete_file(bucket_path: tuple[str, str]) -> bool: | ||
return context.storage.delete_file(bucket_path[0], bucket_path[1]) | ||
|
||
iter = ((bucket, path) for bucket, paths in buckets_paths.items() for path in paths) | ||
results = context.threadpool.map(delete_file, iter) | ||
return sum(results) | ||
|
||
|
||
def cleanup_with_storage_field( | ||
path_field: str, | ||
context: CleanupContext, | ||
query: QuerySet, | ||
) -> CleanupResult: | ||
cleaned_files = 0 | ||
|
||
# delete `None` `path_field`s right away | ||
cleaned_models = query.filter(**{f"{path_field}__isnull": True})._raw_delete( | ||
query.db | ||
) | ||
|
||
# delete all those files from storage, using chunks based on the `id` column | ||
storage_query = query.filter(**{f"{path_field}__isnull": False}).order_by("id") | ||
|
||
while True: | ||
storage_paths = storage_query.values_list(path_field, flat=True)[ | ||
:MANUAL_QUERY_CHUNKSIZE | ||
] | ||
if len(storage_paths) == 0: | ||
break | ||
|
||
cleaned_files += cleanup_files_batched( | ||
context, {context.default_bucket: storage_paths} | ||
) | ||
cleaned_models += query.filter( | ||
id__in=storage_query[:MANUAL_QUERY_CHUNKSIZE] | ||
)._raw_delete(query.db) | ||
|
||
return CleanupResult(cleaned_models, cleaned_files) | ||
|
||
|
||
def cleanup_archivefield( | ||
field_name: str, context: CleanupContext, query: QuerySet | ||
) -> CleanupResult: | ||
model_field_name = f"_{field_name}_storage_path" | ||
|
||
return cleanup_with_storage_field(model_field_name, context, query) | ||
|
||
|
||
# This has all the `Repository` fields needed by `get_archive_hash` | ||
@dataclasses.dataclass | ||
class FakeRepository: | ||
repoid: int | ||
service: str | ||
service_id: str | ||
|
||
|
||
def cleanup_commitreport(context: CleanupContext, query: QuerySet) -> CleanupResult: | ||
coverage_reports = query.values_list( | ||
"report_type", | ||
"code", | ||
"external_id", | ||
"commit__commitid", | ||
"commit__repository__repoid", | ||
"commit__repository__author__service", | ||
"commit__repository__service_id", | ||
).order_by("id") | ||
|
||
cleaned_models = 0 | ||
cleaned_files = 0 | ||
repo_hashes: dict[int, str] = {} | ||
|
||
while True: | ||
reports = coverage_reports[:MANUAL_QUERY_CHUNKSIZE] | ||
if len(reports) == 0: | ||
break | ||
|
||
buckets_paths: dict[str, list[str]] = defaultdict(list) | ||
for ( | ||
report_type, | ||
report_code, | ||
external_id, | ||
commit_sha, | ||
repoid, | ||
repo_service, | ||
repo_service_id, | ||
) in reports: | ||
if repoid not in repo_hashes: | ||
fake_repo = FakeRepository( | ||
repoid=repoid, service=repo_service, service_id=repo_service_id | ||
) | ||
repo_hashes[repoid] = ArchiveService.get_archive_hash(fake_repo) | ||
repo_hash = repo_hashes[repoid] | ||
|
||
# depending on the `report_type`, we have: | ||
# - a `chunks` file for coverage | ||
# - a `bundle_report.sqlite` for BA | ||
if report_type == "bundle_analysis": | ||
path = StoragePaths.bundle_report.path( | ||
repo_key=repo_hash, report_key=external_id | ||
) | ||
buckets_paths[context.bundleanalysis_bucket].append(path) | ||
elif report_type == "test_results": | ||
# TA has cached rollups, but those are based on `Branch` | ||
pass | ||
else: | ||
chunks_file_name = report_code if report_code is not None else "chunks" | ||
path = MinioEndpoints.chunks.get_path( | ||
version="v4", | ||
repo_hash=repo_hash, | ||
commitid=commit_sha, | ||
chunks_file_name=chunks_file_name, | ||
) | ||
buckets_paths[context.default_bucket].append(path) | ||
|
||
cleaned_files += cleanup_files_batched(context, buckets_paths) | ||
cleaned_models += query.filter( | ||
id__in=query.order_by("id")[:MANUAL_QUERY_CHUNKSIZE] | ||
)._raw_delete(query.db) | ||
|
||
return CleanupResult(cleaned_models, cleaned_files) | ||
|
||
|
||
def cleanup_upload(context: CleanupContext, query: QuerySet) -> CleanupResult: | ||
cleaned_files = 0 | ||
|
||
# delete `None` `storage_path`s right away | ||
cleaned_models = query.filter(storage_path__isnull=True)._raw_delete(query.db) | ||
|
||
# delete all those files from storage, using chunks based on the `id` column | ||
storage_query = query.filter(storage_path__isnull=False).order_by("id") | ||
|
||
while True: | ||
uploads = storage_query.values_list("report__report_type", "storage_path")[ | ||
:MANUAL_QUERY_CHUNKSIZE | ||
] | ||
if len(uploads) == 0: | ||
break | ||
|
||
buckets_paths: dict[str, list[str]] = defaultdict(list) | ||
for report_type, storage_path in uploads: | ||
if report_type == "bundle_analysis": | ||
buckets_paths[context.bundleanalysis_bucket].append(storage_path) | ||
else: | ||
buckets_paths[context.default_bucket].append(storage_path) | ||
|
||
cleaned_files += cleanup_files_batched(context, buckets_paths) | ||
cleaned_models += query.filter( | ||
id__in=storage_query[:MANUAL_QUERY_CHUNKSIZE] | ||
)._raw_delete(query.db) | ||
|
||
return CleanupResult(cleaned_models, cleaned_files) | ||
|
||
|
||
# All the models that need custom python code for deletions so a bulk `DELETE` query does not work. | ||
MANUAL_CLEANUP: dict[ | ||
type[Model], Callable[[CleanupContext, QuerySet], CleanupResult] | ||
] = { | ||
Commit: partial(cleanup_archivefield, "report"), | ||
Pull: partial(cleanup_archivefield, "flare"), | ||
ReportDetails: partial(cleanup_archivefield, "files_array"), | ||
CommitReport: cleanup_commitreport, | ||
Upload: cleanup_upload, | ||
CommitComparison: partial(cleanup_with_storage_field, "report_storage_path"), | ||
ProfilingUpload: partial(cleanup_with_storage_field, "raw_upload_location"), | ||
StaticAnalysisSingleFileSnapshot: partial( | ||
cleanup_with_storage_field, "content_location" | ||
), | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import logging | ||
|
||
from django.db import transaction | ||
from django.db.models import Q | ||
from shared.django_apps.codecov_auth.models import Owner, OwnerProfile | ||
from shared.django_apps.core.models import Commit, Pull, Repository | ||
|
||
from services.cleanup.cleanup import run_cleanup | ||
from services.cleanup.utils import CleanupSummary | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
CLEAR_ARRAY_FIELDS = ["plan_activated_users", "organizations", "admins"] | ||
|
||
|
||
def cleanup_owner(owner_id: int) -> CleanupSummary: | ||
log.info("Started/Continuing Owner cleanup", extra={"owner_id": owner_id}) | ||
|
||
clear_owner_references(owner_id) | ||
owner_query = Owner.objects.filter(ownerid=owner_id) | ||
summary = run_cleanup(owner_query) | ||
|
||
log.info("Owner cleanup finished", extra={"owner_id": owner_id, "summary": summary}) | ||
return summary | ||
|
||
|
||
# TODO: maybe turn this into a `MANUAL_CLEANUP`? | ||
def clear_owner_references(owner_id: int): | ||
""" | ||
This clears the `ownerid` from various DB arrays where it is being referenced. | ||
""" | ||
|
||
OwnerProfile.objects.filter(default_org=owner_id).update(default_org=None) | ||
Owner.objects.filter(bot=owner_id).update(bot=None) | ||
Repository.objects.filter(bot=owner_id).update(bot=None) | ||
Commit.objects.filter(author=owner_id).update(author=None) | ||
Pull.objects.filter(author=owner_id).update(author=None) | ||
|
||
# This uses a transaction / `select_for_update` to ensure consistency when | ||
# modifying these `ArrayField`s in python. | ||
# I don’t think we have such consistency anyplace else in the codebase, so | ||
# if this is causing lock contention issues, its also fair to avoid this. | ||
with transaction.atomic(): | ||
filter = Q() | ||
for field in CLEAR_ARRAY_FIELDS: | ||
filter = filter | Q(**{f"{field}__contains": [owner_id]}) | ||
|
||
owners_with_reference = Owner.objects.select_for_update().filter(filter) | ||
for owner in owners_with_reference: | ||
for field in CLEAR_ARRAY_FIELDS: | ||
array = getattr(owner, field) | ||
setattr(owner, field, [x for x in array if x != owner_id]) | ||
|
||
owner.save(update_fields=CLEAR_ARRAY_FIELDS) |
Oops, something went wrong.