-
Notifications
You must be signed in to change notification settings - Fork 14
Multithread image tests #752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
0e9b692
a81a7c7
b8881c7
76bac29
9e2776d
69a6487
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,6 @@ | ||
| from concurrent.futures import ProcessPoolExecutor, as_completed | ||
| from contextlib import redirect_stderr, redirect_stdout | ||
| from io import StringIO | ||
| from typing import Dict, List | ||
|
|
||
| from tests.integration.image_checker import ( | ||
|
|
@@ -17,128 +20,268 @@ def intersect_tasks( | |
| return [task for task in requested_tasks if task in available_tasks] | ||
|
|
||
|
|
||
| def test_images(): | ||
| # To test a different branch, set this to True, and manually set the expansions. | ||
| TEST_DIFFERENT_EXPANSIONS = False | ||
| if TEST_DIFFERENT_EXPANSIONS: | ||
| expansions = dict() | ||
| # Example settings: | ||
| expansions["expected_dir"] = "/lcrc/group/e3sm/public_html/zppy_test_resources/" | ||
| expansions["user_www"] = ( | ||
| "/lcrc/group/e3sm/public_html/diagnostic_output/ac.forsyth2/" | ||
| ) | ||
| expansions["unique_id"] = "test_zppy_20250401" | ||
| diff_dir_suffix = "_test_pr699_try6" | ||
| else: | ||
| expansions = get_expansions() | ||
| diff_dir_suffix = "" | ||
| test_results_dict: Dict[str, Results] = dict() | ||
| requested_tasks: List[str] = list(expansions["tasks_to_run"]) | ||
| try: | ||
| # TODO: these could be run in parallel, if easy to implement | ||
|
|
||
| # Weekly comprehensive tests | ||
| print("Checking weekly cfg output") | ||
| if "weekly_comprehensive_v2" in expansions["cfgs_to_run"]: | ||
| available_tasks = [ | ||
| "e3sm_diags", | ||
| "mpas_analysis", | ||
| "global_time_series", | ||
| "ilamb", | ||
| ] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| set_up_and_run_image_checker( | ||
| def prepare_test_configs( | ||
| expansions: Dict, diff_dir_suffix: str, requested_tasks: List[str] | ||
| ) -> List[tuple]: | ||
| """Prepare test configurations based on expansions.""" | ||
| test_configs = [] | ||
|
|
||
| # Weekly comprehensive tests | ||
| print("Preparing weekly cfg tests") | ||
| if "weekly_comprehensive_v2" in expansions["cfgs_to_run"]: | ||
| available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| test_configs.append( | ||
| ( | ||
| "comprehensive_v2", | ||
| V2_CASE_NAME, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| test_results_dict, | ||
| ) | ||
| if "weekly_comprehensive_v3" in expansions["cfgs_to_run"]: | ||
| # Adds pcmdi_diags | ||
| available_tasks = [ | ||
| "e3sm_diags", | ||
| "mpas_analysis", | ||
| "global_time_series", | ||
| "ilamb", | ||
| "pcmdi_diags", | ||
| ] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| set_up_and_run_image_checker( | ||
| ) | ||
|
|
||
| if "weekly_comprehensive_v3" in expansions["cfgs_to_run"]: | ||
| # Adds pcmdi_diags | ||
| available_tasks = [ | ||
| "e3sm_diags", | ||
| "mpas_analysis", | ||
| "global_time_series", | ||
| "ilamb", | ||
| "pcmdi_diags", | ||
| ] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| test_configs.append( | ||
| ( | ||
| "comprehensive_v3", | ||
| V3_CASE_NAME, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| test_results_dict, | ||
| ) | ||
| if "weekly_bundles" in expansions["cfgs_to_run"]: | ||
| # No mpas_analysis | ||
| available_tasks = ["e3sm_diags", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| set_up_and_run_image_checker( | ||
| ) | ||
|
|
||
| if "weekly_bundles" in expansions["cfgs_to_run"]: | ||
| # No mpas_analysis | ||
| available_tasks = ["e3sm_diags", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| test_configs.append( | ||
| ( | ||
| "bundles", | ||
| V3_CASE_NAME, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| test_results_dict, | ||
| ) | ||
| ) | ||
|
|
||
| # Legacy comprehensive tests | ||
| # These cfgs remain unchanged, but we test the latest zppy code on them | ||
| # to check for backwards-compatiblity issues. | ||
| print("Checking legacy cfg output") | ||
| if "weekly_legacy_3.0.0_comprehensive_v2" in expansions["cfgs_to_run"]: | ||
| available_tasks = [ | ||
| "e3sm_diags", | ||
| "mpas_analysis", | ||
| "global_time_series", | ||
| "ilamb", | ||
| ] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| set_up_and_run_image_checker( | ||
| # Legacy comprehensive tests | ||
| print("Preparing legacy cfg tests") | ||
| if "weekly_legacy_3.0.0_comprehensive_v2" in expansions["cfgs_to_run"]: | ||
| available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| test_configs.append( | ||
| ( | ||
| "legacy_3.0.0_comprehensive_v2", | ||
| V2_CASE_NAME, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| test_results_dict, | ||
| ) | ||
| if "weekly_legacy_3.0.0_comprehensive_v3" in expansions["cfgs_to_run"]: | ||
| available_tasks = [ | ||
| "e3sm_diags", | ||
| "mpas_analysis", | ||
| "global_time_series", | ||
| "ilamb", | ||
| ] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| set_up_and_run_image_checker( | ||
| ) | ||
|
|
||
| if "weekly_legacy_3.0.0_comprehensive_v3" in expansions["cfgs_to_run"]: | ||
| available_tasks = ["e3sm_diags", "mpas_analysis", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| test_configs.append( | ||
| ( | ||
| "legacy_3.0.0_comprehensive_v3", | ||
| V3_CASE_NAME, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| test_results_dict, | ||
| ) | ||
| if "weekly_legacy_3.0.0_bundles" in expansions["cfgs_to_run"]: | ||
| # No mpas_analysis | ||
| available_tasks = ["e3sm_diags", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| set_up_and_run_image_checker( | ||
| ) | ||
|
|
||
| if "weekly_legacy_3.0.0_bundles" in expansions["cfgs_to_run"]: | ||
| # No mpas_analysis | ||
| available_tasks = ["e3sm_diags", "global_time_series", "ilamb"] | ||
| tasks_to_run = intersect_tasks(available_tasks, requested_tasks) | ||
| test_configs.append( | ||
| ( | ||
| "legacy_3.0.0_bundles", | ||
| V3_CASE_NAME, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| ) | ||
| ) | ||
|
|
||
| return test_configs | ||
|
|
||
|
|
||
| def map_cfg_to_test_name(cfg: str) -> str: | ||
| """Map from weekly_* config names to actual test names.""" | ||
| return cfg.replace("weekly_", "") | ||
|
|
||
|
|
||
| def order_results( | ||
| test_results_dict: Dict[str, Results], | ||
| test_configs: List[tuple], | ||
| expansions: Dict, | ||
| ) -> Dict[str, Results]: | ||
| """Reorder results to match the order in expansions.""" | ||
| ordered_results_dict: Dict[str, Results] = dict() | ||
|
|
||
| # Get the order from expansions | ||
| cfg_order = [map_cfg_to_test_name(cfg) for cfg in expansions["cfgs_to_run"]] | ||
| task_order = list(expansions["tasks_to_run"]) | ||
|
|
||
| for cfg_name in cfg_order: | ||
| # Find the test config that matches this cfg_name | ||
| matching_config = None | ||
| for config in test_configs: | ||
| if config[0] == cfg_name: | ||
| matching_config = config | ||
| break | ||
|
|
||
| if matching_config is None: | ||
| continue | ||
|
|
||
| test_name = matching_config[0] | ||
| # Add results for each task in the order from expansions | ||
| for task in task_order: | ||
| # Look for exact key match: test_name_task | ||
| expected_key = f"{test_name}_{task}" | ||
| if expected_key in test_results_dict: | ||
| ordered_results_dict[expected_key] = test_results_dict[expected_key] | ||
|
|
||
| return ordered_results_dict | ||
|
|
||
|
|
||
| def run_test( | ||
| test_name: str, | ||
| case_name: str, | ||
| expansions: Dict, | ||
| diff_dir_suffix: str, | ||
| tasks_to_run: List[str], | ||
| ) -> tuple[str, Dict[str, Results], str]: | ||
| """Run a single test and return its name, results dict, and captured output.""" | ||
| captured_output = StringIO() | ||
|
|
||
| try: | ||
| test_results_dict: Dict[str, Results] = dict() | ||
|
|
||
| # Use context managers for thread-safe redirection | ||
| with redirect_stdout(captured_output), redirect_stderr(captured_output): | ||
| set_up_and_run_image_checker( | ||
| test_name, | ||
| case_name, | ||
| expansions, | ||
| diff_dir_suffix, | ||
| tasks_to_run, | ||
| test_results_dict, | ||
| ) | ||
|
|
||
| output = captured_output.getvalue() | ||
|
|
||
| # Write to individual log file | ||
| log_filename = f"test_{test_name}.log" | ||
| with open(log_filename, "w") as f: | ||
| f.write(output) | ||
|
|
||
| return test_name, test_results_dict, output | ||
| except Exception as e: | ||
| output = captured_output.getvalue() | ||
| # Still write the partial output | ||
| log_filename = f"test_{test_name}.log" | ||
| with open(log_filename, "w") as f: | ||
| f.write(output) | ||
| raise e | ||
|
|
||
|
|
||
| def test_images(): | ||
| # To test a different branch, set this to True, and manually set the expansions. | ||
| TEST_DIFFERENT_EXPANSIONS = False | ||
| if TEST_DIFFERENT_EXPANSIONS: | ||
| expansions = dict() | ||
| # Example settings: | ||
| expansions["expected_dir"] = "/lcrc/group/e3sm/public_html/zppy_test_resources/" | ||
| expansions["user_www"] = ( | ||
| "/lcrc/group/e3sm/public_html/diagnostic_output/ac.forsyth2/" | ||
| ) | ||
| expansions["unique_id"] = "test_zppy_20250401" | ||
| diff_dir_suffix = "_test_pr699_try6" | ||
| else: | ||
| expansions = get_expansions() | ||
| diff_dir_suffix = "" | ||
|
|
||
| test_results_dict: Dict[str, Results] = dict() | ||
| requested_tasks: List[str] = list(expansions["tasks_to_run"]) | ||
|
|
||
| # Prepare test configurations | ||
| test_configs = prepare_test_configs(expansions, diff_dir_suffix, requested_tasks) | ||
|
|
||
| try: | ||
| # Run tests in parallel using ProcessPoolExecutor for isolated stdout/stderr | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched from multi-threading to multi-processing: Claude's original note was in favor of multi-threading:
But it turns out that complicates the log-writing (e.g., the log file for bundles might contain log lines for the v2 legacy test). Now, with multi-processing, Claude notes:
That is, we may want to consider running the test from a compute node instead. |
||
| print(f"Running {len(test_configs)} tests in parallel") | ||
| print("Individual test logs will be written to test_<name>.log files") | ||
| with ProcessPoolExecutor(max_workers=6) as executor: | ||
| # Submit all tests | ||
| future_to_test = { | ||
| executor.submit(run_test, *config): config[0] for config in test_configs | ||
| } | ||
|
|
||
| # Collect results as they complete | ||
| for future in as_completed(future_to_test): | ||
| test_name = future_to_test[future] | ||
| try: | ||
| result_name, results_dict, output = future.result() | ||
| # Merge all results from this test into the main dict | ||
| test_results_dict.update(results_dict) | ||
| print( | ||
| f"✓ Completed: {test_name} ({len(results_dict)} tasks) (log: test_{test_name}.log)" | ||
| ) | ||
| except Exception as e: | ||
| print(f"✗ Test {test_name} generated an exception: {e}") | ||
| # Still try to write partial results | ||
| construct_markdown_summary_table( | ||
| test_results_dict, "early_test_images_summary.md" | ||
| ) | ||
| raise e | ||
|
|
||
| except Exception as e: | ||
| construct_markdown_summary_table( | ||
| test_results_dict, "early_test_images_summary.md" | ||
| ) | ||
| raise e | ||
| construct_markdown_summary_table(test_results_dict, "test_images_summary.md") | ||
| for tr in test_results_dict.values(): | ||
|
|
||
| # Reorder results to match the order in expansions | ||
| ordered_results_dict = order_results(test_results_dict, test_configs, expansions) | ||
|
|
||
| construct_markdown_summary_table(ordered_results_dict, "test_images_summary.md") | ||
|
|
||
| print("\nTest Summary:") | ||
| print(f"{'Test':<50} {'Total':>10} {'Correct':>10} {'Status':>10}") | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment about using alignment specifiers, for clarity. |
||
| print("-" * 82) | ||
|
|
||
| all_passed = True | ||
| for key, tr in ordered_results_dict.items(): | ||
| status = ( | ||
| "✓ PASS" if tr.image_count_total == tr.image_count_correct else "✗ FAIL" | ||
| ) | ||
| if tr.image_count_total != tr.image_count_correct: | ||
| all_passed = False | ||
| print( | ||
| f"{key:<50} {tr.image_count_total:>10} {tr.image_count_correct:>10} {status:>10}" | ||
| ) | ||
|
|
||
| print("-" * 82) | ||
|
|
||
| if not all_passed: | ||
| print( | ||
| "\n⚠ Some tests had mismatched or missing images. Check individual log files for details." | ||
| ) | ||
|
|
||
| for tr in ordered_results_dict.values(): | ||
| assert tr.image_count_total == tr.image_count_correct | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this
logs/...so the logs are in a subdir for easy deletion (rather thanrm test_*.log) or review.