|
| 1 | +"""Pipeline orchestration for AQS monitors.""" |
| 2 | +from __future__ import annotations |
| 3 | +import sys |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +# When running this module directly (python -m pipelines.aqs.monitors_run) |
| 7 | +# the repository's `src` directory may not be on sys.path. Prepend it so |
| 8 | +# `import soar` works from the repo root. |
| 9 | +ROOT = Path(__file__).resolve().parents[2] |
| 10 | +sys.path.insert(0, str(ROOT / "src")) |
| 11 | + |
| 12 | +from soar import config # noqa: E402 |
| 13 | +from soar.aqs.extractors.monitors import fetch_monitors # noqa: E402 |
| 14 | +from soar.aqs.transformers.monitors import add_site_id, to_curated, to_staged # noqa: E402 |
| 15 | +from soar.loaders.filesystem import write_csv, write_parquet # noqa: E402 |
| 16 | + |
| 17 | +RAW_PARQUET = config.RAW / "monitors_raw.parquet" |
| 18 | +RAW_CSV = config.RAW / "monitors_raw.csv" |
| 19 | +CURATED_PARQUET = config.TRANS / "monitors_curated.parquet" |
| 20 | +CURATED_CSV = config.TRANS / "monitors_curated.csv" |
| 21 | +STAGED_PARQUET = config.STAGED / "monitors_staged.parquet" |
| 22 | +STAGED_CSV = config.STAGED / "monitors_staged.csv" |
| 23 | + |
| 24 | + |
| 25 | +def run() -> None: |
| 26 | + """Execute the monitors pipeline end-to-end.""" |
| 27 | + config.ensure_dirs(config.RAW, config.TRANS, config.STAGED) |
| 28 | + config.set_aqs_credentials() |
| 29 | + |
| 30 | + raw = fetch_monitors(["88101"], config.BDATE, config.EDATE, config.STATE) |
| 31 | + raw_with_ids = add_site_id(raw) if not raw.empty else raw |
| 32 | + curated = to_curated(raw_with_ids) if not raw.empty else raw_with_ids |
| 33 | + staged = to_staged(curated) |
| 34 | + |
| 35 | + write_parquet(raw_with_ids, RAW_PARQUET) |
| 36 | + write_csv(raw_with_ids, RAW_CSV) |
| 37 | + write_parquet(curated, CURATED_PARQUET) |
| 38 | + write_csv(curated, CURATED_CSV) |
| 39 | + write_parquet(staged, STAGED_PARQUET) |
| 40 | + write_csv(staged, STAGED_CSV) |
| 41 | + |
| 42 | + print( |
| 43 | + " | ".join( |
| 44 | + [ |
| 45 | + f"raw={len(raw_with_ids)}", |
| 46 | + f"curated={len(curated)}", |
| 47 | + f"staged={len(staged)}", |
| 48 | + ] |
| 49 | + ) |
| 50 | + ) |
| 51 | + |
| 52 | + |
| 53 | +if __name__ == "__main__": |
| 54 | + run() |
0 commit comments