Skip to content

Commit 57c7178

Browse files
authored
Split compute and storage tasks (#4064)
See DIAGNijmegen/rse-roadmap#408
1 parent dd702fc commit 57c7178

4 files changed

Lines changed: 72 additions & 55 deletions

File tree

app/config/settings.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,14 @@ def sentry_before_send(event, hint):
12621262
"task": "grandchallenge.invoices.tasks.send_challenge_invoice_overdue_reminder_emails",
12631263
"schedule": crontab(day_of_month=1, hour=6, minute=0),
12641264
},
1265+
"update_challenge_storage_size": {
1266+
"task": "grandchallenge.challenges.tasks.update_challenge_storage_size",
1267+
"schedule": crontab(hour=6, minute=15),
1268+
},
1269+
"update_challenge_compute_costs": {
1270+
"task": "grandchallenge.challenges.tasks.update_challenge_compute_costs",
1271+
"schedule": crontab(minute=45),
1272+
},
12651273
"delete_users_who_dont_login": {
12661274
"task": "grandchallenge.profiles.tasks.delete_users_who_dont_login",
12671275
"schedule": timedelta(hours=1),
@@ -1286,10 +1294,6 @@ def sentry_before_send(event, hint):
12861294
"task": "grandchallenge.core.tasks.cleanup_celery_backend",
12871295
"schedule": timedelta(hours=1),
12881296
},
1289-
"update_compute_costs_and_storage_size": {
1290-
"task": "grandchallenge.challenges.tasks.update_compute_costs_and_storage_size",
1291-
"schedule": timedelta(hours=2),
1292-
},
12931297
"logout_privileged_users": {
12941298
"task": "grandchallenge.browser_sessions.tasks.logout_privileged_users",
12951299
"schedule": timedelta(hours=1),

app/grandchallenge/challenges/costs.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,23 @@
1919
)
2020

2121

22-
def annotate_job_duration_and_compute_costs(*, phase):
22+
def annotate_compute_costs(*, challenge):
2323
algorithm_job_utilizations = JobUtilization.objects.filter(
24-
phase=phase,
24+
challenge=challenge
25+
)
26+
evaluation_job_utilizations = EvaluationUtilization.objects.filter(
27+
challenge=challenge
28+
)
29+
30+
update_compute_cost_euro_millicents(
31+
obj=challenge,
32+
algorithm_job_utilizations=algorithm_job_utilizations,
33+
evaluation_job_utilizations=evaluation_job_utilizations,
2534
)
35+
36+
37+
def annotate_job_duration_and_compute_costs(*, phase):
38+
algorithm_job_utilizations = JobUtilization.objects.filter(phase=phase)
2639
evaluation_job_utilizations = EvaluationUtilization.objects.filter(
2740
phase=phase, external_evaluation=False
2841
)
@@ -38,7 +51,25 @@ def annotate_job_duration_and_compute_costs(*, phase):
3851
)
3952

4053

41-
def annotate_compute_costs_and_storage_size(*, challenge):
54+
def update_compute_cost_euro_millicents(
55+
*, obj, algorithm_job_utilizations, evaluation_job_utilizations
56+
):
57+
algorithm_job_costs = algorithm_job_utilizations.aggregate(
58+
Sum("compute_cost_euro_millicents")
59+
)
60+
61+
evaluation_costs = evaluation_job_utilizations.aggregate(
62+
Sum("compute_cost_euro_millicents")
63+
)
64+
65+
items = [algorithm_job_costs, evaluation_costs]
66+
67+
obj.compute_cost_euro_millicents = sum(
68+
item["compute_cost_euro_millicents__sum"] or 0 for item in items
69+
)
70+
71+
72+
def annotate_storage_size(*, challenge):
4273
permission = Permission.objects.get(
4374
codename="view_job",
4475
content_type__app_label="algorithms",
@@ -61,19 +92,6 @@ def annotate_compute_costs_and_storage_size(*, challenge):
6192
evaluation_jobs=evaluation_jobs,
6293
)
6394

64-
algorithm_job_utilizations = JobUtilization.objects.filter(
65-
challenge=challenge,
66-
)
67-
evaluation_job_utilizations = EvaluationUtilization.objects.filter(
68-
challenge=challenge
69-
)
70-
71-
update_compute_cost_euro_millicents(
72-
obj=challenge,
73-
algorithm_job_utilizations=algorithm_job_utilizations,
74-
evaluation_job_utilizations=evaluation_job_utilizations,
75-
)
76-
7795

7896
def update_size_in_storage_and_registry(
7997
*, challenge, algorithm_jobs, evaluation_jobs
@@ -170,21 +188,3 @@ def update_size_in_storage_and_registry(
170188
challenge.size_in_registry = sum(
171189
item.get("size_in_registry__sum") or 0 for item in items
172190
)
173-
174-
175-
def update_compute_cost_euro_millicents(
176-
*, obj, algorithm_job_utilizations, evaluation_job_utilizations
177-
):
178-
algorithm_job_costs = algorithm_job_utilizations.aggregate(
179-
Sum("compute_cost_euro_millicents")
180-
)
181-
182-
evaluation_costs = evaluation_job_utilizations.aggregate(
183-
Sum("compute_cost_euro_millicents")
184-
)
185-
186-
items = [algorithm_job_costs, evaluation_costs]
187-
188-
obj.compute_cost_euro_millicents = sum(
189-
item["compute_cost_euro_millicents__sum"] or 0 for item in items
190-
)

app/grandchallenge/challenges/tasks.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
from psycopg.errors import LockNotAvailable
1111

1212
from grandchallenge.challenges.costs import (
13-
annotate_compute_costs_and_storage_size,
13+
annotate_compute_costs,
1414
annotate_job_duration_and_compute_costs,
15+
annotate_storage_size,
1516
)
1617
from grandchallenge.challenges.emails import (
1718
send_onboarding_task_due_reminder,
@@ -94,20 +95,14 @@ def wrapper(*args, **kwargs):
9495

9596

9697
@acks_late_2xlarge_task
97-
def update_compute_costs_and_storage_size():
98+
def update_challenge_compute_costs():
9899
for challenge in Challenge.objects.with_available_compute().iterator():
99100
with transaction.atomic():
100-
annotate_compute_costs_and_storage_size(challenge=challenge)
101+
annotate_compute_costs(challenge=challenge)
101102

102103
@retry_with_backoff((LockNotAvailable,))
103104
def save_challenge():
104-
challenge.save(
105-
update_fields=(
106-
"size_in_storage",
107-
"size_in_registry",
108-
"compute_cost_euro_millicents",
109-
)
110-
)
105+
challenge.save(update_fields=("compute_cost_euro_millicents",))
111106

112107
save_challenge()
113108

@@ -128,6 +123,24 @@ def save_phase():
128123
save_phase()
129124

130125

126+
@acks_late_2xlarge_task
127+
def update_challenge_storage_size():
128+
for challenge in Challenge.objects.iterator():
129+
with transaction.atomic():
130+
annotate_storage_size(challenge=challenge)
131+
132+
@retry_with_backoff((LockNotAvailable,))
133+
def save_challenge():
134+
challenge.save(
135+
update_fields=(
136+
"size_in_storage",
137+
"size_in_registry",
138+
)
139+
)
140+
141+
save_challenge()
142+
143+
131144
class OnboardingTaskInfo(NamedTuple):
132145
challenge: str
133146
num_is_overdue: int

app/tests/challenges_tests/test_tasks.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
)
1212
from grandchallenge.challenges.tasks import (
1313
send_onboarding_task_reminder_emails,
14+
update_challenge_compute_costs,
1415
update_challenge_results_cache,
15-
update_compute_costs_and_storage_size,
1616
)
1717
from grandchallenge.invoices.models import PaymentStatusChoices
1818
from tests.evaluation_tests.factories import EvaluationFactory, PhaseFactory
@@ -164,7 +164,7 @@ def test_challenge_budget_alert_email(settings):
164164

165165
evaluation.utilization.compute_cost_euro_millicents = 500000
166166
evaluation.utilization.save()
167-
update_compute_costs_and_storage_size()
167+
update_challenge_compute_costs()
168168

169169
# Budget alert threshold not exceeded
170170
assert len(mail.outbox) == 0
@@ -175,7 +175,7 @@ def test_challenge_budget_alert_email(settings):
175175
)
176176
evaluation.utilization.compute_cost_euro_millicents = 300000
177177
evaluation.utilization.save()
178-
update_compute_costs_and_storage_size()
178+
update_challenge_compute_costs()
179179

180180
# Budget alert threshold exceeded
181181
assert len(mail.outbox) == 3
@@ -205,7 +205,7 @@ def test_challenge_budget_alert_email(settings):
205205
)
206206
evaluation.utilization.compute_cost_euro_millicents = 100000
207207
evaluation.utilization.save()
208-
update_compute_costs_and_storage_size()
208+
update_challenge_compute_costs()
209209

210210
# Next budget alert threshold not exceeded
211211
assert len(mail.outbox) == 0
@@ -216,7 +216,7 @@ def test_challenge_budget_alert_email(settings):
216216
)
217217
evaluation.utilization.compute_cost_euro_millicents = 1
218218
evaluation.utilization.save()
219-
update_compute_costs_and_storage_size()
219+
update_challenge_compute_costs()
220220

221221
# Next budget alert threshold exceeded
222222
assert len(mail.outbox) != 0
@@ -252,7 +252,7 @@ def test_challenge_budget_alert_two_thresholds_one_email(settings):
252252
)
253253
evaluation.utilization.compute_cost_euro_millicents = 950000
254254
evaluation.utilization.save()
255-
update_compute_costs_and_storage_size()
255+
update_challenge_compute_costs()
256256

257257
# Two budget alert thresholds exceeded, alert only sent for last one.
258258
assert len(mail.outbox) == 3
@@ -279,7 +279,7 @@ def test_challenge_budget_alert_no_budget():
279279
evaluation.utilization.compute_cost_euro_millicents = 1
280280
evaluation.utilization.save()
281281
assert len(mail.outbox) == 0
282-
update_compute_costs_and_storage_size()
282+
update_challenge_compute_costs()
283283
assert len(mail.outbox) != 0
284284
assert "Budget Consumed Alert" in mail.outbox[0].subject
285285

0 commit comments

Comments
 (0)