Skip to content

Commit 6ab9b6a

Browse files
committed
Add monitors and AQI daily transformers with pipelines
- Add monitors transformer: reads raw monitors data, selects specific fields, removes duplicates by site_code - Add AQI daily transformer: combines daily pollutant data by year, creates site_code, filters AQI data - Add pipeline scripts for running monitors and AQI daily transformations - Update monitors extractor to remove obsolete load_parameters_csv function - Improve TRV transformers with better unit handling and vectorized operations
1 parent 8b7e724 commit 6ab9b6a

9 files changed

Lines changed: 619 additions & 81 deletions

File tree

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""Pipeline for transforming AQI daily data.
2+
3+
This pipeline reads raw AQI daily summary files, combines all pollutants
4+
for each year, applies transformations, and writes cleaned data to the
5+
transform layer organized by year.
6+
"""
7+
from __future__ import annotations
8+
import sys
9+
from pathlib import Path
10+
from datetime import date
11+
12+
import pandas as pd
13+
14+
# Add src directory to Python path
15+
ROOT = Path(__file__).resolve().parents[2]
16+
sys.path.insert(0, str(ROOT / "src"))
17+
18+
import config
19+
from aqs.transformers.aqi_daily import transform_aqi_daily_for_year
20+
from loaders.filesystem import write_csv
21+
22+
23+
def run():
24+
"""Run the AQI daily transformation pipeline."""
25+
print("🚀 Starting AQI Daily Transformation Pipeline")
26+
print(f"📅 Date: {date.today()}")
27+
28+
# Setup
29+
config.ensure_dirs()
30+
31+
# Input directory (raw daily data)
32+
raw_daily_dir = config.RAW_DAILY
33+
34+
if not raw_daily_dir.exists():
35+
print(f"❌ Raw daily directory not found: {raw_daily_dir}")
36+
print(" Please run the daily extraction pipeline first.")
37+
return
38+
39+
# Output directory
40+
output_dir = config.ROOT / "transform" / "aqi"
41+
output_dir.mkdir(parents=True, exist_ok=True)
42+
43+
# Process each year in the date range
44+
years_processed = 0
45+
total_records = 0
46+
47+
for year in range(config.START_YEAR, config.END_YEAR + 1):
48+
year_str = str(year)
49+
print(f"\n📅 Processing year {year_str}...")
50+
51+
# Transform data for this year
52+
transformed_df = transform_aqi_daily_for_year(year_str, raw_daily_dir)
53+
54+
if transformed_df.empty:
55+
print(f"⚠️ No data for year {year_str}, skipping")
56+
continue
57+
58+
# Write to transform layer
59+
output_path = output_dir / f"aqi_aqs_daily_{year_str}.csv"
60+
write_csv(transformed_df, output_path)
61+
62+
print(f"✅ Wrote {len(transformed_df)} AQI records to {output_path}")
63+
64+
years_processed += 1
65+
total_records += len(transformed_df)
66+
67+
print("\n🎉 AQI daily transformation complete!")
68+
print(f"📊 Processed {years_processed} years with {total_records} total records")
69+
70+
71+
if __name__ == "__main__":
72+
run()

pipelines/aqs/monitors_run.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""Pipeline for extracting all Oregon monitors metadata from AQS API.
2+
3+
This pipeline fetches monitor metadata for all relevant parameters in Oregon
4+
from 2005-2025, deduplicates by site, and writes the results to the data lake.
5+
"""
6+
from __future__ import annotations
7+
import sys
8+
from pathlib import Path
9+
from datetime import date
10+
11+
# Add src directory to Python path
12+
ROOT = Path(__file__).resolve().parents[2]
13+
sys.path.insert(0, str(ROOT / "src"))
14+
15+
import config
16+
from aqs.extractors.monitors import fetch_all_monitors_for_oregon
17+
from loaders.filesystem import write_csv
18+
19+
20+
def run():
21+
"""Run the Oregon monitors extraction pipeline."""
22+
print("🚀 Starting Oregon Monitors Extraction Pipeline")
23+
print(f"📅 Date: {date.today()}")
24+
25+
# Setup
26+
config.ensure_dirs()
27+
config.set_aqs_credentials()
28+
29+
# Date range
30+
bdate = date(2005, 1, 1)
31+
edate = date(2025, 12, 31)
32+
print(f"📅 Processing monitors from {bdate} to {edate}")
33+
34+
# Fetch monitors
35+
print("\n📡 Fetching monitor metadata from AQS API...")
36+
monitors_df = fetch_all_monitors_for_oregon(bdate, edate)
37+
38+
if monitors_df.empty:
39+
print("❌ No monitors found")
40+
return
41+
42+
# Write to data lake
43+
output_dir = config.ROOT / "raw" / "aqs"/"monitors"
44+
output_dir.mkdir(parents=True, exist_ok=True)
45+
output_path = output_dir / "oregon_monitors_2005_2025.csv"
46+
47+
write_csv(monitors_df, output_path)
48+
print(f"\n✅ Wrote {len(monitors_df)} unique monitor records to {output_path}")
49+
50+
print("\n🎉 Monitors extraction complete!")
51+
52+
53+
if __name__ == "__main__":
54+
run()
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Pipeline for transforming Oregon monitors metadata.
2+
3+
This pipeline reads raw monitor metadata, selects specific fields,
4+
removes duplicates by site_code, and writes cleaned monitor records
5+
to the transform layer.
6+
"""
7+
from __future__ import annotations
8+
import sys
9+
from pathlib import Path
10+
from datetime import date
11+
12+
import pandas as pd
13+
14+
# Add src directory to Python path
15+
ROOT = Path(__file__).resolve().parents[2]
16+
sys.path.insert(0, str(ROOT / "src"))
17+
18+
import config
19+
from aqs.transformers.monitors import transform_monitors
20+
from loaders.filesystem import write_csv
21+
22+
23+
def run():
24+
"""Run the Oregon monitors transformation pipeline."""
25+
print("🚀 Starting Oregon Monitors Transformation Pipeline")
26+
print(f"📅 Date: {date.today()}")
27+
28+
# Setup
29+
config.ensure_dirs()
30+
31+
# Input path (raw monitors)
32+
input_path = config.ROOT / "raw" / "aqs" / "monitors" / "oregon_monitors_2005_2025.csv"
33+
34+
if not input_path.exists():
35+
print(f"❌ Raw monitors file not found: {input_path}")
36+
print(" Please run the monitors extraction pipeline first.")
37+
return
38+
39+
# Read raw monitors data
40+
print(f"\n📖 Reading raw monitors from {input_path}")
41+
raw_monitors_df = pd.read_csv(input_path)
42+
43+
if raw_monitors_df.empty:
44+
print("❌ No raw monitors data found")
45+
return
46+
47+
print(f"📊 Loaded {len(raw_monitors_df)} raw monitor records")
48+
49+
# Transform monitors
50+
print("\n🔄 Transforming monitor data...")
51+
transformed_df = transform_monitors(raw_monitors_df)
52+
53+
if transformed_df.empty:
54+
print("❌ Transformation resulted in empty dataset")
55+
return
56+
57+
# Write to transform layer
58+
output_dir = config.ROOT / "transform" / "monitors"
59+
output_dir.mkdir(parents=True, exist_ok=True)
60+
output_path = output_dir / "aqs_monitors.csv"
61+
62+
write_csv(transformed_df, output_path)
63+
print(f"\n✅ Wrote {len(transformed_df)} transformed monitor records to {output_path}")
64+
65+
print("\n🎉 Monitors transformation complete!")
66+
67+
68+
if __name__ == "__main__":
69+
run()

src/aqs/extractors/monitors.py

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import json
2020
from urllib.parse import urlencode
21+
from concurrent.futures import ThreadPoolExecutor
22+
import threading
2123

2224
import pandas as pd
2325
import requests
2426

2527
from pyaqsapi import bystate
2628
import config
29+
from aqs import _client
2730

2831

2932
def _ensure_dataframe(payload: object) -> Optional[pd.DataFrame]:
@@ -139,7 +142,7 @@ def fetch_samples_for_parameter(parameter_code: str, bdate: date, edate: date, s
139142

140143
samples_frames: List[pd.DataFrame] = []
141144
# Expect monitors to contain state_code, county_code, site_number columns
142-
unique_sites = monitors[["state_code", "county_code", "site_number"]].drop_duplicates()
145+
unique_sites = monitors[["state_code", "county_code", "site_number", "parameter_code"]].drop_duplicates()
143146
for _, row in unique_sites.iterrows():
144147
state = str(row["state_code"]).zfill(2)
145148
county = str(row["county_code"]).zfill(3)
@@ -164,12 +167,62 @@ def fetch_samples_for_parameter(parameter_code: str, bdate: date, edate: date, s
164167
return pd.concat(samples_frames, ignore_index=True)
165168

166169

167-
def load_parameters_csv(path: str = "ops/parameters.csv") -> List[str]:
168-
"""Load the `AQS_Parameter` column from the parameters CSV as strings.
169170

170-
Returns a list of parameter codes suitable for looping in the pipeline.
171+
172+
def fetch_all_monitors_for_oregon(bdate: date, edate: date) -> pd.DataFrame:
173+
"""Fetch all unique monitor metadata for Oregon (state 41) from 2005-2025.
174+
175+
Uses hardcoded parameter codes, fetches monitors for each,
176+
concatenates results, and deduplicates by site to ensure one entry per monitor
177+
(~200 total unique sites), regardless of parameter coverage.
171178
"""
172-
df = pd.read_csv(path, dtype={"AQS_Parameter": str})
173-
if "AQS_Parameter" not in df.columns:
174-
raise KeyError("parameters.csv must contain an 'AQS_Parameter' column")
175-
return df["AQS_Parameter"].dropna().astype(str).tolist()
179+
# Check circuit breaker
180+
if _client.circuit_is_open():
181+
raise RuntimeError("AQS circuit is open; cannot fetch monitors")
182+
183+
# Hardcoded parameter codes
184+
parameter_codes = ["44201", "88101", "88502", "85103", "85110", "85128", "17141", "43817", "43804", "45201", "43509", "43503", "14115", "17242"]
185+
186+
print(f"📋 Processing {len(parameter_codes)} parameters for monitors...")
187+
188+
# Fetch monitors for each parameter concurrently (limit workers to avoid API overload)
189+
all_monitors: List[pd.DataFrame] = []
190+
191+
def fetch_for_param(code: str) -> pd.DataFrame:
192+
print(f" 📡 Fetching monitors for parameter {code}...")
193+
monitors = fetch_monitors([code], bdate, edate, "41") # Oregon FIPS
194+
if not monitors.empty:
195+
print(f" ✅ Found {len(monitors)} monitors for {code}")
196+
else:
197+
print(f" ⚠️ No monitors found for {code}")
198+
return monitors
199+
200+
with ThreadPoolExecutor(max_workers=4) as executor: # Limit to 4 concurrent requests
201+
futures = [executor.submit(fetch_for_param, code) for code in parameter_codes]
202+
for future in futures:
203+
df = future.result()
204+
if not df.empty:
205+
all_monitors.append(df)
206+
207+
# Concatenate and deduplicate by site (one entry per monitor)
208+
if not all_monitors:
209+
print("❌ No monitors found for any parameter")
210+
return pd.DataFrame()
211+
212+
print("🔄 Concatenating and deduplicating monitor data...")
213+
combined = pd.concat(all_monitors, ignore_index=True)
214+
original_count = len(combined)
215+
216+
# Create site_code: state_code + county_code + site_number
217+
combined["site_code"] = (
218+
combined["state_code"].astype(str).str.zfill(2) +
219+
combined["county_code"].astype(str).str.zfill(3) +
220+
combined["site_number"].astype(str).str.zfill(4)
221+
)
222+
223+
# Deduplicate by site + parameter (state_code, county_code, site_number, parameter_code)
224+
combined = combined.drop_duplicates(subset=["state_code", "county_code", "site_number", "parameter_code"])
225+
deduped_count = len(combined)
226+
227+
print(f"✅ Deduplicated: {original_count} raw entries → {deduped_count} unique monitor-parameter combinations")
228+
return combined

0 commit comments

Comments
 (0)