Skip to content

Commit 9aef0f2

Browse files
committed
Add parallel processing scripts for dengue forecasting and raking
- Implemented `04_rake_dengue_parallel.py` for parallel execution of dengue raking forecasts using Jobmon. - Implemented `05_as_dengue_shifts_parallel.py` for parallel execution of age-sex shifts in dengue forecasts. - Created `rake_dengue.py` script to handle the raking of dengue incidence predictions with detailed input/output path logic and regression calculations. - Configured Jobmon workflows with appropriate compute resources and task templates for efficient processing. - Added error handling and logging for workflow execution status.
1 parent d8dc9a7 commit 9aef0f2

22 files changed

+2649
-253
lines changed

src/idd_forecast_mbp/02_data_prep/05_malaria_modeling_dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@
254254
]
255255
# Apply log transformations
256256
for col in covariates_to_log_transform:
257-
as_md_df[f"log_{col}"] = np.log(as_md_df[col] + 1e-6)
257+
as_md_df[f"log_{col}"] = np.log(as_md_df[col])
258258

259259
as_md_modeling_df = as_md_df.merge(malaria_stage_3_df, on=["location_id", "year_id"], how="left")
260260
as_md_modeling_df = as_md_modeling_df[~as_md_modeling_df["A0_af"].isna()]

src/idd_forecast_mbp/02_data_prep/06_dengue_modeling_dataframe.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
dengue_df[f"logit_{col}"] = np.log(clipped_values / (1 - clipped_values))
173173

174174
aa_A0_dengue_df = dengue_df[(dengue_df["location_id"] == dengue_df["A0_location_id"]) & (dengue_df["year_id"] == 2022)].copy()
175-
aa_A0_dengue_df = aa_A0_dengue_df[aa_A0_dengue_df['aa_dengue_mort_count'] > 1].copy()
175+
aa_A0_dengue_df = aa_A0_dengue_df[aa_A0_dengue_df['aa_dengue_inc_count'] > 100].copy()
176176
A0_dengue_ids = aa_A0_dengue_df['A0_location_id'].unique()
177177

178178

@@ -189,7 +189,9 @@
189189

190190
write_parquet(dengue_df, aa_ge3_dengue_stage_1_modeling_df_path)
191191

192-
dengue_df = dengue_df[dengue_df['A0_location_id'].isin(A0_dengue_ids)].copy()
192+
193+
194+
193195

194196

195197

@@ -198,15 +200,16 @@
198200
### Prepares the final dataset for modeling by selecting relevant columns,
199201
### merging to the age-sex-location-year level, and saving the final dataset.
200202
###----------------------------------------------------------###
201-
dengue_stage_2_df = dengue_df.copy()
202-
# Drop any columns that have yn = 0
203-
dengue_stage_2_df = dengue_stage_2_df[dengue_stage_2_df["yn"] == 1].drop(columns=["yn"])
204-
# Subset down to level 5 locations
203+
dengue_stage_2_df = dengue_df[dengue_df['A0_location_id'].isin(A0_dengue_ids)].copy()
204+
#
205205
dengue_stage_2_df = dengue_stage_2_df[dengue_stage_2_df["level"] == 5].drop(columns=["level"])
206+
207+
208+
206209
# Create the A0_af factor variable
207210
dengue_stage_2_df["A0_location_id"] = dengue_stage_2_df["A0_location_id"].astype(int)
208211
dengue_stage_2_df['A0_af'] = 'A0_' + dengue_stage_2_df['A0_location_id'].astype(str)
209-
dengue_stage_2_df['A0_af'] = dengue_stage_2_df['A0_af'].astype('category')
212+
210213
# dengue_stage_2_df = dengue_stage_2_df.drop(columns=['aa_dengue_inc_count', 'aa_dengue_inc_rate', 'aa_dengue_mort_count', 'aa_dengue_mort_rate', 'aa_dengue_cfr'])
211214
# Get the as data
212215
md_location_ids = dengue_stage_2_df["location_id"].unique().tolist()
@@ -216,20 +219,21 @@
216219
columns=as_merge_variables + ["dengue_mort_rate","dengue_inc_rate","dengue_mort_count","dengue_inc_count","population","aa_population"],
217220
filters=[year_filter, md_location_filter, age_filter, sex_filter])
218221

222+
as_md_df = as_md_df[as_md_df['dengue_inc_rate'] > 0].copy()
219223

220224
as_md_df["dengue_cfr"] = as_md_df["dengue_mort_rate"] / as_md_df["dengue_inc_rate"]
221-
as_md_df.loc[as_md_df["dengue_inc_rate"] == 0, "dengue_cfr"] = 0
222225

223226
covariates_to_log_transform = [
224-
"dengue_mort_rate",
225227
"dengue_inc_rate"
226228
]
227229
for col in covariates_to_log_transform:
228-
as_md_df[f"log_{col}"] = np.log(as_md_df[col] + 1e-6)
230+
as_md_df[f"log_{col}"] = np.log(as_md_df[col])
231+
229232

230233
covariates_to_logit_transform = ['dengue_cfr']
231234
for col in covariates_to_logit_transform:
232-
clipped_values = as_md_df[col].clip(lower=0.001, upper=0.999)
235+
clipped_values = as_md_df[col].clip(upper=0.99)
236+
print(f"Range of {col}: {as_md_df[col].min()} to {as_md_df[col].max()}")
233237
as_md_df[f"logit_{col}"] = np.log(clipped_values / (1 - clipped_values))
234238

235239
dengue_stage_2_df = dengue_stage_2_df.drop(columns=["population"])
@@ -239,7 +243,14 @@
239243
as_md_modeling_df = as_md_modeling_df[~(as_md_modeling_df["age_group_id"] == 2)]
240244
as_md_modeling_df["as_id"] = "a" + as_md_modeling_df["age_group_id"].astype(str) + "_s" + as_md_modeling_df["sex_id"].astype(str)
241245

242-
write_parquet(as_md_modeling_df, as_md_dengue_modeling_df_path)
246+
columns_to_keep = as_merge_variables + ['A0_af', 'as_id', 'dengue_suitability', 'log_dengue_inc_rate', 'logit_urban_1km_threshold_300',
247+
'log_gdppc_mean', 'logit_dengue_cfr', 'dengue_mort_rate']
248+
249+
as_md_modeling_df = as_md_modeling_df[columns_to_keep]
250+
# Drop any columns that have yn = 0
251+
# dengue_stage_2_df = dengue_stage_2_df[dengue_stage_2_df["yn"] == 1].drop(columns=["yn"])
252+
253+
243254

244255
cause_columns = list([col for col in as_md_modeling_df.columns if cause in col and "suit" not in col])
245256
base_md_modeling_df = as_md_modeling_df[(as_md_modeling_df['age_group_id'] == reference_age_group_id) & (as_md_modeling_df['sex_id'] == reference_sex_id)].copy()
@@ -253,6 +264,8 @@
253264
on=aa_merge_variables,
254265
how='left')
255266

267+
as_md_modeling_df = as_md_modeling_df[as_md_modeling_df['dengue_mort_rate'] > 0]
256268

269+
write_parquet(as_md_modeling_df, as_md_dengue_modeling_df_path)
257270
write_parquet(base_md_modeling_df, base_md_modeling_df_path)
258271
write_parquet(rest_md_modeling_df, rest_md_modeling_df_path)

src/idd_forecast_mbp/02_data_prep/09_forecasted_dengue_dataframes_parallel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@
5555

5656
# Define the task template for processing each year batch
5757
task_template = tool.get_task_template(
58-
template_name="hierarchy_generation",
58+
template_name="as_dengue_forecasting_df_creation",
5959
default_cluster_name="slurm",
6060
default_compute_resources={
61-
"memory": "50G",
61+
"memory": "70G",
6262
"cores": 1,
6363
"runtime": "5m",
6464
"queue": "all.q",

src/idd_forecast_mbp/02_data_prep/forecasted_draw_specific_dengue_dataframes.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,33 @@
8383

8484
# Get the unique values of A0_location_id
8585
years = list(range(2022, 2023))
86-
year_filter = ('year_id', 'in', years)
86+
year_filter = ('year_id', '==', 2022)
8787

88+
dengue_df = read_parquet_with_integer_ids(aa_full_cause_df_path_template,
89+
filters=[year_filter, level_filter(hierarchy_df, start_level = 3)])
8890

89-
base_md_modeling_df = read_parquet_with_integer_ids(base_md_modeling_df_path,
90-
filters = [year_filter])
91+
dengue_df = dengue_df[dengue_df['dengue_inc_count'] > 100].copy()
92+
dengue_df = dengue_df.rename(columns={'location_id':'A0_location_id'})
93+
A0_location_ids = dengue_df['A0_location_id'].unique()
9194

92-
dengue_modeling_location_ids = base_md_modeling_df['location_id'].unique()
95+
age_filter = ('age_group_id', '==', reference_age_group_id)
96+
sex_filter = ('sex_id', '==', reference_sex_id)
97+
98+
md_dengue_df = read_parquet_with_integer_ids(as_full_cause_df_path_template,
99+
filters=[year_filter, age_filter, sex_filter, level_filter(hierarchy_df, start_level = 5)])
100+
101+
md_dengue_df = md_dengue_df.merge(hierarchy_df[['location_id', 'A0_location_id']],
102+
on='location_id', how='left')
103+
104+
md_dengue_df = md_dengue_df[md_dengue_df['A0_location_id'].isin(A0_location_ids)].copy()
105+
106+
md_dengue_df['base_log_dengue_inc_rate'] = np.log(md_dengue_df['dengue_inc_rate'])
107+
108+
109+
# base_md_modeling_df = read_parquet_with_integer_ids(base_md_modeling_df_path,
110+
# filters = [year_filter])
111+
112+
dengue_modeling_location_ids = md_dengue_df['location_id'].unique()
93113
dengue_modeling_location_filter = ('location_id', 'in', dengue_modeling_location_ids)
94114

95115

@@ -112,14 +132,14 @@
112132

113133

114134

115-
cause_columns = list([col for col in base_md_modeling_df.columns if cause in col and "suit" not in col])
116-
columns_to_keep = aa_merge_variables + ['base_log_dengue_inc_rate']
117-
base_md_modeling_df = base_md_modeling_df[columns_to_keep].copy()
118-
base_md_modeling_df = base_md_modeling_df.drop(columns=['year_id'])
135+
136+
137+
md_dengue_df = md_dengue_df[['location_id', 'base_log_dengue_inc_rate']].copy()
138+
119139

120140
# Merge in the dengue_stage_2_modeling_df
121141
forecast_by_draw_df = forecast_by_draw_df.merge(
122-
base_md_modeling_df,
142+
md_dengue_df,
123143
how="left",
124144
on=["location_id"]
125145
)
@@ -133,10 +153,26 @@
133153
# Create a new column with the log transformed value
134154
forecast_by_draw_df[f"log_{col}"] = np.log(forecast_by_draw_df[col] + 1e-6)
135155

136-
as_md_dengue_modeling_df = read_parquet_with_integer_ids(as_md_dengue_modeling_df_path,
137-
filters = [year_filter, dengue_modeling_location_filter])
156+
as_sex_filter = ('sex_id', '==', reference_sex_id)
157+
as_age_filter = ('age_group_id', '==', reference_age_group_id)
158+
159+
160+
as_md_dengue_modeling_df = read_parquet_with_integer_ids(as_full_cause_df_path_template,
161+
filters=[year_filter, dengue_modeling_location_filter, as_age_filter, as_sex_filter])
162+
163+
as_md_dengue_modeling_df["as_id"] = "a" + as_md_dengue_modeling_df["age_group_id"].astype(str) + "_s" + as_md_dengue_modeling_df["sex_id"].astype(str)
164+
165+
as_md_dengue_modeling_df["dengue_cfr"] = as_md_dengue_modeling_df["dengue_mort_rate"] / as_md_dengue_modeling_df["dengue_inc_rate"]
166+
167+
covariates_to_logit_transform = ['dengue_cfr']
168+
for col in covariates_to_logit_transform:
169+
clipped_values = as_md_dengue_modeling_df[col].clip(upper=0.99)
170+
print(f"Range of {col}: {as_md_dengue_modeling_df[col].min()} to {as_md_dengue_modeling_df[col].max()}")
171+
as_md_dengue_modeling_df[f"logit_{col}"] = np.log(clipped_values / (1 - clipped_values))
172+
173+
174+
138175

139-
cause_columns = list([col for col in as_md_dengue_modeling_df.columns if cause in col and "suit" not in col])
140176
columns_to_keep = as_merge_variables + ['logit_dengue_cfr', 'as_id']
141177
as_md_dengue_modeling_df = as_md_dengue_modeling_df[columns_to_keep].copy()
142178
as_md_dengue_modeling_df['year_to_rake'] = 2022
@@ -154,4 +190,9 @@
154190
draw=draw
155191
)
156192

193+
columns_to_keep = as_merge_variables + ['logit_dengue_cfr', 'log_gdppc_mean', 'base_log_dengue_inc_rate', 'dengue_suitability', 'logit_urban_1km_threshold_300', 'as_id', 'A0_af']
194+
forecast_by_draw_df = forecast_by_draw_df[columns_to_keep].copy()
195+
196+
197+
157198
write_parquet(forecast_by_draw_df, forecast_by_draw_df_path)

src/idd_forecast_mbp/02_data_prep/forecasted_draw_specific_malaria_dataframes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def generate_dah_scenarios(
252252
covariates_to_log_transform = [col for col in as_base_malaria_df.columns if 'rate' in col]
253253
for col in covariates_to_log_transform:
254254
# Create a new column with the log transformed value
255-
as_base_malaria_df[f"log_{col}"] = np.log(as_base_malaria_df[col] + 1e-6)
255+
as_base_malaria_df[f"log_{col}"] = np.log(as_base_malaria_df[col])
256256

257257
as_base_malaria_df[f"logit_malaria_pfpr"] = np.log(0.999 * as_base_malaria_df["malaria_pfpr"] / (1 - 0.999 * as_base_malaria_df["malaria_pfpr"]))
258258

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
2+
rm(list = ls())
3+
4+
require(glue)
5+
require(mgcv)
6+
require(scam)
7+
require(arrow)
8+
require(data.table)
9+
10+
11+
"%ni%" <- Negate("%in%")
12+
"%nlike%" <- Negate("%like%")
13+
14+
last_year <- 2022
15+
16+
MODELING_DATA_PATH <- "/mnt/team/idd/pub/forecast-mbp/03-modeling_data"
17+
18+
19+
20+
as_md_dengue_modeling_df_path = glue("{MODELING_DATA_PATH}/as_md_dengue_modeling_df.parquet")
21+
base_md_dengue_modeling_df_path = glue("{MODELING_DATA_PATH}/base_md_dengue_modeling_df.parquet")
22+
rest_md_dengue_modeling_df_path = glue("{MODELING_DATA_PATH}/rest_md_dengue_modeling_df.parquet")
23+
24+
# Read in a parquet file
25+
#
26+
27+
as_dengue_df = as.data.frame(arrow::read_parquet(as_md_dengue_modeling_df_path))
28+
as_dengue_df <- as_dengue_df[which(as_dengue_df$dengue_mort_rate > 0),]
29+
range(as_dengue_df$logit_dengue_cfr)
30+
as_dengue_df$A0_af <- as.factor(as_dengue_df$A0_af)
31+
as_dengue_df$as_id = as.factor(as_dengue_df$as_id)
32+
33+
base_dengue_df = as.data.frame(arrow::read_parquet(base_md_dengue_modeling_df_path))
34+
base_dengue_df$A0_af <- as.factor(base_dengue_df$A0_af)
35+
base_dengue_df$as_id = as.factor(base_dengue_df$as_id)
36+
37+
# load(glue("{MODELING_DATA_PATH}/2025_06_29_dengue_models.RData"))
38+
39+
40+
rest_dengue_df = as.data.frame(arrow::read_parquet(rest_md_dengue_modeling_df_path))
41+
rest_dengue_df$A0_af <- as.factor(rest_dengue_df$A0_af)
42+
rest_dengue_df$as_id = as.factor(rest_dengue_df$as_id)
43+
44+
base_dengue_df$dengue_suit_fraction <- base_dengue_df$dengue_suitability / 365
45+
base_dengue_df$dengue_suit_fraction <- pmin(pmax(base_dengue_df$dengue_suit_fraction, 0.001), 0.999)
46+
base_dengue_df$logit_dengue_suitability <- log(base_dengue_df$dengue_suit_fraction / (1 - base_dengue_df$dengue_suit_fraction))
47+
48+
49+
mod_inc_base <- scam(base_log_dengue_inc_rate ~ s(logit_dengue_suitability, k = 6, bs = 'mpi') +
50+
s(logit_urban_1km_threshold_300, k = 6, bs = 'mpi') + A0_af,
51+
data = base_dengue_df,
52+
optimizer = "efs",
53+
control = list(maxit = 300)) # Limit iterations
54+
55+
56+
57+
mod_cfr_all <- lm(logit_dengue_cfr ~ log_gdppc_mean + as_id + A0_af,
58+
data = as_dengue_df)
59+
60+
61+
write.csv(levels(as_dengue_df$as_id), file = glue("{MODELING_DATA_PATH}/as_id_levels.csv"), row.names = FALSE)
62+
write.csv(coef(mod_cfr_all), file = glue("{MODELING_DATA_PATH}/mod_cfr_all_coefficients.csv"), row.names = TRUE)
63+
64+
strip_model <- function(model) {
65+
model$model <- NULL
66+
model$fitted.values <- NULL
67+
model$residuals <- NULL
68+
model$effects <- NULL
69+
model$qr$qr <- NULL
70+
model$linear.predictors <- NULL
71+
model$weights <- NULL
72+
model$prior.weights <- NULL
73+
model$data <- NULL
74+
model$family <- NULL
75+
model$deviance <- NULL
76+
model$aic <- NULL
77+
model$null.deviance <- NULL
78+
model$iter <- NULL
79+
model$df.residual <- NULL
80+
model$df.null <- NULL
81+
model$y <- NULL
82+
model$converged <- NULL
83+
model$boundary <- NULL
84+
85+
# Keep essential components for prediction
86+
attr(model$terms, ".Environment") <- NULL
87+
88+
return(model)
89+
}
90+
91+
# Usage
92+
stripped_mod_cfr_all <- strip_model(mod_cfr_all)
93+
94+
model_names <- c("mod_inc_base", "mod_cfr_all")
95+
96+
save(list = model_names, file = glue("{MODELING_DATA_PATH}/2025_06_29_dengue_models.RData"))
97+
stripped_model_names <- c("mod_inc_base", "stripped_mod_cfr_all")
98+
save(list = model_names, file = glue("{MODELING_DATA_PATH}/2025_06_29_dengue_models_stripped.RData"))
99+
100+
101+
102+
103+
104+
105+

src/idd_forecast_mbp/03_modeling/new_final_malaria_models.r

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,32 +39,38 @@ malaria_pfpr_mod <- scam(logit_malaria_pfpr ~ logit_malaria_suitability +
3939
optimizer = "efs", # Faster optimizer
4040
control = list(maxit = 300)) # Limit iterations
4141

42+
mod_df <- past_data[which(past_data$aa_malaria_mort_rate > 0),]
4243
mortality_scam_mod <- scam(log_aa_malaria_mort_rate ~ s(logit_malaria_pfpr, k = 10, bs = "mpi") +
4344
log_gdppc_mean +
4445
A0_af,
45-
data = past_data,
46+
data = mod_df,
4647
optimizer = "efs", # Faster optimizer
4748
control = list(maxit = 300)) # Limit iterations
49+
50+
mod_df <- past_data[which(past_data$aa_malaria_inc_rate > 0),]
4851
incidence_scam_mod <- scam(log_aa_malaria_inc_rate ~ s(logit_malaria_pfpr, k = 10, bs = "mpi") +
4952
log_gdppc_mean + A0_af,
50-
data = past_data,
53+
data = mod_df,
5154
optimizer = "efs", # Faster optimizer
5255
control = list(maxit = 300)) # Limit iterations
5356

57+
mod_df <- past_data[which(past_data$base_malaria_mort_rate > 0),]
5458
mortality_base_scam_mod <- scam(log_base_malaria_mort_rate ~ s(logit_malaria_pfpr, k = 10, bs = "mpi") +
5559
log_gdppc_mean +
5660
A0_af,
57-
data = past_data,
61+
data = mod_df,
5862
optimizer = "efs", # Faster optimizer
5963
control = list(maxit = 300)) # Limit iterations
64+
65+
mod_df <- past_data[which(past_data$base_malaria_inc_rate > 0),]
6066
incidence_base_scam_mod <- scam(log_base_malaria_inc_rate ~ s(logit_malaria_pfpr, k = 10, bs = "mpi") +
6167
log_gdppc_mean + A0_af,
62-
data = past_data,
68+
data = mod_df,
6369
optimizer = "efs", # Faster optimizer
6470
control = list(maxit = 300)) # Limit iterations
6571

6672

6773
model_names <- c("malaria_pfpr_mod", "mortality_scam_mod", "incidence_scam_mod", "mortality_base_scam_mod",
6874
"incidence_base_scam_mod")
6975

70-
save(list = model_names, file = glue("{data_path}/2025_06_29_malaria_models.RData"))
76+
save(list = model_names, file = glue("{data_path}/2025_07_03_malaria_models.RData"))

src/idd_forecast_mbp/04_forecasting/02_forecast_dengue_admin_2s_launcher.r

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ write.csv(param_map, param_map_filepath, row.names = FALSE)
1818
## QSUB Command
1919
job_name <- glue("forecast_dengue") # name of the job
2020
thread_flag <- "-c 4"
21-
mem_flag <- "--mem=100G"
21+
mem_flag <- "--mem=75G"
2222
runtime_flag <- "-t 50"
2323
#jdrive_flag <- "-l archive" # archive nodes can access the J drive. They're a little harder to get though. If you need J drive access, uncomment this and add it to the qsub_command
2424
queue_flag <- "-p all.q" # long or all

0 commit comments

Comments
 (0)