Skip to content

Commit d8dc9a7

Browse files
committed
Add xarray utility functions for NetCDF handling and DataFrame conversion
- Implemented functions to read and write NetCDF files with integer ID validation and retry logic. - Added utility functions for sorting ID coordinates and ensuring they are integers. - Created a function to convert pandas DataFrames to xarray Datasets with optimized dtypes and validation. - Introduced filtering functions for xarray Datasets based on multiple coordinates and ranges. - Added YAML loading functions to parse covariate configurations from a YAML file. - Included example usage and preset configurations for common dimension setups. - Created a test file for future parquet fixes.
1 parent d599b0d commit d8dc9a7

File tree

49 files changed

+6345
-762
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+6345
-762
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,5 @@ venv.bak/
7777

7878
# IDEs
7979
.idea/
80-
notebooks/
80+
notebooks/
81+
.Rproj.user

STALE_FILE_HANDLE_FIX.md

Whitespace-only changes.

idd-forecast-mbp.Rproj

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
Version: 1.0
2+
3+
RestoreWorkspace: Default
4+
SaveWorkspace: Default
5+
AlwaysSaveHistory: Default
6+
7+
EnableCodeIndexing: Yes
8+
UseSpacesForTab: Yes
9+
NumSpacesForTab: 2
10+
Encoding: UTF-8
11+
12+
RnwWeave: Sweave
13+
LaTeX: pdfLaTeX

src/idd_forecast_mbp/02_data_prep/02_as_fhs_and_full_population.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
import sys
1515
import xarray as xr
1616
from idd_forecast_mbp import constants as rfc
17-
from idd_forecast_mbp.helper_functions import read_parquet_with_integer_ids, write_parquet
17+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet
18+
from idd_forecast_mbp.xarray_functions import read_netcdf_with_integer_ids, write_netcdf, convert_with_preset
1819

1920
age_type_map = {
2021
"all_age": {
@@ -39,10 +40,15 @@
3940
fhs_hierarchy_df_path = f"{GBD_DATA_PATH}/fhs_2023_modeling_hierarchy.parquet"
4041

4142
gbd_population_path = f"{GBD_DATA_PATH}/gbd_2023_population.parquet"
42-
aa_full_population_df_path = f"{PROCESSED_DATA_PATH}/aa_2023_full_population.parquet"
43-
as_full_population_df_path = f"{PROCESSED_DATA_PATH}/as_2023_full_population.parquet"
44-
aa_fhs_population_path = f"{PROCESSED_DATA_PATH}/aa_2023_fhs_population.parquet"
45-
as_fhs_population_path = f"{PROCESSED_DATA_PATH}/as_2023_fhs_population.parquet"
43+
aa_full_population_df_path = f"{PROCESSED_DATA_PATH}/aa_2023_full_population_df.parquet"
44+
as_full_population_df_path = f"{PROCESSED_DATA_PATH}/as_2023_full_population_df.parquet"
45+
aa_full_population_ds_path = f"{PROCESSED_DATA_PATH}/aa_2023_full_population_ds.nc"
46+
as_full_population_ds_path = f"{PROCESSED_DATA_PATH}/as_2023_full_population_ds.nc"
47+
48+
aa_fhs_population_df_path = f"{PROCESSED_DATA_PATH}/aa_2023_fhs_population_df.parquet"
49+
as_fhs_population_df_path = f"{PROCESSED_DATA_PATH}/as_2023_fhs_population_df.parquet"
50+
aa_fhs_population_ds_path = f"{PROCESSED_DATA_PATH}/aa_2023_fhs_population_ds.nc"
51+
as_fhs_population_ds_path = f"{PROCESSED_DATA_PATH}/as_2023_fhs_population_ds.nc"
4652

4753
missing_level_4_location_path = f"{PROCESSED_DATA_PATH}/missing_level_4_location_ids.parquet"
4854
missing_level_5_location_path = f"{PROCESSED_DATA_PATH}/missing_level5_location_ids.parquet"
@@ -155,8 +161,14 @@
155161

156162

157163
# Write to parquet
158-
write_parquet(aa_fhs_population_df, aa_fhs_population_path)
159-
write_parquet(as_fhs_population_df, as_fhs_population_path)
164+
write_parquet(aa_fhs_population_df, aa_fhs_population_df_path)
165+
write_parquet(as_fhs_population_df, as_fhs_population_df_path)
166+
167+
aa_fhs_population_ds = convert_with_preset(aa_fhs_population_df, preset='aa_variables')
168+
write_netcdf(aa_fhs_population_ds, aa_fhs_population_ds_path)
169+
as_fhs_population_ds = convert_with_preset(as_fhs_population_df, preset='as_variables')
170+
write_netcdf(as_fhs_population_ds, as_fhs_population_ds_path)
171+
160172

161173
###----------------------------------------------------------###
162174
### 3. Base Population Data Loading
@@ -423,6 +435,9 @@
423435

424436
# Step 5: Finalize the aa_full_population_df
425437
write_parquet(aa_full_population_df, aa_full_population_df_path)
438+
# Convert to xarray dataset and write to netCDF
439+
aa_full_population_ds = convert_with_preset(aa_full_population_df, preset='aa_variables')
440+
write_netcdf(aa_full_population_ds, aa_full_population_ds_path)
426441

427442
###----------------------------------------------------------###
428443
### 5. Age Metadata Processing
@@ -539,4 +554,7 @@
539554
# As a last step, replace all age-sex population
540555

541556
# Write the final DataFrame to a parquet file
542-
write_parquet(as_full_population_df, as_full_population_df_path)
557+
write_parquet(as_full_population_df, as_full_population_df_path)
558+
# Convert to xarray dataset and write to netCDF
559+
as_full_population_ds = convert_with_preset(as_full_population_df, preset='as_variables')
560+
write_netcdf(as_full_population_ds, as_full_population_ds_path)

src/idd_forecast_mbp/02_data_prep/03_rake_aa_A2_to_GBD.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from datetime import datetime
66
from rra_tools.shell_tools import mkdir # type: ignore
77
from idd_forecast_mbp import constants as rfc
8-
from idd_forecast_mbp.helper_functions import read_parquet_with_integer_ids, write_parquet, check_column_for_problematic_values
8+
from idd_forecast_mbp.helper_functions import check_column_for_problematic_values
9+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet
910
from idd_forecast_mbp.cause_processing_functions import format_aa_gbd_df, process_lsae_df
1011
from idd_forecast_mbp.rake_and_aggregate_functions import rake_aa_count_lsae_to_gbd, make_aa_full_rate_df_from_aa_count_df, check_concordance, aggregate_aa_rate_lsae_to_gbd
1112

src/idd_forecast_mbp/02_data_prep/04_rake_as_A2_to_GBD.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from pathlib import Path
44
from rra_tools.shell_tools import mkdir # type: ignore
55
from idd_forecast_mbp import constants as rfc
6-
from idd_forecast_mbp.helper_functions import read_parquet_with_integer_ids, write_parquet, level_filter
7-
6+
from idd_forecast_mbp.helper_functions import level_filter
7+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet
88

99
PROCESSED_DATA_PATH = rfc.PROCESSED_DATA_PATH
1010
FORECASTING_DATA_PATH = rfc.FORECASTING_DATA_PATH
@@ -68,13 +68,16 @@
6868
gbd_columns_to_read = ["location_id", "year_id", "age_group_id", "sex_id", 'population', "val"]
6969
full_df_columns_to_read = ["location_id", "year_id", 'population', outcome_count]
7070
# Get the most detailed gbd data
71+
print("Reading most-detailed GBD data")
72+
print(f"Reading {as_gbd_cause_df_path}")
7173
as_md_gbd_df = read_parquet_with_integer_ids(as_gbd_cause_df_path,
7274
columns=gbd_columns_to_read,
7375
filters=[year_filter, level_filter(hierarchy_df, start_level = 3, end_level = 5), measure_filter,
7476
metric_filter, age_filter, sex_filter]).rename(columns={'val': outcome_count})
7577

7678
gbd_location_ids = as_md_gbd_df['location_id'].unique().tolist()
7779
gbd_location_filter = ('location_id', 'in', gbd_location_ids)
80+
print(f"Reading {aa_full_cause_df_path}")
7881
aa_md_gbd_df = read_parquet_with_integer_ids(aa_full_cause_df_path,
7982
columns=full_df_columns_to_read,
8083
filters=[year_filter, gbd_location_filter]).rename(columns={
@@ -83,6 +86,7 @@
8386

8487
as_md_gbd_df = as_md_gbd_df.merge(aa_md_gbd_df, on=['location_id', 'year_id'], how='left').copy()
8588

89+
print("Calculating most-detailed GBD rates and rate ratios")
8690
as_md_gbd_df['aa_' + outcome_rate] = as_md_gbd_df['aa_' + outcome_count] / as_md_gbd_df['aa_population']
8791
as_md_gbd_df[outcome_rate] = as_md_gbd_df[outcome_count] / as_md_gbd_df['population']
8892
as_md_gbd_df['rate_ratio'] = as_md_gbd_df[outcome_rate] / as_md_gbd_df['aa_' + outcome_rate]
@@ -106,21 +110,24 @@
106110
)
107111

108112
# Rename the gbd dataframe columns
109-
gbd_outcome_columns = [col for col in as_md_gbd_df.columns if measure_short in col or 'ratio' in col] + ['location_id']
113+
gbd_outcome_columns = [col for col in as_md_gbd_df.columns if measure_short in col or 'ratio' in col] + ['location_id','aa_population', 'population']
110114
rename_dict = {col: 'gbd_' + col for col in gbd_outcome_columns}
111-
as_md_gbd_df = as_md_gbd_df.rename(columns=rename_dict).drop(columns=['aa_population', 'population'])
115+
as_md_gbd_df = as_md_gbd_df.rename(columns=rename_dict)
112116

117+
print("Starting subnational merge")
113118
# Merge
114119
as_subnat_df = as_subnat_df.merge(
115120
as_md_gbd_df,
116121
how='left',
117122
on=['gbd_location_id', 'year_id', 'age_group_id', 'sex_id']
118123
)
119-
124+
125+
120126
as_subnat_df['aa_' + outcome_rate] = as_subnat_df['aa_' + outcome_count] / as_subnat_df['aa_population']
121127
as_subnat_df[outcome_rate] = as_subnat_df['gbd_rate_ratio'] * as_subnat_df['aa_' + outcome_rate]
122128
as_subnat_df[outcome_count] = as_subnat_df[outcome_rate] * as_subnat_df['population']
123129
#
130+
print(as_subnat_df[as_subnat_df[outcome_rate] == as_subnat_df[outcome_rate].max()])
124131
drop_cols = [col for col in as_subnat_df.columns if 'gbd_' in col or 'rate_ratio' in col]
125132
as_subnat_df = as_subnat_df.drop(columns=drop_cols)
126133

src/idd_forecast_mbp/02_data_prep/05_malaria_modeling_dataframe.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import os
1313
import sys
1414
from idd_forecast_mbp import constants as rfc
15-
from idd_forecast_mbp.helper_functions import read_parquet_with_integer_ids, merge_dataframes, read_income_paths, read_urban_paths, write_parquet, level_filter
15+
from idd_forecast_mbp.helper_functions import merge_dataframes, read_income_paths, read_urban_paths, level_filter
16+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet
1617
import glob
1718

1819
malaria_mortality_threshold = 1
@@ -97,8 +98,8 @@
9798
age_group_ids = age_sex_df['age_group_id'].unique().tolist()
9899
age_filter = ('age_group_id', 'in', age_group_ids)
99100

100-
aa_merge_variables = ["location_id", "year_id"]
101-
as_merge_variables = ["location_id", "year_id", "age_group_id", "sex_id"]
101+
aa_merge_variables = rfc.aa_merge_variables
102+
as_merge_variables = rfc.as_merge_variables
102103

103104
###----------------------------------------------------------###
104105
### 4. Data Loading and Integration
@@ -113,7 +114,9 @@
113114
filters=[year_filter, level_filter(hierarchy_df, start_level = 3, end_level = 5)])
114115
# Read and merge development assistance data
115116
dah_df = read_parquet_with_integer_ids(dah_df_path)
116-
malaria_df = pd.merge(malaria_df, dah_df, on=["location_id", "year_id"], how="left")
117+
malaria_df = pd.merge(malaria_df,
118+
dah_df[aa_merge_variables + ['mal_DAH_total','mal_DAH_total_per_capita']],
119+
on = aa_merge_variables, how="left")
117120

118121
# Load and merge urbanization metrics
119122
urban_dfs = read_urban_paths(urban_paths, VARIABLE_DATA_PATH)
@@ -173,12 +176,18 @@
173176
malaria_stage_2_df = malaria_stage_2_df.copy()
174177

175178
# Select countries with significant malaria burden (mortality > threshold)
176-
A0_malaria_stage_2_df = malaria_stage_2_df[(malaria_stage_2_df["location_id"] == malaria_stage_2_df["A0_location_id"]) & (malaria_stage_2_df["year_id"] == 2022)]
177-
A0_malaria_stage_2_df = A0_malaria_stage_2_df[A0_malaria_stage_2_df["malaria_mort_count"] >= malaria_mortality_threshold]
178-
phase_2_A0_location_ids = A0_malaria_stage_2_df["location_id"].unique()
179+
A0_malaria_stage_2_df = malaria_stage_2_df[(malaria_stage_2_df["location_id"] == malaria_stage_2_df["A0_location_id"]) & (malaria_stage_2_df["year_id"] == 2022)].copy()
180+
A0_malaria_stage_2_df = A0_malaria_stage_2_df.rename(columns={
181+
"malaria_pfpr": "A0_malaria_pfpr",
182+
"malaria_mort_count": "A0_malaria_mort_count",
183+
"malaria_inc_count": "A0_malaria_inc_count"})
184+
185+
A0_malaria_stage_2_df = A0_malaria_stage_2_df[A0_malaria_stage_2_df["A0_malaria_mort_count"] >= malaria_mortality_threshold]
186+
phase_2_A0_location_ids = A0_malaria_stage_2_df["A0_location_id"].unique()
179187

180188
# Subset to high-burden countries and most detailed geographic units
181189
malaria_stage_2_df = malaria_stage_2_df[malaria_stage_2_df["A0_location_id"].isin(phase_2_A0_location_ids)]
190+
malaria_stage_2_df = malaria_stage_2_df.merge(A0_malaria_stage_2_df[["A0_location_id", "A0_malaria_pfpr", "A0_malaria_mort_count", "A0_malaria_inc_count"]], on=["A0_location_id"], how="left")
182191

183192
###----------------------------------------------------------###
184193
### 7. Feature Engineering and Transformations
@@ -227,6 +236,7 @@
227236
'urban_100m_threshold_300', 'urban_1km_threshold_1500', 'urban_100m_threshold_1500', 'gdppc_mean',
228237
'total_precipitation', 'relative_humidity', 'mean_temperature',
229238
'mean_high_temperature', 'malaria_suitability', 'people_flood_days_per_capita', 'A0_location_id',
239+
"A0_malaria_pfpr", "A0_malaria_mort_count", "A0_malaria_inc_count",
230240
'A0_af', 'log_mal_DAH_total_per_capita', 'log_gdppc_mean', 'logit_urban_1km_threshold_300',
231241
'logit_urban_100m_threshold_300', 'logit_urban_1km_threshold_1500', 'logit_urban_100m_threshold_1500', 'logit_malaria_pfpr']
232242
malaria_stage_3_df = malaria_stage_2_df[stage_2_df_columns_to_keep].copy()

src/idd_forecast_mbp/02_data_prep/06_dengue_modeling_dataframe.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import os
1313
import sys
1414
from idd_forecast_mbp import constants as rfc
15-
from idd_forecast_mbp.helper_functions import read_parquet_with_integer_ids, merge_dataframes, read_income_paths, read_urban_paths, write_parquet, level_filter
15+
from idd_forecast_mbp.helper_functions import merge_dataframes, read_income_paths, read_urban_paths, level_filter
16+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet
1617
import glob
1718

1819
dengue_mortality_theshold = 1
@@ -101,7 +102,7 @@
101102
### including urbanization metrics, income data,
102103
### and climate variables from different sources.
103104
###----------------------------------------------------------###
104-
# Load core malaria data
105+
# Load core dengue data
105106
dengue_df = read_parquet_with_integer_ids(aa_full_cause_df_path_template,
106107
filters=[year_filter, level_filter(hierarchy_df, start_level = 3, end_level = 5)])
107108

@@ -170,20 +171,28 @@
170171
clipped_values = dengue_df[col].clip(lower=0.001, upper=0.999)
171172
dengue_df[f"logit_{col}"] = np.log(clipped_values / (1 - clipped_values))
172173

174+
aa_A0_dengue_df = dengue_df[(dengue_df["location_id"] == dengue_df["A0_location_id"]) & (dengue_df["year_id"] == 2022)].copy()
175+
aa_A0_dengue_df = aa_A0_dengue_df[aa_A0_dengue_df['aa_dengue_mort_count'] > 1].copy()
176+
A0_dengue_ids = aa_A0_dengue_df['A0_location_id'].unique()
177+
178+
173179
# Make the yn variable. This will be used as the response in the phase 1 model and to trim the data in the phase 2 model
174180
# dengue_df$yn[which(dengue_df$dengue_mort_rate > dengue_mortality_rate_theshold & dengue_df$dengue_mort_count > dengue_mortality_theshold & dengue_df$dengue_suitability > 0)] <- 1
175181
dengue_df["yn"] = 0
176182
dengue_df.loc[
177-
(dengue_df["aa_dengue_mort_rate"] > dengue_mortality_rate_theshold) &
178-
(dengue_df["aa_dengue_mort_count"] > dengue_mortality_theshold) &
183+
(dengue_df["aa_dengue_mort_rate"] > 1/100000) &
184+
(dengue_df["aa_dengue_mort_count"] > 0) &
179185
(dengue_df["aa_dengue_inc_count"] > 0) &
180186
(dengue_df["dengue_suitability"] > 0),
181187
"yn"
182188
] = 1
183189

184-
185190
write_parquet(dengue_df, aa_ge3_dengue_stage_1_modeling_df_path)
186191

192+
dengue_df = dengue_df[dengue_df['A0_location_id'].isin(A0_dengue_ids)].copy()
193+
194+
195+
187196
###----------------------------------------------------------###
188197
### 8. Final Modeling Dataset Preparation
189198
### Prepares the final dataset for modeling by selecting relevant columns,
@@ -198,7 +207,7 @@
198207
dengue_stage_2_df["A0_location_id"] = dengue_stage_2_df["A0_location_id"].astype(int)
199208
dengue_stage_2_df['A0_af'] = 'A0_' + dengue_stage_2_df['A0_location_id'].astype(str)
200209
dengue_stage_2_df['A0_af'] = dengue_stage_2_df['A0_af'].astype('category')
201-
dengue_stage_2_df = dengue_stage_2_df.drop(columns=['aa_dengue_inc_count', 'aa_dengue_inc_rate', 'aa_dengue_mort_count', 'aa_dengue_mort_rate', 'aa_dengue_cfr'])
210+
# dengue_stage_2_df = dengue_stage_2_df.drop(columns=['aa_dengue_inc_count', 'aa_dengue_inc_rate', 'aa_dengue_mort_count', 'aa_dengue_mort_rate', 'aa_dengue_cfr'])
202211
# Get the as data
203212
md_location_ids = dengue_stage_2_df["location_id"].unique().tolist()
204213
md_location_filter = ('location_id', 'in', md_location_ids)
@@ -207,6 +216,7 @@
207216
columns=as_merge_variables + ["dengue_mort_rate","dengue_inc_rate","dengue_mort_count","dengue_inc_count","population","aa_population"],
208217
filters=[year_filter, md_location_filter, age_filter, sex_filter])
209218

219+
210220
as_md_df["dengue_cfr"] = as_md_df["dengue_mort_rate"] / as_md_df["dengue_inc_rate"]
211221
as_md_df.loc[as_md_df["dengue_inc_rate"] == 0, "dengue_cfr"] = 0
212222

src/idd_forecast_mbp/02_data_prep/07_forecasted_dataframes_non_draw_part.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import numpy as np
1212

1313
from idd_forecast_mbp import constants as rfc
14-
from idd_forecast_mbp.helper_functions import ensure_id_columns_are_integers, read_parquet_with_integer_ids, read_income_paths, merge_dataframes, write_parquet, read_urban_paths
14+
from idd_forecast_mbp.helper_functions import read_income_paths, merge_dataframes, read_urban_paths
15+
from idd_forecast_mbp.parquet_functions import read_parquet_with_integer_ids, write_parquet, ensure_id_columns_are_integers, sort_id_columns
1516

1617

1718
hierarchy = "lsae_1209"
@@ -31,6 +32,9 @@
3132
hierarchy_df_path = f'{PROCESSED_DATA_PATH}/full_hierarchy_lsae_1209.parquet'
3233
hierarchy_df = read_parquet_with_integer_ids(hierarchy_df_path)
3334

35+
md_location_ids = hierarchy_df[hierarchy_df['level'] == 5]['location_id'].unique().tolist()
36+
md_location_filter = ('location_id', 'in', md_location_ids)
37+
3438
aa_full_population_df_path = f"{PROCESSED_DATA_PATH}/aa_2023_full_population.parquet"
3539
aa_full_population_df = read_parquet_with_integer_ids(aa_full_population_df_path)
3640
aa_merge_variables = rfc.aa_merge_variables
@@ -61,7 +65,7 @@
6165
flooding_df_path = flooding_df_path_template.format(ssp_scenario=ssp_scenario)
6266

6367
forecast_df = read_parquet_with_integer_ids(flooding_df_path,
64-
filters = [year_filter])
68+
filters = [year_filter, md_location_filter])
6569
forecast_df = forecast_df.drop(
6670
columns=["model", "variant", 'population']
6771
)
@@ -87,15 +91,14 @@
8791

8892
# Merge in the hierarchy_df
8993
forecast_df = forecast_df.merge(
90-
hierarchy_df[['location_id', 'A0_location_id', 'most_detailed_lsae']],
94+
hierarchy_df[['location_id', 'A0_location_id']],
9195
how="left",
9296
left_on="location_id",
9397
right_on="location_id"
9498
)
9599

96100
# Drop rows where A0_location_id is NaN
97101
forecast_df = forecast_df.dropna(subset=["A0_location_id"])
98-
forecast_df = forecast_df[forecast_df["most_detailed_lsae"] == 1]
99102

100103
forecast_df = ensure_id_columns_are_integers(forecast_df)
101104

0 commit comments

Comments
 (0)