diff --git a/src/python/clawutil/test.py b/src/python/clawutil/test.py index 95e836f..651e598 100644 --- a/src/python/clawutil/test.py +++ b/src/python/clawutil/test.py @@ -8,37 +8,231 @@ class ClawpackRegressionTest(unittest.TestCase) results and looking for errors. """ -from __future__ import print_function - -from __future__ import absolute_import +from pathlib import Path import os import sys -import tempfile import subprocess -import unittest +import importlib import shutil import inspect +import random +import string +from collections.abc import Iterable +from typing import Optional + +import numpy as np + +import clawpack.clawutil.runclaw as runclaw +import clawpack.clawutil.claw_git_status as claw_git_status +import clawpack.pyclaw.solution as solution +import clawpack.pyclaw.gauges as gauges + + +# Support for ClawpackRegressionTest +import tempfile +import unittest import time import glob -import numpy +# TODO: Update documentation +class ClawpackTestRunner: + r"""Base Clawpcak regression test runner -import clawpack.geoclaw.util -import clawpack.pyclaw.gauges as gauges -import clawpack.pyclaw.solution as solution -import clawpack.clawutil.claw_git_status as claw_git_status -from clawpack.clawutil import runclaw + + Hints on use of pytest + - *-s* will not capture output and allows for pdb use + - *--basetemp=* sets the output directory + - + """ + + def __init__(self, path: Path, test_path: Optional[Path]=None): + r"""""" + + self.temp_path = path + # This works if the originating caller is in the right spot in the stack + # If this is not the case, we provide a way to set it manually + if test_path: + self.test_path = test_path + else: + self.test_path = Path(Path(inspect.stack()[2].filename).absolute() + ).parent + self.executable_name = 'xclaw' + + # Do we want to set this? + self.verbose = False + + + def set_data(self, setrun_path: Optional[Path]=None): + r"""Set the rundata for the test. + + :Input: + - setrun_path (Path) - path to setrun file to be run + """ + + if not setrun_path: + setrun_path = Path(self.test_path) / "setrun.py" + + mod_name = '_'.join(("setrun", + "".join(random.choices(string.ascii_letters + + string.digits, k=32)))) + spec = importlib.util.spec_from_file_location(mod_name, setrun_path) + setrun_module = importlib.util.module_from_spec(spec) + sys.modules[mod_name] = setrun_module + spec.loader.exec_module(setrun_module) + self.rundata = setrun_module.setrun() + + + def write_data(self, path: Optional[Path]=None): + r"""Write out the data contained in *rundata*.""" + + if not path: + path = self.temp_path + self.rundata.write(out_dir=path) + + + def build_executable(self, make_level: str='default', + FFLAGS: Optional[str]=None, + LFLAGS: Optional[str]=None, + verbose: bool=False): + r"""Build executable for test""" + + # Assumes GCC CLI + if not FFLAGS: + FFLAGS = os.environ.get('FFLAGS', "-O2 -fopenmp") + if not LFLAGS: + LFLAGS = os.environ.get('LFLAGS', FFLAGS) + + if make_level.lower() == "new": + cmd = "".join((f"cd {self.test_path} ; make new ", + f"FFLAGS='{FFLAGS}' LFLAGS='{LFLAGS}'")) + elif make_level.lower() == "default": + # clean up *.o and *.mod files in test path only + for path in self.test_path.glob("*.o"): + path.unlink() + for path in self.test_path.glob("*.mod"): + path.unlink() + cmd = "".join((f"cd {self.test_path} ; make .exe ", + f"FFLAGS='{FFLAGS}' LFLAGS='{LFLAGS}'")) + elif make_level.lower() == "exe": + cmd = "".join((f"cd {self.test_path} ; make .exe ", + f"FFLAGS='{FFLAGS}' LFLAGS='{LFLAGS}'")) + else: + raise ValueError(f"Invaled make_level={make_level} given.") + + try: + if verbose: + print(f"Build command: {cmd}") + subprocess.run(cmd, shell=True, check=True) + except subprocess.CalledProcessError as e: + self.clean() + raise e + + shutil.move(self.test_path / self.executable_name, self.temp_path) + + + def run_code(self): + r"""""" + runclaw.runclaw(xclawcmd=self.temp_path / self.executable_name, + rundir=self.temp_path, + outdir=self.temp_path, + overwrite=True, + restart=False) + + def clean(self): + """""" + pass + + + def check_frame(self, frame: int, indices: Iterable=(0,), + regression_path: Optional[Path]=None, + save: bool=False, **kwargs): + r"""""" + + if not regression_path: + regression_path = self.test_path / "regression_data" + + # Load test output data + sol = solution.Solution(frame, path=self.temp_path) + sol_sums = [sol.q[i, ...].sum() for i in indices] + + # Load regression data + regression_data = regression_path / f"frame{str(frame).zfill(4)}.txt" + if save: + np.savetxt(regression_data, sol_sums) + claw_git_status.make_git_status_file(outdir=regression_path) + regression_sum = np.loadtxt(regression_data) + + # Compare data + kwargs.setdefault('rtol', 1e-14) + kwargs.setdefault('atol', 1e-8) + np.testing.assert_allclose(sol_sums, regression_sum, **kwargs) + + + def check_gauge(self, gauge_id: int, + indices: Iterable=(0,), + regression_path: Optional[Path]=None, + save: bool=False, **kwargs): + r"""Basic test to assert gauge equality + + :Input: + - *save* (bool) - If *True* will save the output from this test to + the file *regresion_data.txt*. Default is *False*. + - *indices* (tuple) - Contains indices to compare in the gague + comparison. Defaults to *(0)*. + - *rtol* (float) - Relative tolerance used in the comparison, default + is *1e-14*. Note that the old *tolerance* input is now synonymous + with this parameter. + - *atol* (float) - Absolute tolerance used in the comparison, default + is *1e-08*. + """ + + if not(isinstance(indices, tuple) or isinstance(indices, list)): + indices = tuple(indices) + + if not regression_path: + regression_path = self.test_path / "regression_data" + + # Load test output data + gauge = gauges.GaugeSolution(gauge_id, path=self.temp_path) + if gauge.q.shape[1] == 0: + raise AssertionError(f"Empty gauge {gauge_id}.") + + # Load regression data + if save: + shutil.copy(self.temp_path / f"gauge{str(gauge_id).zfill(5)}.txt", + regression_path) + claw_git_status.make_git_status_file(outdir=regression_path) + regression_gauge = gauges.GaugeSolution(gauge_id, path=regression_path) + + if gauge.q.shape[1] != regression_gauge.q.shape[1]: + raise AssertionError( "Gauges have different sizes, regression" + + f" gauge = {regression_gauge.q.shape}, " + + f"test gauge = {gauge.q.shape}") + + # Compare data + kwargs.setdefault('rtol', 1e-14) + kwargs.setdefault('atol', 1e-8) + try: + for n in indices: + np.testing.assert_allclose(gauge.q[n, :], + regression_gauge.q[n, :], + **kwargs) + except AssertionError as e: + err_msg = "\n".join((e.args[0], + "Gauge Match Failed for gauge = %s" % gauge_id)) + err_msg = "\n".join((err_msg, " failures in fields:")) + failure_indices = [] + for n in indices: + if not np.allclose(gauge.q[n, :], regression_gauge.q[n, :], + **kwargs): + failure_indices.append(str(n)) + index_str = ", ".join(failure_indices) + raise AssertionError(" ".join((err_msg, index_str))) -# Support for WIP decorator removed -# It did not seem to be used in any examples, so simplify for converting -# from nose to pytest. Note that for pytest one can use one of these -# decorators instead: -# import pytest -# @pytest.mark.xfail(reason='WIP') # for 'expected to fail' -# @pytest.mark.skip(reason='WIP') # to skip entirely -# +# Old unittest based framework - works with PyTest, but is being replaced by +# the runner above class ClawpackRegressionTest(unittest.TestCase): r"""Base Clawpcak regression test setup @@ -283,13 +477,13 @@ def check_frame(self, save=False, indices=[0], frame_num=1, regression_data_file = os.path.join(self.test_path, "regression_data", file_name) if save: - numpy.savetxt(regression_data_file, data_sum) + np.savetxt(regression_data_file, data_sum) claw_git_status.make_git_status_file( outdir=os.path.join(self.test_path, "regression_data")) - regression_sum = numpy.loadtxt(regression_data_file) + regression_sum = np.loadtxt(regression_data_file) - assert numpy.allclose(data_sum, regression_sum, rtol=rtol, atol=atol), \ + assert np.allclose(data_sum, regression_sum, rtol=rtol, atol=atol), \ "\n new_data: %s, \n expected: %s" % (data_sum, regression_sum) @@ -332,7 +526,7 @@ def check_gauges(self, save=False, gauge_id=1, indices=[0], # Compare data try: for n in indices: - numpy.testing.assert_allclose(gauge.q[n, :], + np.testing.assert_allclose(gauge.q[n, :], regression_gauge.q[n, :], rtol=rtol, atol=atol, verbose=False) @@ -342,7 +536,7 @@ def check_gauges(self, save=False, gauge_id=1, indices=[0], err_msg = "\n".join((err_msg, " failures in fields:")) failure_indices = [] for n in indices: - if not numpy.allclose(gauge.q[n, :], regression_gauge.q[n, :], + if not np.allclose(gauge.q[n, :], regression_gauge.q[n, :], rtol=rtol, atol=atol): failure_indices.append(str(n)) index_str = ", ".join(failure_indices)