Skip to content

Commit 0edce61

Browse files
committed
Refactor upload static
Uses the async best practices we found in #4342
1 parent 18f11ed commit 0edce61

2 files changed

Lines changed: 66 additions & 81 deletions

File tree

app/grandchallenge/components/backends/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
MAX_SPOOL_SIZE = 1_000_000_000 # 1GB
4949

5050
CONCURRENCY = 50
51-
BOTO_CONFIG = Config(max_pool_connections=120)
51+
ASYNC_BOTO_CONFIG = Config(max_pool_connections=120)
5252

5353

5454
class JobParams(NamedTuple):
@@ -410,7 +410,9 @@ async def _provision(self, *, tasks):
410410
session = aioboto3.Session()
411411

412412
async with session.client(
413-
"s3", endpoint_url=settings.AWS_S3_ENDPOINT_URL, config=BOTO_CONFIG
413+
"s3",
414+
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
415+
config=ASYNC_BOTO_CONFIG,
414416
) as s3_client:
415417
async with asyncio.TaskGroup() as task_group:
416418
for task in tasks:

app/grandchallenge/core/management/commands/upload_static.py

Lines changed: 62 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22
import logging
33
from pathlib import Path
44

5-
import boto3
6-
from botocore.exceptions import BotoCoreError, ClientError
5+
import aioboto3
6+
from asgiref.sync import async_to_sync
77
from django.conf import settings
8-
from django.core.management.base import BaseCommand, CommandError
8+
from django.core.management.base import BaseCommand
9+
10+
from grandchallenge.components.backends.base import (
11+
ASYNC_BOTO_CONFIG,
12+
CONCURRENCY,
13+
)
914

1015
logger = logging.getLogger(__name__)
1116

@@ -32,57 +37,52 @@
3237
}
3338

3439

40+
async def s3_upload_file(
41+
*, filename, bucket, key, content_type, cache_control, semaphore, s3_client
42+
):
43+
async with semaphore:
44+
await s3_client.upload_file(
45+
Filename=filename,
46+
Bucket=bucket,
47+
Key=key,
48+
ExtraArgs={
49+
"ContentType": content_type,
50+
"CacheControl": cache_control,
51+
},
52+
)
53+
logger.info(f"Uploaded s3://{bucket}/{key}")
54+
55+
3556
class Command(BaseCommand):
3657
help = "Uploads static files to an S3 bucket"
3758

3859
def add_arguments(self, parser):
3960
parser.add_argument(
4061
"--bucket", type=str, required=True, help="S3 bucket name"
4162
)
42-
parser.add_argument(
43-
"--concurrency",
44-
type=int,
45-
default=10,
46-
help="Number of concurrent uploads",
47-
)
4863

4964
def handle(self, *args, **options):
5065
bucket_name = options["bucket"]
51-
concurrency = options["concurrency"]
52-
53-
try:
54-
s3_client = boto3.client("s3")
5566

56-
files_to_upload = self._get_files_to_upload()
67+
files_to_upload = self._get_files_to_upload()
5768

58-
if not files_to_upload:
59-
raise RuntimeError("No files found to upload")
69+
if not files_to_upload:
70+
raise RuntimeError("No files found to upload")
6071

61-
self.stdout.write(
62-
f"Found {len(files_to_upload)} files to upload to {bucket_name}"
63-
)
72+
self.stdout.write(
73+
f"Found {len(files_to_upload)} files to upload to {bucket_name}"
74+
)
6475

65-
asyncio.run(
66-
self._upload_files(
67-
s3_client=s3_client,
68-
bucket_name=bucket_name,
69-
files=files_to_upload,
70-
concurrency=concurrency,
71-
)
72-
)
76+
self._upload_files(
77+
bucket_name=bucket_name,
78+
files=files_to_upload,
79+
)
7380

74-
self.stdout.write(
75-
self.style.SUCCESS(
76-
f"Successfully uploaded {len(files_to_upload)} files to {bucket_name}"
77-
)
81+
self.stdout.write(
82+
self.style.SUCCESS(
83+
f"Successfully uploaded {len(files_to_upload)} files to {bucket_name}"
7884
)
79-
80-
except (BotoCoreError, ClientError) as e:
81-
self.stderr.write(self.style.ERROR(f"AWS Error: {str(e)}"))
82-
raise CommandError(f"Failed to upload files: {str(e)}")
83-
except Exception as e:
84-
self.stderr.write(self.style.ERROR(f"Unexpected error: {str(e)}"))
85-
raise CommandError(f"Failed to upload files: {str(e)}")
85+
)
8686

8787
def _get_files_to_upload(self) -> list[Path]:
8888
files = []
@@ -94,52 +94,35 @@ def _get_files_to_upload(self) -> list[Path]:
9494

9595
return files
9696

97+
@async_to_sync
9798
async def _upload_files(
9899
self,
99100
*,
100-
s3_client,
101101
bucket_name: str,
102102
files: list[Path],
103-
concurrency: int,
104103
) -> None:
105-
semaphore = asyncio.Semaphore(concurrency)
106-
errors: set[str] = set()
107-
108-
async def upload_file(file_path: Path) -> None:
109-
async with semaphore:
110-
relative_path = file_path.relative_to(
111-
Path(settings.STATIC_ROOT).parent
112-
)
113-
s3_key = str(relative_path)
114-
115-
try:
116-
loop = asyncio.get_event_loop()
117-
await loop.run_in_executor(
118-
None,
119-
lambda: s3_client.upload_file(
120-
Filename=str(file_path),
121-
Bucket=bucket_name,
122-
Key=s3_key,
123-
ExtraArgs={
124-
"ContentType": CONTENT_TYPES[
125-
file_path.suffix.lower()
126-
],
127-
"CacheControl": settings.PUBLIC_FILE_CACHE_CONTROL,
128-
},
129-
),
104+
semaphore = asyncio.Semaphore(CONCURRENCY)
105+
session = aioboto3.Session()
106+
107+
async with session.client(
108+
"s3",
109+
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
110+
config=ASYNC_BOTO_CONFIG,
111+
) as s3_client:
112+
async with asyncio.TaskGroup() as task_group:
113+
for file in files:
114+
relative_path = file.relative_to(
115+
Path(settings.STATIC_ROOT).parent
130116
)
131-
self.stdout.write(f"Uploaded: {s3_key}")
132-
except Exception as e:
133-
error_msg = f"Failed to upload {s3_key}: {str(e)}"
134-
errors.add(error_msg)
135-
self.stderr.write(self.style.ERROR(error_msg))
136117

137-
tasks = [upload_file(file_path) for file_path in files]
138-
139-
await asyncio.gather(*tasks)
140-
141-
if errors:
142-
error_count = len(errors)
143-
raise CommandError(
144-
f"Failed to upload {error_count} files. First error: {next(iter(errors))}"
145-
)
118+
task_group.create_task(
119+
s3_upload_file(
120+
filename=str(file),
121+
bucket=bucket_name,
122+
key=str(relative_path),
123+
content_type=CONTENT_TYPES[file.suffix.lower()],
124+
cache_control=settings.PUBLIC_FILE_CACHE_CONTROL,
125+
semaphore=semaphore,
126+
s3_client=s3_client,
127+
)
128+
)

0 commit comments

Comments
 (0)