Skip to content

Commit 79810b9

Browse files
authored
Improve Concurrent Log Upload (#394)
* Improve Concurrent Log Upload * Fix lint * Remove fsync
1 parent 0ea9bc5 commit 79810b9

File tree

1 file changed

+12
-6
lines changed

1 file changed

+12
-6
lines changed

src/utils/job/jobs.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
from typing import List, Dict, Tuple, Type
3232
import urllib.parse
3333

34-
import aiofiles
3534
import redis # type: ignore
3635
import redis.asyncio # type: ignore
3736
import pydantic
@@ -1381,28 +1380,35 @@ def execute(self, context: JobExecutionContext,
13811380
async def migrate_logs(redis_url: str, redis_key: str, file_name: str):
13821381
''' Uploads logs to S3 and deletes them from Redis. Returns the S3 file path. '''
13831382

1384-
async with aiofiles.tempfile.NamedTemporaryFile(mode='w+') as temp_file:
1383+
fd, tmp_path = tempfile.mkstemp(suffix='.log')
1384+
try:
1385+
os.close(fd)
1386+
13851387
await connectors.write_redis_log_to_disk(
13861388
redis_url,
13871389
redis_key,
1388-
str(temp_file.name),
1390+
tmp_path,
13891391
)
13901392

13911393
await progress_writer.report_progress_async()
13921394

1393-
await temp_file.flush()
1394-
13951395
# Wrap the call in a concrete no-arg function to avoid overload issues during lint.
13961396
def _upload_logs() -> storage.UploadSummary:
13971397
return storage_client.upload_objects(
1398-
source=str(temp_file.name),
1398+
source=tmp_path,
13991399
destination_prefix=self.workflow_id,
14001400
destination_name=file_name,
14011401
)
14021402

14031403
await asyncio.to_thread(_upload_logs)
14041404

14051405
await progress_writer.report_progress_async()
1406+
finally:
1407+
# Clean up the temp file ourselves
1408+
try:
1409+
os.unlink(tmp_path)
1410+
except OSError:
1411+
pass
14061412

14071413
semaphore = asyncio.Semaphore(CONCURRENT_UPLOADS)
14081414

0 commit comments

Comments
 (0)