Skip to content

Commit 3e2ab77

Browse files
ravwojdylaclaude
andcommitted
datakit staging check: require .executor_status == SUCCESS
Tighten the lane from "prefix has at least one object" to "the executor step that produced it reports SUCCESS". Reuses StatusFile so both the plain-text and legacy JSON-lines formats are handled (the former is a single token, the latter is an event log where the latest status wins). Surfaces two existing issues in the registry on first run: - raw/common_corpus_english-b78a5c1: no .executor_status file at all - raw/finetranslations_d17a789b: status=RUNNING (never terminated) Both will need to be cleaned up — or the source entries dropped from the registry — for this lane to go green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 43568ce commit 3e2ab77

1 file changed

Lines changed: 21 additions & 22 deletions

File tree

scripts/datakit/validate_source_staging.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,55 @@
11
# Copyright The Marin Authors
22
# SPDX-License-Identifier: Apache-2.0
33

4-
"""Verify every Datakit source's staged_path exists under gs://marin-us-central1.
4+
"""Verify every Datakit source's staged dump terminated SUCCESS.
55
66
Each :class:`marin.datakit.sources.DatakitSource` with a non-empty ``staged_path``
7-
must resolve to a GCS prefix with at least one object — otherwise the ferry's
8-
verify-only download step will 404 at runtime. Enforced daily as a parallel
9-
lane of the datakit-smoke workflow.
7+
must resolve to a GCS prefix under ``gs://marin-us-central1`` whose
8+
``.executor_status`` file (plain text or legacy JSON-lines) reports ``SUCCESS`` —
9+
otherwise the ferry's verify-only download step is pointing at a partial or
10+
missing dump. Enforced daily as a parallel lane of the datakit-smoke workflow.
1011
"""
1112

1213
import logging
1314
import sys
1415
from concurrent.futures import ThreadPoolExecutor
1516

1617
from marin.datakit.sources import all_sources
17-
from rigging.filesystem import url_to_fs
18+
from marin.execution.executor_step_status import STATUS_SUCCESS, StatusFile
1819
from rigging.log_setup import configure_logging
1920

2021
logger = logging.getLogger(__name__)
2122

2223
BUCKET = "gs://marin-us-central1"
2324
MAX_WORKERS = 16
25+
WORKER_ID = "datakit-smoke-sources-check"
2426

2527

26-
def _check(full_path: str) -> tuple[str, bool]:
27-
fs, _ = url_to_fs(full_path)
28-
try:
29-
children = fs.ls(full_path, detail=False)
30-
except FileNotFoundError:
31-
return full_path, False
32-
return full_path, bool(children)
28+
def _check(staged_path: str) -> tuple[str, str]:
29+
"""Return (output_path, status) where status is ``SUCCESS`` or a failure token."""
30+
output_path = f"{BUCKET}/{staged_path}"
31+
status = StatusFile(output_path, worker_id=WORKER_ID).status
32+
return output_path, status or "MISSING"
3333

3434

3535
def main() -> None:
3636
configure_logging()
3737
sources = all_sources()
3838
unique_paths = sorted({s.staged_path for s in sources.values() if s.staged_path})
3939
logger.info("Verifying %d unique staged paths under %s", len(unique_paths), BUCKET)
40-
urls = [f"{BUCKET}/{p}" for p in unique_paths]
4140

42-
missing: list[str] = []
41+
bad: list[tuple[str, str]] = []
4342
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
44-
for full, exists in pool.map(_check, urls):
45-
if exists:
46-
logger.debug("OK: %s", full)
43+
for output_path, status in pool.map(_check, unique_paths):
44+
if status == STATUS_SUCCESS:
45+
logger.debug("OK: %s", output_path)
4746
else:
48-
logger.error("MISSING: %s", full)
49-
missing.append(full)
47+
logger.error("%s: %s", status, output_path)
48+
bad.append((output_path, status))
5049

51-
if missing:
52-
raise SystemExit(f"{len(missing)}/{len(unique_paths)} staged paths missing under {BUCKET}")
53-
logger.info("All %d staged paths present under %s", len(unique_paths), BUCKET)
50+
if bad:
51+
raise SystemExit(f"{len(bad)}/{len(unique_paths)} staged paths not SUCCESS under {BUCKET}")
52+
logger.info("All %d staged paths report SUCCESS under %s", len(unique_paths), BUCKET)
5453

5554

5655
if __name__ == "__main__":

0 commit comments

Comments
 (0)