Skip to content

Commit 5446122

Browse files
authored
Migrate more celery tasks (#4743)
See #4408
1 parent dfc05c9 commit 5446122

22 files changed

Lines changed: 105 additions & 132 deletions

File tree

app/config/settings.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,14 +1197,6 @@ def get_private_ip():
11971197
)
11981198

11991199
CELERY_BEAT_SCHEDULE = {
1200-
"refresh_expiring_user_tokens": {
1201-
"task": "grandchallenge.github.tasks.refresh_expiring_user_tokens",
1202-
"schedule": crontab(hour=0, minute=15),
1203-
},
1204-
"update_publication_metadata": {
1205-
"task": "grandchallenge.publications.tasks.update_publication_metadata",
1206-
"schedule": crontab(hour=0, minute=30),
1207-
},
12081200
"remove_inactive_container_images": {
12091201
"task": "grandchallenge.components.tasks.remove_inactive_container_images",
12101202
"schedule": crontab(hour=1, minute=0),
@@ -1221,10 +1213,6 @@ def get_private_ip():
12211213
"task": "grandchallenge.algorithms.tasks.deactivate_old_algorithm_images",
12221214
"schedule": crontab(hour=2, minute=30),
12231215
},
1224-
"aggregate_celery_daily_stats": {
1225-
"task": "grandchallenge.background_tasks.tasks.aggregate_celery_daily_stats",
1226-
"schedule": crontab(hour=2, minute=45),
1227-
},
12281216
"update_associated_challenges": {
12291217
"task": "grandchallenge.algorithms.tasks.update_associated_challenges",
12301218
"schedule": crontab(hour=3, minute=0),
@@ -1261,30 +1249,10 @@ def get_private_ip():
12611249
"task": "grandchallenge.challenges.tasks.update_challenge_compute_costs",
12621250
"schedule": crontab(minute=45),
12631251
},
1264-
"delete_users_who_dont_login": {
1265-
"task": "grandchallenge.profiles.tasks.delete_users_who_dont_login",
1266-
"schedule": timedelta(hours=1),
1267-
},
1268-
"delete_old_user_uploads": {
1269-
"task": "grandchallenge.uploads.tasks.delete_old_user_uploads",
1270-
"schedule": timedelta(hours=1),
1271-
},
1272-
"clear_sessions": {
1273-
"task": "grandchallenge.browser_sessions.tasks.clear_sessions",
1274-
"schedule": timedelta(hours=1),
1275-
},
1276-
"cleanup_expired_tokens": {
1277-
"task": "grandchallenge.github.tasks.cleanup_expired_tokens",
1278-
"schedule": timedelta(hours=1),
1279-
},
12801252
"cleanup_sent_raw_emails": {
12811253
"task": "grandchallenge.emails.tasks.cleanup_sent_raw_emails",
12821254
"schedule": timedelta(hours=1),
12831255
},
1284-
"logout_privileged_users": {
1285-
"task": "grandchallenge.browser_sessions.tasks.logout_privileged_users",
1286-
"schedule": timedelta(hours=1),
1287-
},
12881256
"update_challenge_results_cache": {
12891257
"task": "grandchallenge.challenges.tasks.update_challenge_results_cache",
12901258
"schedule": timedelta(minutes=5),

app/grandchallenge/background_tasks/tasks.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
)
1616
from django.utils import timezone
1717
from django_celery_results.models import TaskResult
18+
from lambda_tasks.decorators import lambda_task
1819
from lambda_tasks.timeouts import SoftTimeLimitExceeded
1920

2021
from grandchallenge.background_tasks.models import CeleryTaskDailyStats
2122
from grandchallenge.core.celery import acks_late_micro_short_task
2223

2324

2425
@acks_late_micro_short_task(
26+
name=f"{__name__}.aggregate_celery_daily_stats",
2527
singleton=True,
2628
# No need to retry here as the periodic task call this again
2729
ignore_errors=(
@@ -30,6 +32,19 @@
3032
),
3133
)
3234
@transaction.atomic
35+
def aggregate_celery_daily_stats_celery(**kwargs):
36+
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
37+
return aggregate_celery_daily_stats(**kwargs)
38+
39+
40+
@lambda_task(
41+
singleton=True,
42+
# No need to retry here as the periodic task call this again
43+
ignore_errors=(
44+
CelerySoftTimeLimitExceeded,
45+
SoftTimeLimitExceeded,
46+
),
47+
)
3348
def aggregate_celery_daily_stats():
3449
yesterday = (timezone.now() - timedelta(days=1)).date()
3550

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,34 @@
11
from django.conf import settings
22
from django.db import transaction
33
from django.utils.timezone import now
4+
from lambda_tasks.decorators import lambda_task
45

56
from grandchallenge.browser_sessions.models import BrowserSession
67
from grandchallenge.core.celery import acks_late_micro_short_task
78

89

9-
@acks_late_micro_short_task
10+
@acks_late_micro_short_task(name=f"{__name__}.logout_privileged_users")
1011
@transaction.atomic
12+
def logout_privileged_users_celery(**kwargs):
13+
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
14+
return logout_privileged_users(**kwargs)
15+
16+
17+
@lambda_task
1118
def logout_privileged_users():
1219
BrowserSession.objects.filter(
1320
user__is_staff=True,
1421
created__lt=now() - settings.SESSION_PRIVILEGED_USER_TIMEOUT,
1522
).only("pk").delete()
1623

1724

18-
@acks_late_micro_short_task
25+
@acks_late_micro_short_task(name=f"{__name__}.clear_sessions")
1926
@transaction.atomic
27+
def clear_sessions_celery(**kwargs):
28+
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
29+
return clear_sessions(**kwargs)
30+
31+
32+
@lambda_task
2033
def clear_sessions():
2134
BrowserSession.objects.filter(expire_date__lt=now()).only("pk").delete()

app/grandchallenge/core/tasks.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import boto3
44
from django.conf import settings
55
from django.contrib.sites.models import Site
6-
from django.db import transaction
76
from django.db.models import Count
87
from django.utils import timezone
98
from django.utils.timezone import now
@@ -17,18 +16,10 @@
1716
PostProcessImageTask,
1817
RawImageUploadSession,
1918
)
20-
from grandchallenge.core.celery import acks_late_micro_short_task
2119
from grandchallenge.evaluation.models import Evaluation, Method
2220
from grandchallenge.workstations.models import Session
2321

2422

25-
@acks_late_micro_short_task(name=f"{__name__}.cleanup_celery_backend")
26-
@transaction.atomic
27-
def cleanup_celery_backend_celery():
28-
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
29-
return cleanup_celery_backend()
30-
31-
3223
@lambda_task
3324
def cleanup_celery_backend():
3425
"""Cleanup the Celery backend."""

app/grandchallenge/direct_messages/tasks.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from django.db.models import Count, F, Q
44
from lambda_tasks.decorators import lambda_task
55

6-
from grandchallenge.core.celery import acks_late_micro_short_task
76
from grandchallenge.profiles.models import NotificationEmailOptions
87

98

@@ -48,14 +47,6 @@ def get_new_senders(*, user):
4847
return sorted(list(new_senders), key=lambda s: s.pk)
4948

5049

51-
@acks_late_micro_short_task(
52-
name=f"{__name__}.send_new_unread_direct_messages_emails"
53-
)
54-
def send_new_unread_direct_messages_emails_celery():
55-
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
56-
return send_new_unread_direct_messages_emails()
57-
58-
5950
@lambda_task
6051
def send_new_unread_direct_messages_emails():
6152
site = Site.objects.get_current()

app/grandchallenge/discussion_forums/tasks.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,16 @@
11
from uuid import UUID
22

33
from actstream.actions import follow
4-
from celery.utils.log import get_task_logger
54
from django.apps import apps
6-
from django.db import transaction
75
from lambda_tasks.decorators import lambda_task
6+
from lambda_tasks.logging import task_logger
87

9-
from grandchallenge.core.celery import acks_late_micro_short_task
108
from grandchallenge.core.exceptions import LockNotAcquiredException
119
from grandchallenge.notifications.models import (
1210
Notification,
1311
NotificationTypeChoices,
1412
)
1513

16-
logger = get_task_logger(__name__)
17-
18-
19-
@acks_late_micro_short_task(
20-
name=f"{__name__}.create_forum_notifications",
21-
retry_on=(LockNotAcquiredException,),
22-
)
23-
@transaction.atomic
24-
def create_forum_notifications_celery(**kwargs):
25-
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
26-
return create_forum_notifications(**kwargs)
27-
2814

2915
@lambda_task(retry_on=(LockNotAcquiredException,))
3016
def create_forum_notifications(
@@ -39,7 +25,7 @@ def create_forum_notifications(
3925
model = apps.get_model(app_label=app_label, model_name=model_name)
4026

4127
if model not in (ForumPost, ForumTopic):
42-
logger.error(
28+
task_logger.error(
4329
f"Forum notifications can only be created for posts or topics, not for {model}"
4430
)
4531
return

app/grandchallenge/evaluation/models.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,9 +1079,8 @@ def has_active_evaluations(self, *, users):
10791079
)
10801080

10811081
def handle_submission_limit_avoidance(self, *, user):
1082-
on_commit(
1083-
deactivate_user.signature(kwargs={"user_pk": user.pk}).apply_async
1084-
)
1082+
deactivate_user.execute_on_commit(user_pk=user.pk)
1083+
10851084
mail_managers(
10861085
subject="Suspected submission limit avoidance",
10871086
message=format_html(

app/grandchallenge/github/tasks.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from django.db import transaction
1717
from django.db.transaction import on_commit
1818
from django.utils.timezone import now
19+
from lambda_tasks.decorators import lambda_task
1920

2021
from grandchallenge.algorithms.models import Algorithm
2122
from grandchallenge.codebuild.tasks import create_codebuild_build
@@ -179,8 +180,14 @@ def unlink_algorithm(*, pk):
179180
)
180181

181182

182-
@acks_late_micro_short_task
183+
@acks_late_micro_short_task(name=f"{__name__}.cleanup_expired_tokens")
183184
@transaction.atomic
185+
def cleanup_expired_tokens_celery(**kwargs):
186+
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
187+
return cleanup_expired_tokens(**kwargs)
188+
189+
190+
@lambda_task
184191
def cleanup_expired_tokens():
185192
from grandchallenge.github.models import GitHubUserToken
186193

@@ -189,8 +196,14 @@ def cleanup_expired_tokens():
189196
).delete()
190197

191198

192-
@acks_late_micro_short_task
193-
def refresh_user_token(*, pk):
199+
@acks_late_micro_short_task(name=f"{__name__}.refresh_user_token")
200+
def refresh_user_token_celery(**kwargs):
201+
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
202+
return refresh_user_token(**kwargs)
203+
204+
205+
@lambda_task
206+
def refresh_user_token(*, pk: int):
194207
from grandchallenge.github.models import GitHubUserToken
195208

196209
token = GitHubUserToken.objects.get(pk=pk)
@@ -204,7 +217,13 @@ def refresh_user_token(*, pk):
204217
token.save()
205218

206219

207-
@acks_late_micro_short_task
220+
@acks_late_micro_short_task(name=f"{__name__}.refresh_expiring_user_tokens")
221+
def refresh_expiring_user_tokens_celery(**kwargs):
222+
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
223+
return refresh_expiring_user_tokens(**kwargs)
224+
225+
226+
@lambda_task
208227
def refresh_expiring_user_tokens():
209228
"""Refresh user tokens expiring in the next 1 to 28 days"""
210229
from grandchallenge.github.models import GitHubUserToken
@@ -214,6 +233,4 @@ def refresh_expiring_user_tokens():
214233
refresh_token_expires__lt=now() + timedelta(days=28),
215234
)
216235
for token in queryset.iterator():
217-
on_commit(
218-
refresh_user_token.signature(kwargs={"pk": token.pk}).apply_async
219-
)
236+
refresh_user_token.execute_on_commit(pk=token.pk)

app/grandchallenge/notifications/tasks.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,12 @@
22
from django.db.models import Count, F, Q
33
from lambda_tasks.decorators import lambda_task
44

5-
from grandchallenge.core.celery import acks_late_micro_short_task
65
from grandchallenge.profiles.models import (
76
NotificationEmailOptions,
87
UserProfile,
98
)
109

1110

12-
@acks_late_micro_short_task(name=f"{__name__}.send_unread_notification_emails")
13-
def send_unread_notification_emails_celery():
14-
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
15-
return send_unread_notification_emails()
16-
17-
1811
@lambda_task
1912
def send_unread_notification_emails():
2013
site = Site.objects.get_current()

app/grandchallenge/profiles/admin.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from django.contrib import admin
33
from django.contrib.auth import get_user_model
44
from django.contrib.auth.admin import UserAdmin
5-
from django.db.transaction import on_commit
65

76
from grandchallenge.core.admin import (
87
GroupObjectPermissionAdmin,
@@ -29,9 +28,7 @@ class UserProfileInline(admin.StackedInline):
2928
)
3029
def deactivate_users(modeladmin, request, queryset):
3130
for user in queryset.filter(is_active=True):
32-
on_commit(
33-
deactivate_user.signature(kwargs={"user_pk": user.pk}).apply_async
34-
)
31+
deactivate_user.execute_on_commit(user_pk=user.pk)
3532

3633

3734
class UserProfileAdmin(UserAdmin):

0 commit comments

Comments
 (0)