Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 218 additions & 24 deletions src/python/clawutil/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,231 @@ class ClawpackRegressionTest(unittest.TestCase)
results and looking for errors.
"""

from __future__ import print_function

from __future__ import absolute_import
from pathlib import Path
import os
import sys
import tempfile
import subprocess
import unittest
import importlib
import shutil
import inspect
import random
import string
from collections.abc import Iterable
from typing import Optional

import numpy as np

import clawpack.clawutil.runclaw as runclaw
import clawpack.clawutil.claw_git_status as claw_git_status
import clawpack.pyclaw.solution as solution
import clawpack.pyclaw.gauges as gauges


# Support for ClawpackRegressionTest
import tempfile
import unittest
import time
import glob

import numpy
# TODO: Update documentation
class ClawpackTestRunner:
r"""Base Clawpcak regression test runner

import clawpack.geoclaw.util
import clawpack.pyclaw.gauges as gauges
import clawpack.pyclaw.solution as solution
import clawpack.clawutil.claw_git_status as claw_git_status
from clawpack.clawutil import runclaw

Hints on use of pytest
- *-s* will not capture output and allows for pdb use
- *--basetemp=* sets the output directory
-
"""

def __init__(self, path: Path, test_path: Optional[Path]=None):
r""""""

self.temp_path = path
# This works if the originating caller is in the right spot in the stack
# If this is not the case, we provide a way to set it manually
if test_path:
self.test_path = test_path
else:
self.test_path = Path(Path(inspect.stack()[2].filename).absolute()
).parent
self.executable_name = 'xclaw'

# Do we want to set this?
self.verbose = False


def set_data(self, setrun_path: Optional[Path]=None):
r"""Set the rundata for the test.

:Input:
- setrun_path (Path) - path to setrun file to be run
"""

if not setrun_path:
setrun_path = Path(self.test_path) / "setrun.py"

mod_name = '_'.join(("setrun",
"".join(random.choices(string.ascii_letters
+ string.digits, k=32))))
spec = importlib.util.spec_from_file_location(mod_name, setrun_path)
setrun_module = importlib.util.module_from_spec(spec)
sys.modules[mod_name] = setrun_module
spec.loader.exec_module(setrun_module)
self.rundata = setrun_module.setrun()


def write_data(self, path: Optional[Path]=None):
r"""Write out the data contained in *rundata*."""

if not path:
path = self.temp_path
self.rundata.write(out_dir=path)


def build_executable(self, make_level: str='default',
FFLAGS: Optional[str]=None,
LFLAGS: Optional[str]=None,
verbose: bool=False):
r"""Build executable for test"""

# Assumes GCC CLI
if not FFLAGS:
FFLAGS = os.environ.get('FFLAGS', "-O2 -fopenmp")
if not LFLAGS:
LFLAGS = os.environ.get('LFLAGS', FFLAGS)

if make_level.lower() == "new":
cmd = "".join((f"cd {self.test_path} ; make new ",
f"FFLAGS='{FFLAGS}' LFLAGS='{LFLAGS}'"))
elif make_level.lower() == "default":
# clean up *.o and *.mod files in test path only
for path in self.test_path.glob("*.o"):
path.unlink()
for path in self.test_path.glob("*.mod"):
path.unlink()
cmd = "".join((f"cd {self.test_path} ; make .exe ",
f"FFLAGS='{FFLAGS}' LFLAGS='{LFLAGS}'"))
elif make_level.lower() == "exe":
cmd = "".join((f"cd {self.test_path} ; make .exe ",
f"FFLAGS='{FFLAGS}' LFLAGS='{LFLAGS}'"))
else:
raise ValueError(f"Invaled make_level={make_level} given.")

try:
if verbose:
print(f"Build command: {cmd}")
subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
self.clean()
raise e

shutil.move(self.test_path / self.executable_name, self.temp_path)


def run_code(self):
r""""""
runclaw.runclaw(xclawcmd=self.temp_path / self.executable_name,
rundir=self.temp_path,
outdir=self.temp_path,
overwrite=True,
restart=False)

def clean(self):
""""""
pass


def check_frame(self, frame: int, indices: Iterable=(0,),
regression_path: Optional[Path]=None,
save: bool=False, **kwargs):
r""""""

if not regression_path:
regression_path = self.test_path / "regression_data"

# Load test output data
sol = solution.Solution(frame, path=self.temp_path)
sol_sums = [sol.q[i, ...].sum() for i in indices]

# Load regression data
regression_data = regression_path / f"frame{str(frame).zfill(4)}.txt"
if save:
np.savetxt(regression_data, sol_sums)
claw_git_status.make_git_status_file(outdir=regression_path)
regression_sum = np.loadtxt(regression_data)

# Compare data
kwargs.setdefault('rtol', 1e-14)
kwargs.setdefault('atol', 1e-8)
np.testing.assert_allclose(sol_sums, regression_sum, **kwargs)


def check_gauge(self, gauge_id: int,
indices: Iterable=(0,),
regression_path: Optional[Path]=None,
save: bool=False, **kwargs):
r"""Basic test to assert gauge equality

:Input:
- *save* (bool) - If *True* will save the output from this test to
the file *regresion_data.txt*. Default is *False*.
- *indices* (tuple) - Contains indices to compare in the gague
comparison. Defaults to *(0)*.
- *rtol* (float) - Relative tolerance used in the comparison, default
is *1e-14*. Note that the old *tolerance* input is now synonymous
with this parameter.
- *atol* (float) - Absolute tolerance used in the comparison, default
is *1e-08*.
"""

if not(isinstance(indices, tuple) or isinstance(indices, list)):
indices = tuple(indices)

if not regression_path:
regression_path = self.test_path / "regression_data"

# Load test output data
gauge = gauges.GaugeSolution(gauge_id, path=self.temp_path)
if gauge.q.shape[1] == 0:
raise AssertionError(f"Empty gauge {gauge_id}.")

# Load regression data
if save:
shutil.copy(self.temp_path / f"gauge{str(gauge_id).zfill(5)}.txt",
regression_path)
claw_git_status.make_git_status_file(outdir=regression_path)
regression_gauge = gauges.GaugeSolution(gauge_id, path=regression_path)

if gauge.q.shape[1] != regression_gauge.q.shape[1]:
raise AssertionError( "Gauges have different sizes, regression" +
f" gauge = {regression_gauge.q.shape}, " +
f"test gauge = {gauge.q.shape}")

# Compare data
kwargs.setdefault('rtol', 1e-14)
kwargs.setdefault('atol', 1e-8)
try:
for n in indices:
np.testing.assert_allclose(gauge.q[n, :],
regression_gauge.q[n, :],
**kwargs)
except AssertionError as e:
err_msg = "\n".join((e.args[0],
"Gauge Match Failed for gauge = %s" % gauge_id))
err_msg = "\n".join((err_msg, " failures in fields:"))
failure_indices = []
for n in indices:
if not np.allclose(gauge.q[n, :], regression_gauge.q[n, :],
**kwargs):
failure_indices.append(str(n))
index_str = ", ".join(failure_indices)
raise AssertionError(" ".join((err_msg, index_str)))

# Support for WIP decorator removed
# It did not seem to be used in any examples, so simplify for converting
# from nose to pytest. Note that for pytest one can use one of these
# decorators instead:
# import pytest
# @pytest.mark.xfail(reason='WIP') # for 'expected to fail'
# @pytest.mark.skip(reason='WIP') # to skip entirely
#


# Old unittest based framework - works with PyTest, but is being replaced by
# the runner above
class ClawpackRegressionTest(unittest.TestCase):

r"""Base Clawpcak regression test setup
Expand Down Expand Up @@ -283,13 +477,13 @@ def check_frame(self, save=False, indices=[0], frame_num=1,
regression_data_file = os.path.join(self.test_path, "regression_data",
file_name)
if save:
numpy.savetxt(regression_data_file, data_sum)
np.savetxt(regression_data_file, data_sum)
claw_git_status.make_git_status_file(
outdir=os.path.join(self.test_path, "regression_data"))

regression_sum = numpy.loadtxt(regression_data_file)
regression_sum = np.loadtxt(regression_data_file)

assert numpy.allclose(data_sum, regression_sum, rtol=rtol, atol=atol), \
assert np.allclose(data_sum, regression_sum, rtol=rtol, atol=atol), \
"\n new_data: %s, \n expected: %s" % (data_sum, regression_sum)


Expand Down Expand Up @@ -332,7 +526,7 @@ def check_gauges(self, save=False, gauge_id=1, indices=[0],
# Compare data
try:
for n in indices:
numpy.testing.assert_allclose(gauge.q[n, :],
np.testing.assert_allclose(gauge.q[n, :],
regression_gauge.q[n, :],
rtol=rtol, atol=atol,
verbose=False)
Expand All @@ -342,7 +536,7 @@ def check_gauges(self, save=False, gauge_id=1, indices=[0],
err_msg = "\n".join((err_msg, " failures in fields:"))
failure_indices = []
for n in indices:
if not numpy.allclose(gauge.q[n, :], regression_gauge.q[n, :],
if not np.allclose(gauge.q[n, :], regression_gauge.q[n, :],
rtol=rtol, atol=atol):
failure_indices.append(str(n))
index_str = ", ".join(failure_indices)
Expand Down