Skip to content

Commit 821704f

Browse files
committed
Bring in aggregation code
1 parent a043ff5 commit 821704f

File tree

3 files changed

+49
-16
lines changed

3 files changed

+49
-16
lines changed

src/climate_data/aggregate/hierarchy.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,15 @@ def hierarchy_main(
9696
draw_results.append(draw_df)
9797

9898
pbar.update()
99+
draw_df = (
100+
pd.concat(draw_results, ignore_index=True)
101+
.groupby(["location_id", "year_id"])
102+
.sum()
103+
.reset_index()
104+
)
99105

100106
agg_df = utils.aggregate_climate_to_hierarchy(
101-
pd.concat(draw_results, ignore_index=True),
107+
draw_df,
102108
hierarchy_df,
103109
).set_index(["location_id", "year_id"])
104110
all_results.append(agg_df["value"].rename(draw))
@@ -212,8 +218,8 @@ def hierarchy(
212218
task_resources={
213219
"queue": queue,
214220
"cores": 1,
215-
"memory": "16G",
216-
"runtime": "30m",
221+
"memory": "50G",
222+
"runtime": "200m",
217223
"project": "proj_rapidresponse",
218224
},
219225
log_root=ca_data.log_dir("aggregate_hierarchy"),

src/climate_data/aggregate/pixel.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def pixel_main(
3838
hierarchy, block_key, pm_data
3939
)
4040

41-
years = cdc.ALL_YEARS
41+
years = [int(y) for y in cdc.ALL_YEARS]
4242
measures = cdc.AGGREGATION_MEASURES
4343
scenarios = cdc.AGGREGATION_SCENARIOS
4444

@@ -67,7 +67,12 @@ def pixel_main(
6767

6868
# Pull out and rasterize the climate data for the current year
6969
clim_arr = (
70-
to_raster(ds.sel(year=year)["value"], no_data_value=np.nan) # noqa: SLF001
70+
to_raster( # noqa: SLF001
71+
ds.sel(year=year)["value"],
72+
no_data_value=np.nan,
73+
lat_col="latitude",
74+
lon_col="longitude",
75+
)
7176
.resample_to(pop_raster, "nearest")
7277
.astype(np.float32)
7378
._ndarray
@@ -164,14 +169,15 @@ def pixel(
164169
block_keys = modeling_frame["block_key"].unique().tolist()
165170
block_keys = clio.convert_choice(block_key, block_keys)
166171

172+
print("Checking for existing results")
167173
jobs = []
168-
for h, b, d in itertools.product(hierarchy, block_keys, draw):
174+
hbd = list(itertools.product(hierarchy, block_keys, draw))
175+
for h, b, d in tqdm.tqdm(hbd):
169176
if not ca_data.raw_results_path(agg_version, h, b, d).exists():
170177
jobs.append((h, b, d))
171178
jobs = list(set(jobs))
172179

173180
print(f"Running {len(jobs)} jobs")
174-
175181
jobmon.run_parallel(
176182
runner="cdtask aggregate",
177183
task_name="pixel",

src/climate_data/aggregate/utils.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
PopulationModelData,
1414
)
1515

16+
MAX_BOUNDS = {
17+
"ESRI:54034": (-20037508.34, 20037508.34, -6363885.33, 6363885.33),
18+
}
19+
1620

1721
def build_location_masks(
1822
hierarchy: str,
@@ -136,11 +140,32 @@ def get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon:
136140
shapely.Polybon
137141
The bounding box of the raster in the CRS specified by the crs parameter.
138142
"""
143+
if raster.crs not in MAX_BOUNDS:
144+
msg = f"Unsupported CRS: {raster.crs}"
145+
raise ValueError(msg)
146+
147+
xmin_clip, xmax_clip, ymin_clip, ymax_clip = MAX_BOUNDS[raster.crs]
139148
xmin, xmax, ymin, ymax = raster.bounds
149+
150+
xmin = np.clip(xmin, xmin_clip, xmax_clip)
151+
xmax = np.clip(xmax, xmin_clip, xmax_clip)
152+
ymin = np.clip(ymin, ymin_clip, ymax_clip)
153+
ymax = np.clip(ymax, ymin_clip, ymax_clip)
154+
140155
bbox = gpd.GeoSeries([shapely.box(xmin, ymin, xmax, ymax)], crs=raster.crs)
141-
if crs is not None:
142-
bbox = bbox.to_crs(crs)
143-
return cast(shapely.Polygon, bbox.iloc[0])
156+
out_bbox = bbox.to_crs(crs) if crs is not None else bbox.copy()
157+
158+
# Check that our transformation didn't do something weird
159+
# (e.g. artificially clip the bounds or have the bounds extend over the
160+
# antimeridian)
161+
check_bbox = out_bbox.to_crs(raster.crs)
162+
area_change = (np.abs(bbox.area - check_bbox.area) / bbox.area).iloc[0]
163+
tolerance = 1e-6
164+
if area_change > tolerance:
165+
msg = f"Area change: {area_change}"
166+
raise ValueError(msg)
167+
168+
return cast(shapely.Polygon, out_bbox.iloc[0])
144169

145170

146171
def aggregate_climate_to_hierarchy(
@@ -173,21 +198,17 @@ def aggregate_climate_to_hierarchy(
173198
subset["parent_id"] = parent_map
174199

175200
parent_values = (
176-
subset.groupby(["year_id", "scenario", "parent_id"])[
177-
["weighted_climate", "population"]
178-
]
201+
subset.groupby(["year_id", "parent_id"])[["weighted_climate", "population"]]
179202
.sum()
180203
.reset_index()
181204
.rename(columns={"parent_id": "location_id"})
182205
.set_index("location_id")
183206
)
184-
parent_values["value"] = (
185-
parent_values.weighted_climate / parent_values.population
186-
)
187207
results = pd.concat([results, parent_values])
188208
results = (
189209
results.reset_index()
190210
.sort_values(["location_id", "year_id"])
191211
.reset_index(drop=True)
192212
)
213+
parent_values["value"] = parent_values.weighted_climate / parent_values.population
193214
return results

0 commit comments

Comments
 (0)