Skip to content

Commit a70540f

Browse files
committed
feat(upload_worker): enhance background FULL upload process with improved error handling and cleanup
1 parent 08b304e commit a70540f

File tree

1 file changed

+45
-10
lines changed

1 file changed

+45
-10
lines changed

grail/trainer/upload_worker.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import logging
1616
import multiprocessing
1717
import queue
18+
import shutil
19+
import tempfile
1820
import time
1921
from pathlib import Path
2022
from typing import Any
@@ -76,7 +78,7 @@ async def _upload_full_background(
7678
staging_path: Path,
7779
checkpoint_publisher: CheckpointPublisher,
7880
target_window: int,
79-
) -> None:
81+
) -> bool:
8082
"""Upload FULL checkpoint in background (non-blocking).
8183
8284
This runs as a fire-and-forget task at anchor windows while DELTA upload
@@ -90,8 +92,15 @@ async def _upload_full_background(
9092
"""
9193
try:
9294
logger.info("Background FULL upload starting for checkpoint-%s", target_window)
93-
await checkpoint_publisher.upload_full_background(staging_path, target_window)
94-
logger.info("Background FULL upload completed for checkpoint-%s", target_window)
95+
ok = await checkpoint_publisher.upload_full_background(staging_path, target_window)
96+
if ok:
97+
logger.info("Background FULL upload completed for checkpoint-%s", target_window)
98+
else:
99+
logger.warning(
100+
"Background FULL upload did not complete cleanly for checkpoint-%s",
101+
target_window,
102+
)
103+
return ok
95104
except Exception as exc:
96105
# Background task - log error but don't propagate
97106
# Next anchor window will retry, and DELTA chain remains valid
@@ -100,6 +109,10 @@ async def _upload_full_background(
100109
target_window,
101110
exc,
102111
)
112+
return False
113+
finally:
114+
# This path is a dedicated copy created by the upload worker; it is safe to remove.
115+
shutil.rmtree(staging_path, ignore_errors=True)
103116

104117

105118
async def upload_worker_loop(
@@ -266,14 +279,40 @@ async def upload_worker_loop(
266279
"Starting background FULL upload for anchor checkpoint-%s",
267280
checkpoint_window,
268281
)
269-
asyncio.create_task(
282+
# Copy staging synchronously to avoid a race with snapshot_manager.cleanup_staging()
283+
bg_dir = Path(tempfile.mkdtemp(prefix=f"bg-full-{checkpoint_window}-"))
284+
shutil.copytree(staging_path, bg_dir, dirs_exist_ok=True)
285+
286+
bg_task = asyncio.create_task(
270287
_upload_full_background(
271-
staging_path,
288+
bg_dir,
272289
checkpoint_publisher,
273290
checkpoint_window,
274291
)
275292
)
276293

294+
checkpoint_window_for_task = checkpoint_window
295+
296+
def _on_bg_done(
297+
task: asyncio.Task[bool],
298+
*,
299+
_checkpoint_window: int = checkpoint_window_for_task,
300+
) -> None:
301+
nonlocal anchor_window
302+
try:
303+
ok = task.result()
304+
except Exception: # noqa: BLE001
305+
ok = False
306+
if ok:
307+
# Only advance the anchor after we know the FULL upload succeeded.
308+
anchor_window = _checkpoint_window
309+
logger.info(
310+
"✅ Background FULL anchor confirmed for checkpoint-%s",
311+
_checkpoint_window,
312+
)
313+
314+
bg_task.add_done_callback(_on_bg_done)
315+
277316
except UploadError:
278317
# Delta upload failed, fallback to FULL upload
279318
logger.warning(
@@ -302,11 +341,7 @@ async def upload_worker_loop(
302341
prev_window = checkpoint_window
303342
prev_state = loaded_state
304343
# Update anchor on FULL uploads (including background FULL at anchors)
305-
is_new_anchor = (
306-
should_upload_full_only
307-
or did_fallback_to_full
308-
or should_upload_full_background
309-
)
344+
is_new_anchor = should_upload_full_only or did_fallback_to_full
310345
if is_new_anchor:
311346
anchor_window = checkpoint_window
312347
logger.info(

0 commit comments

Comments
 (0)