|
68 | 68 | import fsspec |
69 | 69 | from fray import ResourceConfig |
70 | 70 | from marin.datakit.sources import DatakitSource, all_sources |
| 71 | +from marin.execution.executor_step_status import STATUS_SUCCESS, StatusFile, StepAlreadyDone, step_lock |
71 | 72 | from marin.execution.step_runner import StepRunner |
72 | 73 | from marin.execution.step_spec import StepSpec |
73 | 74 | from rigging.filesystem import marin_prefix |
@@ -303,10 +304,14 @@ def _finalize_executor_status(src_dir: str, dst_dir: str) -> None: |
303 | 304 |
|
304 | 305 |
|
305 | 306 | def _leaf_already_synced(dst_dir: str) -> bool: |
306 | | - """True if ``<dst_dir>/.executor_status`` already exists on the destination.""" |
307 | | - dst = _executor_status_path(dst_dir) |
308 | | - fs, _ = fsspec.core.url_to_fs(dst) |
309 | | - return fs.exists(dst) |
| 307 | + """True iff ``<dst_dir>/.executor_status`` exists AND its content is ``SUCCESS``. |
| 308 | +
|
| 309 | + A non-``SUCCESS`` marker (``FAILED``/``DEP_FAILED``/``RUNNING``/legacy |
| 310 | + log) means the prior writer didn't finish cleanly — re-sync instead of |
| 311 | + trusting it. Reuses :class:`StatusFile` so the legacy JSON-lines format |
| 312 | + still parses correctly. |
| 313 | + """ |
| 314 | + return StatusFile(dst_dir, worker_id="datakit-sync-check").status == STATUS_SUCCESS |
310 | 315 |
|
311 | 316 |
|
312 | 317 | def _remove_tmp_orphans(dst_dir: str) -> None: |
@@ -342,54 +347,67 @@ def _upload_dir( |
342 | 347 | copy_threads: int, |
343 | 348 | job_name: str, |
344 | 349 | ) -> None: |
345 | | - """Drive the Zephyr pipeline that copies one source directory.""" |
346 | | - rels = _list_relative_files(src_dir) |
347 | | - if not rels: |
348 | | - logger.warning("No files under %s — nothing to upload.", src_dir) |
349 | | - return |
350 | | - shards = _shard(rels, files_per_shard) |
351 | | - logger.info( |
352 | | - "Uploading %d files in %d shards (x%d copy threads/shard): %s -> %s", |
353 | | - len(rels), |
354 | | - len(shards), |
355 | | - copy_threads, |
356 | | - src_dir, |
357 | | - dst_dir, |
358 | | - ) |
| 350 | + """Drive the Zephyr pipeline that copies one source directory. |
| 351 | +
|
| 352 | + Serializes concurrent runs on the same leaf via :func:`step_lock` keyed |
| 353 | + on ``dst_dir``. The lock writes ``RUNNING`` to ``<dst_dir>/.executor_status`` |
| 354 | + on acquisition; ``_finalize_executor_status`` overwrites it with the src |
| 355 | + marker (``SUCCESS``) at the end. If another worker finalized the same leaf |
| 356 | + while we waited for the lock, :class:`StepAlreadyDone` is raised — we |
| 357 | + log and return without re-running. |
| 358 | + """ |
| 359 | + try: |
| 360 | + with step_lock(dst_dir, f"sync/{job_name}", force_run_failed=True): |
| 361 | + rels = _list_relative_files(src_dir) |
| 362 | + if not rels: |
| 363 | + logger.warning("No files under %s — nothing to upload.", src_dir) |
| 364 | + return |
| 365 | + shards = _shard(rels, files_per_shard) |
| 366 | + logger.info( |
| 367 | + "Uploading %d files in %d shards (x%d copy threads/shard): %s -> %s", |
| 368 | + len(rels), |
| 369 | + len(shards), |
| 370 | + copy_threads, |
| 371 | + src_dir, |
| 372 | + dst_dir, |
| 373 | + ) |
359 | 374 |
|
360 | | - # Each shard's JSONL output is named after its content hash, so |
361 | | - # ``skip_existing=True`` doubles as the resume marker: if the source |
362 | | - # files haven't changed the hash matches an existing key and Zephyr |
363 | | - # short-circuits the whole shard. If any source file's fingerprint |
364 | | - # changes, the hash changes and the shard re-runs. |
365 | | - shard_paths = [f"{shard_prefix.rstrip('/')}/{_shard_hash(shard)}.jsonl.gz" for shard in shards] |
366 | | - |
367 | | - pipeline = ( |
368 | | - Dataset.from_list(shards) |
369 | | - .map( |
370 | | - lambda shard: _copy_shard( |
371 | | - shard, |
372 | | - src_dir=src_dir, |
373 | | - dst_dir=dst_dir, |
374 | | - copy_threads=copy_threads, |
| 375 | + # Each shard's JSONL output is named after its content hash, so |
| 376 | + # ``skip_existing=True`` doubles as the resume marker: if the source |
| 377 | + # files haven't changed the hash matches an existing key and Zephyr |
| 378 | + # short-circuits the whole shard. If any source file's fingerprint |
| 379 | + # changes, the hash changes and the shard re-runs. |
| 380 | + shard_paths = [f"{shard_prefix.rstrip('/')}/{_shard_hash(shard)}.jsonl.gz" for shard in shards] |
| 381 | + |
| 382 | + pipeline = ( |
| 383 | + Dataset.from_list(shards) |
| 384 | + .map( |
| 385 | + lambda shard: _copy_shard( |
| 386 | + shard, |
| 387 | + src_dir=src_dir, |
| 388 | + dst_dir=dst_dir, |
| 389 | + copy_threads=copy_threads, |
| 390 | + ) |
| 391 | + ) |
| 392 | + .write_jsonl(lambda shard_idx, _total: shard_paths[shard_idx], skip_existing=True) |
375 | 393 | ) |
376 | | - ) |
377 | | - .write_jsonl(lambda shard_idx, _total: shard_paths[shard_idx], skip_existing=True) |
378 | | - ) |
379 | | - # Each shard fans out into ``copy_threads`` blocking I/O threads, so the |
380 | | - # worker needs at least that many cores' worth of headroom. |
381 | | - ctx = ZephyrContext( |
382 | | - name=job_name, |
383 | | - resources=ResourceConfig(cpu=max(2, copy_threads // 4), ram="4g"), |
384 | | - ) |
385 | | - ctx.execute(pipeline) |
386 | | - # Only now that every other file is at dst, publish ``.executor_status`` |
387 | | - # — its presence is the canonical "this leaf is fully synced" signal that |
388 | | - # lets future runs skip the source entirely. |
389 | | - _finalize_executor_status(src_dir, dst_dir) |
390 | | - # With the marker in place, the leaf is canonical; any ``.tmp.<uuid>`` |
391 | | - # debris under it is orphaned and can be removed. |
392 | | - _remove_tmp_orphans(dst_dir) |
| 394 | + # Each shard fans out into ``copy_threads`` blocking I/O threads, so the |
| 395 | + # worker needs at least that many cores' worth of headroom. |
| 396 | + ctx = ZephyrContext( |
| 397 | + name=job_name, |
| 398 | + resources=ResourceConfig(cpu=max(2, copy_threads // 4), ram="4g"), |
| 399 | + ) |
| 400 | + ctx.execute(pipeline) |
| 401 | + # Only now that every other file is at dst, publish ``.executor_status`` |
| 402 | + # — its presence is the canonical "this leaf is fully synced" signal that |
| 403 | + # lets future runs skip the source entirely. Overwrites the ``RUNNING`` |
| 404 | + # marker that ``step_lock`` wrote on acquisition. |
| 405 | + _finalize_executor_status(src_dir, dst_dir) |
| 406 | + # With the marker in place, the leaf is canonical; any ``.tmp.<uuid>`` |
| 407 | + # debris under it is orphaned and can be removed. |
| 408 | + _remove_tmp_orphans(dst_dir) |
| 409 | + except StepAlreadyDone: |
| 410 | + logger.info("Skip %s: another worker finalized it while we waited for the lock", dst_dir) |
393 | 411 |
|
394 | 412 |
|
395 | 413 | # ---------------------------------------------------------------------- |
|
0 commit comments