Skip to content

Commit e4e1ed2

Browse files
authored
Multithread image tests (#752)
* Attempt to multithread the image tests * Working multiprocessing * Reset test utils * Address comments * Prevent deadlocks by using spawn * Use forkserver for better performance
1 parent fd7f39a commit e4e1ed2

File tree

1 file changed

+232
-81
lines changed

1 file changed

+232
-81
lines changed

tests/integration/test_images.py

Lines changed: 232 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import os
2+
from concurrent.futures import ProcessPoolExecutor, as_completed
3+
from contextlib import redirect_stderr, redirect_stdout
4+
from io import StringIO
5+
from multiprocessing import get_context
16
from typing import Dict, List
27

38
from tests.integration.image_checker import (
@@ -17,128 +22,274 @@ def intersect_tasks(
1722
return [task for task in requested_tasks if task in available_tasks]
1823

1924

20-
def test_images():
21-
# To test a different branch, set this to True, and manually set the expansions.
22-
TEST_DIFFERENT_EXPANSIONS = False
23-
if TEST_DIFFERENT_EXPANSIONS:
24-
expansions = dict()
25-
# Example settings:
26-
expansions["expected_dir"] = "/lcrc/group/e3sm/public_html/zppy_test_resources/"
27-
expansions["user_www"] = (
28-
"/lcrc/group/e3sm/public_html/diagnostic_output/ac.forsyth2/"
29-
)
30-
expansions["unique_id"] = "test_zppy_20250401"
31-
diff_dir_suffix = "_test_pr699_try6"
32-
else:
33-
expansions = get_expansions()
34-
diff_dir_suffix = ""
35-
test_results_dict: Dict[str, Results] = dict()
36-
requested_tasks: List[str] = list(expansions["tasks_to_run"])
37-
try:
38-
# TODO: these could be run in parallel, if easy to implement
39-
40-
# Weekly comprehensive tests
41-
print("Checking weekly cfg output")
42-
if "weekly_comprehensive_v2" in expansions["cfgs_to_run"]:
43-
available_tasks = [
44-
"e3sm_diags",
45-
"mpas_analysis",
46-
"global_time_series",
47-
"ilamb",
48-
]
49-
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
50-
set_up_and_run_image_checker(
25+
def prepare_test_configs(
26+
expansions: Dict, diff_dir_suffix: str, requested_tasks: List[str]
27+
) -> List[tuple]:
28+
"""Prepare test configurations based on expansions."""
29+
test_configs = []
30+
31+
# Weekly comprehensive tests
32+
print("Preparing weekly cfg tests")
33+
if "weekly_comprehensive_v2" in expansions["cfgs_to_run"]:
34+
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
35+
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
36+
test_configs.append(
37+
(
5138
"comprehensive_v2",
5239
V2_CASE_NAME,
5340
expansions,
5441
diff_dir_suffix,
5542
tasks_to_run,
56-
test_results_dict,
5743
)
58-
if "weekly_comprehensive_v3" in expansions["cfgs_to_run"]:
59-
# Adds pcmdi_diags
60-
available_tasks = [
61-
"e3sm_diags",
62-
"mpas_analysis",
63-
"global_time_series",
64-
"ilamb",
65-
"pcmdi_diags",
66-
]
67-
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
68-
set_up_and_run_image_checker(
44+
)
45+
46+
if "weekly_comprehensive_v3" in expansions["cfgs_to_run"]:
47+
# Adds pcmdi_diags
48+
available_tasks = [
49+
"e3sm_diags",
50+
"mpas_analysis",
51+
"global_time_series",
52+
"ilamb",
53+
"pcmdi_diags",
54+
]
55+
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
56+
test_configs.append(
57+
(
6958
"comprehensive_v3",
7059
V3_CASE_NAME,
7160
expansions,
7261
diff_dir_suffix,
7362
tasks_to_run,
74-
test_results_dict,
7563
)
76-
if "weekly_bundles" in expansions["cfgs_to_run"]:
77-
# No mpas_analysis
78-
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
79-
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
80-
set_up_and_run_image_checker(
64+
)
65+
66+
if "weekly_bundles" in expansions["cfgs_to_run"]:
67+
# No mpas_analysis
68+
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
69+
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
70+
test_configs.append(
71+
(
8172
"bundles",
8273
V3_CASE_NAME,
8374
expansions,
8475
diff_dir_suffix,
8576
tasks_to_run,
86-
test_results_dict,
8777
)
78+
)
8879

89-
# Legacy comprehensive tests
90-
# These cfgs remain unchanged, but we test the latest zppy code on them
91-
# to check for backwards-compatiblity issues.
92-
print("Checking legacy cfg output")
93-
if "weekly_legacy_3.0.0_comprehensive_v2" in expansions["cfgs_to_run"]:
94-
available_tasks = [
95-
"e3sm_diags",
96-
"mpas_analysis",
97-
"global_time_series",
98-
"ilamb",
99-
]
100-
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
101-
set_up_and_run_image_checker(
80+
# Legacy comprehensive tests
81+
print("Preparing legacy cfg tests")
82+
if "weekly_legacy_3.0.0_comprehensive_v2" in expansions["cfgs_to_run"]:
83+
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
84+
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
85+
test_configs.append(
86+
(
10287
"legacy_3.0.0_comprehensive_v2",
10388
V2_CASE_NAME,
10489
expansions,
10590
diff_dir_suffix,
10691
tasks_to_run,
107-
test_results_dict,
10892
)
109-
if "weekly_legacy_3.0.0_comprehensive_v3" in expansions["cfgs_to_run"]:
110-
available_tasks = [
111-
"e3sm_diags",
112-
"mpas_analysis",
113-
"global_time_series",
114-
"ilamb",
115-
]
116-
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
117-
set_up_and_run_image_checker(
93+
)
94+
95+
if "weekly_legacy_3.0.0_comprehensive_v3" in expansions["cfgs_to_run"]:
96+
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
97+
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
98+
test_configs.append(
99+
(
118100
"legacy_3.0.0_comprehensive_v3",
119101
V3_CASE_NAME,
120102
expansions,
121103
diff_dir_suffix,
122104
tasks_to_run,
123-
test_results_dict,
124105
)
125-
if "weekly_legacy_3.0.0_bundles" in expansions["cfgs_to_run"]:
126-
# No mpas_analysis
127-
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
128-
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
129-
set_up_and_run_image_checker(
106+
)
107+
108+
if "weekly_legacy_3.0.0_bundles" in expansions["cfgs_to_run"]:
109+
# No mpas_analysis
110+
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
111+
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
112+
test_configs.append(
113+
(
130114
"legacy_3.0.0_bundles",
131115
V3_CASE_NAME,
132116
expansions,
133117
diff_dir_suffix,
134118
tasks_to_run,
119+
)
120+
)
121+
122+
return test_configs
123+
124+
125+
def map_cfg_to_test_name(cfg: str) -> str:
126+
"""Map from weekly_* config names to actual test names."""
127+
return cfg.replace("weekly_", "")
128+
129+
130+
def order_results(
131+
test_results_dict: Dict[str, Results],
132+
test_configs: List[tuple],
133+
expansions: Dict,
134+
) -> Dict[str, Results]:
135+
"""Reorder results to match the order in expansions."""
136+
ordered_results_dict: Dict[str, Results] = dict()
137+
138+
# Get the order from expansions
139+
cfg_order = [map_cfg_to_test_name(cfg) for cfg in expansions["cfgs_to_run"]]
140+
task_order = list(expansions["tasks_to_run"])
141+
142+
for cfg_name in cfg_order:
143+
# Find the test config that matches this cfg_name
144+
matching_config = None
145+
for config in test_configs:
146+
if config[0] == cfg_name:
147+
matching_config = config
148+
break
149+
150+
if matching_config is None:
151+
continue
152+
153+
test_name = matching_config[0]
154+
# Add results for each task in the order from expansions
155+
for task in task_order:
156+
# Look for exact key match: test_name_task
157+
expected_key = f"{test_name}_{task}"
158+
if expected_key in test_results_dict:
159+
ordered_results_dict[expected_key] = test_results_dict[expected_key]
160+
161+
return ordered_results_dict
162+
163+
164+
def run_test(
165+
test_name: str,
166+
case_name: str,
167+
expansions: Dict,
168+
diff_dir_suffix: str,
169+
tasks_to_run: List[str],
170+
) -> tuple[str, Dict[str, Results], str]:
171+
"""Run a single test and return its name, results dict, and captured output."""
172+
captured_output = StringIO()
173+
174+
try:
175+
test_results_dict: Dict[str, Results] = dict()
176+
177+
# Use context managers for thread-safe redirection
178+
with redirect_stdout(captured_output), redirect_stderr(captured_output):
179+
set_up_and_run_image_checker(
180+
test_name,
181+
case_name,
182+
expansions,
183+
diff_dir_suffix,
184+
tasks_to_run,
135185
test_results_dict,
136186
)
187+
188+
output = captured_output.getvalue()
189+
190+
# Write to individual log file
191+
log_filename = f"images_logs/test_{test_name}.log"
192+
os.makedirs(
193+
"images_logs", exist_ok=True
194+
) # Creates directory if it doesn't exist
195+
with open(log_filename, "w") as f:
196+
f.write(output)
197+
198+
return test_name, test_results_dict, output
199+
except Exception as e:
200+
output = captured_output.getvalue()
201+
# Still write the partial output
202+
log_filename = f"test_{test_name}.log"
203+
with open(log_filename, "w") as f:
204+
f.write(output)
205+
raise e
206+
207+
208+
def test_images():
209+
# To test a different branch, set this to True, and manually set the expansions.
210+
TEST_DIFFERENT_EXPANSIONS = False
211+
if TEST_DIFFERENT_EXPANSIONS:
212+
expansions = dict()
213+
# Example settings:
214+
expansions["expected_dir"] = "/lcrc/group/e3sm/public_html/zppy_test_resources/"
215+
expansions["user_www"] = (
216+
"/lcrc/group/e3sm/public_html/diagnostic_output/ac.forsyth2/"
217+
)
218+
expansions["unique_id"] = "test_zppy_20250401"
219+
diff_dir_suffix = "_test_pr699_try6"
220+
else:
221+
expansions = get_expansions()
222+
diff_dir_suffix = ""
223+
224+
test_results_dict: Dict[str, Results] = dict()
225+
requested_tasks: List[str] = list(expansions["tasks_to_run"])
226+
227+
# Prepare test configurations
228+
test_configs = prepare_test_configs(expansions, diff_dir_suffix, requested_tasks)
229+
230+
try:
231+
# Run tests in parallel using ProcessPoolExecutor for isolated stdout/stderr
232+
print(f"Running {len(test_configs)} tests in parallel")
233+
print("Individual test logs will be written to test_<name>.log files")
234+
with ProcessPoolExecutor(
235+
max_workers=6, mp_context=get_context("forkserver")
236+
) as executor:
237+
# Submit all tests
238+
future_to_test = {
239+
executor.submit(run_test, *config): config[0] for config in test_configs
240+
}
241+
242+
# Collect results as they complete
243+
for future in as_completed(future_to_test):
244+
test_name = future_to_test[future]
245+
try:
246+
result_name, results_dict, output = future.result()
247+
# Merge all results from this test into the main dict
248+
test_results_dict.update(results_dict)
249+
print(
250+
f"✓ Completed: {test_name} ({len(results_dict)} tasks) (log: test_{test_name}.log)"
251+
)
252+
except Exception as e:
253+
print(f"✗ Test {test_name} generated an exception: {e}")
254+
# Still try to write partial results
255+
construct_markdown_summary_table(
256+
test_results_dict, "early_test_images_summary.md"
257+
)
258+
raise e
259+
137260
except Exception as e:
138261
construct_markdown_summary_table(
139262
test_results_dict, "early_test_images_summary.md"
140263
)
141264
raise e
142-
construct_markdown_summary_table(test_results_dict, "test_images_summary.md")
143-
for tr in test_results_dict.values():
265+
266+
# Reorder results to match the order in expansions
267+
ordered_results_dict = order_results(test_results_dict, test_configs, expansions)
268+
269+
construct_markdown_summary_table(ordered_results_dict, "test_images_summary.md")
270+
271+
print("\nTest Summary:")
272+
# Using alignment specifiers:
273+
print(f"{'Test':<50} {'Total':>10} {'Correct':>10} {'Status':>10}")
274+
print("-" * 82)
275+
276+
all_passed = True
277+
for key, tr in ordered_results_dict.items():
278+
status = (
279+
"✓ PASS" if tr.image_count_total == tr.image_count_correct else "✗ FAIL"
280+
)
281+
if tr.image_count_total != tr.image_count_correct:
282+
all_passed = False
283+
print(
284+
f"{key:<50} {tr.image_count_total:>10} {tr.image_count_correct:>10} {status:>10}"
285+
)
286+
287+
print("-" * 82)
288+
289+
if not all_passed:
290+
print(
291+
"\n⚠ Some tests had mismatched or missing images. Check individual log files for details."
292+
)
293+
294+
for tr in ordered_results_dict.values():
144295
assert tr.image_count_total == tr.image_count_correct

0 commit comments

Comments
 (0)