Skip to content

Commit 002a304

Browse files
committed
feat: Update run dates, queue, and memory settings; enhance file writing process in AA draws scripts
1 parent 466ce98 commit 002a304

File tree

5 files changed

+329
-61
lines changed

5 files changed

+329
-61
lines changed

src/idd_forecast_mbp/06_upload/03_create_and_combine_aa_draws_parallel.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
run_date = "2025_07_24"
2020
run_date = '2025_08_04'
21-
run_date = '2025_08_11'
21+
run_date = '2025_08_28'
2222
dah_scenarios = rfc.dah_scenarios
2323
dah_scenarios = ['Baseline', 'Constant']
2424
dah_scenarios = ['Baseline']
@@ -48,7 +48,7 @@
4848

4949
# Project
5050
project = "proj_rapidresponse" # Adjust this to your project name if needed
51-
queue = 'long.q'
51+
queue = 'all.q'
5252

5353
wf_uuid = uuid.uuid4()
5454
tool_name = f"{package_name}_create_summaries_{wf_uuid}"
@@ -74,16 +74,16 @@
7474
}
7575
)
7676

77-
memory = "80G"
77+
memory = "100G"
7878

7979
# Define the task template for processing each year batch
8080
task_template = tool.get_task_template(
8181
template_name=template_name,
8282
default_cluster_name="slurm",
8383
default_compute_resources={
8484
"memory": memory,
85-
"cores": 16,
86-
"runtime": "45m",
85+
"cores": 10,
86+
"runtime": "60m",
8787
"queue": queue,
8888
"project": project,
8989
"stdout": str(stdout_dir),
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import getpass
2+
import uuid
3+
from jobmon.client.tool import Tool # type: ignore
4+
from pathlib import Path
5+
import geopandas as gpd # type: ignore
6+
from idd_forecast_mbp import constants as rfc
7+
8+
repo_name = rfc.repo_name
9+
package_name = rfc.package_name
10+
11+
hold_variables = {
12+
'malaria': ['DAH', 'flood', 'gdppc', 'suitability'],
13+
'dengue': ['gdppc', 'suitability', 'urban'],
14+
}
15+
run_hold_variables = False
16+
17+
template_name = f'{repo_name}_06_03_create_and_combine'
18+
19+
run_date = "2025_07_24"
20+
run_date = '2025_08_04'
21+
run_date = '2025_08_28'
22+
dah_scenarios = rfc.dah_scenarios
23+
dah_scenarios = ['Baseline', 'Constant']
24+
dah_scenarios = ['Baseline']
25+
# dah_scenarios = ['reference', 'better', 'worse']
26+
27+
causes = rfc.cause_map
28+
# causes = ['dengue']
29+
ssp_scenarios = rfc.ssp_scenarios
30+
# ssp_scenarios = ['ssp245']
31+
32+
33+
# Script directory
34+
SCRIPT_ROOT = rfc.REPO_ROOT / repo_name / "src" / package_name / "06_upload"
35+
36+
draws = rfc.draws
37+
38+
# Jobmon setup
39+
user = getpass.getuser()
40+
41+
log_dir = Path("/mnt/team/idd/pub/")
42+
log_dir.mkdir(parents=True, exist_ok=True)
43+
# Create directories for stdout and stderr
44+
stdout_dir = log_dir / "stdout"
45+
stderr_dir = log_dir / "stderr"
46+
stdout_dir.mkdir(parents=True, exist_ok=True)
47+
stderr_dir.mkdir(parents=True, exist_ok=True)
48+
49+
# Project
50+
project = "proj_rapidresponse" # Adjust this to your project name if needed
51+
queue = 'all.q'
52+
53+
wf_uuid = uuid.uuid4()
54+
tool_name = f"{package_name}_create_summaries_{wf_uuid}"
55+
tool = Tool(name=tool_name)
56+
57+
# Create a workflow
58+
workflow = tool.create_workflow(
59+
name=f"{tool_name}_workflow_{wf_uuid}",
60+
max_concurrently_running=10000, # Adjust based on system capacity
61+
)
62+
63+
# Compute resources
64+
workflow.set_default_compute_resources_from_dict(
65+
cluster_name="slurm",
66+
dictionary={
67+
"memory": "15G",
68+
"cores": 1,
69+
"runtime": "60m",
70+
"queue": queue,
71+
"project": project,
72+
"stdout": str(stdout_dir),
73+
"stderr": str(stderr_dir),
74+
}
75+
)
76+
77+
memory = "100G"
78+
79+
# Define the task template for processing each year batch
80+
task_template = tool.get_task_template(
81+
template_name=template_name,
82+
default_cluster_name="slurm",
83+
default_compute_resources={
84+
"memory": memory,
85+
"cores": 10,
86+
"runtime": "60m",
87+
"queue": queue,
88+
"project": project,
89+
"stdout": str(stdout_dir),
90+
"stderr": str(stderr_dir),
91+
},
92+
command_template=(
93+
"python {script_root}/create_and_combine_as_draws.py "
94+
"--cause {{cause}} "
95+
"--ssp_scenario {{ssp_scenario}} "
96+
"--dah_scenario {{dah_scenario}} "
97+
"--measure {{measure}} "
98+
"--hold_variable {{hold_variable}} "
99+
"--run_date {{run_date}}"
100+
).format(script_root=SCRIPT_ROOT),
101+
node_args=["cause", "ssp_scenario", "dah_scenario", "measure", "hold_variable", "run_date"],
102+
task_args=[],
103+
op_args=[],
104+
)
105+
106+
107+
tasks = []
108+
109+
for cause in causes:
110+
for ssp_scenario in ssp_scenarios:
111+
for measure in ['mortality', 'incidence']:
112+
if cause == "malaria":
113+
for dah_scenario in dah_scenarios:
114+
# Create the primary task
115+
task = task_template.create_task(
116+
cause=cause,
117+
ssp_scenario=ssp_scenario,
118+
dah_scenario=dah_scenario,
119+
measure=measure,
120+
hold_variable='None',
121+
run_date=run_date,
122+
)
123+
tasks.append(task)
124+
else:
125+
# Create the primary task
126+
task = task_template.create_task(
127+
cause=cause,
128+
ssp_scenario=ssp_scenario,
129+
dah_scenario='None',
130+
measure=measure,
131+
hold_variable='None',
132+
run_date=run_date,
133+
)
134+
tasks.append(task)
135+
if run_hold_variables:
136+
for cause in causes:
137+
for hold_variable in hold_variables[cause]:
138+
for ssp_scenario in rfc.ssp_scenarios:
139+
for measure in ['mortality', 'incidence']:
140+
if cause == "malaria":
141+
for dah_scenario in dah_scenarios:
142+
# Create the primary task
143+
task = task_template.create_task(
144+
cause=cause,
145+
ssp_scenario=ssp_scenario,
146+
dah_scenario=dah_scenario,
147+
measure=measure,
148+
hold_variable=hold_variable,
149+
run_date=run_date
150+
)
151+
tasks.append(task)
152+
else:
153+
# Create the primary task
154+
task = task_template.create_task(
155+
cause=cause,
156+
ssp_scenario=ssp_scenario,
157+
dah_scenario=None,
158+
measure=measure,
159+
hold_variable=hold_variable,
160+
run_date=run_date
161+
)
162+
tasks.append(task)
163+
164+
print(f"Number of tasks: {len(tasks)}")
165+
166+
if tasks:
167+
workflow.add_tasks(tasks)
168+
print("✅ Tasks successfully added to workflow.")
169+
else:
170+
print("⚠️ No tasks added to workflow. Check task generation.")
171+
172+
try:
173+
workflow.bind()
174+
print("✅ Workflow successfully bound.")
175+
print(f"Running workflow with ID {workflow.workflow_id}.")
176+
print("For full information see the Jobmon GUI:")
177+
print(f"https://jobmon-gui.ihme.washington.edu/#/workflow/{workflow.workflow_id}")
178+
except Exception as e:
179+
print(f"❌ Workflow binding failed: {e}")
180+
181+
try:
182+
status = workflow.run()
183+
print(f"Workflow {workflow.workflow_id} completed with status {status}.")
184+
except Exception as e:
185+
print(f"❌ Workflow submission failed: {e}")

src/idd_forecast_mbp/06_upload/06_make_mega_as_draws.ipynb

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"cells": [
33
{
44
"cell_type": "code",
5-
"execution_count": null,
5+
"execution_count": 2,
66
"id": "cc3b5a3b",
77
"metadata": {},
88
"outputs": [],
@@ -26,7 +26,7 @@
2626
},
2727
{
2828
"cell_type": "code",
29-
"execution_count": null,
29+
"execution_count": 3,
3030
"id": "409b3738",
3131
"metadata": {},
3232
"outputs": [],
@@ -37,7 +37,7 @@
3737
},
3838
{
3939
"cell_type": "code",
40-
"execution_count": null,
40+
"execution_count": 4,
4141
"id": "172bf6bd",
4242
"metadata": {},
4343
"outputs": [],
@@ -53,10 +53,22 @@
5353
},
5454
{
5555
"cell_type": "code",
56-
"execution_count": null,
56+
"execution_count": 5,
5757
"id": "fe636c87",
5858
"metadata": {},
59-
"outputs": [],
59+
"outputs": [
60+
{
61+
"ename": "NameError",
62+
"evalue": "name 'ssp_scenario' is not defined",
63+
"output_type": "error",
64+
"traceback": [
65+
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
66+
"\u001b[31mNameError\u001b[39m Traceback (most recent call last)",
67+
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 6\u001b[39m\n\u001b[32m 4\u001b[39m cause_map = rfc.cause_map\n\u001b[32m 5\u001b[39m ssp_scenarios = rfc.ssp_scenarios\n\u001b[32m----> \u001b[39m\u001b[32m6\u001b[39m scenario = ssp_scenarios[\u001b[43mssp_scenario\u001b[49m][\u001b[33m\"\u001b[39m\u001b[33mdhs_scenario\u001b[39m\u001b[33m\"\u001b[39m] \u001b[38;5;66;03m# is the DHS scenario name\u001b[39;00m\n",
68+
"\u001b[31mNameError\u001b[39m: name 'ssp_scenario' is not defined"
69+
]
70+
}
71+
],
6072
"source": [
6173
"ssp_draws = rfc.draws\n",
6274
"measure_map = rfc.measure_map\n",
@@ -68,10 +80,22 @@
6880
},
6981
{
7082
"cell_type": "code",
71-
"execution_count": null,
83+
"execution_count": 6,
7284
"id": "a2156f97",
7385
"metadata": {},
74-
"outputs": [],
86+
"outputs": [
87+
{
88+
"ename": "NameError",
89+
"evalue": "name 'cause' is not defined",
90+
"output_type": "error",
91+
"traceback": [
92+
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
93+
"\u001b[31mNameError\u001b[39m Traceback (most recent call last)",
94+
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[6]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[43mcause\u001b[49m == \u001b[33m\"\u001b[39m\u001b[33mmalaria\u001b[39m\u001b[33m\"\u001b[39m:\n\u001b[32m 2\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m hold_variable == \u001b[33m'\u001b[39m\u001b[33mNone\u001b[39m\u001b[33m'\u001b[39m:\n\u001b[32m 3\u001b[39m processed_forecast_ds_path_template = \u001b[33m\"\u001b[39m\u001b[38;5;132;01m{UPLOAD_DATA_PATH}\u001b[39;00m\u001b[33m/full_as_\u001b[39m\u001b[38;5;132;01m{cause}\u001b[39;00m\u001b[33m_measure_\u001b[39m\u001b[38;5;132;01m{measure}\u001b[39;00m\u001b[33m_ssp_scenario_\u001b[39m\u001b[38;5;132;01m{ssp_scenario}\u001b[39;00m\u001b[33m_dah_scenario_\u001b[39m\u001b[38;5;132;01m{dah_scenario}\u001b[39;00m\u001b[33m_draw_\u001b[39m\u001b[38;5;132;01m{draw}\u001b[39;00m\u001b[33m_with_predictions.nc\u001b[39m\u001b[33m\"\u001b[39m\n",
95+
"\u001b[31mNameError\u001b[39m: name 'cause' is not defined"
96+
]
97+
}
98+
],
7599
"source": [
76100
"\n",
77101
"\n",
@@ -176,7 +200,28 @@
176200
"file_paths = [get_file_path(draw, cause, measure, ssp_scenario, dah_scenario, vaccinate, hold_variable) \n",
177201
" for draw in ssp_draws]\n",
178202
"\n",
179-
"print(f\"Loading {len(file_paths)} files...\")\n",
203+
"print(f\"Loading {len(file_paths)} files...\")\n"
204+
]
205+
},
206+
{
207+
"cell_type": "code",
208+
"execution_count": null,
209+
"id": "0540081a",
210+
"metadata": {},
211+
"outputs": [
212+
{
213+
"ename": "NameError",
214+
"evalue": "name 'as_ds' is not defined",
215+
"output_type": "error",
216+
"traceback": [
217+
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
218+
"\u001b[31mNameError\u001b[39m Traceback (most recent call last)",
219+
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[1]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[43mas_ds\u001b[49m\n",
220+
"\u001b[31mNameError\u001b[39m: name 'as_ds' is not defined"
221+
]
222+
}
223+
],
224+
"source": [
180225
"\n",
181226
"# Open all files as a single dataset with lazy loading\n",
182227
"upload_ds = xr.open_mfdataset(\n",
@@ -290,8 +335,22 @@
290335
}
291336
],
292337
"metadata": {
338+
"kernelspec": {
339+
"display_name": "forecast-mbp",
340+
"language": "python",
341+
"name": "python3"
342+
},
293343
"language_info": {
294-
"name": "python"
344+
"codemirror_mode": {
345+
"name": "ipython",
346+
"version": 3
347+
},
348+
"file_extension": ".py",
349+
"mimetype": "text/x-python",
350+
"name": "python",
351+
"nbconvert_exporter": "python",
352+
"pygments_lexer": "ipython3",
353+
"version": "3.12.9"
295354
}
296355
},
297356
"nbformat": 4,

0 commit comments

Comments
 (0)