Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions .github/workflows/marin-datakit-smoke.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,52 @@ jobs:
retention-days: 1
if-no-files-found: ignore

# Parallel lane: verify every DatakitSource.staged_path resolves to a
# non-empty prefix under gs://marin-us-central1. Cheap (metadata only) and
# independent of the ferry, so runs alongside datakit-smoke.
datakit-sources-staged:
runs-on: ubuntu-latest
timeout-minutes: 30
concurrency:
group: datakit-sources-staged
cancel-in-progress: true
steps:
- name: Checkout code
uses: actions/checkout@v5

- name: Set up Python 3.12
uses: actions/setup-python@v6
with:
python-version: "3.12"

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
enable-cache: true

- name: Install dependencies
run: uv sync --all-packages --extra=cpu --no-default-groups

- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.IRIS_CI_GCP_SA_KEY }}

- name: Verify datakit source staging paths
shell: bash -l {0}
run: .venv/bin/python scripts/datakit/validate_source_staging.py

# Separate job so Slack always fires, even if the main job is force-killed
# after its grace window. `needs.datakit-smoke.result` reflects the main
# job outcome; failure()/cancelled() context functions only see this job's
# after its grace window. `needs.*.result` reflects the upstream job
# outcomes; failure()/cancelled() context functions only see this job's
# steps.
notify-slack:
needs: datakit-smoke
if: always() && (needs.datakit-smoke.result == 'failure' || needs.datakit-smoke.result == 'cancelled') && github.event_name == 'schedule'
needs: [datakit-smoke, datakit-sources-staged]
if: |
always() && github.event_name == 'schedule' && (
needs.datakit-smoke.result == 'failure' || needs.datakit-smoke.result == 'cancelled'
|| needs.datakit-sources-staged.result == 'failure' || needs.datakit-sources-staged.result == 'cancelled'
)
runs-on: ubuntu-latest
steps:
- name: Download Slack message
Expand Down
62 changes: 62 additions & 0 deletions scripts/datakit/validate_source_staging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright The Marin Authors
# SPDX-License-Identifier: Apache-2.0

"""Verify every Datakit source's staged dump terminated SUCCESS.

Each :class:`marin.datakit.sources.DatakitSource` with a non-empty ``staged_path``
must resolve to a GCS prefix under ``gs://marin-us-central1`` whose
``.executor_status`` file (plain text or legacy JSON-lines) reports ``SUCCESS`` —
otherwise the ferry's verify-only download step is pointing at a partial or
missing dump. Enforced daily as a parallel lane of the datakit-smoke workflow.
"""

import logging
import sys
from concurrent.futures import ThreadPoolExecutor

from marin.datakit.sources import all_sources
from marin.execution.executor_step_status import STATUS_SUCCESS, StatusFile
from rigging.log_setup import configure_logging

logger = logging.getLogger(__name__)

BUCKET = "gs://marin-us-central1"
MAX_WORKERS = 16
WORKER_ID = "datakit-smoke-sources-check"


def _check(staged_path: str) -> tuple[str, str]:
"""Return (output_path, status) where status is ``SUCCESS`` or a failure token."""
output_path = f"{BUCKET}/{staged_path}"
status = StatusFile(output_path, worker_id=WORKER_ID).status
return output_path, status or "MISSING"


def main() -> None:
configure_logging()
sources = all_sources()
unique_paths = sorted({s.staged_path for s in sources.values() if s.staged_path})
logger.info("Verifying %d unique staged paths under %s", len(unique_paths), BUCKET)

bad: list[tuple[str, str]] = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
for output_path, status in pool.map(_check, unique_paths):
if status == STATUS_SUCCESS:
logger.debug("OK: %s", output_path)
else:
logger.error("%s: %s", status, output_path)
bad.append((output_path, status))

if bad:
raise SystemExit(f"{len(bad)}/{len(unique_paths)} staged paths not SUCCESS under {BUCKET}")
logger.info("All %d staged paths report SUCCESS under %s", len(unique_paths), BUCKET)


if __name__ == "__main__":
try:
main()
except SystemExit:
raise
except Exception as exc:
logger.error("Validation failed: %s", exc)
sys.exit(1)
Loading