Skip to content

Commit 5013217

Browse files
committed
Implements municipio_id normalization in R scripts and adds coverage audit. Enhances data integrity by ensuring consistent formatting of municipio_id values across datasets and introduces an automated audit process for municipal forecast coverage.
1 parent 38a599d commit 5013217

File tree

4 files changed

+205
-20
lines changed

4 files changed

+205
-20
lines changed

docs/pending_municipal_id_fix.md

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,50 @@
11
# Municipal Forecast ID Normalization (Paused Task)
22

3-
_Last updated: 2025-11-19_
3+
_Last updated: 2025-11-20_
44

55
## Status
6-
- Nationwide municipal forecast run produced 65,737 rows for `2025-11-19`, but `municipio_id` values are stored without left padding and some contain stray whitespace/newlines.
7-
- Direct comparisons against `data/input/municipalities.csv.gz` therefore flag entire provinces (e.g., Barcelona, Badajoz, Burgos) as missing even though data exists.
6+
- `scripts/r/get_forecast_data_hybrid.R` pads and trims `municipio_id` values everywhere via `normalize_municipio_id()` (live since 2025-11-20).
7+
- `update_municipal_forecasts_only.sh` (job 27127, shards 1–5) re-ran with the fix at 11:32 CET; cumulative file now holds 664,489 rows with correctly padded IDs.
8+
- Coverage audit installed: shard 1 now runs `python3 scripts/python/audit_municipal_forecast_coverage.py` after each array completion. The audit exits non-zero if any non-excluded IDs are missing.
9+
- Latest audit (2025-11-20): 8,129 reference municipios, 8,037 collected; shortfall limited to the known excluded sets below.
810

9-
## Outstanding Work
10-
1. Patch the municipal forecast collector(s) so `municipio_id` values are `str_trim`\+`str_pad(width = 5, pad = "0")` prior to persistence.
11-
2. Regenerate today’s municipal forecasts after deploying the fix to validate that all ~8k municipalities collect successfully.
12-
3. Re-run the audit script to confirm the differential drops to the expected handful of communal territories (53xxx codes, North African islets, etc.).
11+
## Expected Gaps (excluded from coverage metrics)
1312

14-
## Notes
15-
- Example bad value: `municipio_id = "8001"` (should be `08001`).
16-
- CSV file to reprocess: `data/output/daily_municipal_forecast.csv.gz` (plain-text CSV despite `.gz` extension).
17-
- Reference mapping: `data/input/municipalities.csv.gz`.
18-
- Prior Python helper lives in shell history: `python - <<'PY' ...` extracting outstanding IDs.
13+
### New municipios without AEMET forecasts (monitor if they appear)
14+
- `11903` — San Martín del Tesorillo
15+
- `14901` — Fuente Carreteros
16+
- `14902` — La Guijarrosa
17+
- `18077` — Fornes
18+
- `21902` — La Zarza-Perrunal
19+
- `41904` — El Palmar de Troya
20+
21+
### Communal / parzonería / ledanía territories
22+
- `53000``53083`
23+
- `54001``54005`
24+
25+
The audit script ignores the IDs above for coverage calculations but will print a warning if any of them begin to appear in the AEMET output so we can revisit downstream handling.
26+
27+
## Coverage Snapshot — 2025-11-20
28+
- Reference municipalities: 8,129
29+
- Output municipalities (after de-duplication): 8,037
30+
- Ignored IDs: 92
31+
- Unexpected extras: none
1932

20-
Resume from **Step 1** once Barcelona pipeline issues are resolved.
33+
Full ignored list:
34+
35+
```
36+
11903, 14901, 14902, 18077, 21902, 41904, 53000, 53001, 53002, 53003, 53004,
37+
53005, 53006, 53007, 53008, 53009, 53010, 53011, 53012, 53013, 53014, 53015,
38+
53016, 53017, 53018, 53019, 53020, 53021, 53022, 53023, 53024, 53025, 53026,
39+
53027, 53028, 53029, 53031, 53032, 53033, 53034, 53035, 53036, 53037, 53038,
40+
53039, 53040, 53041, 53042, 53043, 53044, 53045, 53046, 53047, 53048, 53049,
41+
53050, 53051, 53052, 53053, 53054, 53055, 53056, 53057, 53058, 53059, 53060,
42+
53061, 53062, 53063, 53064, 53065, 53066, 53067, 53068, 53069, 53070, 53071,
43+
53072, 53073, 53074, 53075, 53076, 53077, 53078, 53080, 53081, 53083, 54001,
44+
54002, 54003, 54004, 54005
45+
```
46+
47+
## Notes
48+
- Forecast cumulative file is plain CSV at `data/output/daily_municipal_forecast.csv.gz` despite the `.gz` suffix.
49+
- Municipal coverage audit runs automatically for SLURM array task 1; rerun manually with `python3 scripts/python/audit_municipal_forecast_coverage.py` if needed.
50+
- Manual rewrite script (2025-11-20) remains in shell history should another one-off normalization ever be required.
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#!/usr/bin/env python3
2+
"""Audit municipal forecast coverage against the reference list."""
3+
4+
from __future__ import annotations
5+
6+
import csv
7+
import gzip
8+
import sys
9+
from pathlib import Path
10+
11+
PROJECT_ROOT = Path(__file__).resolve().parents[2]
12+
REF_PATH = PROJECT_ROOT / "data/input/municipalities.csv.gz"
13+
FORECAST_PATH = PROJECT_ROOT / "data/output/daily_municipal_forecast.csv.gz"
14+
15+
NEW_MUNICIPIOS = {
16+
"11903", # San Martín del Tesorillo
17+
"14901", # Fuente Carreteros
18+
"14902", # La Guijarrosa
19+
"18077", # Fornes
20+
"21902", # La Zarza-Perrunal
21+
"41904", # El Palmar de Troya
22+
}
23+
24+
COMMUNAL_CODES = {
25+
"53000", "53001", "53002", "53003", "53004", "53005", "53006", "53007",
26+
"53008", "53009", "53010", "53011", "53012", "53013", "53014", "53015",
27+
"53016", "53017", "53018", "53019", "53020", "53021", "53022", "53023",
28+
"53024", "53025", "53026", "53027", "53028", "53029", "53031", "53032",
29+
"53033", "53034", "53035", "53036", "53037", "53038", "53039", "53040",
30+
"53041", "53042", "53043", "53044", "53045", "53046", "53047", "53048",
31+
"53049", "53050", "53051", "53052", "53053", "53054", "53055", "53056",
32+
"53057", "53058", "53059", "53060", "53061", "53062", "53063", "53064",
33+
"53065", "53066", "53067", "53068", "53069", "53070", "53071", "53072",
34+
"53073", "53074", "53075", "53076", "53077", "53078", "53080", "53081",
35+
"53083", "54001", "54002", "54003", "54004", "54005",
36+
}
37+
38+
EXPECTED_ABSENT = NEW_MUNICIPIOS | COMMUNAL_CODES
39+
40+
41+
def load_reference_ids(path: Path) -> set[str]:
42+
if not path.exists():
43+
print(f"ERROR: reference file not found: {path}", file=sys.stderr)
44+
sys.exit(2)
45+
with gzip.open(path, "rt", encoding="utf-8") as handle:
46+
reader = csv.DictReader(handle)
47+
if reader.fieldnames is None or "CUMUN" not in reader.fieldnames:
48+
print("ERROR: reference file missing CUMUN header", file=sys.stderr)
49+
sys.exit(2)
50+
ids = set()
51+
for row in reader:
52+
raw = row.get("CUMUN")
53+
if raw is None:
54+
continue
55+
stripped = raw.strip()
56+
if not stripped:
57+
continue
58+
ids.add(stripped.zfill(5))
59+
return ids
60+
61+
62+
def load_forecast_ids(path: Path) -> set[str]:
63+
if not path.exists():
64+
print(f"ERROR: forecast file not found: {path}", file=sys.stderr)
65+
sys.exit(2)
66+
with open(path, newline="", encoding="utf-8") as handle:
67+
reader = csv.DictReader(handle)
68+
if reader.fieldnames is None or "municipio_id" not in reader.fieldnames:
69+
print("ERROR: forecast file missing municipio_id header", file=sys.stderr)
70+
sys.exit(2)
71+
ids = set()
72+
for row in reader:
73+
mid = row.get("municipio_id")
74+
if mid:
75+
ids.add(mid)
76+
return ids
77+
78+
79+
def main() -> int:
80+
ref_ids = load_reference_ids(REF_PATH)
81+
forecast_ids = load_forecast_ids(FORECAST_PATH)
82+
83+
missing = sorted(ref_ids - forecast_ids - EXPECTED_ABSENT)
84+
unexpected_present = sorted(forecast_ids & EXPECTED_ABSENT)
85+
86+
print(f"Reference municipalities: {len(ref_ids)}")
87+
print(f"Forecast municipalities: {len(forecast_ids)}")
88+
print(f"Ignored IDs (expected absent): {len(EXPECTED_ABSENT)}")
89+
90+
if unexpected_present:
91+
print(
92+
"WARNING: expected-absent IDs present in forecast data: "
93+
+ ", ".join(unexpected_present)
94+
)
95+
96+
if missing:
97+
print(f"ERROR: {len(missing)} reference municipios missing from forecasts.")
98+
print(
99+
"Sample missing IDs: " + ", ".join(missing[:20])
100+
+ ("..." if len(missing) > 20 else "")
101+
)
102+
return 1
103+
104+
print("Municipal forecast coverage OK (excluding expected gaps).")
105+
return 0
106+
107+
108+
if __name__ == "__main__":
109+
sys.exit(main())

scripts/r/get_forecast_data_hybrid.R

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,20 @@ library(stringr)
1818
x
1919
}
2020

21+
normalize_municipio_id <- function(x) {
22+
if (is.null(x)) return(character())
23+
if (!length(x)) return(character())
24+
na_mask <- is.na(x)
25+
x_chr <- as.character(x)
26+
x_chr[na_mask] <- NA_character_
27+
trimmed <- str_trim(x_chr)
28+
if (!length(trimmed)) return(trimmed)
29+
trimmed[!is.na(trimmed) & trimmed == ""] <- NA_character_
30+
padded <- str_pad(trimmed, width = 5, pad = "0")
31+
padded[is.na(trimmed)] <- NA_character_
32+
padded
33+
}
34+
2135
parse_cli_args <- function(args) {
2236
if (!length(args)) return(list())
2337
parsed <- list()
@@ -98,6 +112,7 @@ load_cumulative_data <- function(path) {
98112
if (!"municipio_id" %in% names(dt)) {
99113
dt[, municipio_id := NA_character_]
100114
}
115+
dt[, municipio_id := normalize_municipio_id(municipio_id)]
101116
if (!inherits(dt$fecha, "Date")) {
102117
dt[, fecha := as.Date(fecha)]
103118
}
@@ -117,6 +132,7 @@ if (nrow(cumulative_data)) {
117132
!is.na(municipio_id) & as.Date(collected_at, tz = "UTC") == RUN_DATE,
118133
unique(municipio_id)
119134
]
135+
completed_today <- completed_today[!is.na(completed_today)]
120136
if (length(completed_today)) {
121137
cat("Already collected", length(completed_today), "municipalities for", RUN_DATE, "\n")
122138
}
@@ -152,19 +168,23 @@ release_file_lock <- function(path) {
152168
persist_batch <- function(batch_dt) {
153169
if (!nrow(batch_dt)) return()
154170
batch_dt[, collected_at := as.POSIXct(collected_at, tz = "UTC")]
171+
batch_dt[, municipio_id := normalize_municipio_id(municipio_id)]
155172
acquire_file_lock(lock_path)
156173
on.exit(release_file_lock(lock_path), add = TRUE)
157174
latest_disk <- load_cumulative_data(cumulative_path)
158175
combined <- rbind(latest_disk, batch_dt, fill = TRUE)
176+
if (nrow(combined)) {
177+
combined[, municipio_id := normalize_municipio_id(municipio_id)]
178+
}
159179
setorderv(combined, c("municipio_id", "fecha", "elaborado", "collected_at"))
160180
combined <- unique(combined, by = c("municipio_id", "fecha", "elaborado"), fromLast = TRUE)
161181
cumulative_data <<- combined
162182
save_cumulative_data(cumulative_path, cumulative_data)
163183
}
164184

165-
# Load municipality data
185+
# Load municipality data
166186
cat("Loading municipality codes...\n")
167-
municipalities_data = fread(
187+
municipalities_data <- fread(
168188
"data/input/municipalities.csv.gz",
169189
colClasses = list(character = "CUMUN")
170190
)
@@ -173,7 +193,8 @@ if(!"CUMUN" %in% names(municipalities_data)){
173193
stop("CUMUN column not found in municipalities.csv.gz")
174194
}
175195

176-
all_municipios = str_pad(trimws(municipalities_data$CUMUN), width = 5, pad = "0")
196+
all_municipios = normalize_municipio_id(municipalities_data$CUMUN)
197+
all_municipios = all_municipios[!is.na(all_municipios)]
177198
cat("Loaded", length(all_municipios), "municipalities\n")
178199

179200
if(TESTING_MODE) {
@@ -240,7 +261,12 @@ while (length(remaining_municipios) > 0 && pass_number <= MAX_COLLECTION_PASSES)
240261

241262
# Function to attempt forecast collection with key rotation on failure
242263
collect_with_retry <- function(municipios, max_retries = MAX_BATCH_RETRIES) {
243-
municipios <- str_pad(trimws(municipios), width = 5, pad = "0")
264+
municipios <- normalize_municipio_id(municipios)
265+
municipios <- municipios[!is.na(municipios)]
266+
if (!length(municipios)) {
267+
cat("No valid municipality IDs remain after normalization; skipping batch.\n")
268+
return(data.frame())
269+
}
244270
for (attempt in seq_len(max_retries)) {
245271
result <- tryCatch({
246272
aemet_api_key(get_current_api_key(), install = TRUE, overwrite = TRUE)
@@ -340,7 +366,7 @@ while (length(remaining_municipios) > 0 && pass_number <= MAX_COLLECTION_PASSES)
340366
temp_min = temperatura_minima
341367
) %>%
342368
mutate(
343-
municipio_id = str_pad(as.character(municipio_id), width = 5, pad = "0")
369+
municipio_id = normalize_municipio_id(municipio_id)
344370
) %>%
345371
mutate(
346372
temp_avg = rowMeans(cbind(temp_max, temp_min), na.rm = TRUE),
@@ -356,7 +382,7 @@ while (length(remaining_municipios) > 0 && pass_number <= MAX_COLLECTION_PASSES)
356382
humid_min = humedadRelativa_minima
357383
) %>%
358384
mutate(
359-
municipio = str_pad(as.character(municipio), width = 5, pad = "0")
385+
municipio = normalize_municipio_id(municipio)
360386
)
361387

362388
# Get wind data
@@ -367,7 +393,7 @@ while (length(remaining_municipios) > 0 && pass_number <= MAX_COLLECTION_PASSES)
367393
wind_speed = viento_velocidad
368394
) %>%
369395
mutate(
370-
municipio = str_pad(as.character(municipio), width = 5, pad = "0")
396+
municipio = normalize_municipio_id(municipio)
371397
)
372398

373399
# Combine all data
@@ -387,6 +413,9 @@ while (length(remaining_municipios) > 0 && pass_number <= MAX_COLLECTION_PASSES)
387413
})
388414

389415
batch_final_dt <- as.data.table(batch_final)
416+
if (nrow(batch_final_dt)) {
417+
batch_final_dt[, municipio_id := normalize_municipio_id(municipio_id)]
418+
}
390419
if (!nrow(batch_final_dt)) {
391420
cat("No records produced after processing batch", batch_idx, "- skipping persistence.\n\n")
392421
next
@@ -440,6 +469,7 @@ cat("=== FINAL PROCESSING ===\n")
440469
if(length(all_forecasts) > 0) {
441470
final_data <- rbindlist(all_forecasts, use.names = TRUE, fill = TRUE)
442471
if (nrow(final_data)) {
472+
final_data[, municipio_id := normalize_municipio_id(municipio_id)]
443473
final_data[, fecha := as.Date(fecha)]
444474
final_data[, collected_at := as.POSIXct(collected_at, tz = "UTC")]
445475
}
@@ -455,6 +485,8 @@ if(length(all_forecasts) > 0) {
455485
cat("No new forecast data collected in this run (municipalities may already be up to date or all API calls failed).\n")
456486
}
457487

488+
cumulative_data <- load_cumulative_data(cumulative_path)
489+
458490
if (nrow(cumulative_data)) {
459491
cat("\n=== SUMMARY STATISTICS ===\n")
460492
cat("Total municipalities in cumulative file:", length(unique(cumulative_data$municipio_id)), "\n")

update_municipal_forecasts_only.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ fi
3535
KEY_POOL=${KEY_POOLS[$((SHARD_INDEX-1))]}
3636

3737
# Dataset 4: Municipal forecasts
38+
EXIT_CODE=0
3839
echo "Dataset 4: Municipal forecasts shard ${SHARD_INDEX}/${SHARD_COUNT} using key pool '${KEY_POOL}'..."
3940
srun Rscript scripts/r/get_forecast_data_hybrid.R \
4041
--shard-index=${SHARD_INDEX} \
@@ -44,11 +45,24 @@ if [ $? -eq 0 ]; then
4445
echo "✅ Forecast collection completed"
4546
else
4647
echo "❌ Forecast collection failed"
48+
EXIT_CODE=1
4749
fi
4850

4951
echo "=== Collection Summary ==="
5052
echo "Completed: $(date)"
5153
ls -la data/output/*.csv.gz
5254

55+
if [ "${SLURM_ARRAY_TASK_ID:-1}" -eq 1 ]; then
56+
echo "Running municipal forecast coverage audit..."
57+
if python3 scripts/python/audit_municipal_forecast_coverage.py; then
58+
echo "✅ Municipal forecast coverage audit passed"
59+
else
60+
echo "❌ Municipal forecast coverage audit failed"
61+
EXIT_CODE=1
62+
fi
63+
fi
64+
65+
exit ${EXIT_CODE}
66+
5367
# Run with
5468
# sbatch ~/research/weather-data-collector-spain/update_municipal_forecasts_only.sh

0 commit comments

Comments
 (0)