Skip to content

Commit 495a22d

Browse files
authored
Remove retry if dropped (#3998)
With the changes with database proxying this should no longer happen
1 parent 8a68cfe commit 495a22d

2 files changed

Lines changed: 46 additions & 108 deletions

File tree

app/grandchallenge/cases/tasks.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from tempfile import TemporaryDirectory
88

99
from billiard.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
10+
from django.apps import apps
1011
from django.conf import settings
1112
from django.core.exceptions import ValidationError
1213
from django.core.files import File
@@ -21,10 +22,7 @@
2122
from grandchallenge.cases.models import Image, ImageFile, RawImageUploadSession
2223
from grandchallenge.components.backends.utils import safe_extract
2324
from grandchallenge.components.models import ComponentInterface
24-
from grandchallenge.components.tasks import (
25-
get_model_instance,
26-
lock_model_instance,
27-
)
25+
from grandchallenge.components.tasks import lock_model_instance
2826
from grandchallenge.core.celery import acks_late_2xlarge_task
2927
from grandchallenge.core.exceptions import LockNotAcquiredException
3028
from grandchallenge.reader_studies.models import DisplaySet
@@ -140,12 +138,12 @@ def build_images( # noqa:C901
140138
)
141139

142140
if linked_object_pk:
141+
linked_model = apps.get_model(
142+
app_label=linked_app_label, model_name=linked_model_name
143+
)
144+
143145
try:
144-
linked_object = get_model_instance(
145-
app_label=linked_app_label,
146-
model_name=linked_model_name,
147-
pk=linked_object_pk,
148-
)
146+
linked_object = linked_model.objects.get(pk=linked_object_pk)
149147
except (ArchiveItem.DoesNotExist, DisplaySet.DoesNotExist):
150148
# users can delete archive items and display sets before this task runs
151149
logger.info(

app/grandchallenge/components/tasks.py

Lines changed: 39 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -717,43 +717,6 @@ def _get_image_config_file(
717717
return {"image_sha256": image_sha256, "config": config}
718718

719719

720-
def retry_if_dropped(func):
721-
"""
722-
Retry a function that relies on an open database connection.
723-
724-
Use this decorator when you have a long running task as sometimes the db
725-
connection will drop.
726-
"""
727-
728-
def wrapper(*args, **kwargs):
729-
n_tries = 0
730-
max_tries = 2
731-
err = None
732-
733-
while n_tries < max_tries:
734-
n_tries += 1
735-
736-
try:
737-
return func(*args, **kwargs)
738-
except OperationalError as e:
739-
err = e
740-
741-
# This needs to be a local import
742-
from django.db import connection
743-
744-
connection.close()
745-
746-
raise err
747-
748-
return wrapper
749-
750-
751-
@retry_if_dropped
752-
def get_model_instance(*, app_label, model_name, **kwargs):
753-
model = apps.get_model(app_label=app_label, model_name=model_name)
754-
return model.objects.get(**kwargs)
755-
756-
757720
def lock_model_instance(*, app_label, model_name, **kwargs):
758721
"""
759722
Locks a model instance for update.
@@ -827,9 +790,8 @@ def execute_job( # noqa: C901
827790
828791
Once the job has executed it will be in the EXECUTING or FAILURE states.
829792
"""
830-
job = get_model_instance(
831-
pk=job_pk, app_label=job_app_label, model_name=job_model_name
832-
)
793+
model = apps.get_model(app_label=job_app_label, model_name=job_model_name)
794+
job = model.objects.get(pk=job_pk)
833795
executor = job.get_executor(backend=backend)
834796

835797
if job.status == job.PROVISIONED:
@@ -844,15 +806,11 @@ def execute_job( # noqa: C901
844806
raise PriorStepFailed(msg)
845807

846808
try:
847-
# This call is potentially very long
848809
executor.execute()
849810
except RetryStep:
850811
job.update_status(status=job.PROVISIONED)
851812
raise
852813
except ComponentException as e:
853-
job = get_model_instance(
854-
pk=job_pk, app_label=job_app_label, model_name=job_model_name
855-
)
856814
job.update_status(
857815
status=job.FAILURE,
858816
stdout=executor.stdout,
@@ -861,19 +819,13 @@ def execute_job( # noqa: C901
861819
detailed_error_message=e.message_details,
862820
)
863821
except (SoftTimeLimitExceeded, TimeLimitExceeded):
864-
job = get_model_instance(
865-
pk=job_pk, app_label=job_app_label, model_name=job_model_name
866-
)
867822
job.update_status(
868823
status=job.FAILURE,
869824
stdout=executor.stdout,
870825
stderr=executor.stderr,
871826
error_message="Time limit exceeded",
872827
)
873828
except Exception:
874-
job = get_model_instance(
875-
pk=job_pk, app_label=job_app_label, model_name=job_model_name
876-
)
877829
job.update_status(
878830
status=job.FAILURE,
879831
stdout=executor.stdout,
@@ -1031,9 +983,8 @@ def retry_task(
1031983
backend: str,
1032984
):
1033985
"""Retries an existing task that was previously provisioned"""
1034-
job = get_model_instance(
1035-
pk=job_pk, app_label=job_app_label, model_name=job_model_name
1036-
)
986+
model = apps.get_model(app_label=job_app_label, model_name=job_model_name)
987+
job = model.objects.get(pk=job_pk)
1037988
executor = job.get_executor(backend=backend)
1038989

1039990
if job.status != job.PROVISIONED:
@@ -1062,27 +1013,24 @@ def deprovision_job(
10621013
job_model_name: str,
10631014
backend: str,
10641015
):
1065-
job = get_model_instance(
1066-
pk=job_pk, app_label=job_app_label, model_name=job_model_name
1067-
)
1016+
model = apps.get_model(app_label=job_app_label, model_name=job_model_name)
1017+
job = model.objects.get(pk=job_pk)
10681018

10691019
executor = job.get_executor(backend=backend)
10701020
executor.deprovision()
10711021

10721022

10731023
@shared_task
10741024
def start_service(*, pk: uuid.UUID, app_label: str, model_name: str):
1075-
session = get_model_instance(
1076-
pk=pk, app_label=app_label, model_name=model_name
1077-
)
1025+
model = apps.get_model(app_label=app_label, model_name=model_name)
1026+
session = model.objects.get(pk=pk)
10781027
session.start()
10791028

10801029

10811030
@shared_task
10821031
def stop_service(*, pk: uuid.UUID, app_label: str, model_name: str):
1083-
session = get_model_instance(
1084-
pk=pk, app_label=app_label, model_name=model_name
1085-
)
1032+
model = apps.get_model(app_label=app_label, model_name=model_name)
1033+
session = model.objects.get(pk=pk)
10861034
session.stop()
10871035

10881036

@@ -1235,60 +1183,52 @@ def preload_interactive_algorithms():
12351183

12361184

12371185
@acks_late_micro_short_task
1186+
@transaction.atomic
12381187
def add_image_to_component_interface_value(
12391188
*, component_interface_value_pk, upload_session_pk
12401189
):
1241-
with transaction.atomic():
1242-
session = RawImageUploadSession.objects.get(pk=upload_session_pk)
1190+
from grandchallenge.components.models import ComponentInterfaceValue
12431191

1244-
if session.image_set.count() != 1:
1245-
session.status = RawImageUploadSession.FAILURE
1246-
session.error_message = (
1247-
"Image imports should result in a single image"
1248-
)
1249-
session.save()
1250-
return
1192+
session = RawImageUploadSession.objects.get(pk=upload_session_pk)
12511193

1252-
civ = get_model_instance(
1253-
pk=component_interface_value_pk,
1254-
app_label="components",
1255-
model_name="componentinterfacevalue",
1256-
)
1194+
if session.image_set.count() != 1:
1195+
session.status = RawImageUploadSession.FAILURE
1196+
session.error_message = "Image imports should result in a single image"
1197+
session.save()
1198+
return
12571199

1258-
civ.image = session.image_set.get()
1259-
civ.full_clean()
1260-
civ.save()
1200+
civ = ComponentInterfaceValue.objects.get(pk=component_interface_value_pk)
1201+
1202+
civ.image = session.image_set.get()
1203+
civ.full_clean()
1204+
civ.save()
12611205

1262-
civ.image.update_viewer_groups_permissions()
1206+
civ.image.update_viewer_groups_permissions()
12631207

12641208

12651209
@acks_late_2xlarge_task
1210+
@transaction.atomic
12661211
def civ_value_to_file(*, civ_pk):
1267-
with transaction.atomic():
1268-
civ = get_model_instance(
1269-
pk=civ_pk,
1270-
app_label="components",
1271-
model_name="componentinterfacevalue",
1272-
)
1212+
from grandchallenge.components.models import ComponentInterfaceValue
12731213

1274-
if civ.value is None:
1275-
raise RuntimeError("CIV value is None")
1214+
civ = ComponentInterfaceValue.objects.get(pk=civ_pk)
12761215

1277-
civ.file = ContentFile(
1278-
json.dumps(civ.value).encode("utf-8"),
1279-
name=Path(civ.interface.relative_path).name,
1280-
)
1281-
civ.value = None
1282-
civ.save()
1216+
if civ.value is None:
1217+
raise RuntimeError("CIV value is None")
1218+
1219+
civ.file = ContentFile(
1220+
json.dumps(civ.value).encode("utf-8"),
1221+
name=Path(civ.interface.relative_path).name,
1222+
)
1223+
civ.value = None
1224+
civ.save()
12831225

12841226

12851227
@acks_late_2xlarge_task
12861228
def validate_voxel_values(*, civ_pk):
1287-
civ = get_model_instance(
1288-
pk=civ_pk,
1289-
app_label="components",
1290-
model_name="componentinterfacevalue",
1291-
)
1229+
from grandchallenge.components.models import ComponentInterfaceValue
1230+
1231+
civ = ComponentInterfaceValue.objects.get(pk=civ_pk)
12921232

12931233
first_file = civ.image.files.first()
12941234
if (

0 commit comments

Comments
 (0)