Skip to content

Commit caf289f

Browse files
authored
Merge pull request #17 from OR-Dept-Environmental-Quality/feature/ryan-add-envista-staging
Feature/ryan-add-envista-staging
2 parents 84d7788 + 4c12742 commit caf289f

3 files changed

Lines changed: 31 additions & 16 deletions

File tree

pipelines/envista/run_env_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from envista.extractors.measurements import get_envista_hourly, get_envista_daily
3131

3232

33-
ENV_TEST_MODE = config.ENV_TEST_MODE
33+
ENV_TEST_MODE = str(config.ENV_TEST_MODE).lower() in ("1", "true", "yes")
3434
ENV_MONITOR_DIR = config.RAW_ENV_MONITORS
3535
ENV_SAMPLE_DIR = config.RAW_ENV_SAMPLE
3636
ENV_DAILY_DIR = config.RAW_ENV_DAILY

pipelines/envista/run_env_transform.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def run():
3131
raw_monitors_dir = config.RAW_ENV_MONITORS
3232
raw_daily_dir = config.RAW_ENV_DAILY
3333
trans_daily_dir = config.TRANS_DAILY
34+
trans_aqi_dir = config.TRANS_AQI
3435

3536
if not raw_monitors_dir.exists():
3637
print(f"Raw monitors directory not found: {raw_monitors_dir}")
@@ -64,11 +65,10 @@ def run():
6465
print(f"No data for year {year_str}, skipping")
6566
continue
6667

67-
# Write to transform layer
68-
output_path = trans_daily_dir / f"envista_daily_{year_str}.csv"
69-
write_csv(transform_daily_df, output_path)
70-
71-
print(f"Wrote {len(transform_daily_df)} AQI records to {output_path}")
68+
# Write to AQI transform layer
69+
aqi_output_path = trans_aqi_dir / f"envista_aqi_{year_str}.csv"
70+
write_csv(transform_daily_df, aqi_output_path)
71+
print(f"Wrote {len(transform_daily_df)} AQI records to {aqi_output_path}")
7272

7373
years_processed += 1
7474
total_records += len(transform_daily_df)

src/stage/consolidate_fct_criteria_daily.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,35 @@ def consolidate_criteria_daily_for_year(year: str, transform_dir: Path) -> pd.Da
3636
Returns:
3737
Consolidated DataFrame with criteria daily fact data
3838
"""
39-
# Read the transformed data for this year
40-
input_file = transform_dir / f"aqi_aqs_daily_{year}.csv"
41-
if not input_file.exists():
42-
print(f"⚠️ No AQI daily file found for year {year}: {input_file}")
39+
# Find all transformed data files matching the pattern for this year
40+
pattern = f"*aqi*{year}.csv"
41+
matching_files = list(transform_dir.glob(pattern))
42+
43+
if not matching_files:
44+
print(f"⚠️ No AQI daily files found for year {year} matching pattern: {pattern}")
4345
return pd.DataFrame()
4446

45-
try:
46-
df = pd.read_csv(input_file)
47-
except Exception as e:
48-
print(f"❌ Error reading {input_file}: {e}")
47+
print(f" 📂 Found {len(matching_files)} file(s) matching pattern for {year}")
48+
49+
dfs = []
50+
for input_file in matching_files:
51+
try:
52+
file_df = pd.read_csv(input_file)
53+
print(f" ✓ Read {len(file_df)} records from {input_file.name}")
54+
dfs.append(file_df)
55+
except Exception as e:
56+
print(f"❌ Error reading {input_file}: {e}")
57+
continue
58+
59+
if not dfs:
60+
print(f"⚠️ No files were successfully read for year {year}")
4961
return pd.DataFrame()
50-
62+
63+
# Combine all dataframes
64+
df = pd.concat(dfs, ignore_index=True)
65+
5166
if df.empty:
52-
print(f"⚠️ Empty AQI daily file for year {year}")
67+
print(f"⚠️ All combined files are empty for year {year}")
5368
return pd.DataFrame()
5469

5570
# Define the required columns for fct_criteria_daily fact table

0 commit comments

Comments
 (0)