Skip to content

Commit 8fabd2b

Browse files
committed
Add new scripts for population hold variable calculations and visualization
- Created `make_hold_pop.ipynb` for generating population hold variables for malaria forecasting, including data loading, processing, and saving results. - Developed `make_population_hold_variables_by_draw.py` to handle command-line arguments and process population hold variables by draw, integrating hierarchy data. - Introduced `bin_functions.py` for enhanced visualization capabilities, including legend and colorbar management for plotting. - Added `color_functions.py` to manage color mapping and colormap creation for visual outputs. - Implemented `number_functions.py` for utility functions related to number formatting and uncertainty intervals.
1 parent 002a304 commit 8fabd2b

15 files changed

+2014
-102
lines changed

src/idd_forecast_mbp/05_aggregation/01_cause_as_aggregation_by_draw_parallel.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# Script directory
1212
SCRIPT_ROOT = rfc.REPO_ROOT / repo_name / "src" / package_name / "05_aggregation"
1313

14+
run_date = '2025_08_28'
1415

1516
ssp_scenarios = rfc.ssp_scenarios
1617
dah_scenarios = rfc.dah_scenarios
@@ -21,15 +22,14 @@
2122
'dengue': ['gdppc', 'suitability', 'urban'],
2223
}
2324

24-
run_hold_variables = False
25+
run_hold_variables = True
2526

2627
dah_scenarios = rfc.dah_scenarios
2728
dah_scenarios = ['Baseline','Constant']
2829
# dah_scenarios = ['reference','better', 'worse']
2930

3031
causes = rfc.cause_map
3132
causes = ['malaria', 'dengue']
32-
causes = ['dengue']
3333

3434

3535
# Jobmon setup
@@ -92,15 +92,17 @@
9292
"--measure {{measure}} "
9393
"--draw {{draw}} "
9494
"--hold_variable {{hold_variable}} "
95+
"--run_date {{run_date}} "
9596
).format(script_root=SCRIPT_ROOT),
96-
node_args=["cause", "ssp_scenario", "dah_scenario", "measure", "draw", "hold_variable"],
97+
node_args=["cause", "ssp_scenario", "dah_scenario", "measure", "draw", "hold_variable","run_date"],
9798
task_args=[],
9899
op_args=[],
99100
)
100101

101102

102103
# Add tasks
103104
tasks = []
105+
dah_scenarios = ['Baseline', 'Constant']
104106
# for cause in cause_map:
105107
for cause in causes:
106108
for ssp_scenario in ssp_scenarios:
@@ -115,6 +117,7 @@
115117
dah_scenario=dah_scenario,
116118
measure=measure,
117119
draw=draw,
120+
run_date=run_date,
118121
hold_variable='None' # No hold variable for primary task
119122
)
120123
tasks.append(task)
@@ -125,9 +128,11 @@
125128
dah_scenario='None',
126129
measure=measure,
127130
draw=draw,
131+
run_date=run_date,
128132
hold_variable='None' # No hold variable for primary task
129133
)
130134
tasks.append(task)
135+
dah_scenarios = ['Baseline']
131136
if run_hold_variables:
132137
for cause in causes:
133138
for hold_variable in hold_variables[cause]:
@@ -143,6 +148,7 @@
143148
dah_scenario=dah_scenario,
144149
measure=measure,
145150
draw=draw,
151+
run_date=run_date,
146152
hold_variable=hold_variable
147153
)
148154
tasks.append(task)
@@ -154,6 +160,7 @@
154160
dah_scenario='None',
155161
measure=measure,
156162
draw=draw,
163+
run_date=run_date,
157164
hold_variable=hold_variable
158165
)
159166
tasks.append(task)
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import getpass
2+
import uuid
3+
from jobmon.client.tool import Tool # type: ignore
4+
from pathlib import Path
5+
import geopandas as gpd # type: ignore
6+
from idd_forecast_mbp import constants as rfc
7+
8+
repo_name = rfc.repo_name
9+
package_name = rfc.package_name
10+
11+
# Script directory
12+
SCRIPT_ROOT = rfc.REPO_ROOT / repo_name / "src" / package_name / "05_aggregation"
13+
14+
run_date = '2025_08_28'
15+
16+
ssp_scenarios = rfc.ssp_scenarios
17+
dah_scenarios = rfc.dah_scenarios
18+
draws = rfc.draws
19+
20+
hold_variables = {
21+
'malaria': ['DAH', 'flood', 'gdppc', 'suitability'],
22+
'dengue': ['gdppc', 'suitability', 'urban'],
23+
}
24+
25+
run_hold_variables = True
26+
27+
dah_scenarios = rfc.dah_scenarios
28+
dah_scenarios = ['Baseline','Constant']
29+
# dah_scenarios = ['reference','better', 'worse']
30+
31+
causes = rfc.cause_map
32+
causes = ['malaria', 'dengue']
33+
34+
35+
# Jobmon setup
36+
user = getpass.getuser()
37+
38+
log_dir = Path("/mnt/team/idd/pub/")
39+
log_dir.mkdir(parents=True, exist_ok=True)
40+
# Create directories for stdout and stderr
41+
stdout_dir = log_dir / "stdout"
42+
stderr_dir = log_dir / "stderr"
43+
stdout_dir.mkdir(parents=True, exist_ok=True)
44+
stderr_dir.mkdir(parents=True, exist_ok=True)
45+
46+
# Project
47+
project = "proj_rapidresponse" # Adjust this to your project name if needed
48+
queue = 'long.q'
49+
50+
wf_uuid = uuid.uuid4()
51+
tool_name = f"{package_name}_draw_level_cause_aggregation_{wf_uuid}"
52+
tool = Tool(name=tool_name)
53+
54+
# Create a workflow
55+
workflow = tool.create_workflow(
56+
name=f"{tool_name}_workflow_{wf_uuid}",
57+
max_concurrently_running=10000, # Adjust based on system capacity
58+
)
59+
60+
# Compute resources
61+
workflow.set_default_compute_resources_from_dict(
62+
cluster_name="slurm",
63+
dictionary={
64+
"memory": "15G",
65+
"cores": 1,
66+
"runtime": "60m",
67+
"queue": queue,
68+
"project": project,
69+
"stdout": str(stdout_dir),
70+
"stderr": str(stderr_dir),
71+
}
72+
)
73+
74+
# Define the task template for processing each year batch
75+
task_template = tool.get_task_template(
76+
template_name="as_cause_draw_aggregation",
77+
default_cluster_name="slurm",
78+
default_compute_resources={
79+
"memory": "20G",
80+
"cores": 8,
81+
"runtime": "10m",
82+
"queue": queue,
83+
"project": project,
84+
"stdout": str(stdout_dir),
85+
"stderr": str(stderr_dir),
86+
},
87+
command_template=(
88+
"python {script_root}/cause_as_aggregation_by_draw_raked.py "
89+
"--cause {{cause}} "
90+
"--ssp_scenario {{ssp_scenario}} "
91+
"--dah_scenario {{dah_scenario}} "
92+
"--measure {{measure}} "
93+
"--draw {{draw}} "
94+
"--hold_variable {{hold_variable}} "
95+
"--run_date {{run_date}} "
96+
).format(script_root=SCRIPT_ROOT),
97+
node_args=["cause", "ssp_scenario", "dah_scenario", "measure", "draw", "hold_variable","run_date"],
98+
task_args=[],
99+
op_args=[],
100+
)
101+
102+
103+
# Add tasks
104+
tasks = []
105+
dah_scenarios = ['Baseline', 'Constant']
106+
# for cause in cause_map:
107+
for cause in causes:
108+
for ssp_scenario in ssp_scenarios:
109+
for measure in ['mortality', 'incidence']:
110+
for draw in draws:
111+
if cause == "malaria":
112+
for dah_scenario in dah_scenarios:
113+
# Create the primary task
114+
task = task_template.create_task(
115+
cause=cause,
116+
ssp_scenario=ssp_scenario,
117+
dah_scenario=dah_scenario,
118+
measure=measure,
119+
draw=draw,
120+
run_date=run_date,
121+
hold_variable='None' # No hold variable for primary task
122+
)
123+
tasks.append(task)
124+
else:
125+
task = task_template.create_task(
126+
cause=cause,
127+
ssp_scenario=ssp_scenario,
128+
dah_scenario='None',
129+
measure=measure,
130+
draw=draw,
131+
run_date=run_date,
132+
hold_variable='None' # No hold variable for primary task
133+
)
134+
tasks.append(task)
135+
dah_scenarios = ['Baseline']
136+
if run_hold_variables:
137+
for cause in causes:
138+
for hold_variable in hold_variables[cause]:
139+
for ssp_scenario in ssp_scenarios:
140+
for measure in ['mortality', 'incidence']:
141+
for draw in draws:
142+
if cause == "malaria":
143+
for dah_scenario in dah_scenarios:
144+
# Create the task with hold variable
145+
task = task_template.create_task(
146+
cause=cause,
147+
ssp_scenario=ssp_scenario,
148+
dah_scenario=dah_scenario,
149+
measure=measure,
150+
draw=draw,
151+
run_date=run_date,
152+
hold_variable=hold_variable
153+
)
154+
tasks.append(task)
155+
else:
156+
# Create the task with hold variable
157+
task = task_template.create_task(
158+
cause=cause,
159+
ssp_scenario=ssp_scenario,
160+
dah_scenario='None',
161+
measure=measure,
162+
draw=draw,
163+
run_date=run_date,
164+
hold_variable=hold_variable
165+
)
166+
tasks.append(task)
167+
168+
print(f"Number of tasks: {len(tasks)}")
169+
170+
if tasks:
171+
workflow.add_tasks(tasks)
172+
print("✅ Tasks successfully added to workflow.")
173+
else:
174+
print("⚠️ No tasks added to workflow. Check task generation.")
175+
176+
try:
177+
workflow.bind()
178+
print("✅ Workflow successfully bound.")
179+
print(f"Running workflow with ID {workflow.workflow_id}.")
180+
print("For full information see the Jobmon GUI:")
181+
print(f"https://jobmon-gui.ihme.washington.edu/#/workflow/{workflow.workflow_id}")
182+
except Exception as e:
183+
print(f"❌ Workflow binding failed: {e}")
184+
185+
try:
186+
status = workflow.run()
187+
print(f"Workflow {workflow.workflow_id} completed with status {status}.")
188+
except Exception as e:
189+
print(f"❌ Workflow submission failed: {e}")

0 commit comments

Comments
 (0)