-
Notifications
You must be signed in to change notification settings - Fork 11
Implement storage- and relationship-aware cleanup #973
Changes from 16 commits
bf0841b
3b4cbaa
e43b561
7b5b295
e6226f7
6031478
9635aba
98edb6c
14cc6da
d83bbf6
152e704
0d70211
2eb6a1d
8ded8b5
1d61cc3
90bf1e2
a726394
15ef7d2
6693116
5b6206c
15aea8b
0a22ed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import logging | ||
|
||
from django.db.models.query import QuerySet | ||
|
||
from services.cleanup.models import MANUAL_CLEANUP | ||
from services.cleanup.relations import build_relation_graph | ||
from services.cleanup.utils import CleanupContext, CleanupResult, CleanupSummary | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def run_cleanup( | ||
query: QuerySet, | ||
) -> CleanupSummary: | ||
""" | ||
Cleans up all the models and storage files reachable from the given `QuerySet`. | ||
|
||
This deletes all database models in topological sort order, and also removes | ||
all the files in storage for any of the models in the relationship graph. | ||
|
||
Returns the number of models and files being cleaned up in total, and per-Model. | ||
""" | ||
context = CleanupContext() | ||
models_to_cleanup = build_relation_graph(query) | ||
|
||
summary = {} | ||
cleaned_models = 0 | ||
cleaned_files = 0 | ||
|
||
for model, query in models_to_cleanup: | ||
manual_cleanup = MANUAL_CLEANUP.get(model) | ||
if manual_cleanup is not None: | ||
result = manual_cleanup(context, query) | ||
else: | ||
result = CleanupResult(query._raw_delete(query.db)) | ||
|
||
if result.cleaned_models > 0 or result.cleaned_files > 0: | ||
summary[model] = result | ||
|
||
log.info( | ||
f"Finished cleaning up {model.__name__}", | ||
extra={ | ||
"cleaned_models": result.cleaned_models, | ||
"cleaned_files": result.cleaned_files, | ||
}, | ||
) | ||
|
||
cleaned_models += result.cleaned_models | ||
cleaned_files += result.cleaned_files | ||
|
||
totals = CleanupResult(cleaned_models, cleaned_files) | ||
return CleanupSummary(totals, summary) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
import dataclasses | ||
import itertools | ||
from collections import defaultdict | ||
from collections.abc import Callable | ||
from concurrent.futures import ThreadPoolExecutor | ||
from functools import partial | ||
|
||
from django.db.models import Model | ||
from django.db.models.query import QuerySet | ||
from shared.bundle_analysis import StoragePaths | ||
from shared.django_apps.compare.models import CommitComparison | ||
from shared.django_apps.core.models import Commit, Pull | ||
from shared.django_apps.profiling.models import ProfilingUpload | ||
from shared.django_apps.reports.models import CommitReport, ReportDetails | ||
from shared.django_apps.reports.models import ReportSession as Upload | ||
from shared.django_apps.staticanalysis.models import StaticAnalysisSingleFileSnapshot | ||
|
||
from services.archive import ArchiveService, MinioEndpoints | ||
from services.cleanup.utils import CleanupContext, CleanupResult | ||
|
||
MANUAL_QUERY_CHUNKSIZE = 5_000 | ||
DELETE_FILES_BATCHSIZE = 50 | ||
|
||
|
||
def cleanup_files_batched( | ||
context: CleanupContext, buckets_paths: dict[str, list[str]] | ||
) -> int: | ||
cleaned_files = 0 | ||
|
||
# TODO: maybe reuse the executor across calls? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We certainly stand to gainfrom not having to initialize and tear down the pool every time. But it seems that the calls to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, it is running only with one pool at a time. The regular deletions are still running serially in topological sort order. So it does not concurrently delete two unrelated models. I was thinking about that a bit, but I thought it would be too complex to actually implement. Also reusing the thread pool would mean that I would have to introduce some kind of "waitpool" to make sure all the file deletes are complete before returning, which still keeping the pool alive. |
||
with ThreadPoolExecutor() as e: | ||
for bucket, paths in buckets_paths.items(): | ||
for batched_paths in itertools.batched(paths, DELETE_FILES_BATCHSIZE): | ||
e.submit(context.storage.delete_files, bucket, list(batched_paths)) | ||
cleaned_files += len(paths) | ||
|
||
return cleaned_files | ||
|
||
|
||
def cleanup_with_storage_field( | ||
path_field: str, | ||
context: CleanupContext, | ||
query: QuerySet, | ||
) -> CleanupResult: | ||
cleaned_files = 0 | ||
|
||
# delete `None` `path_field`s right away | ||
cleaned_models = query.filter(**{f"{path_field}__isnull": True})._raw_delete( | ||
query.db | ||
) | ||
|
||
# delete all those files from storage, using chunks based on the `id` column | ||
storage_query = query.filter(**{f"{path_field}__isnull": False}).order_by("id") | ||
|
||
while True: | ||
storage_paths = storage_query.values_list(path_field, flat=True)[ | ||
:MANUAL_QUERY_CHUNKSIZE | ||
] | ||
if len(storage_paths) == 0: | ||
break | ||
|
||
cleaned_files += cleanup_files_batched( | ||
context, {context.default_bucket: storage_paths} | ||
) | ||
cleaned_models += query.filter( | ||
id__in=storage_query[:MANUAL_QUERY_CHUNKSIZE] | ||
)._raw_delete(query.db) | ||
|
||
return CleanupResult(cleaned_models, cleaned_files) | ||
|
||
|
||
def cleanup_archivefield( | ||
field_name: str, context: CleanupContext, query: QuerySet | ||
) -> CleanupResult: | ||
model_field_name = f"_{field_name}_storage_path" | ||
|
||
return cleanup_with_storage_field(model_field_name, context, query) | ||
|
||
|
||
# This has all the `Repository` fields needed by `get_archive_hash` | ||
@dataclasses.dataclass | ||
class FakeRepository: | ||
repoid: int | ||
service: str | ||
service_id: str | ||
|
||
|
||
def cleanup_commitreport(context: CleanupContext, query: QuerySet) -> CleanupResult: | ||
coverage_reports = query.values_list( | ||
"report_type", | ||
"code", | ||
"external_id", | ||
"commit__commitid", | ||
"commit__repository__repoid", | ||
"commit__repository__author__service", | ||
"commit__repository__service_id", | ||
).order_by("id") | ||
|
||
cleaned_models = 0 | ||
cleaned_files = 0 | ||
repo_hashes: dict[int, str] = {} | ||
|
||
while True: | ||
reports = coverage_reports[:MANUAL_QUERY_CHUNKSIZE] | ||
if len(reports) == 0: | ||
break | ||
|
||
buckets_paths: dict[str, list[str]] = defaultdict(list) | ||
for ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [very nit] maybe a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don’t fully understand? Would you prefer to not destruct the tuple into variables right away, but rather access them through a |
||
report_type, | ||
report_code, | ||
external_id, | ||
commit_sha, | ||
repoid, | ||
repo_service, | ||
repo_service_id, | ||
) in reports: | ||
if repoid not in repo_hashes: | ||
fake_repo = FakeRepository( | ||
repoid=repoid, service=repo_service, service_id=repo_service_id | ||
) | ||
repo_hashes[repoid] = ArchiveService.get_archive_hash(fake_repo) | ||
repo_hash = repo_hashes[repoid] | ||
|
||
# depending on the `report_type`, we have: | ||
# - a `chunks` file for coverage | ||
# - a `bundle_report.sqlite` for BA | ||
if report_type == "bundle_analysis": | ||
path = StoragePaths.bundle_report.path( | ||
repo_key=repo_hash, report_key=external_id | ||
) | ||
buckets_paths[context.bundleanalysis_bucket].append(path) | ||
elif report_type == "test_results": | ||
# TA has cached rollups, but those are based on `Branch` | ||
pass | ||
else: | ||
chunks_file_name = report_code if report_code is not None else "chunks" | ||
path = MinioEndpoints.chunks.get_path( | ||
version="v4", | ||
repo_hash=repo_hash, | ||
commitid=commit_sha, | ||
chunks_file_name=chunks_file_name, | ||
) | ||
buckets_paths[context.default_bucket].append(path) | ||
|
||
cleaned_files += cleanup_files_batched(context, buckets_paths) | ||
cleaned_models += query.filter( | ||
id__in=query.order_by("id")[:MANUAL_QUERY_CHUNKSIZE] | ||
)._raw_delete(query.db) | ||
|
||
return CleanupResult(cleaned_models, cleaned_files) | ||
|
||
|
||
def cleanup_upload(context: CleanupContext, query: QuerySet) -> CleanupResult: | ||
cleaned_files = 0 | ||
|
||
# delete `None` `storage_path`s right away | ||
cleaned_models = query.filter(storage_path__isnull=True)._raw_delete(query.db) | ||
|
||
# delete all those files from storage, using chunks based on the `id` column | ||
storage_query = query.filter(storage_path__isnull=False).order_by("id") | ||
|
||
while True: | ||
uploads = storage_query.values_list("report__report_type", "storage_path")[ | ||
:MANUAL_QUERY_CHUNKSIZE | ||
] | ||
if len(uploads) == 0: | ||
break | ||
|
||
buckets_paths: dict[str, list[str]] = defaultdict(list) | ||
for report_type, storage_path in uploads: | ||
if report_type == "bundle_analysis": | ||
buckets_paths[context.bundleanalysis_bucket].append(storage_path) | ||
else: | ||
buckets_paths[context.default_bucket].append(storage_path) | ||
|
||
cleaned_files += cleanup_files_batched(context, buckets_paths) | ||
cleaned_models += query.filter( | ||
id__in=storage_query[:MANUAL_QUERY_CHUNKSIZE] | ||
)._raw_delete(query.db) | ||
|
||
return CleanupResult(cleaned_models, cleaned_files) | ||
|
||
|
||
# All the models that need custom python code for deletions so a bulk `DELETE` query does not work. | ||
MANUAL_CLEANUP: dict[ | ||
type[Model], Callable[[CleanupContext, QuerySet], CleanupResult] | ||
] = { | ||
Commit: partial(cleanup_archivefield, "report"), | ||
Pull: partial(cleanup_archivefield, "flare"), | ||
ReportDetails: partial(cleanup_archivefield, "files_array"), | ||
CommitReport: cleanup_commitreport, | ||
Upload: cleanup_upload, | ||
CommitComparison: partial(cleanup_with_storage_field, "report_storage_path"), | ||
ProfilingUpload: partial(cleanup_with_storage_field, "raw_upload_location"), | ||
StaticAnalysisSingleFileSnapshot: partial( | ||
cleanup_with_storage_field, "content_location" | ||
), | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import logging | ||
|
||
from django.db import transaction | ||
from django.db.models import Q | ||
from shared.django_apps.codecov_auth.models import Owner, OwnerProfile | ||
from shared.django_apps.core.models import Commit, Pull, Repository | ||
|
||
from services.cleanup.cleanup import run_cleanup | ||
from services.cleanup.utils import CleanupSummary | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
CLEAR_ARRAY_FIELDS = ["plan_activated_users", "organizations", "admins"] | ||
|
||
|
||
def cleanup_owner(owner_id: int) -> CleanupSummary: | ||
log.info("Started/Continuing Owner cleanup", extra={"owner_id": owner_id}) | ||
|
||
clear_owner_references(owner_id) | ||
owner_query = Owner.objects.filter(ownerid=owner_id) | ||
summary = run_cleanup(owner_query) | ||
|
||
log.info("Owner cleanup finished", extra={"owner_id": owner_id, "summary": summary}) | ||
return summary | ||
|
||
|
||
# TODO: maybe turn this into a `MANUAL_CLEANUP`? | ||
def clear_owner_references(owner_id: int): | ||
""" | ||
This clears the `ownerid` from various DB arrays where it is being referenced. | ||
""" | ||
|
||
OwnerProfile.objects.filter(default_org=owner_id).update(default_org=None) | ||
Swatinem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Owner.objects.filter(bot=owner_id).update(bot=None) | ||
Repository.objects.filter(bot=owner_id).update(bot=None) | ||
Commit.objects.filter(author=owner_id).update(author=None) | ||
Pull.objects.filter(author=owner_id).update(author=None) | ||
|
||
# This uses a transaction / `select_for_update` to ensure consistency when | ||
# modifying these `ArrayField`s in python. | ||
# I don’t think we have such consistency anyplace else in the codebase, so | ||
# if this is causing lock contention issues, its also fair to avoid this. | ||
with transaction.atomic(): | ||
filter = Q() | ||
for field in CLEAR_ARRAY_FIELDS: | ||
filter = filter | Q(**{f"{field}__contains": [owner_id]}) | ||
|
||
owners_with_reference = Owner.objects.select_for_update().filter(filter) | ||
for owner in owners_with_reference: | ||
for field in CLEAR_ARRAY_FIELDS: | ||
array = getattr(owner, field) | ||
setattr(owner, field, [x for x in array if x != owner_id]) | ||
|
||
owner.save(update_fields=CLEAR_ARRAY_FIELDS) |
Uh oh!
There was an error while loading. Please reload this page.