Skip to content

Commit 1ebc6cb

Browse files
committed
allow option for all counties, save data files with filter suffix
1 parent 44fa9ed commit 1ebc6cb

File tree

6 files changed

+63
-50
lines changed

6 files changed

+63
-50
lines changed

data-processing/Snakefile

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ conda: "requirements.yaml" # Path: envs/conda.yaml
1010

1111

1212
envvars:
13-
"CENSUS_API_KEY"
13+
"CENSUS_API_KEY",
1414

1515

1616
# read hydra config from initialize API
@@ -21,75 +21,78 @@ with hydra.initialize(config_path="conf", version_base=None):
2121

2222
# make sure data locations exist / user can set them to
2323
# symlinks separately if preferred
24-
os.makedirs(f"{processing_cfg.data_dir}/raw", exist_ok=True)
25-
os.makedirs(f"{processing_cfg.data_dir}/processed", exist_ok=True)
24+
data_dir = processing_cfg.data_dir
25+
os.makedirs(f"{data_dir}/raw", exist_ok=True)
26+
os.makedirs(f"{data_dir}/processed", exist_ok=True)
27+
28+
filters = ["65000", "all"]
2629

2730

2831
rule all:
2932
input:
30-
processing_cfg.data_dir + "/processed/endogenous_states_actions.parquet",
31-
processing_cfg.data_dir + "/processed/exogenous_states.parquet",
32-
processing_cfg.data_dir + "/processed/bspline_basis.parquet",
33-
processing_cfg.data_dir + "/processed/confounders.parquet",
33+
expand(
34+
data_dir + "/processed/endogenous_states_actions_{filter}.parquet",
35+
filter=filters,
36+
),
37+
expand(
38+
data_dir + "/processed/exogenous_states_{filter}.parquet",
39+
filter=filters,
40+
),
41+
expand(
42+
data_dir + "/processed/bspline_basis_{filter}.parquet",
43+
filter=filters,
44+
),
45+
expand(
46+
data_dir + "/processed/confounders_{filter}.parquet",
47+
filter=filters,
48+
),
49+
3450

3551
rule merge_state_actions:
3652
input:
3753
expand(
38-
processing_cfg.data_dir + "/processed/alerts/{state}.parquet",
54+
data_dir + "/processed/alerts/{state}.parquet",
3955
state=config["states"],
4056
),
41-
processing_cfg.data_dir + "/processed/heatmetrics.parquet",
57+
data_dir + "/processed/heatmetrics_{filter}.parquet",
4258
output:
43-
processing_cfg.data_dir + "/processed/exogenous_states.parquet",
44-
processing_cfg.data_dir + "/processed/endogenous_states_actions.parquet",
45-
processing_cfg.data_dir + "/processed/budget.parquet",
59+
data_dir + "/processed/exogenous_states_{filter}.parquet",
60+
data_dir + "/processed/endogenous_states_actions_{filter}.parquet",
61+
data_dir + "/processed/budget_{filter}.parquet",
62+
data_dir + "/processed/bspline_basis_{filter}.parquet",
4663
log:
47-
"logs/merge_state_actions.log",
64+
"logs/merge_state_actions_{filter}.log",
4865
shell:
49-
"python merge_state_actions.py &> {log}"
50-
51-
52-
# rule merge_hospitalizations:
53-
# input:
54-
# processing_cfg.data_dir + "/processed/exogenous_states.parquet",
55-
# processing_cfg.data_dir + "/processed/endogenous_states_actions.parquet",
56-
# output:
57-
# processing_cfg.data_dir + "/processed/training_data.parquet",
58-
# log:
59-
# "logs/merge_hospitalizations.log",
60-
# shell:
61-
# f"""
62-
# python merge_hospitalizations.py \
63-
# hospitalizations.data_path={config['hosps_data_path']}
64-
# &> {{log}}
65-
# """
66+
"python merge_state_actions.py county_filter={wildcards.filter} &> {log}"
6667

6768

6869
rule confounders:
6970
output:
70-
processing_cfg.data_dir + "/processed/confounders.parquet",
71+
data_dir + "/processed/confounders_{filter}.parquet",
7172
log:
72-
"logs/confounders.log",
73+
"logs/confounders_{filter}.log",
7374
shell:
7475
f"""
7576
python confounders.py \
76-
census_api_key={os.environ['CENSUS_API_KEY']} \
77+
census_api_key={os.environ['CENSUS_API_KEY']} county_filter={{wildcards.filter}} \
7778
&> {{log}}
7879
"""
7980

8081

8182
rule heatmetrics:
83+
input:
84+
data_dir + "/processed/confounders_{filter}.parquet",
8285
output:
83-
processing_cfg.data_dir + "/processed/heatmetrics.parquet",
86+
data_dir + "/processed/heatmetrics_{filter}.parquet",
8487
log:
85-
"logs/heatmetrics.log",
88+
"logs/heatmetrics_{filter}.log",
8689
shell:
87-
"python heatmetrics.py &> {log}"
90+
"python heatmetrics.py county_filter={wildcards.filter} &> {log}"
8891

8992

9093
rule alerts:
9194
output:
92-
processing_cfg.data_dir + "/processed/alerts/{state}.parquet",
95+
data_dir + "/processed/alerts/{state}.parquet",
9396
log:
9497
"logs/alerts_{state}.log",
9598
shell:

data-processing/conf/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ census_api_key: ${oc.env:CENSUS_API_KEY}
44
# processed_dir: ../data/processed
55
data_dir: ../data
66

7+
county_filter: 65000 # choose 'all' for no filter
8+
79
heatmetrics:
810
min_month: 5
911
max_month: 9

data-processing/confounders.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,14 @@ def main(cfg):
208208

209209
# Keep only those places with population > 65000 and complete data cases
210210
print(merged_df.shape)
211-
merged_df = merged_df.loc[merged_df.total_pop > 65000]
211+
212+
if cfg.county_filter != "all":
213+
merged_df = merged_df.loc[merged_df.total_pop > float(cfg.county_filter)]
214+
212215
merged_df = merged_df.dropna()
213-
merged_df.to_parquet(f"{cfg.data_dir}/processed/confounders.parquet", index=False)
216+
merged_df.to_parquet(
217+
f"{cfg.data_dir}/processed/confounders_{cfg.county_filter}.parquet", index=False
218+
)
214219

215220

216221
if __name__ == "__main__":

data-processing/heatmetrics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def transform_rds_to_parquet(rds_path, parquet_path):
2424
@hydra.main(config_path="conf", config_name="config", version_base=None)
2525
def main(cfg):
2626
# Download data if not already present
27-
processed_path = f"{cfg.data_dir}/processed/heatmetrics.parquet"
2827
download_path = f"{cfg.data_dir}/raw/heatmetrics.rds" # data is in R's native format
2928
url = cfg.heatmetrics.url
3029

@@ -49,11 +48,12 @@ def main(cfg):
4948
df = df[["StCoFIPS", "Date"] + cfg.heatmetrics.cols]
5049
df.rename(columns={"StCoFIPS": "fips", "Date": "date"}, inplace=True)
5150

52-
# Keep only large fips
53-
confounders = pd.read_parquet(f"{cfg.data_dir}/processed/confounders.parquet")
51+
suffix = cfg.county_filter
52+
confounders = pd.read_parquet(f"{cfg.data_dir}/processed/confounders_{suffix}.parquet")
5453
df = df[df.fips.isin(confounders.fips)]
5554

5655
# Write to parquet
56+
processed_path = f"{cfg.data_dir}/processed/heatmetrics_{suffix}.parquet"
5757
df.to_parquet(processed_path)
5858
LOGGER.info(f"Data written to {processed_path} with head\n: {df.head()}")
5959

data-processing/merge_state_actions.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ def main(cfg):
1818
"""This script merges data from the heatmetrics and heatalerts and computes
1919
the state space variables. The merged data is saved as a parquet file."""
2020

21+
# county filter suffix
22+
suffix = cfg.county_filter
23+
2124
# Load heatmetrics
22-
hm = pd.read_parquet(f"{cfg.data_dir}/processed/heatmetrics.parquet")
25+
hm = pd.read_parquet(f"{cfg.data_dir}/processed/heatmetrics_{suffix}.parquet")
2326
hm = hm.sort_values(["fips", "date"])
2427

2528
# Load and post process heat alerts data
@@ -156,7 +159,7 @@ def main(cfg):
156159

157160
# compute the rolling of alerts in the the entire summer
158161
df["rolling_alerts"] = df.groupby(["fips", "year"])["alert"].transform("cumsum")
159-
df["remaiing_budget"] = df["budget"] - df["rolling_alerts"]
162+
df["remaining_budget"] = df["budget"] - df["rolling_alerts"]
160163

161164
# dos splines
162165
M = max(df.dos)
@@ -181,7 +184,7 @@ def main(cfg):
181184
# standardize and save
182185
bspline_basis = (bspline_basis - bspline_col_means) / bspline_col_stds
183186
bspline_basis.columns = [f"bspline_dos_{i}" for i in range(bspline_basis.shape[1])]
184-
bspline_basis.to_parquet(f"{cfg.data_dir}/processed/bspline_basis.parquet")
187+
bspline_basis.to_parquet(f"{cfg.data_dir}/processed/bspline_basis_{suffix}.parquet")
185188

186189
# -------------------
187190
# save exogenous states, endogenous states, actions
@@ -200,7 +203,7 @@ def main(cfg):
200203
*[f"bspline_dos_{i}" for i in range(bspline_dos.shape[1])],
201204
]
202205
exogenous_states = df[exogenous_state_vars + ["fips", "date"]]
203-
exogenous_states.to_parquet(f"{cfg.data_dir}/processed/exogenous_states.parquet")
206+
exogenous_states.to_parquet(f"{cfg.data_dir}/processed/exogenous_states_{suffix}.parquet")
204207

205208
# actions and endogenous states
206209
action_vars = [
@@ -212,11 +215,11 @@ def main(cfg):
212215
]
213216
action_states = df[action_vars + ["fips", "date"]]
214217
action_states.to_parquet(
215-
f"{cfg.data_dir}/processed/endogenous_states_actions.parquet"
218+
f"{cfg.data_dir}/processed/endogenous_states_actions_{suffix}.parquet"
216219
)
217220

218221
# save budget
219-
budget.to_parquet(f"{cfg.data_dir}/processed/budget.parquet")
222+
budget.to_parquet(f"{cfg.data_dir}/processed/budget_{suffix}.parquet")
220223

221224

222225
if __name__ == "__main__":

data/.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
hospitalizations/*
22
raw/nws-forecast-zones
33
raw/heatmetrics.parquet
4-
shapefile
5-
!processed
4+
raw/shapefile
5+
processed/

0 commit comments

Comments
 (0)