Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,14 +1197,6 @@ def get_private_ip():
)

CELERY_BEAT_SCHEDULE = {
"refresh_expiring_user_tokens": {
"task": "grandchallenge.github.tasks.refresh_expiring_user_tokens",
"schedule": crontab(hour=0, minute=15),
},
"update_publication_metadata": {
"task": "grandchallenge.publications.tasks.update_publication_metadata",
"schedule": crontab(hour=0, minute=30),
},
"remove_inactive_container_images": {
"task": "grandchallenge.components.tasks.remove_inactive_container_images",
"schedule": crontab(hour=1, minute=0),
Expand All @@ -1221,10 +1213,6 @@ def get_private_ip():
"task": "grandchallenge.algorithms.tasks.deactivate_old_algorithm_images",
"schedule": crontab(hour=2, minute=30),
},
"aggregate_celery_daily_stats": {
"task": "grandchallenge.background_tasks.tasks.aggregate_celery_daily_stats",
"schedule": crontab(hour=2, minute=45),
},
"update_associated_challenges": {
"task": "grandchallenge.algorithms.tasks.update_associated_challenges",
"schedule": crontab(hour=3, minute=0),
Expand Down Expand Up @@ -1261,30 +1249,10 @@ def get_private_ip():
"task": "grandchallenge.challenges.tasks.update_challenge_compute_costs",
"schedule": crontab(minute=45),
},
"delete_users_who_dont_login": {
"task": "grandchallenge.profiles.tasks.delete_users_who_dont_login",
"schedule": timedelta(hours=1),
},
"delete_old_user_uploads": {
"task": "grandchallenge.uploads.tasks.delete_old_user_uploads",
"schedule": timedelta(hours=1),
},
"clear_sessions": {
"task": "grandchallenge.browser_sessions.tasks.clear_sessions",
"schedule": timedelta(hours=1),
},
"cleanup_expired_tokens": {
"task": "grandchallenge.github.tasks.cleanup_expired_tokens",
"schedule": timedelta(hours=1),
},
"cleanup_sent_raw_emails": {
"task": "grandchallenge.emails.tasks.cleanup_sent_raw_emails",
"schedule": timedelta(hours=1),
},
"logout_privileged_users": {
"task": "grandchallenge.browser_sessions.tasks.logout_privileged_users",
"schedule": timedelta(hours=1),
},
"update_challenge_results_cache": {
"task": "grandchallenge.challenges.tasks.update_challenge_results_cache",
"schedule": timedelta(minutes=5),
Expand Down
15 changes: 15 additions & 0 deletions app/grandchallenge/background_tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
)
from django.utils import timezone
from django_celery_results.models import TaskResult
from lambda_tasks.decorators import lambda_task
from lambda_tasks.timeouts import SoftTimeLimitExceeded

from grandchallenge.background_tasks.models import CeleryTaskDailyStats
from grandchallenge.core.celery import acks_late_micro_short_task


@acks_late_micro_short_task(
name=f"{__name__}.aggregate_celery_daily_stats",
singleton=True,
# No need to retry here as the periodic task call this again
ignore_errors=(
Expand All @@ -30,6 +32,19 @@
),
)
@transaction.atomic
def aggregate_celery_daily_stats_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return aggregate_celery_daily_stats(**kwargs)


@lambda_task(
singleton=True,
# No need to retry here as the periodic task call this again
ignore_errors=(
CelerySoftTimeLimitExceeded,
SoftTimeLimitExceeded,
),
)
def aggregate_celery_daily_stats():
yesterday = (timezone.now() - timedelta(days=1)).date()

Expand Down
17 changes: 15 additions & 2 deletions app/grandchallenge/browser_sessions/tasks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
from django.conf import settings
from django.db import transaction
from django.utils.timezone import now
from lambda_tasks.decorators import lambda_task

from grandchallenge.browser_sessions.models import BrowserSession
from grandchallenge.core.celery import acks_late_micro_short_task


@acks_late_micro_short_task
@acks_late_micro_short_task(name=f"{__name__}.logout_privileged_users")
@transaction.atomic
def logout_privileged_users_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return logout_privileged_users(**kwargs)


@lambda_task
def logout_privileged_users():
BrowserSession.objects.filter(
user__is_staff=True,
created__lt=now() - settings.SESSION_PRIVILEGED_USER_TIMEOUT,
).only("pk").delete()


@acks_late_micro_short_task
@acks_late_micro_short_task(name=f"{__name__}.clear_sessions")
@transaction.atomic
def clear_sessions_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return clear_sessions(**kwargs)


@lambda_task
def clear_sessions():
BrowserSession.objects.filter(expire_date__lt=now()).only("pk").delete()
9 changes: 0 additions & 9 deletions app/grandchallenge/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import boto3
from django.conf import settings
from django.contrib.sites.models import Site
from django.db import transaction
from django.db.models import Count
from django.utils import timezone
from django.utils.timezone import now
Expand All @@ -17,18 +16,10 @@
PostProcessImageTask,
RawImageUploadSession,
)
from grandchallenge.core.celery import acks_late_micro_short_task
from grandchallenge.evaluation.models import Evaluation, Method
from grandchallenge.workstations.models import Session


@acks_late_micro_short_task(name=f"{__name__}.cleanup_celery_backend")
@transaction.atomic
def cleanup_celery_backend_celery():
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return cleanup_celery_backend()


@lambda_task
def cleanup_celery_backend():
"""Cleanup the Celery backend."""
Expand Down
9 changes: 0 additions & 9 deletions app/grandchallenge/direct_messages/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from django.db.models import Count, F, Q
from lambda_tasks.decorators import lambda_task

from grandchallenge.core.celery import acks_late_micro_short_task
from grandchallenge.profiles.models import NotificationEmailOptions


Expand Down Expand Up @@ -48,14 +47,6 @@ def get_new_senders(*, user):
return sorted(list(new_senders), key=lambda s: s.pk)


@acks_late_micro_short_task(
name=f"{__name__}.send_new_unread_direct_messages_emails"
)
def send_new_unread_direct_messages_emails_celery():
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return send_new_unread_direct_messages_emails()


@lambda_task
def send_new_unread_direct_messages_emails():
site = Site.objects.get_current()
Expand Down
18 changes: 2 additions & 16 deletions app/grandchallenge/discussion_forums/tasks.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
from uuid import UUID

from actstream.actions import follow
from celery.utils.log import get_task_logger
from django.apps import apps
from django.db import transaction
from lambda_tasks.decorators import lambda_task
from lambda_tasks.logging import task_logger

from grandchallenge.core.celery import acks_late_micro_short_task
from grandchallenge.core.exceptions import LockNotAcquiredException
from grandchallenge.notifications.models import (
Notification,
NotificationTypeChoices,
)

logger = get_task_logger(__name__)


@acks_late_micro_short_task(
name=f"{__name__}.create_forum_notifications",
retry_on=(LockNotAcquiredException,),
)
@transaction.atomic
def create_forum_notifications_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return create_forum_notifications(**kwargs)


@lambda_task(retry_on=(LockNotAcquiredException,))
def create_forum_notifications(
Expand All @@ -39,7 +25,7 @@ def create_forum_notifications(
model = apps.get_model(app_label=app_label, model_name=model_name)

if model not in (ForumPost, ForumTopic):
logger.error(
task_logger.error(
f"Forum notifications can only be created for posts or topics, not for {model}"
)
return
Expand Down
5 changes: 2 additions & 3 deletions app/grandchallenge/evaluation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,8 @@ def has_active_evaluations(self, *, users):
)

def handle_submission_limit_avoidance(self, *, user):
on_commit(
deactivate_user.signature(kwargs={"user_pk": user.pk}).apply_async
)
deactivate_user.execute_on_commit(user_pk=user.pk)

mail_managers(
subject="Suspected submission limit avoidance",
message=format_html(
Expand Down
31 changes: 24 additions & 7 deletions app/grandchallenge/github/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from django.db import transaction
from django.db.transaction import on_commit
from django.utils.timezone import now
from lambda_tasks.decorators import lambda_task

from grandchallenge.algorithms.models import Algorithm
from grandchallenge.codebuild.tasks import create_codebuild_build
Expand Down Expand Up @@ -179,8 +180,14 @@ def unlink_algorithm(*, pk):
)


@acks_late_micro_short_task
@acks_late_micro_short_task(name=f"{__name__}.cleanup_expired_tokens")
@transaction.atomic
def cleanup_expired_tokens_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return cleanup_expired_tokens(**kwargs)


@lambda_task
def cleanup_expired_tokens():
from grandchallenge.github.models import GitHubUserToken

Expand All @@ -189,8 +196,14 @@ def cleanup_expired_tokens():
).delete()


@acks_late_micro_short_task
def refresh_user_token(*, pk):
@acks_late_micro_short_task(name=f"{__name__}.refresh_user_token")
def refresh_user_token_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return refresh_user_token(**kwargs)


@lambda_task
def refresh_user_token(*, pk: int):
from grandchallenge.github.models import GitHubUserToken

token = GitHubUserToken.objects.get(pk=pk)
Expand All @@ -204,7 +217,13 @@ def refresh_user_token(*, pk):
token.save()


@acks_late_micro_short_task
@acks_late_micro_short_task(name=f"{__name__}.refresh_expiring_user_tokens")
def refresh_expiring_user_tokens_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return refresh_expiring_user_tokens(**kwargs)


@lambda_task
def refresh_expiring_user_tokens():
"""Refresh user tokens expiring in the next 1 to 28 days"""
from grandchallenge.github.models import GitHubUserToken
Expand All @@ -214,6 +233,4 @@ def refresh_expiring_user_tokens():
refresh_token_expires__lt=now() + timedelta(days=28),
)
for token in queryset.iterator():
on_commit(
refresh_user_token.signature(kwargs={"pk": token.pk}).apply_async
)
refresh_user_token.execute_on_commit(pk=token.pk)
7 changes: 0 additions & 7 deletions app/grandchallenge/notifications/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@
from django.db.models import Count, F, Q
from lambda_tasks.decorators import lambda_task

from grandchallenge.core.celery import acks_late_micro_short_task
from grandchallenge.profiles.models import (
NotificationEmailOptions,
UserProfile,
)


@acks_late_micro_short_task(name=f"{__name__}.send_unread_notification_emails")
def send_unread_notification_emails_celery():
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return send_unread_notification_emails()


@lambda_task
def send_unread_notification_emails():
site = Site.objects.get_current()
Expand Down
5 changes: 1 addition & 4 deletions app/grandchallenge/profiles/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.contrib.auth.admin import UserAdmin
from django.db.transaction import on_commit

from grandchallenge.core.admin import (
GroupObjectPermissionAdmin,
Expand All @@ -29,9 +28,7 @@ class UserProfileInline(admin.StackedInline):
)
def deactivate_users(modeladmin, request, queryset):
for user in queryset.filter(is_active=True):
on_commit(
deactivate_user.signature(kwargs={"user_pk": user.pk}).apply_async
)
deactivate_user.execute_on_commit(user_pk=user.pk)


class UserProfileAdmin(UserAdmin):
Expand Down
19 changes: 16 additions & 3 deletions app/grandchallenge/profiles/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.utils.timezone import now
from lambda_tasks.decorators import lambda_task

from grandchallenge.browser_sessions.models import BrowserSession
from grandchallenge.core.celery import acks_late_micro_short_task


@acks_late_micro_short_task
@acks_late_micro_short_task(name=f"{__name__}.deactivate_user")
@transaction.atomic
def deactivate_user(*, user_pk):
def deactivate_user_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return deactivate_user(**kwargs)


@lambda_task
def deactivate_user(*, user_pk: int):
user = (
get_user_model().objects.select_related("verification").get(pk=user_pk)
)
Expand All @@ -30,8 +37,14 @@ def deactivate_user(*, user_pk):
BrowserSession.objects.filter(user=user).delete()


@acks_late_micro_short_task
@acks_late_micro_short_task(name=f"{__name__}.delete_users_who_dont_login")
@transaction.atomic
def delete_users_who_dont_login_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return delete_users_who_dont_login(**kwargs)


@lambda_task
def delete_users_who_dont_login():
"""Remove users who do not sign in after USER_LOGIN_TIMEOUT_DAYS"""
get_user_model().objects.exclude(
Expand Down
9 changes: 8 additions & 1 deletion app/grandchallenge/publications/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from celery.utils.log import get_task_logger
from lambda_tasks.decorators import lambda_task
from requests.exceptions import RequestException

from grandchallenge.core.celery import acks_late_2xlarge_task
Expand All @@ -7,7 +8,13 @@
logger = get_task_logger(__name__)


@acks_late_2xlarge_task
@acks_late_2xlarge_task(name=f"{__name__}.update_publication_metadata")
def update_publication_metadata_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return update_publication_metadata(**kwargs)


@lambda_task
def update_publication_metadata():
pks_to_delete = []

Expand Down
Loading
Loading