Skip to content

Commit fd5e922

Browse files
committed
Added new helper functions
1 parent 8fabd2b commit fd5e922

15 files changed

+821
-190
lines changed

src/idd_forecast_mbp/04_forecasting/forecast_malaria_admin_2s_rocket.r

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ forecast_df <- as.data.frame(arrow::read_parquet(input_forecast_df_path))
8282
message(exp(min(forecast_df$log_aa_malaria_mort_rate, na.rm = TRUE)))
8383
forecast_df$A0_af <- as.factor(forecast_df$A0_af)
8484

85+
86+
87+
88+
8589
forecast_df$malaria_suit_fraction <- forecast_df$malaria_suitability / 365
8690
forecast_df$malaria_suit_fraction <- pmin(pmax(forecast_df$malaria_suit_fraction, 0.001), 0.999)
8791
forecast_df$logit_malaria_suitability <- log(forecast_df$malaria_suit_fraction / (1 - forecast_df$malaria_suit_fraction))
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import getpass
2+
import uuid
3+
import pandas as pd # type: ignore
4+
from jobmon.client.status_commands import workflow_tasks, task_status # type: ignore
5+
from jobmon.client.tool import Tool # type: ignore
6+
from pathlib import Path
7+
from idd_forecast_mbp import constants as rfc
8+
9+
repo_name = rfc.repo_name
10+
package_name = rfc.package_name
11+
12+
# Script directory
13+
SCRIPT_ROOT = rfc.REPO_ROOT / repo_name / "src" / package_name / "05_aggregation"
14+
run_date = '2025_08_28'
15+
16+
17+
CAUSES = ["malaria", "dengue"]
18+
SCENARIOS = [0, 75, 76]
19+
MEASURES = ["death", "incidence", "yll", "yld"]
20+
DRAWS = [i for i in range(100)]
21+
22+
# Jobmon setup
23+
user = getpass.getuser()
24+
25+
log_dir = Path("/mnt/team/idd/pub/")
26+
log_dir.mkdir(parents=True, exist_ok=True)
27+
# Create directories for stdout and stderr
28+
stdout_dir = log_dir / "stdout"
29+
stderr_dir = log_dir / "stderr"
30+
stdout_dir.mkdir(parents=True, exist_ok=True)
31+
stderr_dir.mkdir(parents=True, exist_ok=True)
32+
33+
# Project
34+
project = "proj_rapidresponse" # Adjust this to your project name if needed
35+
queue = 'long.q'
36+
37+
# create jobmon jobs
38+
user = getpass.getuser()
39+
wf_uuid = uuid.uuid4()
40+
41+
# Create a tool
42+
tool = Tool(name="malaria_dengue_raking")
43+
44+
45+
# Create a workflow, and set the executor
46+
workflow = tool.create_workflow(
47+
name=f"malaria_dengue_raking_{wf_uuid}",
48+
)
49+
50+
51+
# Define the task template for processing each year batch
52+
task_template = tool.get_task_template(
53+
template_name="malaria_dengue_raking_task",
54+
default_cluster_name="slurm",
55+
default_compute_resources={
56+
"queue": queue,
57+
"cores": int(2),
58+
"memory": "10G",
59+
"runtime": "10m",
60+
"project": project, # Ensure the project is set correctly
61+
"stdout": str(stdout_dir),
62+
"stderr": str(stderr_dir),
63+
},
64+
command_template=(
65+
"python {script_root}/create_raked_outcomes.py "
66+
"--cause {{cause}} "
67+
"--scenario {{scenario}} "
68+
"--measure {{measure}} "
69+
"--draw {{draw}} "
70+
).format(script_root=SCRIPT_ROOT),
71+
node_args=["cause", "scenario", "measure", "draw"],
72+
task_args=[],
73+
op_args=[],
74+
)
75+
76+
77+
def check_if_path_draw_exists(cause, scenario, measure, draw):
78+
out_dir = Path("/mnt/team/rapidresponse/pub/malaria-denv/deliverables/2025_08_26_admin_2_counts/output/")
79+
80+
SCENARIO_MAP = {
81+
0: "ssp245",
82+
75: "ssp126",
83+
76: "ssp585",
84+
}
85+
scenario = SCENARIO_MAP[scenario]
86+
if measure == "death":
87+
measure = "mortality"
88+
89+
if cause == "malaria":
90+
dirname = (
91+
f"as_cause_{cause}_measure_{measure}_metric_count_"
92+
f"ssp_scenario_{scenario}_dah_scenario_Baseline_raked"
93+
)
94+
else:
95+
dirname = (
96+
f"as_cause_{cause}_measure_{measure}_metric_count_"
97+
f"ssp_scenario_{scenario}_raked"
98+
)
99+
filename = f"draw_{int(draw)}.nc"
100+
101+
output_name = out_dir / dirname / "2025_09_08" / filename
102+
103+
if output_name.exists():
104+
return True
105+
return False
106+
107+
108+
tasks = []
109+
for cause in CAUSES:
110+
for scenario in SCENARIOS:
111+
for measure in MEASURES:
112+
for draw in DRAWS:
113+
task = task_template.create_task(
114+
cause=cause,
115+
scenario=scenario,
116+
measure=measure,
117+
draw=draw
118+
)
119+
tasks.append(task)
120+
121+
print(f"Number of tasks to run: {len(tasks)}")
122+
123+
if tasks:
124+
workflow.add_tasks(tasks)
125+
print("✅ Tasks successfully added to workflow.")
126+
else:
127+
print("⚠️ No tasks added to workflow. Check task generation.")
128+
129+
try:
130+
workflow.bind()
131+
print("✅ Workflow successfully bound.")
132+
print(f"Running workflow with ID {workflow.workflow_id}.")
133+
print("For full information see the Jobmon GUI:")
134+
print(f"https://jobmon-gui.ihme.washington.edu/#/workflow/{workflow.workflow_id}")
135+
except Exception as e:
136+
print(f"❌ Workflow binding failed: {e}")
137+
138+
try:
139+
status = workflow.run()
140+
print(f"Workflow {workflow.workflow_id} completed with status {status}.")
141+
except Exception as e:
142+
print(f"❌ Workflow submission failed: {e}")

src/idd_forecast_mbp/05_aggregation/02_cause_as_aggregation_by_draw_raked_parallel.py renamed to src/idd_forecast_mbp/05_aggregation/03_cause_as_aggregation_by_draw_raked_parallel.py

File renamed without changes.

src/idd_forecast_mbp/05_aggregation/03_create_as_dalys_by_draw_raked_parallel.py renamed to src/idd_forecast_mbp/05_aggregation/04_create_as_dalys_by_draw_raked_parallel.py

File renamed without changes.

src/idd_forecast_mbp/05_aggregation/04_make_population_hold_variables_by_draw_parallel.py renamed to src/idd_forecast_mbp/05_aggregation/05_make_population_hold_variables_by_draw_parallel.py

File renamed without changes.

src/idd_forecast_mbp/05_aggregation/cause_as_aggregation_by_draw_raked.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,15 @@
6161
for output_measure in output_measures:
6262
# Calculate multipliers
6363

64-
output_folder = folder_template_dict[cause].format(direction='output', measure=output_measure, ssp_scenario=ssp_scenario, suffix='_raked')
64+
output_folder = folder_template_dict[cause].format(direction='output/2025_09_08', measure=output_measure, ssp_scenario=ssp_scenario, suffix='_raked')
6565
draw_int = int(draw)
6666
output_ds_path = f'{raked_base}/{output_folder}/draw_{draw_int}.nc'
6767
output_ds = xr.open_dataset(output_ds_path)
6868
output_ds = output_ds.drop_vars(['draw', 'draw_id', 'scenario']).rename({'value': 'val'})
69-
output_ds = output_ds.squeeze('draw').squeeze('scenario')
69+
# Squeeze only if 'draw' and 'scenario' are coordinates
70+
for dim in ['draw', 'scenario']:
71+
if dim in output_ds.coords:
72+
output_ds = output_ds.squeeze(dim)
7073

7174
input_folder = folder_template_dict[cause].format(direction='input', measure=measure, ssp_scenario=ssp_scenario, suffix='')
7275
input_ds_path = f'{raked_base}/{input_folder}/draw_{draw_int}.nc'

0 commit comments

Comments
 (0)