Skip to content

Commit 0365b0b

Browse files
committed
Add scripts for creating and combining draw-level data for malaria forecasting
- Implemented `06_create_and_combine_as_and_aa_draws_parallel.py` to manage parallel workflows for draw generation. - Developed `create_and_combine_as_and_aa_draws.py` for processing and aggregating age-specific and all-age malaria data. - Introduced `create_and_combine_as_draws.py` for additional draw-level data handling. - Enhanced argument parsing and file path management for better flexibility and usability. - Integrated efficient data loading and filtering mechanisms using xarray and Dask for improved performance. - Added functionality for calculating rates and means, and writing final NetCDF files with appropriate metadata.
1 parent 7cd87f8 commit 0365b0b

40 files changed

Lines changed: 4636 additions & 303 deletions

src/idd_forecast_mbp/01_map_to_admin_2/01_prep_maps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import datetime
77
from rra_tools.shell_tools import mkdir # type: ignore
88
from idd_forecast_mbp import constants as rfc
9-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary, parse_yaml_dictionary
9+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary, parse_yaml_dictionary
1010

1111
repo_name = rfc.repo_name
1212
package_name = rfc.package_name

src/idd_forecast_mbp/01_map_to_admin_2/02_pixel_main_parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pathlib import Path
55
import geopandas as gpd # type: ignore
66
from idd_forecast_mbp import constants as rfc
7-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary, parse_yaml_dictionary
7+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary, parse_yaml_dictionary
88

99
repo_name = rfc.repo_name
1010
package_name = rfc.package_name

src/idd_forecast_mbp/01_map_to_admin_2/03_pixel_hierarchy_parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pathlib import Path
55
import geopandas as gpd # type: ignore
66
from idd_forecast_mbp import constants as rfc
7-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary
7+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary
88

99
repo_name = rfc.repo_name
1010
package_name = rfc.package_name

src/idd_forecast_mbp/01_map_to_admin_2/04_pixel_urban_main_parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pathlib import Path
55
import geopandas as gpd # type: ignore
66
from idd_forecast_mbp import constants as rfc
7-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary, parse_yaml_dictionary
7+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary, parse_yaml_dictionary
88

99
repo_name = rfc.repo_name
1010
package_name = rfc.package_name

src/idd_forecast_mbp/01_map_to_admin_2/05_pixel_urban_hierarchy_parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pathlib import Path
55
import geopandas as gpd # type: ignore
66
from idd_forecast_mbp import constants as rfc
7-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary
7+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary
88

99
repo_name = rfc.repo_name
1010
package_name = rfc.package_name

src/idd_forecast_mbp/01_map_to_admin_2/pixel_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import itertools
1616
from rra_tools.shell_tools import mkdir # type: ignore
1717
from idd_forecast_mbp import constants as rfc
18-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary, parse_yaml_dictionary
18+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary, parse_yaml_dictionary
1919
import argparse
2020
import yaml
2121

src/idd_forecast_mbp/01_map_to_admin_2/pixel_urban_main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import itertools
1616
from rra_tools.shell_tools import mkdir # type: ignore
1717
from idd_forecast_mbp import constants as rfc
18-
from idd_forecast_mbp.helper_functions import load_yaml_dictionary, parse_yaml_dictionary
18+
from idd_forecast_mbp.yaml_functions import load_yaml_dictionary, parse_yaml_dictionary
1919
import argparse
2020
import yaml
2121

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import xarray as xr # type: ignore
2+
from pathlib import Path
3+
import numpy as np # type: ignore
4+
from typing import cast
5+
import numpy.typing as npt # type: ignore
6+
import pandas as pd # type: ignore
7+
from typing import Literal, NamedTuple
8+
import itertools
9+
from rra_tools.shell_tools import mkdir # type: ignore
10+
from idd_forecast_mbp import constants as rfc
11+
from idd_forecast_mbp.helper_functions import merge_dataframes, read_income_paths, read_urban_paths, level_filter
12+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet
13+
14+
15+
import argparse
16+
parser = argparse.ArgumentParser(description="Add DAH Sceanrios and create draw level dataframes for forecating malaria")
17+
18+
# Define arguments
19+
parser.add_argument("--ssp_scenario", type=str, required=True, help="ssp scenario number (ssp16, ssp245, ssp585")
20+
parser.add_argument("--draw", type=str, required=True, help="Draw number (e.g., '001', '002', etc.)")
21+
22+
# Parse arguments
23+
args = parser.parse_args()
24+
25+
26+
ssp_scenario = args.ssp_scenario
27+
draw = args.draw
28+
29+
# Hierarchy
30+
hierarchy = "lsae_1209"
31+
PROCESSED_DATA_PATH = rfc.MODEL_ROOT / "02-processed_data"
32+
FORECASTING_DATA_PATH = rfc.MODEL_ROOT / "04-forecasting_data"
33+
34+
# # New DAH data
35+
# new_dah_scenarios = {
36+
# 'reference': {
37+
# 'name': 'reference',
38+
# 'path': f'{PROCESSED_DATA_PATH}/dah_reference_df.parquet'
39+
# },
40+
# 'better': {
41+
# 'name': 'better',
42+
# 'path': f'{PROCESSED_DATA_PATH}/dah_better_df.parquet'
43+
# },
44+
# 'worse': {
45+
# 'name': 'worse',
46+
# 'path': f'{PROCESSED_DATA_PATH}/dah_worse_df.parquet'
47+
# }
48+
# }
49+
50+
# New DAH data
51+
# Created using make_GK_dah_df.py
52+
new_dah_scenarios = {
53+
'reference': {
54+
'name': 'GK_reference_2025_11_02',
55+
'path': f'{PROCESSED_DATA_PATH}/GK_dah_ref_df_2025_07_08.parquet'
56+
},
57+
'cut20': {
58+
'name': 'GK_cut20_2025_11_02',
59+
'path': f'{PROCESSED_DATA_PATH}/GK_dah_cut20_df_2025_07_08.parquet'
60+
}
61+
}
62+
63+
64+
base_dah_scenario_df_path_template = "{FORECASTING_DATA_PATH}/malaria_forecast_ssp_scenario_{ssp_scenario}_dah_scenario_Baseline_draw_{draw}.parquet"
65+
dah_scenario_df_path_template = "{FORECASTING_DATA_PATH}/malaria_forecast_ssp_scenario_{ssp_scenario}_dah_scenario_{dah_scenario_name}_draw_{draw}.parquet"
66+
67+
columns_to_keep = ['location_id', 'year_id', 'people_flood_days_per_capita',
68+
'gdppc_mean', 'log_gdppc_mean',
69+
'logit_malaria_pfpr',
70+
'aa_malaria_mort_rate', 'aa_malaria_inc_rate',
71+
'base_malaria_mort_rate', 'base_malaria_inc_rate',
72+
'log_aa_malaria_mort_rate', 'log_aa_malaria_inc_rate',
73+
'log_base_malaria_mort_rate', 'log_base_malaria_inc_rate',
74+
'malaria_suitability', 'year_to_rake_to', 'A0_af']
75+
76+
dah_columns_to_keep = ['location_id', 'year_id', 'mal_DAH_total_per_capita']
77+
78+
79+
base_dah_scenario_df_path = base_dah_scenario_df_path_template.format(
80+
FORECASTING_DATA_PATH=FORECASTING_DATA_PATH,
81+
ssp_scenario=ssp_scenario,
82+
draw=draw
83+
)
84+
base_dah_scenario_df = read_parquet_with_integer_ids(base_dah_scenario_df_path,
85+
columns=columns_to_keep
86+
)
87+
88+
for dah_scenario_name, dah_scenario in new_dah_scenarios.items():
89+
print(f"Processing DAH scenario: {dah_scenario_name}")
90+
91+
# FIX: Create a fresh copy of the base data for the current scenario merge.
92+
current_scenario_df = base_dah_scenario_df.copy()
93+
94+
# Read the new DAH scenario data
95+
dah_df = read_parquet_with_integer_ids(dah_scenario['path'],
96+
columns=dah_columns_to_keep)
97+
98+
print("-" * 50)
99+
print(f"DIAGNOSTIC: {dah_scenario_name.upper()} Input Check")
100+
101+
# Check 1: Does the input DAH file (dah_df) have non-zero data post-2025?
102+
# Assuming 2025 is the start year of divergence
103+
post_2025_dah = dah_df[dah_df['year_id'] >= 2025]['mal_DAH_total_per_capita']
104+
105+
if not post_2025_dah.empty and post_2025_dah.abs().sum() > 1e-9:
106+
print(f"✅ Input DAH data is NOT zero post-2025. Max value: {post_2025_dah.max():.4f}")
107+
else:
108+
print("❌ Input DAH data is zero/missing post-2025. The problem is upstream.")
109+
print("-" * 50)
110+
111+
current_scenario_df['A0_location_id'] = current_scenario_df['A0_af'].str.extract(r'A0_(\d+)')[0].astype(int)
112+
113+
# Merge the unique DAH data into the fresh copy
114+
dah_scenario_df = current_scenario_df.merge(
115+
dah_df,
116+
left_on=['A0_location_id', 'year_id'],
117+
right_on=['location_id', 'year_id'],
118+
how='left',
119+
suffixes=('', '_dah')
120+
)
121+
dah_scenario_df = dah_scenario_df.drop(columns=['location_id_dah', 'A0_location_id'])
122+
123+
124+
# Add the new DAH column (fillna is still correct)
125+
dah_scenario_df['mal_DAH_total_per_capita'] = dah_scenario_df['mal_DAH_total_per_capita'].fillna(0)
126+
# Write the output to a new parquet file
127+
dah_scenario_df_path = dah_scenario_df_path_template.format(
128+
FORECASTING_DATA_PATH=FORECASTING_DATA_PATH,
129+
ssp_scenario=ssp_scenario,
130+
dah_scenario_name=dah_scenario['name'],
131+
draw=draw
132+
)
133+
134+
write_parquet(dah_scenario_df, dah_scenario_df_path)

src/idd_forecast_mbp/02_data_prep/alt_forecasted_malaria_dataframes.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414

1515
import argparse
16-
parser = argparse.ArgumentParser(description="Add DAH Sceanrios and create draw level dataframes for forecating dengue")
16+
parser = argparse.ArgumentParser(description="Add DAH Sceanrios and create draw level dataframes for forecating malaria")
1717

1818
# Define arguments
1919
parser.add_argument("--ssp_scenario", type=str, required=True, help="ssp scenario number (ssp16, ssp245, ssp585")
@@ -31,19 +31,32 @@
3131
PROCESSED_DATA_PATH = rfc.MODEL_ROOT / "02-processed_data"
3232
FORECASTING_DATA_PATH = rfc.MODEL_ROOT / "04-forecasting_data"
3333

34+
# # New DAH data
35+
# new_dah_scenarios = {
36+
# 'reference': {
37+
# 'name': 'reference',
38+
# 'path': f'{PROCESSED_DATA_PATH}/dah_reference_df.parquet'
39+
# },
40+
# 'better': {
41+
# 'name': 'better',
42+
# 'path': f'{PROCESSED_DATA_PATH}/dah_better_df.parquet'
43+
# },
44+
# 'worse': {
45+
# 'name': 'worse',
46+
# 'path': f'{PROCESSED_DATA_PATH}/dah_worse_df.parquet'
47+
# }
48+
# }
49+
3450
# New DAH data
51+
# Created using make_GK_dah_df.py
3552
new_dah_scenarios = {
3653
'reference': {
37-
'name': 'reference',
38-
'path': f'{PROCESSED_DATA_PATH}/dah_reference_df.parquet'
39-
},
40-
'better': {
41-
'name': 'better',
42-
'path': f'{PROCESSED_DATA_PATH}/dah_better_df.parquet'
54+
'name': 'GK_reference_2025_11_02',
55+
'path': f'{PROCESSED_DATA_PATH}/GK_dah_ref_df_2025_07_08.parquet'
4356
},
44-
'worse': {
45-
'name': 'worse',
46-
'path': f'{PROCESSED_DATA_PATH}/dah_worse_df.parquet'
57+
'cut20': {
58+
'name': 'GK_cut20_2025_11_02',
59+
'path': f'{PROCESSED_DATA_PATH}/GK_dah_cut20_df_2025_07_08.parquet'
4760
}
4861
}
4962

@@ -75,21 +88,46 @@
7588
for dah_scenario_name, dah_scenario in new_dah_scenarios.items():
7689
print(f"Processing DAH scenario: {dah_scenario_name}")
7790

91+
# FIX: Create a fresh copy of the base data for the current scenario merge.
92+
current_scenario_df = base_dah_scenario_df.copy()
93+
7894
# Read the new DAH scenario data
7995
dah_df = read_parquet_with_integer_ids(dah_scenario['path'],
8096
columns=dah_columns_to_keep)
8197

82-
# Merge with the existing DAH scenario data
83-
dah_scenario_df = base_dah_scenario_df.merge(dah_df, on=['location_id', 'year_id'], how='left')
98+
print("-" * 50)
99+
print(f"DIAGNOSTIC: {dah_scenario_name.upper()} Input Check")
100+
101+
# Check 1: Does the input DAH file (dah_df) have non-zero data post-2025?
102+
# Assuming 2025 is the start year of divergence
103+
post_2025_dah = dah_df[dah_df['year_id'] >= 2025]['mal_DAH_total_per_capita']
104+
105+
if not post_2025_dah.empty and post_2025_dah.abs().sum() > 1e-9:
106+
print(f"✅ Input DAH data is NOT zero post-2025. Max value: {post_2025_dah.max():.4f}")
107+
else:
108+
print("❌ Input DAH data is zero/missing post-2025. The problem is upstream.")
109+
print("-" * 50)
110+
111+
current_scenario_df['A0_location_id'] = current_scenario_df['A0_af'].str.extract(r'A0_(\d+)')[0].astype(int)
112+
113+
# Merge the unique DAH data into the fresh copy
114+
dah_scenario_df = current_scenario_df.merge(
115+
dah_df,
116+
left_on=['A0_location_id', 'year_id'],
117+
right_on=['location_id', 'year_id'],
118+
how='left',
119+
suffixes=('', '_dah')
120+
)
121+
dah_scenario_df = dah_scenario_df.drop(columns=['location_id_dah', 'A0_location_id'])
122+
84123

85-
# Add the new DAH column
124+
# Add the new DAH column (fillna is still correct)
86125
dah_scenario_df['mal_DAH_total_per_capita'] = dah_scenario_df['mal_DAH_total_per_capita'].fillna(0)
87-
88126
# Write the output to a new parquet file
89127
dah_scenario_df_path = dah_scenario_df_path_template.format(
90128
FORECASTING_DATA_PATH=FORECASTING_DATA_PATH,
91129
ssp_scenario=ssp_scenario,
92-
dah_scenario_name=dah_scenario_name,
130+
dah_scenario_name=dah_scenario['name'],
93131
draw=draw
94132
)
95133

0 commit comments

Comments
 (0)