Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement storage- and relationship-aware cleanup #973

Merged
merged 22 commits into from
Jan 22, 2025

Conversation

Swatinem
Copy link
Contributor

@Swatinem Swatinem commented Dec 19, 2024

This implements a generic framework for "delete anything referencing this object".

In order to do so, this builds a relationship graph based on Django models/metadata, augmented with some manually defined relationships which are not encoded in django models.

This graph is then processed in topological sort order, meaning that it will delete leaf models first, before advancing to other models, thus avoiding cascading deletes on the database side.

It is also possible to hook up custom code for specific models. This has been used to properly delete any kind of file in storage that is being referenced by these models.
Previous code has existed to delete all the "repo files", which did so by first enumerating all the files in storage matching a "per repository" naming pattern. Even though that code might have worked for smaller repositories, I assume that it would not have worked properly for larger repositories with a huge amount of files.

The new storage deletion code is using batches and a thread pool to delete files instead, and it also covers file name patterns (and different storage buckets in the case of BA) that do not match the "repo files" pattern.


This new generic deletion framework is being hooked up, replacing the existing FlushRepo and DeleteOwner tasks.
The tasks are build in such a way that they can time out and be rerun, and they will just pick up where they left off (going over the same topological sort order), assuming that the whole task is not being run in a transaction. (I hope not, but given how many problems we have with DB locking and transactions, who knows 🤷🏻‍♂️ )


This is work towards codecov/engineering-team#1127

@Swatinem Swatinem self-assigned this Dec 19, 2024
@codecov-notifications
Copy link

codecov-notifications bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 98.16092% with 8 lines in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
services/cleanup/models.py 96.47% 3 Missing ⚠️
services/cleanup/relations.py 95.77% 3 Missing ⚠️
services/cleanup/repository.py 92.85% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link

github-actions bot commented Dec 19, 2024

✅ All tests successful. No failed tests were found.

📣 Thoughts on this report? Let Codecov know! | Powered by Codecov

Copy link

codecov bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 98.16092% with 8 lines in your changes missing coverage. Please review.

Project coverage is 97.74%. Comparing base (2b5bffc) to head (0a22ed4).
Report is 12 commits behind head on main.

Files with missing lines Patch % Lines
services/cleanup/models.py 96.47% 3 Missing ⚠️
services/cleanup/relations.py 95.77% 3 Missing ⚠️
services/cleanup/repository.py 92.85% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #973      +/-   ##
==========================================
- Coverage   97.75%   97.74%   -0.02%     
==========================================
  Files         451      458       +7     
  Lines       36812    36985     +173     
==========================================
+ Hits        35986    36150     +164     
- Misses        826      835       +9     
Flag Coverage Δ
integration 42.55% <34.25%> (+0.06%) ⬆️
unit 90.29% <98.16%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

⚠️ Impact Analysis from Codecov is deprecated and will be sunset on Jan 31 2025. See more

@Swatinem Swatinem force-pushed the swatinem/bulk-cleanups branch 2 times, most recently from 73f9f21 to cc68239 Compare January 13, 2025 14:01
Copy link

This PR includes changes to shared. Please review them here: codecov/shared@609e56d...c1c9cb2

@Swatinem Swatinem force-pushed the swatinem/bulk-cleanups branch from cc68239 to 302fb5e Compare January 14, 2025 09:00
@codecov-qa
Copy link

codecov-qa bot commented Jan 16, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1790 1 1789 4
View the top 1 failed tests by shortest run time
tasks/tests/unit/test_flush_repo.py::test_flush_repo_few_of_each_only_db_objects
Stack Traces | 0.134s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>
sql = 'INSERT INTO "branches" ("branch", "repoid", "authors", "head", "base", "updatestamp") VALUES (%s, %s, %s::integer[], %s, %s, %s)'
params = ('Money.', 4, None, 'a09962589aafda42956811856e1f5721edf7beb0', None, datetime.datetime(2025, 1, 21, 11, 32, 59, 697156, tzinfo=datetime.timezone.utc))
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)
E               psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "branches_repoid_branch"
E               DETAIL:  Key (repoid, branch)=(4, Money.) already exists.

.../local/lib/python3.13.../db/backends/utils.py:89: UniqueViolation

The above exception was the direct cause of the following exception:

mock_storage = <shared.storage.memory.MemoryStorageService object at 0x7fed0956e0f0>

    @pytest.mark.django_db
    def test_flush_repo_few_of_each_only_db_objects(mock_storage):
        repo = RepositoryFactory()
        flag = RepositoryFlagFactory(repository=repo)
    
        for i in range(8):
            CommitFactory(repository=repo)
    
        for i in range(4):
            base_commit = CommitFactory(repository=repo)
            head_commit = CommitFactory(repository=repo)
            comparison = CommitComparisonFactory(
                base_commit=base_commit, compare_commit=head_commit
            )
    
            FlagComparisonFactory(commit_comparison=comparison, repositoryflag=flag)
    
        # NOTE: The `CommitFactary` defaults to `branch: main, pullid: 1`
        # This default seems to create models for
        # `Pull` and `Branch` automatically through some kind of trigger?
    
        for i in range(17):
            PullFactory(repository=repo, pullid=i + 100)
    
        for i in range(23):
>           BranchFactory(repository=repo)

.../tests/unit/test_flush_repo.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13............/site-packages/factory/base.py:40: in __call__
    return cls.create(**kwargs)
.../local/lib/python3.13............/site-packages/factory/base.py:528: in create
    return cls._generate(enums.CREATE_STRATEGY, kwargs)
.../local/lib/python3.13....../site-packages/factory/django.py:117: in _generate
    return super()._generate(strategy, params)
.../local/lib/python3.13............/site-packages/factory/base.py:465: in _generate
    return step.build()
.../local/lib/python3.13.../site-packages/factory/builder.py:262: in build
    instance = self.factory_meta.instantiate(
.../local/lib/python3.13............/site-packages/factory/base.py:317: in instantiate
    return self.factory._create(model, *args, **kwargs)
.../local/lib/python3.13....../site-packages/factory/django.py:166: in _create
    return manager.create(*args, **kwargs)
.../local/lib/python3.13.../db/models/manager.py:87: in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
.../local/lib/python3.13.../db/models/query.py:658: in create
    obj.save(force_insert=True, using=self.db)
.../local/lib/python3.13.../db/models/base.py:814: in save
    self.save_base(
.../local/lib/python3.13.../db/models/base.py:877: in save_base
    updated = self._save_table(
.../local/lib/python3.13.../db/models/base.py:1020: in _save_table
    results = self._do_insert(
.../local/lib/python3.13.../site-packages/django_prometheus/models.py:43: in _do_insert
    return super()._do_insert(*args, **kwargs)
.../local/lib/python3.13.../db/models/base.py:1061: in _do_insert
    return manager._insert(
.../local/lib/python3.13.../db/models/manager.py:87: in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
.../local/lib/python3.13.../db/models/query.py:1805: in _insert
    return query.get_compiler(using=using).execute_sql(returning_fields)
.../local/lib/python3.13.../models/sql/compiler.py:1822: in execute_sql
    cursor.execute(sql, params)
.../local/lib/python3.13.../db/backends/utils.py:67: in execute
    return self._execute_with_wrappers(
.../local/lib/python3.13.../db/backends/utils.py:80: in _execute_with_wrappers
    return executor(sql, params, many, context)
.../local/lib/python3.13.../db/backends/utils.py:84: in _execute
    with self.db.wrap_database_errors:
.../local/lib/python3.13.../django/db/utils.py:91: in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>
sql = 'INSERT INTO "branches" ("branch", "repoid", "authors", "head", "base", "updatestamp") VALUES (%s, %s, %s::integer[], %s, %s, %s)'
params = ('Money.', 4, None, 'a09962589aafda42956811856e1f5721edf7beb0', None, datetime.datetime(2025, 1, 21, 11, 32, 59, 697156, tzinfo=datetime.timezone.utc))
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)
E               django.db.utils.IntegrityError: duplicate key value violates unique constraint "branches_repoid_branch"
E               DETAIL:  Key (repoid, branch)=(4, Money.) already exists.

.../local/lib/python3.13.../db/backends/utils.py:89: IntegrityError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

Copy link

codecov-public-qa bot commented Jan 16, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
1789 1 1788 4
View the top 1 failed tests by shortest run time
tasks/tests/unit/test_flush_repo.py::::test_flush_repo_few_of_each_only_db_objects
Stack Traces | 0.134s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>
sql = 'INSERT INTO "branches" ("branch", "repoid", "authors", "head", "base", "updatestamp") VALUES (%s, %s, %s::integer[], %s, %s, %s)'
params = ('Money.', 4, None, 'a09962589aafda42956811856e1f5721edf7beb0', None, datetime.datetime(2025, 1, 21, 11, 32, 59, 697156, tzinfo=datetime.timezone.utc))
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)
E               psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "branches_repoid_branch"
E               DETAIL:  Key (repoid, branch)=(4, Money.) already exists.

.../local/lib/python3.13.../db/backends/utils.py:89: UniqueViolation

The above exception was the direct cause of the following exception:

mock_storage = <shared.storage.memory.MemoryStorageService object at 0x7fed0956e0f0>

    @pytest.mark.django_db
    def test_flush_repo_few_of_each_only_db_objects(mock_storage):
        repo = RepositoryFactory()
        flag = RepositoryFlagFactory(repository=repo)
    
        for i in range(8):
            CommitFactory(repository=repo)
    
        for i in range(4):
            base_commit = CommitFactory(repository=repo)
            head_commit = CommitFactory(repository=repo)
            comparison = CommitComparisonFactory(
                base_commit=base_commit, compare_commit=head_commit
            )
    
            FlagComparisonFactory(commit_comparison=comparison, repositoryflag=flag)
    
        # NOTE: The `CommitFactary` defaults to `branch: main, pullid: 1`
        # This default seems to create models for
        # `Pull` and `Branch` automatically through some kind of trigger?
    
        for i in range(17):
            PullFactory(repository=repo, pullid=i + 100)
    
        for i in range(23):
>           BranchFactory(repository=repo)

.../tests/unit/test_flush_repo.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.13............/site-packages/factory/base.py:40: in __call__
    return cls.create(**kwargs)
.../local/lib/python3.13............/site-packages/factory/base.py:528: in create
    return cls._generate(enums.CREATE_STRATEGY, kwargs)
.../local/lib/python3.13....../site-packages/factory/django.py:117: in _generate
    return super()._generate(strategy, params)
.../local/lib/python3.13............/site-packages/factory/base.py:465: in _generate
    return step.build()
.../local/lib/python3.13.../site-packages/factory/builder.py:262: in build
    instance = self.factory_meta.instantiate(
.../local/lib/python3.13............/site-packages/factory/base.py:317: in instantiate
    return self.factory._create(model, *args, **kwargs)
.../local/lib/python3.13....../site-packages/factory/django.py:166: in _create
    return manager.create(*args, **kwargs)
.../local/lib/python3.13.../db/models/manager.py:87: in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
.../local/lib/python3.13.../db/models/query.py:658: in create
    obj.save(force_insert=True, using=self.db)
.../local/lib/python3.13.../db/models/base.py:814: in save
    self.save_base(
.../local/lib/python3.13.../db/models/base.py:877: in save_base
    updated = self._save_table(
.../local/lib/python3.13.../db/models/base.py:1020: in _save_table
    results = self._do_insert(
.../local/lib/python3.13.../site-packages/django_prometheus/models.py:43: in _do_insert
    return super()._do_insert(*args, **kwargs)
.../local/lib/python3.13.../db/models/base.py:1061: in _do_insert
    return manager._insert(
.../local/lib/python3.13.../db/models/manager.py:87: in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
.../local/lib/python3.13.../db/models/query.py:1805: in _insert
    return query.get_compiler(using=using).execute_sql(returning_fields)
.../local/lib/python3.13.../models/sql/compiler.py:1822: in execute_sql
    cursor.execute(sql, params)
.../local/lib/python3.13.../db/backends/utils.py:67: in execute
    return self._execute_with_wrappers(
.../local/lib/python3.13.../db/backends/utils.py:80: in _execute_with_wrappers
    return executor(sql, params, many, context)
.../local/lib/python3.13.../db/backends/utils.py:84: in _execute
    with self.db.wrap_database_errors:
.../local/lib/python3.13.../django/db/utils.py:91: in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>
sql = 'INSERT INTO "branches" ("branch", "repoid", "authors", "head", "base", "updatestamp") VALUES (%s, %s, %s::integer[], %s, %s, %s)'
params = ('Money.', 4, None, 'a09962589aafda42956811856e1f5721edf7beb0', None, datetime.datetime(2025, 1, 21, 11, 32, 59, 697156, tzinfo=datetime.timezone.utc))
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7fed0954d8b0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)
E               django.db.utils.IntegrityError: duplicate key value violates unique constraint "branches_repoid_branch"
E               DETAIL:  Key (repoid, branch)=(4, Money.) already exists.

.../local/lib/python3.13.../db/backends/utils.py:89: IntegrityError

To view more test analytics, go to the Test Analytics Dashboard
📢 Thoughts on this report? Let us know!

@Swatinem Swatinem force-pushed the swatinem/bulk-cleanups branch 2 times, most recently from 40ba152 to 3653469 Compare January 20, 2025 09:36
@Swatinem Swatinem force-pushed the swatinem/bulk-cleanups branch from 3653469 to 90bf1e2 Compare January 20, 2025 10:02
@Swatinem Swatinem requested a review from a team January 20, 2025 11:48
@Swatinem Swatinem marked this pull request as ready for review January 20, 2025 12:01
Copy link
Contributor

@giovanni-guidini giovanni-guidini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lot's to unpack here... I like this in general. I think it's pretty cool and the code looks quite sophisticated.

Just so I understand this better, we currently have 2 entry points to use as "root" nodes when traversing the graph of dependencies: the repo or the owner. Is that correct?

) -> int:
cleaned_files = 0

# TODO: maybe reuse the executor across calls?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We certainly stand to gainfrom not having to initialize and tear down the pool every time.

But it seems that the calls to cleanup_files_batched themselves are not concurrent. Is that on purpose? Like we are now it seems that at any given time there would always be at most 1 pool running anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it is running only with one pool at a time.

The regular deletions are still running serially in topological sort order. So it does not concurrently delete two unrelated models.

I was thinking about that a bit, but I thought it would be too complex to actually implement. Also reusing the thread pool would mean that I would have to introduce some kind of "waitpool" to make sure all the file deletes are complete before returning, which still keeping the pool alive.
I wanted to avoid that complexity for now.

break

buckets_paths: dict[str, list[str]] = defaultdict(list)
for (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[very nit] maybe a NamedTuple would make the code more ergonomic here (but less performant for sure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t fully understand? Would you prefer to not destruct the tuple into variables right away, but rather access them through a NamedTuple object? I’m not sure that would make things more readable 🤔

services/cleanup/models.py Show resolved Hide resolved
services/cleanup/owner.py Show resolved Hide resolved
services/cleanup/repository.py Show resolved Hide resolved
services/cleanup/repository.py Show resolved Hide resolved
- Owner entry from db
- Cascading deletes of repos, pulls, and branches for the owner
"""
acks_late = True # retry the task when the worker dies for whatever reason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't retry if the task ends with an Exception... just saying, if that is the expectation in "whatever"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have more details about the execution model of celery? I feel like after all this time I still don’t understand how it works and behaves under various circumstances.

For these tasks, I would want them to "resume" until they are completed.
This means "resuming" on timeouts (not sure if you can configure them to not have a timeout at all?).
It means resuming when the worker is killed for whatever reason. It also means resuming on any kind of random infrastructure exception.

Even for a fixable programmer error, I would like these tasks to always run to "successful" completion.

I’m not entirely sure how to achieve that with celery 🤷🏻‍♂️

@Swatinem
Copy link
Contributor Author

Just so I understand this better, we currently have 2 entry points to use as "root" nodes when traversing the graph of dependencies: the repo or the owner. Is that correct?

Right now yes, those two need a bit of extra treatment, and they already have associated tasks for them.

But you can just put any arbitrary Queryset into that function, and it will do the right thing.
In the future, I would maybe move the whole cleanup module to shared, so its possible to use it within data migrations.
One thing I would like to use it for in the near-ish future is properly removing the ReportDetails/files_array model.

Copy link
Contributor

@giovanni-guidini giovanni-guidini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clarifications

Copy link
Contributor

@giovanni-guidini giovanni-guidini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I particularly like the usage of cleanup_context in a contextmanager. Very nice 👍

@Swatinem Swatinem added this pull request to the merge queue Jan 22, 2025
Merged via the queue into main with commit c7aabe4 Jan 22, 2025
24 of 27 checks passed
@Swatinem Swatinem deleted the swatinem/bulk-cleanups branch January 22, 2025 11:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants