Skip to content

Commit 6818cba

Browse files
PatrickTCorbettpre-commit-ci[bot]github-actions[bot]zsussweinnatemcintosh
authored
add temporary script to replace utils/Rt_review_exclusions.R (#377)
* add temporary script to replace utils/Rt_review_exclusions.R * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update utils/exclusions_no_m365.R Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update utils/exclusions_no_m365.R Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update utils/exclusions_no_m365.R Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update utils/exclusions_no_m365.R Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Drop extraneous comma * add uv-managed python version * make logic simpler * forgot news * remove old script --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Zachary Susswein <utb2@cdc.gov> Co-authored-by: Nate McIntosh <NMcIntosh@cdc.gov>
1 parent 77cb0ca commit 6818cba

File tree

2 files changed

+322
-0
lines changed

2 files changed

+322
-0
lines changed

NEWS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# CFAEpiNow2Pipeline v0.2.0
22

33
## Features
4+
* Add a utility script for reading, preparing, and uploading Rt review decisions
45
* Adjust run trigger for time change
56
* Move first run earlier in the day
67
* Removing duplicative r-cmd-check.yaml file (already checking in Dockerfile)

utils/exclusions_file_parsing.py

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
"""
2+
This script processes an exclusions review Excel file and extracts point and state
3+
exclusions based on specified criteria. The extracted data is then uploaded to Azure
4+
Blob Storage in CSV format.
5+
"""
6+
7+
# /// script
8+
# requires-python = ">=3.14"
9+
# dependencies = [
10+
# "azure-identity",
11+
# "azure-storage-blob",
12+
# "fastexcel",
13+
# "polars",
14+
# ]
15+
# ///
16+
import io
17+
from datetime import date
18+
from pathlib import Path
19+
20+
import polars as pl
21+
from azure.identity import DefaultAzureCredential
22+
from azure.storage.blob import BlobServiceClient, ContainerClient
23+
24+
# The spreadsheet has some header rows that are nice for humans but not for parsing
25+
SKIP_ROWS = 3
26+
27+
# These are the expected columns in the sheets
28+
COLUMN_NAMES = [
29+
"state",
30+
"dates_affected",
31+
"observed_volume",
32+
"expected_volume",
33+
"initial_thoughts",
34+
"state_abb",
35+
"review_1_decision",
36+
"reviewer_2_decision",
37+
"final_decision",
38+
"drop_dates",
39+
"additional_reasoning",
40+
]
41+
42+
43+
# Mapping of sheet names to pathogens
44+
SHEETS_TO_PATHOGENS = {
45+
"Rt_Review_COVID": "covid",
46+
"Rt_Review_Influenza": "influenza",
47+
"Rt_Review_RSV": "rsv",
48+
}
49+
50+
# combined output schema and columns
51+
COMBINED_SCHEMA = {
52+
"report_date": pl.Date,
53+
"state": pl.String,
54+
"state_abb": pl.String,
55+
"pathogen": pl.String,
56+
"review_1_decision": pl.String,
57+
"reviewer_2_decision": pl.String,
58+
"final_decision": pl.String,
59+
"reference_date": pl.Date,
60+
"geo_value": pl.String,
61+
}
62+
COMBINED_COLUMNS = list(COMBINED_SCHEMA.keys())
63+
64+
POINT_EXCLUSIONS_SCHEMA = {
65+
"reference_date": pl.Date,
66+
"report_date": pl.Date,
67+
"state": pl.String,
68+
"disease": pl.String,
69+
}
70+
71+
STATE_EXCLUSIONS_SCHEMA = {
72+
"state_abb": pl.String,
73+
"pathogen": pl.String,
74+
"type": pl.String,
75+
}
76+
77+
78+
def empty_final_frame() -> pl.DataFrame:
79+
return pl.DataFrame(
80+
{
81+
col: pl.Series(name=col, dtype=dtype, values=[])
82+
for col, dtype in COMBINED_SCHEMA.items()
83+
}
84+
)
85+
86+
87+
def prep_single_sheet(
88+
sheet_name: str,
89+
sheet_df: pl.DataFrame,
90+
pathogen: str,
91+
report_date: date,
92+
) -> pl.DataFrame:
93+
# If the sheet is empty or has no data rows, return an empty frame with the correct
94+
# schema
95+
if sheet_df.height <= SKIP_ROWS:
96+
return empty_final_frame()
97+
98+
# Trim off unnecessary header rows
99+
trimmed = sheet_df.slice(offset=SKIP_ROWS)
100+
base_columns = trimmed.columns[: len(COLUMN_NAMES)]
101+
102+
# Check we have the expected number of columns
103+
if len(base_columns) < len(COLUMN_NAMES):
104+
msg = f"Sheet {sheet_name} is missing expected columns"
105+
raise ValueError(msg)
106+
107+
rename_map = dict(zip(base_columns, COLUMN_NAMES))
108+
109+
cleaned = (
110+
trimmed.select(base_columns)
111+
.rename(rename_map)
112+
# Explode the drop_dates column into multiple rows, split by "|"
113+
.with_columns(pl.col("drop_dates").cast(pl.String, strict=False).str.split("|"))
114+
.explode("drop_dates")
115+
# Clean up drop_dates values: first strip whitespace
116+
.with_columns(drop_dates=pl.col("drop_dates").str.strip_chars())
117+
# Convert empty drop_dates to nulls
118+
.with_columns(
119+
drop_dates=pl.when(
120+
pl.col("drop_dates").is_not_null() & (pl.col("drop_dates") == "")
121+
)
122+
.then(pl.lit(None))
123+
.otherwise(pl.col("drop_dates"))
124+
)
125+
# Filter out any rows with null state abbreviations
126+
.filter(pl.col("state").is_not_null())
127+
# Add in report_date and pathogen columns
128+
.with_columns(
129+
report_date=pl.lit(report_date),
130+
pathogen=pl.lit(pathogen),
131+
)
132+
.with_columns(
133+
# Parse drop_dates into reference_date
134+
reference_date=pl.col("drop_dates").str.strptime(
135+
pl.Date, format="%Y%m%d", strict=False
136+
),
137+
# Rename state_abb to geo_value
138+
geo_value=pl.col("state_abb"),
139+
# Standardize pathogen names
140+
pathogen=pl.when(pl.col("pathogen").eq("covid"))
141+
.then(pl.lit("COVID-19"))
142+
.when(
143+
pl.col("pathogen").eq("influenza"),
144+
)
145+
.then(pl.lit("Influenza"))
146+
.when(pl.col("pathogen").eq("rsv"))
147+
.then(pl.lit("RSV"))
148+
.otherwise(pl.col("pathogen")),
149+
)
150+
# Select and order the final columns
151+
.select(COMBINED_COLUMNS)
152+
)
153+
154+
return cleaned
155+
156+
157+
def read_review_excel_sheet(file_path: Path, report_date: date) -> pl.DataFrame:
158+
# Read all sheets at once
159+
all_sheets: dict[str, pl.DataFrame] = pl.read_excel(
160+
file_path,
161+
sheet_name=list(SHEETS_TO_PATHOGENS.keys()),
162+
has_header=False,
163+
)
164+
165+
frames = []
166+
for sheet_name, pathogen in SHEETS_TO_PATHOGENS.items():
167+
sheet_df = all_sheets.get(sheet_name)
168+
169+
if sheet_df is None:
170+
# Means the sheet is missing
171+
print(f"Warning: Sheet {sheet_name} not found in {file_path}")
172+
continue
173+
prepared = prep_single_sheet(
174+
sheet_name=sheet_name,
175+
sheet_df=sheet_df,
176+
pathogen=pathogen,
177+
report_date=report_date,
178+
)
179+
180+
if prepared.height == 0:
181+
# Means the sheet was there, but had no data
182+
print(f"Info: Sheet {sheet_name} has no data after processing")
183+
continue
184+
185+
frames.append(prepared)
186+
187+
if len(frames) == 0:
188+
return empty_final_frame()
189+
190+
return pl.concat(frames, how="vertical")
191+
192+
193+
def main(file_path: Path, report_date: date, overwrite_blobs: bool):
194+
# === Read and process the exclusions Excel file ===================================
195+
combined_df = read_review_excel_sheet(file_path=file_path, report_date=report_date)
196+
197+
if combined_df.height == 0:
198+
print(
199+
f"No data found in the exclusions file {file_path} after processing. Exiting."
200+
)
201+
return
202+
203+
# === Create the point exclusions DataFrame ========================================
204+
# Get just the point exclusion rows: the ones with "Drop Point(s)" in final_decision
205+
point_exclusions_df = (
206+
combined_df.filter(
207+
pl.col("final_decision").str.contains("Drop Point", literal=True)
208+
)
209+
# Get into the desired schema
210+
.select(
211+
pl.col("reference_date"),
212+
pl.col("report_date"),
213+
pl.col("state_abb").alias("state"),
214+
pl.col("pathogen").alias("disease"),
215+
)
216+
# Double check the schema
217+
.cast(POINT_EXCLUSIONS_SCHEMA) # type: ignore
218+
# Sort nicely
219+
.sort(by=["report_date", "state", "disease", "reference_date"])
220+
)
221+
print("Point Exclusions DataFrame:")
222+
print(point_exclusions_df)
223+
224+
# === Create the state exclusions DataFrame ========================================
225+
# Get just the state exclusion rows: the ones with "Exclude State" in final_decision
226+
state_exclusion_df = (
227+
combined_df.filter(
228+
pl.col("final_decision").str.contains("Exclude State", literal=True)
229+
)
230+
# Create the "type" column based on "final_decision"
231+
.with_columns(
232+
pl.when(pl.col("final_decision").eq("Exclude State (Data)"))
233+
.then(pl.lit("Data"))
234+
.when(pl.col("final_decision").eq("Exclude State (Model)"))
235+
.then(pl.lit("Model"))
236+
.alias("type")
237+
)
238+
# Get into the desired schema
239+
.select(pl.col("state_abb"), pl.col("pathogen"), pl.col("type"))
240+
# Double check the schema
241+
.cast(STATE_EXCLUSIONS_SCHEMA) # type: ignore
242+
# Sort nicely
243+
.sort(by=["state_abb", "pathogen", "type"])
244+
)
245+
print("State Exclusions DataFrame:")
246+
print(state_exclusion_df)
247+
248+
# === Upload both to blob storage ==================================================
249+
# Create the blob storage client for the `nssp-etl` container
250+
ctr_client: ContainerClient = BlobServiceClient(
251+
account_url="https://cfaazurebatchprd.blob.core.windows.net/",
252+
credential=DefaultAzureCredential(),
253+
).get_container_client("nssp-etl")
254+
255+
# Upload the point exclusions CSV
256+
point_exclusion_buffer = io.BytesIO()
257+
point_exclusions_df.write_csv(point_exclusion_buffer)
258+
ctr_client.upload_blob(
259+
name=f"outliers-v2/{report_date.isoformat()}.csv",
260+
data=point_exclusion_buffer.getvalue(),
261+
overwrite=overwrite_blobs,
262+
)
263+
264+
# Upload the state exclusions CSV
265+
state_exclusion_buffer = io.BytesIO()
266+
state_exclusion_df.write_csv(state_exclusion_buffer)
267+
ctr_client.upload_blob(
268+
name=f"state_exclusions/{report_date.isoformat()}_state_exclusions.csv",
269+
data=state_exclusion_buffer.getvalue(),
270+
overwrite=overwrite_blobs,
271+
)
272+
273+
274+
if __name__ == "__main__":
275+
from argparse import ArgumentParser
276+
from datetime import date
277+
278+
parser = ArgumentParser(description="Parse the outliers/exclusions exclusions file")
279+
parser.add_argument(
280+
"-d",
281+
"--date",
282+
type=str,
283+
default=date.today().strftime("%Y-%m-%d"),
284+
help=(
285+
"Date for which to parse the exclusions file (format: YYYY-MM-DD). "
286+
"Default is today's date."
287+
),
288+
)
289+
290+
parser.add_argument(
291+
"-f",
292+
"--file",
293+
type=str,
294+
help=(
295+
"Path to the exclusions Excel file. If none supplied,"
296+
" attempts to use the date to build `~/Downloads/Rt_Review_<date>.xlsx`"
297+
),
298+
default="",
299+
)
300+
301+
parser.add_argument(
302+
"--overwrite-blobs",
303+
action="store_true",
304+
help="Whether to overwrite existing blobs in storage (default: False)",
305+
)
306+
307+
args = parser.parse_args()
308+
309+
this_date = date.fromisoformat(args.date)
310+
if args.file:
311+
file_path = Path(args.file)
312+
else:
313+
file_path = (
314+
Path.home() / "Downloads" / f"Rt_Review_{this_date.strftime('%Y%m%d')}.xlsx"
315+
)
316+
317+
assert file_path.is_file(), f"Exclusions file not found: {file_path}"
318+
319+
main(
320+
file_path=file_path, report_date=this_date, overwrite_blobs=args.overwrite_blobs
321+
)

0 commit comments

Comments
 (0)