Skip to content

Commit 8ea3320

Browse files
committed
Fix creating new event loops
1 parent 7737273 commit 8ea3320

1 file changed

Lines changed: 13 additions & 18 deletions

File tree

  • app/grandchallenge/components/backends

app/grandchallenge/components/backends/base.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import aioboto3
1515
import boto3
1616
import botocore
17+
from asgiref.sync import async_to_sync
1718
from django.conf import settings
1819
from django.core.exceptions import SuspiciousFileOperation, ValidationError
1920
from django.db import transaction
@@ -192,16 +193,14 @@ def __init__(
192193
def provision(self, *, input_civs, input_prefixes):
193194
# We cannot run everything async as it requires database access.
194195
# So first we gather the definitions of the async tasks that
195-
# need to be run, then execute them in a new asyncio loop.
196+
# need to be run, then execute them in the event loop for
197+
# the current thread using @async_to_sync.
196198
provisioning_task_definitions = (
197199
self._get_provisioning_task_definitions(
198200
input_civs=input_civs, input_prefixes=input_prefixes
199201
)
200202
)
201-
202-
asyncio.run(
203-
self._provision(task_definitions=provisioning_task_definitions)
204-
)
203+
self._provision(task_definitions=provisioning_task_definitions)
205204

206205
@abstractmethod
207206
def execute(self): ...
@@ -410,24 +409,20 @@ def _get_key_and_relative_path(self, *, civ, input_prefixes):
410409

411410
return key, relative_path
412411

412+
@async_to_sync
413413
async def _provision(self, *, task_definitions):
414414
semaphore = asyncio.Semaphore(CONCURRENCY)
415415
session = aioboto3.Session()
416416

417-
provisioning_tasks = set()
418-
419-
for task_definition in task_definitions:
420-
aio_task = asyncio.create_task(
421-
task_definition.method(
422-
**task_definition.kwargs,
423-
semaphore=semaphore,
424-
session=session,
417+
async with asyncio.TaskGroup() as task_group:
418+
for task_definition in task_definitions:
419+
task_group.create_task(
420+
task_definition.method(
421+
**task_definition.kwargs,
422+
semaphore=semaphore,
423+
session=session,
424+
)
425425
)
426-
)
427-
provisioning_tasks.add(aio_task)
428-
aio_task.add_done_callback(provisioning_tasks.discard)
429-
430-
await asyncio.gather(*provisioning_tasks)
431426

432427
def _get_provisioning_task_definitions(
433428
self, *, input_civs, input_prefixes

0 commit comments

Comments
 (0)