-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathtest_images.py
More file actions
337 lines (291 loc) · 11.3 KB
/
test_images.py
File metadata and controls
337 lines (291 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from multiprocessing import get_context
from typing import Dict, List
from tests.integration.image_checker import (
Results,
construct_markdown_summary_table,
set_up_and_run_image_checker,
)
from tests.integration.utils import get_expansions
V3_CASE_NAME = "v3.LR.historical_0051"
V2_CASE_NAME = "v2.LR.historical_0201"
def intersect_tasks(
available_tasks: List[str], requested_tasks: List[str]
) -> List[str]:
return [task for task in requested_tasks if task in available_tasks]
def prepare_test_configs(
expansions: Dict, diff_dir_suffix: str, requested_tasks: List[str]
) -> List[tuple]:
"""Prepare test configurations based on expansions."""
test_configs = []
# Weekly tests
print("Preparing weekly cfg tests")
if "weekly_comprehensive_v2" in expansions["cfgs_to_run"]:
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"comprehensive_v2",
V2_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
if "weekly_comprehensive_v3" in expansions["cfgs_to_run"]:
# Adds pcmdi_diags
available_tasks = [
"e3sm_diags",
"mpas_analysis",
"global_time_series",
"ilamb",
"pcmdi_diags",
]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"comprehensive_v3",
V3_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
if "weekly_bundles" in expansions["cfgs_to_run"]:
# No mpas_analysis
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"bundles",
V3_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
# Legacy 3.1.0 comprehensive tests
print("Preparing legacy 3.1.0 cfg tests")
if "weekly_legacy_3.1.0_comprehensive_v2" in expansions["cfgs_to_run"]:
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"legacy_3.1.0_comprehensive_v2",
V2_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
if "weekly_legacy_3.1.0_comprehensive_v3" in expansions["cfgs_to_run"]:
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"legacy_3.1.0_comprehensive_v3",
V3_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
if "weekly_legacy_3.1.0_bundles" in expansions["cfgs_to_run"]:
# No mpas_analysis
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"legacy_3.1.0_bundles",
V3_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
# Legacy 3.0.0 comprehensive tests
print("Preparing legacy 3.0.0 cfg tests")
if "weekly_legacy_3.0.0_comprehensive_v2" in expansions["cfgs_to_run"]:
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"legacy_3.0.0_comprehensive_v2",
V2_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
if "weekly_legacy_3.0.0_comprehensive_v3" in expansions["cfgs_to_run"]:
available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"legacy_3.0.0_comprehensive_v3",
V3_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
if "weekly_legacy_3.0.0_bundles" in expansions["cfgs_to_run"]:
# No mpas_analysis
available_tasks = ["e3sm_diags", "global_time_series", "ilamb"]
tasks_to_run = intersect_tasks(available_tasks, requested_tasks)
test_configs.append(
(
"legacy_3.0.0_bundles",
V3_CASE_NAME,
expansions,
diff_dir_suffix,
tasks_to_run,
)
)
return test_configs
def map_cfg_to_test_name(cfg: str) -> str:
"""Map from weekly_* config names to actual test names."""
return cfg.replace("weekly_", "")
def order_results(
test_results_dict: Dict[str, Results],
test_configs: List[tuple],
expansions: Dict,
) -> Dict[str, Results]:
"""Reorder results to match the order in expansions."""
ordered_results_dict: Dict[str, Results] = dict()
# Get the order from expansions
cfg_order = [map_cfg_to_test_name(cfg) for cfg in expansions["cfgs_to_run"]]
task_order = list(expansions["tasks_to_run"])
for cfg_name in cfg_order:
# Find the test config that matches this cfg_name
matching_config = None
for config in test_configs:
if config[0] == cfg_name:
matching_config = config
break
if matching_config is None:
continue
test_name = matching_config[0]
# Add results for each task in the order from expansions
for task in task_order:
# Look for exact key match: test_name_task
expected_key = f"{test_name}_{task}"
if expected_key in test_results_dict:
ordered_results_dict[expected_key] = test_results_dict[expected_key]
return ordered_results_dict
def run_test(
test_name: str,
case_name: str,
expansions: Dict,
diff_dir_suffix: str,
tasks_to_run: List[str],
) -> tuple[str, Dict[str, Results], str]:
"""Run a single test and return its name, results dict, and captured output."""
captured_output = StringIO()
try:
test_results_dict: Dict[str, Results] = dict()
# Use context managers for thread-safe redirection
with redirect_stdout(captured_output), redirect_stderr(captured_output):
set_up_and_run_image_checker(
test_name,
case_name,
expansions,
diff_dir_suffix,
tasks_to_run,
test_results_dict,
)
output = captured_output.getvalue()
# Write to individual log file
log_filename = f"images_logs/test_{test_name}.log"
os.makedirs(
"images_logs", exist_ok=True
) # Creates directory if it doesn't exist
with open(log_filename, "w") as f:
f.write(output)
return test_name, test_results_dict, output
except Exception as e:
output = captured_output.getvalue()
# Still write the partial output
log_filename = f"test_{test_name}.log"
with open(log_filename, "w") as f:
f.write(output)
raise e
def test_images():
# To test a different branch, set this to True, and manually set the expansions.
TEST_DIFFERENT_EXPANSIONS = False
if TEST_DIFFERENT_EXPANSIONS:
expansions = dict()
# Example settings:
expansions["expected_dir"] = "/lcrc/group/e3sm/public_html/zppy_test_resources/"
expansions["user_www"] = (
"/lcrc/group/e3sm/public_html/diagnostic_output/ac.forsyth2/"
)
expansions["unique_id"] = "test_zppy_20250401"
diff_dir_suffix = "_test_pr699_try6"
else:
expansions = get_expansions()
diff_dir_suffix = ""
test_results_dict: Dict[str, Results] = dict()
requested_tasks: List[str] = list(expansions["tasks_to_run"])
# Prepare test configurations
test_configs = prepare_test_configs(expansions, diff_dir_suffix, requested_tasks)
try:
# Run tests in parallel using ProcessPoolExecutor for isolated stdout/stderr
print(f"Running {len(test_configs)} tests in parallel")
print("Individual test logs will be written to test_<name>.log files")
with ProcessPoolExecutor(
max_workers=6, mp_context=get_context("forkserver")
) as executor:
# Submit all tests
future_to_test = {
executor.submit(run_test, *config): config[0] for config in test_configs
}
# Collect results as they complete
for future in as_completed(future_to_test):
test_name = future_to_test[future]
try:
result_name, results_dict, output = future.result()
# Merge all results from this test into the main dict
test_results_dict.update(results_dict)
print(
f"✓ Completed: {test_name} ({len(results_dict)} tasks) (log: test_{test_name}.log)"
)
except Exception as e:
print(f"✗ Test {test_name} generated an exception: {e}")
# Still try to write partial results
construct_markdown_summary_table(
test_results_dict, "early_test_images_summary.md"
)
raise e
except Exception as e:
construct_markdown_summary_table(
test_results_dict, "early_test_images_summary.md"
)
raise e
# Reorder results to match the order in expansions
ordered_results_dict = order_results(test_results_dict, test_configs, expansions)
construct_markdown_summary_table(ordered_results_dict, "test_images_summary.md")
print("\nTest Summary:")
# Using alignment specifiers:
print(f"{'Test':<50} {'Total':>10} {'Correct':>10} {'Status':>10}")
print("-" * 82)
all_passed = True
for key, tr in ordered_results_dict.items():
status = (
"✓ PASS" if tr.image_count_total == tr.image_count_correct else "✗ FAIL"
)
if tr.image_count_total != tr.image_count_correct:
all_passed = False
print(
f"{key:<50} {tr.image_count_total:>10} {tr.image_count_correct:>10} {status:>10}"
)
print("-" * 82)
if not all_passed:
print(
"\n⚠ Some tests had mismatched or missing images. Check individual log files for details."
)
for tr in ordered_results_dict.values():
assert tr.image_count_total == tr.image_count_correct