diff --git a/src/rra_population_model/cli_options.py b/src/rra_population_model/cli_options.py index 4bf9384..6a8502d 100644 --- a/src/rra_population_model/cli_options.py +++ b/src/rra_population_model/cli_options.py @@ -115,6 +115,15 @@ def with_version[**P, T]() -> Callable[[Callable[P, T]], Callable[P, T]]: ) +def with_copy_from_version[**P, T]() -> Callable[[Callable[P, T]], Callable[P, T]]: + return click.option( + "--copy-from-version", + type=click.STRING, + help="Version of the model to copy predictions from. Used if we're " + "raking a set of predictions to multiple raking targets.", + ) + + def with_block_key[**P, T]() -> Callable[[Callable[P, T]], Callable[P, T]]: return click.option( "--block-key", diff --git a/src/rra_population_model/constants.py b/src/rra_population_model/constants.py index 73eee9e..120d655 100644 --- a/src/rra_population_model/constants.py +++ b/src/rra_population_model/constants.py @@ -1,3 +1,4 @@ +import itertools import warnings from enum import StrEnum from pathlib import Path @@ -26,7 +27,7 @@ def to_list(cls) -> list[str]: class BuiltVersion(BaseModel): provider: Literal["ghsl", "microsoft"] - version: Literal["v4", "r2023a"] + version: Literal["v4", "v6", "r2023a"] time_points: list[str] measures: list[str] denominators: list[str] @@ -47,7 +48,7 @@ def time_points_float(self) -> list[float]: @model_validator(mode="after") def validate_version(self) -> Self: version_map = { - "microsoft": ["v4"], + "microsoft": ["v4", "v6"], "ghsl": ["r2023a"], } allowed_version = version_map[self.provider] @@ -103,6 +104,15 @@ def validate_measures(self) -> Self: measures=["density"], denominators=["density"], ), + "microsoft_v6": BuiltVersion( + provider="microsoft", + version="v6", + time_points=[ + f"{y}q{q}" for y, q in itertools.product(range(2020, 2024), range(1, 5)) + ][1:], + measures=["density"], + denominators=["density"], + ), } DENOMINATORS = [] diff --git a/src/rra_population_model/data.py b/src/rra_population_model/data.py index 0f7cec8..e179299 100644 --- a/src/rra_population_model/data.py +++ b/src/rra_population_model/data.py @@ -582,7 +582,9 @@ def model_root(self, resolution: str) -> Path: def model_version_root(self, resolution: str, version: str) -> Path: return self.model_root(resolution) / version - def make_model_version_root(self, resolution: str, version: str) -> Path: + def make_model_version_root( + self, resolution: str, version: str, *, exist_ok: bool = False + ) -> Path: dirs = [ self.model_version_root(resolution, version), self.raw_predictions_root(resolution, version), @@ -591,16 +593,95 @@ def make_model_version_root(self, resolution: str, version: str) -> Path: self.compiled_predictions_root(resolution, version), ] for path in dirs: - mkdir(path) + mkdir(path, exist_ok=exist_ok) + + return self.model_version_root(resolution, version) + + def maybe_copy_version( + self, + resolution: str, + version: str, + copy_from_version: str | None, + ) -> None: + if copy_from_version is None: + # Nothing to do + return + + version_root = self.model_version_root(resolution, version) + version_spec_path = self.model_specification_path(resolution, version) + version_ckpt_path = self.model_checkpoint_path(resolution, version) + version_preds_root = self.raw_predictions_root(resolution, version) + + copy_from_root = self.model_version_root(resolution, copy_from_version) + copy_spec_path = self.model_specification_path(resolution, copy_from_version) + copy_ckpt_path = self.model_checkpoint_path(resolution, copy_from_version) + copy_preds_root = self.raw_predictions_root(resolution, copy_from_version) + + ################# + # Preconditions # + ################# + # Basic checks + if copy_from_version >= version: + msg = "Cannot copy from a version that is the same or newer." + raise ValueError(msg) - return path + if not copy_from_root.exists(): + msg = f"Cannot copy from non-existent version {copy_from_version}" + raise ValueError(msg) + + if copy_preds_root.is_symlink(): + msg = f"Cannot copy from symlinked raw predictions root {copy_preds_root}" + raise ValueError(msg) + + if version_root.exists() and not version_spec_path.exists(): + msg = f"Version {version} exists but has no specification file. This is an invalid directory state." + raise ValueError(msg) + + # Have we already copied this version? + if version_spec_path.exists(): + model_matches = version_ckpt_path.exists() and version_ckpt_path.samefile( + copy_ckpt_path + ) + predictions_match = ( + version_preds_root.exists() + and version_preds_root.samefile(copy_preds_root) + ) + if model_matches and predictions_match: + # We've already copied this version, we'll make this a no-op + return + else: + msg = f"Version {version} already exists but does not match copy-from version {copy_from_version}" + raise ValueError(msg) + + # If we're here, everything should be safe for copying + # Generate the new version directory + self.make_model_version_root(resolution, version) + + # Copy the model spec + copy_spec = yaml.safe_load(copy_spec_path.read_text()) + copy_spec["model_version"] = version + copy_spec["output_root"] = str(version_root) + with version_spec_path.open("w") as f: + yaml.safe_dump(copy_spec, f) + + # Symlink the model checkpoint. + version_ckpt_path.symlink_to(copy_ckpt_path.resolve()) + + # Symlink the raw predictions root + version_preds_root.rmdir() # We've just made this as an empty directory + version_preds_root.symlink_to(copy_preds_root) + + def model_specification_path(self, resolution: str, version: str) -> Path: + return self.model_version_root(resolution, version) / "specification.yaml" def save_model_specification( self, model_spec: "ModelSpecification", ) -> None: self.make_model_version_root(model_spec.resolution, model_spec.model_version) - path = Path(model_spec.output_root) / "specification.yaml" + path = self.model_specification_path( + model_spec.resolution, model_spec.model_version + ) touch(path) with path.open("w") as f: yaml.safe_dump(model_spec.model_dump(mode="json"), f) @@ -610,15 +691,18 @@ def load_model_specification( ) -> "ModelSpecification": from rra_population_model.model.modeling.datamodel import ModelSpecification - path = self.model_version_root(resolution, version) / "specification.yaml" + path = self.model_specification_path(resolution, version) with path.open() as f: spec = yaml.safe_load(f) return ModelSpecification.model_validate(spec) + def model_checkpoint_path(self, resolution: str, version: str) -> Path: + return self.model_version_root(resolution, version) / "best_model.ckpt" + def load_model(self, resolution: str, version: str) -> "PPSModel": from rra_population_model.model.modeling.model import PPSModel - ckpt_path = self.model_version_root(resolution, version) / "best_model.ckpt" + ckpt_path = self.model_checkpoint_path(resolution, version) return PPSModel.load_from_checkpoint(ckpt_path) def raw_predictions_root(self, resolution: str, version: str) -> Path: diff --git a/src/rra_population_model/model/inference/runner.py b/src/rra_population_model/model/inference/runner.py index 63f5bd5..3e9f680 100644 --- a/src/rra_population_model/model/inference/runner.py +++ b/src/rra_population_model/model/inference/runner.py @@ -71,12 +71,15 @@ def inference_main( for block_key in block_keys if not pm_data.raw_prediction_path(block_key, time_point, model_spec).exists() ] + if not block_keys: + print("All blocks have been predicted.") + return datamodule = InferenceDataModule( model_spec.model_dump(), block_keys, time_point, - num_workers=4, + num_workers=0, ) pred_writer = CustomWriter( pm_data, model.specification, time_point, write_interval="batch" @@ -84,7 +87,7 @@ def inference_main( trainer = Trainer( callbacks=[pred_writer], enable_progress_bar=progress_bar, - devices=2, + devices=1, ) trainer.predict(model, datamodule, return_predictions=False) diff --git a/src/rra_population_model/model_prep/features/runner.py b/src/rra_population_model/model_prep/features/runner.py index 55c68bf..7da5357 100644 --- a/src/rra_population_model/model_prep/features/runner.py +++ b/src/rra_population_model/model_prep/features/runner.py @@ -93,6 +93,9 @@ def features( modeling_frame = pm_data.load_modeling_frame(resolution) block_keys = modeling_frame.block_key.unique().tolist() + njobs = len(block_keys) * len(time_point) + print(f"Submitting {njobs} jobs to process features") + jobmon.run_parallel( runner="pmtask model_prep", task_name="features", diff --git a/src/rra_population_model/postprocess/rake_itu.py b/src/rra_population_model/postprocess/rake_itu.py index 31f626f..30e7180 100644 --- a/src/rra_population_model/postprocess/rake_itu.py +++ b/src/rra_population_model/postprocess/rake_itu.py @@ -12,29 +12,55 @@ from rra_population_model import constants as pmc from rra_population_model.data import PopulationModelData -TIME_POINT = "2023q4" +RAKING_VERSION = "gbd_2023" -def load_shape_population(pm_data: PopulationModelData, iso3: str) -> gpd.GeoDataFrame: - pop = pm_data.load_raking_population("fhs_2021_wpp_2022") - pop = pop[pop.year_id == int(TIME_POINT[:4])] - shapes = pm_data.load_raking_shapes("fhs_2021_wpp_2022") - - all_pop = shapes.merge(pop, on="location_id") - admin0_pop = all_pop[all_pop.ihme_loc_id == iso3] - - if admin0_pop.most_detailed.iloc[0] == 0: - location_id = admin0_pop.location_id.to_numpy()[0] - final_pop = all_pop[all_pop.parent_id == location_id] +def load_admin_populations( + pm_data: PopulationModelData, + iso3: str, + time_point: str, +) -> gpd.GeoDataFrame: + raking_pop = pm_data.load_raking_population(version=RAKING_VERSION) + all_pop = raking_pop.loc[raking_pop.most_detailed == 1].set_index( + ["year_id", "location_id"] + )["wpp_population"] + + # Interpolate the time point population + if "q" in time_point: + year, quarter = (int(s) for s in time_point.split("q")) + next_year = min(year + 1, 2100) + weight = (int(quarter) - 1) / 4 + + prior_year_pop = all_pop.loc[year] + next_year_pop = all_pop.loc[next_year] + + pop = (1 - weight) * prior_year_pop + weight * next_year_pop else: - final_pop = admin0_pop - return final_pop + year = int(time_point) + pop = all_pop.loc[year] + + admin_0 = raking_pop[raking_pop.ihme_loc_id == iso3] + if admin_0.empty: + raking_locs = ( + raking_pop[raking_pop.ihme_loc_id.str.startswith(iso3)] + .location_id.unique() + .tolist() + ) + pop = pop.loc[raking_locs] + else: + pop = pop.loc[[admin_0.location_id.iloc[0]]] + pop = pop.reset_index() + + admins = pm_data.load_raking_shapes(version=RAKING_VERSION) + pop = admins[["location_id", "geometry"]].merge(pop, on="location_id") + return pop def rake_itu_main( resolution: str, version: str, iso3: str, + time_point: str, output_dir: str, ) -> None: pm_data = PopulationModelData(output_dir) @@ -43,10 +69,9 @@ def rake_itu_main( modeling_frame = pm_data.load_modeling_frame(resolution) print("Building population shapefile") - pop = load_shape_population(pm_data, iso3) + pop = load_admin_populations(pm_data, iso3, time_point) itu_mask = pm_data.load_itu_mask(iso3) - pop = pop.to_crs(itu_mask.crs) print("Building location id mask") @@ -93,18 +118,22 @@ def rake_itu_main( .to_crs(itu_mask.crs) .buffer(0) ) - print("Loading predictions") - rasters = [] + prediction: rt.RasterArray | None = None for i, block_key in enumerate(blocks.index): print(f"Loading block {i}/{len(blocks)}: {block_key}") - r = pm_data.load_raw_prediction(block_key, TIME_POINT, model_spec).resample_to( + r = pm_data.load_raw_prediction(block_key, time_point, model_spec).resample_to( itu_mask, "sum" ) - rasters.append(r) - prediction = rt.merge(rasters) + if prediction is None: + prediction = r + else: + if np.nansum(r) == 0: + continue + prediction = rt.merge([prediction, r]) print("Raking") + assert isinstance(prediction, rt.RasterArray) # noqa: S101 prediction_array = prediction.to_numpy() raking_factor = np.ones_like(location_mask) for location_id, location_pop in ( @@ -124,7 +153,7 @@ def rake_itu_main( no_data_value=np.nan, ) pm_data.save_raked_prediction( - raked_pop, block_key=iso3, time_point=TIME_POINT, model_spec=model_spec + raked_pop, block_key=iso3, time_point=time_point, model_spec=model_spec ) @@ -132,17 +161,20 @@ def rake_itu_main( @clio.with_version() @clio.with_resolution() @clio.with_iso3() +@clio.with_time_point(choices=None) @clio.with_output_directory(pmc.MODEL_ROOT) def rake_itu_task( resolution: str, version: str, iso3: str, + time_point: str, output_dir: str, ) -> None: rake_itu_main( resolution, version, iso3, + time_point, output_dir, ) @@ -150,39 +182,53 @@ def rake_itu_task( @click.command() @clio.with_resolution() @clio.with_version() +@clio.with_copy_from_version() @clio.with_iso3(allow_all=True) +@clio.with_time_point(choices=None, allow_all=True) @clio.with_output_directory(pmc.MODEL_ROOT) @clio.with_queue() def rake_itu( resolution: str, version: str, + copy_from_version: str | None, iso3: str, + time_point: str, output_dir: str, queue: str, ) -> None: """Rake populations to the ITU masks.""" pm_data = PopulationModelData(output_dir) + pm_data.maybe_copy_version(resolution, version, copy_from_version) + available_iso3s = pm_data.list_itu_iso3s() iso3s = clio.convert_choice(iso3, available_iso3s) + prediction_time_points = pm_data.list_raw_prediction_time_points( + resolution, version + ) + time_points = clio.convert_choice(time_point, prediction_time_points) + + print(f"Launching {len(iso3s) * len(time_points)} tasks") + jobmon.run_parallel( runner="pmtask postprocess", task_name="rake_itu", task_resources={ "queue": queue, "cores": 1, - "memory": "50G", + "memory": "75G", "runtime": "60m", "project": "proj_rapidresponse", }, node_args={ "iso3": iso3s, + "time-point": time_points, }, task_args={ "resolution": resolution, "version": version, "output-dir": output_dir, }, - max_attempts=1, + max_attempts=3, log_root=pm_data.log_dir("postprocess_rake_itu"), ) diff --git a/src/rra_population_model/postprocess/raking_factors/runner.py b/src/rra_population_model/postprocess/raking_factors/runner.py index 485dd6e..b6ce90a 100644 --- a/src/rra_population_model/postprocess/raking_factors/runner.py +++ b/src/rra_population_model/postprocess/raking_factors/runner.py @@ -14,12 +14,14 @@ from rra_population_model.data import PopulationModelData from rra_population_model.postprocess.utils import get_prediction_time_point +RAKING_VERSION = "gbd_2023" + def load_admin_populations( pm_data: PopulationModelData, time_point: str, ) -> gpd.GeoDataFrame: - raking_pop = pm_data.load_raking_population(version="fhs_2021_wpp_2022") + raking_pop = pm_data.load_raking_population(version=RAKING_VERSION) all_pop = raking_pop.loc[raking_pop.most_detailed == 1].set_index( ["year_id", "location_id"] )["population"] @@ -38,7 +40,7 @@ def load_admin_populations( year = int(time_point) pop = all_pop.loc[year] - admins = pm_data.load_raking_shapes(version="fhs_2021_wpp_2022") + admins = pm_data.load_raking_shapes(version=RAKING_VERSION) pop = admins[["location_id", "geometry"]].merge(pop, on="location_id") return pop @@ -181,6 +183,7 @@ def raking_factors_task( @click.command() @clio.with_resolution() @clio.with_version() +@clio.with_copy_from_version() @clio.with_time_point(choices=None, allow_all=True) @click.option("--extrapolate", is_flag=True) @clio.with_output_directory(pmc.MODEL_ROOT) @@ -189,6 +192,7 @@ def raking_factors_task( def raking_factors( resolution: str, version: str, + copy_from_version: str | None, time_point: str, extrapolate: bool, output_dir: str, @@ -196,6 +200,8 @@ def raking_factors( queue: str, ) -> None: pm_data = PopulationModelData(output_dir) + pm_data.maybe_copy_version(resolution, version, copy_from_version) + prediction_time_points = pm_data.list_raw_prediction_time_points( resolution, version ) diff --git a/src/rra_population_model/preprocess/README.md b/src/rra_population_model/preprocess/README.md deleted file mode 100644 index 15fbb80..0000000 --- a/src/rra_population_model/preprocess/README.md +++ /dev/null @@ -1,26 +0,0 @@ -# People per Structure Model Preprocessing - -The set of pipelines in this subpackage are used to preprocess data for the people per -structure model. There are required steps (e.g. caching building density, preparing -the training data set and features) as well as optional steps (e.g. making -diagostic plots). The preprocessing steps are run using the `ppsrun preprocess ` -command and individual steps can be run using the `pmtask preprocess ` command. -Where reasonable, the tasks have the same names as the steps, though some steps may -be broken into multiple kinds of tasks. Consult `pmrun preprocess --help` and -`pmtask preprocess --help` for more information. - -## Preprocessing Steps - -1. `pmrun preprocess modeling_frame`: -2. `pmrun preprocess features`: This step generates raster features at the - pixel level and writes them out by block. `features` has the single task: - `pmtask preprocess features`. -3. `pmrun preprocess census_data`: Prepare database of linked census data for - processing into training data. -4. `pmrun preprocess training_data`: This step generates the training data - for the people per structure model including the features and the target variable. -5. (Optional) `pmrun preprocess summarize_training_data`: This step creates - tile/scene-level summaries of the training data in tabular form. This is useful - for understanding the distribution of the target variable and how it changes in time. -6. (Optional) `pmrun preprocess plot_training_data`: This step creates diagnostic - plots from the summaries created in the previous step. diff --git a/src/rra_population_model/preprocess/raking_data/metadata.py b/src/rra_population_model/preprocess/raking_data/metadata.py index 96c0f9d..9ed4b21 100644 --- a/src/rra_population_model/preprocess/raking_data/metadata.py +++ b/src/rra_population_model/preprocess/raking_data/metadata.py @@ -30,6 +30,61 @@ NO_REGION_ID = -1 +TO_DROP_PARENTS = [ + # Drop UK UTLAs from these regions + 4618, + 4919, + 4620, + 4621, + 4622, + 4623, + 4624, + 4625, + 4626, + # Drop the India urban/rural splits from these states + 4841, + 4842, + 4843, + 4844, + 4846, + 4849, + 4850, + 4851, + 4852, + 4853, + 4854, + 4855, + 4856, + 4857, + 4859, + 4860, + 4861, + 4862, + 4863, + 4864, + 4865, + 4867, + 4868, + 4869, + 4870, + 4871, + 4872, + 4873, + 4874, + 4875, + 44538, + # Drop the Maori/non-Maori split from New Zealand + 72, +] + +TO_USE_LSAE_SHAPES = [ + 92, # Spain, removes Canary Islands + 71, # Australia, removes Ashmore and Cartier Islands and Coral Sea Islands + # Canada and Greenland intersect in the GBD hierarchy, so swap both. + 101, + 349, +] + class SUPPLEMENT: WPP = "wpp" @@ -202,8 +257,20 @@ def load_supplmental_metadata() -> pd.DataFrame: "ATF", SUPPLEMENT.ZERO_POPULATION, ), - (60921, "Antarctica", NO_REGION_ID, "ATA", SUPPLEMENT.ZERO_POPULATION), - (60923, "Bouvet Island", NO_REGION_ID, "BVT", SUPPLEMENT.ZERO_POPULATION), + ( + 60921, + "Antarctica", + SOUTHERN_LATIN_AMERICA, + "ATA", + SUPPLEMENT.ZERO_POPULATION, + ), + ( + 60923, + "Bouvet Island", + SOUTHERN_LATIN_AMERICA, + "BVT", + SUPPLEMENT.ZERO_POPULATION, + ), # ISO3 manually set ( 60924, diff --git a/src/rra_population_model/preprocess/raking_data/runner.py b/src/rra_population_model/preprocess/raking_data/runner.py index 4251774..62c27d5 100644 --- a/src/rra_population_model/preprocess/raking_data/runner.py +++ b/src/rra_population_model/preprocess/raking_data/runner.py @@ -16,7 +16,6 @@ def raking_data_main( output_dir: str, out_version: str, - wpp_version: str = "2022", ) -> None: pm_data = PopulationModelData(output_dir) @@ -29,10 +28,9 @@ def raking_data_main( shapes = utils.load_shapes(pm_data, gbd_version=gbd_version) supplemental_metadata = load_supplmental_metadata() - # WPP - wpp = utils.load_wpp_populations(pm_data, wpp_version=wpp_version) - print("Building WPP data...") + wpp_version = "2024" if out_version == "gbd_2023" else "2022" + wpp = utils.load_wpp_populations(pm_data, wpp_version=wpp_version) # Add GBD location and region ids to the WPP data by mapping on iso3 codes wpp = utils.add_gbd_metadata_to_wpp( wpp=wpp, @@ -81,13 +79,15 @@ def raking_data_main( pm_data.save_raking_data( population=raking_population, shapes=raking_shapes, - version=f"{out_version}_wpp_{wpp_version}", + version=f"{out_version}", ) @click.command() @clio.with_output_directory(pmc.MODEL_ROOT) -@clio.with_choice("out_version", allow_all=False, choices=["gbd_2021", "fhs_2021"]) +@clio.with_choice( + "out_version", allow_all=False, choices=["gbd_2023", "gbd_2021", "fhs_2021"] +) def raking_data( output_dir: str, out_version: str, diff --git a/src/rra_population_model/preprocess/raking_data/utils.py b/src/rra_population_model/preprocess/raking_data/utils.py index 496a18f..bf92753 100644 --- a/src/rra_population_model/preprocess/raking_data/utils.py +++ b/src/rra_population_model/preprocess/raking_data/utils.py @@ -8,6 +8,8 @@ from rra_population_model.preprocess.raking_data.metadata import ( NO_REGION_ID, SUPPLEMENT, + TO_DROP_PARENTS, + TO_USE_LSAE_SHAPES, ) ########### @@ -18,7 +20,7 @@ def load_wpp_populations( pm_data: PopulationModelData, wpp_version: str ) -> pd.DataFrame: - if wpp_version == "2022": + if wpp_version in ["2022", "2024"]: wpp = pm_data.load_gbd_raking_input("population", f"wpp_{wpp_version}") # Merge Hong Kong, Macau, and Kosovo into China and Serbia, respectively merge_map = { @@ -41,9 +43,6 @@ def load_wpp_populations( .sort_values(["iso3", "year_id"]) .reset_index(drop=True) ) - elif wpp_version == "2024": - msg = "We still need to implement/validate the merge_map for WPP 2024" - raise NotImplementedError(msg) else: msg = f"Invalid WPP version: {wpp_version}" raise ValueError(msg) @@ -56,8 +55,11 @@ def load_ihme_populations( ) -> dict[str, pd.DataFrame]: populations = { "gbd": pm_data.load_gbd_raking_input("population", f"gbd_{gbd_version}"), - "fhs": pm_data.load_gbd_raking_input("population", f"fhs_{gbd_version}"), } + if gbd_version == "2021": + populations["fhs"] = pm_data.load_gbd_raking_input( + "population", f"fhs_{gbd_version}" + ) return populations @@ -66,8 +68,11 @@ def load_hierarchies( ) -> dict[str, pd.DataFrame]: hierarchies = { "gbd": pm_data.load_gbd_raking_input("hierarchy", f"gbd_{gbd_version}"), - "fhs": pm_data.load_gbd_raking_input("hierarchy", f"fhs_{gbd_version}"), } + if gbd_version == "2021": + hierarchies["fhs"] = pm_data.load_gbd_raking_input( + "hierarchy", f"fhs_{gbd_version}" + ) keep_cols = [ "location_id", "location_name", @@ -82,12 +87,22 @@ def load_hierarchies( def load_shapes( - pm_data: PopulationModelData, gbd_version: str + pm_data: PopulationModelData, + gbd_version: str, ) -> dict[str, gpd.GeoDataFrame]: shapes = { "gbd": pm_data.load_gbd_raking_input("shapes", f"gbd_{gbd_version}"), "lsae": pm_data.load_gbd_raking_input("shapes", "lsae_1285_a0"), } + if gbd_version == "2023": + h = pm_data.load_gbd_raking_input("hierarchy", "gbd_2023") + to_drop = ~shapes["gbd"].location_id.isin(h.location_id) + # There are 150 UTLAs that were dropped late from the GBD hierarchy + # but haven't been updated in the shapefile yet. + utla_count = 150 + assert to_drop.sum() == utla_count # noqa: S101 + shapes["gbd"] = shapes["gbd"].loc[~to_drop] + keep_cols = ["location_id", "geometry"] shapes = {k: v.loc[:, keep_cols] for k, v in shapes.items()} return shapes @@ -209,6 +224,10 @@ def prepare_ihme_population( version_tag: str, ) -> pd.DataFrame: p_gbd = populations["gbd"].merge(hierarchies["gbd"], on="location_id") + # First clear out UK UTLAs, India urban/rural splits, and NZ Maori/non-Maori splits + # These do not have accurate geometries in the GBD shapefiles. + p_gbd = p_gbd[~p_gbd.parent_id.isin(TO_DROP_PARENTS)] + p_gbd.loc[p_gbd.location_id.isin(TO_DROP_PARENTS), "most_detailed"] = 1 if version_tag == "fhs": p_fhs = populations["fhs"].merge(hierarchies["fhs"], on="location_id") @@ -219,11 +238,25 @@ def prepare_ihme_population( & p_gbd.location_id.isin(p_fhs.location_id.unique()) ) p_gbd = p_gbd.loc[keep_mask] - # Update most-detailed metadata to match FHS + # Backfill FHS metadata to GBD for consistency + fhs_meta = p_fhs[ + [ + "location_id", + "location_name", + "ihme_loc_id", + "parent_id", + "region_id", + "level", + "most_detailed", + ] + ].drop_duplicates() + p_gbd = p_gbd[["location_id", "year_id", "population"]].merge( + fhs_meta, on="location_id" + ) + fhs_most_detailed = p_gbd.location_id.isin( p_fhs.loc[p_fhs.most_detailed == 1].location_id.unique() ) - p_gbd.loc[fhs_most_detailed, "most_detailed"] = 1 assert (p_gbd.loc[~fhs_most_detailed].most_detailed == 0).all() # noqa: S101 pop = pd.concat([p_gbd, p_fhs], ignore_index=True) elif version_tag == "gbd": @@ -282,13 +315,13 @@ def compute_missing_populations( missing_populations = ( wpp_subset.drop(columns=["scalar"]) + .reset_index() .assign( population=scaled_population.values, most_detailed=1, level=3, + parent_id=lambda x: x["region_id"], ) - .reset_index() - .drop(columns=["region_id"]) .sort_values(["location_id", "year_id"]) ) @@ -300,14 +333,13 @@ def build_raking_population( wpp_population: pd.DataFrame, missing_population: pd.DataFrame, ) -> pd.DataFrame: - ihme_population = ihme_population[ihme_population.most_detailed == 1] + ihme_most_detailed = ihme_population[ihme_population.most_detailed == 1] full_population = ( - pd.concat([ihme_population, missing_population], ignore_index=True) + pd.concat([ihme_most_detailed, missing_population], ignore_index=True) .sort_values(["location_id", "year_id"]) .reset_index(drop=True) ) - # Add on a column with WPP population for UN product raking full_population = full_population.merge( wpp_population[["location_id", "year_id", "population"]].rename( @@ -317,6 +349,46 @@ def build_raking_population( how="left", ) + # Fill in missing WPP populations in GBD subnationals by scaling + # the wpp parent population by the GBD parent population fraction. + wpp_missing = ( + full_population["wpp_population"].isna() & full_population["population"].notna() + ) + parent_map = ihme_population.set_index("location_id")["parent_id"] + parent_map = parent_map[~parent_map.index.duplicated()] + to_fill = ( + full_population.loc[wpp_missing, ["location_id", "year_id", "population"]] + .assign(parent_id=lambda x: x["location_id"]) + .rename(columns={"population": "gbd_population"}) + ) + + still_missing = ~to_fill["parent_id"].isin(wpp_population["location_id"]) + while still_missing.any(): + to_fill.loc[still_missing, "parent_id"] = parent_map.loc[ + to_fill.loc[still_missing, "parent_id"] + ].to_numpy() + still_missing = ~to_fill["parent_id"].isin(wpp_population["location_id"]) + + to_fill = to_fill.merge( + wpp_population[["location_id", "year_id", "population"]].rename( + columns={"population": "wpp_parent_population", "location_id": "parent_id"} + ), + on=["parent_id", "year_id"], + how="left", + ).merge( + ihme_population[["location_id", "year_id", "population"]].rename( + columns={"population": "gbd_parent_population", "location_id": "parent_id"} + ), + on=["parent_id", "year_id"], + how="left", + ) + to_fill["wpp_population"] = to_fill["wpp_parent_population"] * ( + to_fill["gbd_population"] / to_fill["gbd_parent_population"] + ) + full_population.loc[wpp_missing, "wpp_population"] = to_fill[ + "wpp_population" + ].to_numpy() + return full_population @@ -325,7 +397,12 @@ def build_raking_shapes( raking_population: pd.DataFrame, ) -> gpd.GeoDataFrame: ihme_shapes = shapes["gbd"] - keep_mask = ihme_shapes["location_id"].isin(raking_population["location_id"]) + keep_mask = ( + ihme_shapes["location_id"].isin(raking_population["location_id"]) + # We want LSAE definitions for a few places to resolve definition + # issues and some overlaps in the GBD hierarchy. + & ~ihme_shapes["location_id"].isin(TO_USE_LSAE_SHAPES) + ) ihme_shapes = ihme_shapes.loc[keep_mask] lsae_shapes = shapes["lsae"]