# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import base64
import contextlib
import enum
import hashlib
import inspect
import io
import os
import re
import shutil
import sys
from collections import Counter, OrderedDict
from typing import Callable, List, Optional, Tuple, Type, TypeVar, Union
import llnl.util.filesystem as fs
import llnl.util.tty as tty
from llnl.string import plural
from llnl.util.lang import nullcontext
from llnl.util.tty.color import colorize
import spack.error
import spack.paths
import spack.util.spack_json as sjson
from spack.installer import InstallError
from spack.spec import Spec
from spack.util.prefix import Prefix
#: Stand-alone test failure info type
TestFailureType = Tuple[BaseException, str]
#: Name of the test suite's (JSON) lock file
test_suite_filename = "test_suite.lock"
#: Name of the test suite results (summary) file
results_filename = "results.txt"
#: Name of the Spack install phase-time test log file
spack_install_test_log = "install-time-test-log.txt"
ListOrStringType = Union[str, List[str]]
LogType = Union["tty.log.nixlog", "tty.log.winlog"]
Pb = TypeVar("Pb", bound="spack.package_base.PackageBase")
PackageObjectOrClass = Union[Pb, Type[Pb]]
[docs]
class TestStatus(enum.Enum):
"""Names of different stand-alone test states."""
NO_TESTS = -1
SKIPPED = 0
FAILED = 1
PASSED = 2
def __str__(self):
return f"{self.name}"
[docs]
def lower(self):
name = f"{self.name}"
return name.lower()
[docs]
def get_escaped_text_output(filename: str) -> List[str]:
"""Retrieve and escape the expected text output from the file
Args:
filename: path to the file
Returns:
escaped text lines read from the file
"""
with open(filename) as f:
# Ensure special characters are escaped as needed
expected = f.read()
# Split the lines to make it easier to debug failures when there is
# a lot of output
return [re.escape(ln) for ln in expected.split("\n")]
[docs]
def get_test_stage_dir():
"""Retrieves the ``config:test_stage`` path to the configured test stage
root directory
Returns:
str: absolute path to the configured test stage root or, if none,
the default test stage path
"""
return spack.util.path.canonicalize_path(
spack.config.get("config:test_stage", spack.paths.default_test_path)
)
[docs]
def check_outputs(expected: Union[list, set, str], actual: str):
"""Ensure the expected outputs are contained in the actual outputs.
Args:
expected: expected raw output string(s)
actual: actual output string
Raises:
RuntimeError: the expected output is not found in the actual output
"""
expected = expected if isinstance(expected, (list, set)) else [expected]
errors = []
for check in expected:
if not re.search(check, actual):
errors.append(f"Expected '{check}' in output '{actual}'")
if errors:
raise RuntimeError("\n ".join(errors))
[docs]
def find_required_file(
root: str, filename: str, expected: int = 1, recursive: bool = True
) -> ListOrStringType:
"""Find the required file(s) under the root directory.
Args:
root: root directory for the search
filename: name of the file being located
expected: expected number of files to be found under the directory
(default is 1)
recursive: ``True`` if subdirectories are to be recursively searched,
else ``False`` (default is ``True``)
Returns: the path(s), relative to root, to the required file(s)
Raises:
Exception: SkipTest when number of files detected does not match expected
"""
paths = fs.find(root, filename, recursive=recursive)
num_paths = len(paths)
if num_paths != expected:
files = ": {}".format(", ".join(paths)) if num_paths else ""
raise SkipTest(
"Expected {} of {} under {} but {} found{}".format(
plural(expected, "copy", "copies"),
filename,
root,
plural(num_paths, "copy", "copies"),
files,
)
)
return paths[0] if expected == 1 else paths
[docs]
def install_test_root(pkg: Pb):
"""The install test root directory.
Args:
pkg: package being tested
"""
return os.path.join(pkg.metadata_dir, "test")
[docs]
def print_message(logger: LogType, msg: str, verbose: bool = False):
"""Print the message to the log, optionally echoing.
Args:
logger: instance of the output logger (e.g. nixlog or winlog)
msg: message being output
verbose: ``True`` displays verbose output, ``False`` suppresses
it (``False`` is default)
"""
if verbose:
with logger.force_echo():
tty.info(msg, format="g")
else:
tty.info(msg, format="g")
[docs]
def overall_status(current_status: "TestStatus", substatuses: List["TestStatus"]) -> "TestStatus":
"""Determine the overall status based on the current and associated sub status values.
Args:
current_status: current overall status, assumed to default to PASSED
substatuses: status of each test part or overall status of each test spec
Returns:
test status encompassing the main test and all subtests
"""
if current_status in [TestStatus.SKIPPED, TestStatus.NO_TESTS, TestStatus.FAILED]:
return current_status
skipped = 0
for status in substatuses:
if status == TestStatus.FAILED:
return status
elif status == TestStatus.SKIPPED:
skipped += 1
if skipped and skipped == len(substatuses):
return TestStatus.SKIPPED
return current_status
[docs]
class PackageTest:
"""The class that manages stand-alone (post-install) package tests."""
def __init__(self, pkg: Pb):
"""
Args:
pkg: package being tested
Raises:
ValueError: if the package is not concrete
"""
if not pkg.spec.concrete:
raise ValueError("Stand-alone tests require a concrete package")
self.counts: "Counter" = Counter() # type: ignore[attr-defined]
self.pkg = pkg
self.test_failures: List[TestFailureType] = []
self.test_parts: OrderedDict[str, "TestStatus"] = OrderedDict()
self.test_log_file: str
self.pkg_id: str
if pkg.test_suite:
# Running stand-alone tests
self.test_log_file = pkg.test_suite.log_file_for_spec(pkg.spec)
self.tested_file = pkg.test_suite.tested_file_for_spec(pkg.spec)
self.pkg_id = pkg.test_suite.test_pkg_id(pkg.spec)
else:
# Running phase-time tests for a single package whose results are
# retained in the package's stage directory.
pkg.test_suite = TestSuite([pkg.spec])
self.test_log_file = fs.join_path(pkg.stage.path, spack_install_test_log)
self.pkg_id = pkg.spec.format("{name}-{version}-{hash:7}")
# Internal logger for test part processing
self._logger = None
@property
def logger(self) -> Optional[LogType]:
"""The current logger or, if none, sets to one."""
if not self._logger:
self._logger = tty.log.log_output(self.test_log_file)
return self._logger
[docs]
@contextlib.contextmanager
def test_logger(self, verbose: bool = False, externals: bool = False):
"""Context manager for setting up the test logger
Args:
verbose: Display verbose output, including echoing to stdout,
otherwise suppress it
externals: ``True`` for performing tests if external package,
``False`` to skip them
"""
fs.touch(self.test_log_file) # Otherwise log_parse complains
fs.set_install_permissions(self.test_log_file)
with tty.log.log_output(self.test_log_file, verbose) as self._logger:
with self.logger.force_echo(): # type: ignore[union-attr]
tty.msg("Testing package " + colorize(r"@*g{" + self.pkg_id + r"}"))
# use debug print levels for log file to record commands
old_debug = tty.is_debug()
tty.set_debug(True)
try:
yield self.logger
finally:
# reset debug level
tty.set_debug(old_debug)
@property
def archived_install_test_log(self) -> str:
return fs.join_path(self.pkg.metadata_dir, spack_install_test_log)
[docs]
def archive_install_test_log(self, dest_dir: str):
if os.path.exists(self.test_log_file):
fs.install(self.test_log_file, self.archived_install_test_log)
[docs]
def add_failure(self, exception: Exception, msg: str):
"""Add the failure details to the current list."""
self.test_failures.append((exception, msg))
[docs]
def status(self, name: str, status: "TestStatus", msg: Optional[str] = None):
"""Track and print the test status for the test part name."""
part_name = f"{self.pkg.__class__.__name__}::{name}"
extra = "" if msg is None else f": {msg}"
# Handle the special case of a test part consisting of subparts.
# The containing test part can be PASSED while sub-parts (assumed
# to start with the same name) may not have PASSED. This extra
# check is used to ensure the containing test part is not claiming
# to have passed when at least one subpart failed.
substatuses = []
for pname, substatus in self.test_parts.items():
if pname != part_name and pname.startswith(part_name):
substatuses.append(substatus)
if substatuses:
status = overall_status(status, substatuses)
print(f"{status}: {part_name}{extra}")
self.test_parts[part_name] = status
self.counts[status] += 1
[docs]
def phase_tests(
self, builder: spack.builder.Builder, phase_name: str, method_names: List[str]
):
"""Execute the builder's package phase-time tests.
Args:
builder: builder for package being tested
phase_name: the name of the build-time phase (e.g., ``build``, ``install``)
method_names: phase-specific callback method names
"""
verbose = tty.is_verbose()
fail_fast = spack.config.get("config:fail_fast", False)
with self.test_logger(verbose=verbose, externals=False) as logger:
# Report running each of the methods in the build log
print_message(logger, f"Running {phase_name}-time tests", verbose)
builder.pkg.test_suite.current_test_spec = builder.pkg.spec
builder.pkg.test_suite.current_base_spec = builder.pkg.spec
# TODO (post-34236): "test"->"test_" once remove deprecated methods
have_tests = any(name.startswith("test") for name in method_names)
if have_tests:
copy_test_files(builder.pkg, builder.pkg.spec)
for name in method_names:
try:
# Prefer the method in the package over the builder's.
# We need this primarily to pick up arbitrarily named test
# methods but also some build-time checks.
fn = getattr(builder.pkg, name, getattr(builder, name))
msg = f"RUN-TESTS: {phase_name}-time tests [{name}]"
print_message(logger, msg, verbose)
fn()
except AttributeError as e:
msg = f"RUN-TESTS: method not implemented [{name}]"
print_message(logger, msg, verbose)
self.add_failure(e, msg)
if fail_fast:
break
if have_tests:
print_message(logger, "Completed testing", verbose)
# Raise any collected failures here
if self.test_failures:
raise TestFailure(self.test_failures)
[docs]
def stand_alone_tests(self, kwargs):
"""Run the package's stand-alone tests.
Args:
kwargs (dict): arguments to be used by the test process
"""
import spack.build_environment
spack.build_environment.start_build_process(self.pkg, test_process, kwargs)
[docs]
def parts(self) -> int:
"""The total number of (checked) test parts."""
try:
# New in Python 3.10
total = self.counts.total() # type: ignore[attr-defined]
except AttributeError:
nums = [n for _, n in self.counts.items()]
total = sum(nums)
return total
[docs]
def print_log_path(self):
"""Print the test log file path."""
log = self.archived_install_test_log
if not os.path.isfile(log):
log = self.test_log_file
if not (log and os.path.isfile(log)):
tty.debug("There is no test log file (staged or installed)")
return
print(f"\nSee test results at:\n {log}")
[docs]
def ran_tests(self) -> bool:
"""``True`` if ran tests, ``False`` otherwise."""
return self.parts() > self.counts[TestStatus.NO_TESTS]
[docs]
def summarize(self):
"""Collect test results summary lines for this spec."""
lines = []
lines.append("{:=^80}".format(f" SUMMARY: {self.pkg_id} "))
for name, status in self.test_parts.items():
msg = f"{name} .. {status}"
lines.append(msg)
summary = [f"{n} {s.lower()}" for s, n in self.counts.items() if n > 0]
totals = " {} of {} parts ".format(", ".join(summary), self.parts())
lines.append(f"{totals:=^80}")
return lines
[docs]
def write_tested_status(self):
"""Write the overall status to the tested file.
If there any test part failures, then the tests failed. If all test
parts are skipped, then the tests were skipped. If any tests passed
then the tests passed; otherwise, there were not tests executed.
"""
status = TestStatus.NO_TESTS
if self.counts[TestStatus.FAILED] > 0:
status = TestStatus.FAILED
else:
skipped = self.counts[TestStatus.SKIPPED]
if skipped and self.parts() == skipped:
status = TestStatus.SKIPPED
elif self.counts[TestStatus.PASSED] > 0:
status = TestStatus.PASSED
with open(self.tested_file, "w") as f:
f.write(f"{status.value}\n")
[docs]
@contextlib.contextmanager
def test_part(pkg: Pb, test_name: str, purpose: str, work_dir: str = ".", verbose: bool = False):
wdir = "." if work_dir is None else work_dir
tester = pkg.tester
# TODO (post-34236): "test"->"test_" once remove deprecated methods
assert test_name and test_name.startswith(
"test"
), f"Test name must start with 'test' but {test_name} was provided"
if test_name == "test":
tty.warn(
"{}: the 'test' method is deprecated. Convert stand-alone "
"test(s) to methods with names starting 'test_'.".format(pkg.name)
)
title = "test: {}: {}".format(test_name, purpose or "unspecified purpose")
with fs.working_dir(wdir, create=True):
try:
context = tester.logger.force_echo if verbose else nullcontext
with context():
tty.info(title, format="g")
yield
tester.status(test_name, TestStatus.PASSED)
except SkipTest as e:
tester.status(test_name, TestStatus.SKIPPED, str(e))
except (AssertionError, BaseException) as e:
# print a summary of the error to the log file
# so that cdash and junit reporters know about it
exc_type, _, tb = sys.exc_info()
tester.status(test_name, TestStatus.FAILED, str(e))
import traceback
# remove the current call frame to exclude the extract_stack
# call from the error
stack = traceback.extract_stack()[:-1]
# Package files have a line added at import time, so we re-read
# the file to make line numbers match. We have to subtract two
# from the line number because the original line number is
# inflated once by the import statement and the lines are
# displaced one by the import statement.
for i, entry in enumerate(stack):
filename, lineno, function, text = entry
if spack.repo.is_package_file(filename):
with open(filename) as f:
lines = f.readlines()
new_lineno = lineno - 2
text = lines[new_lineno]
if isinstance(entry, tuple):
new_entry = (filename, new_lineno, function, text)
stack[i] = new_entry # type: ignore[call-overload]
elif isinstance(entry, list):
stack[i][1] = new_lineno # type: ignore[index]
# Format and print the stack
out = traceback.format_list(stack)
for line in out:
print(line.rstrip("\n"))
if exc_type is spack.util.executable.ProcessError or exc_type is TypeError:
iostr = io.StringIO()
spack.build_environment.write_log_summary(
iostr, "test", tester.test_log_file, last=1
) # type: ignore[assignment]
m = iostr.getvalue()
else:
# We're below the package context, so get context from
# stack instead of from traceback.
# The traceback is truncated here, so we can't use it to
# traverse the stack.
m = "\n".join(spack.build_environment.get_package_context(tb))
exc = e # e is deleted after this block
# If we fail fast, raise another error
if spack.config.get("config:fail_fast", False):
raise TestFailure([(exc, m)])
else:
tester.add_failure(exc, m)
[docs]
def copy_test_files(pkg: Pb, test_spec: spack.spec.Spec):
"""Copy the spec's cached and custom test files to the test stage directory.
Args:
pkg: package being tested
test_spec: spec being tested, where the spec may be virtual
Raises:
TestSuiteError: package must be part of an active test suite
"""
if pkg is None or pkg.test_suite is None:
base = "Cannot copy test files"
msg = (
f"{base} without a package"
if pkg is None
else f"{pkg.name}: {base}: test suite is missing"
)
raise TestSuiteError(msg)
# copy installed test sources cache into test stage dir
if test_spec.concrete:
cache_source = install_test_root(test_spec.package)
cache_dir = pkg.test_suite.current_test_cache_dir
if os.path.isdir(cache_source) and not os.path.exists(cache_dir):
fs.install_tree(cache_source, cache_dir)
# copy test data into test stage data dir
try:
pkg_cls = test_spec.package_class
except spack.repo.UnknownPackageError:
tty.debug(f"{test_spec.name}: skipping test data copy since no package class found")
return
data_source = Prefix(pkg_cls.package_dir).test
data_dir = pkg.test_suite.current_test_data_dir
if os.path.isdir(data_source) and not os.path.exists(data_dir):
# We assume data dir is used read-only
# maybe enforce this later
shutil.copytree(data_source, data_dir)
[docs]
def test_function_names(pkg: PackageObjectOrClass, add_virtuals: bool = False) -> List[str]:
"""Grab the names of all non-empty test functions.
Args:
pkg: package or package class of interest
add_virtuals: ``True`` adds test methods of provided package
virtual, ``False`` only returns test functions of the package
Returns:
names of non-empty test functions
Raises:
ValueError: occurs if pkg is not a package class
"""
fns = test_functions(pkg, add_virtuals)
return [f"{cls_name}.{fn.__name__}" for (cls_name, fn) in fns]
[docs]
def test_functions(
pkg: PackageObjectOrClass, add_virtuals: bool = False
) -> List[Tuple[str, Callable]]:
"""Grab all non-empty test functions.
Args:
pkg: package or package class of interest
add_virtuals: ``True`` adds test methods of provided package
virtual, ``False`` only returns test functions of the package
Returns:
list of non-empty test functions' (name, function)
Raises:
ValueError: occurs if pkg is not a package class
"""
instance = isinstance(pkg, spack.package_base.PackageBase)
if not (instance or issubclass(pkg, spack.package_base.PackageBase)): # type: ignore[arg-type]
raise ValueError(f"Expected a package (class), not {pkg} ({type(pkg)})")
pkg_cls = pkg.__class__ if instance else pkg
classes = [pkg_cls]
if add_virtuals:
vpkgs = virtuals(pkg)
for vname in vpkgs:
try:
classes.append((Spec(vname)).package_class)
except spack.repo.UnknownPackageError:
tty.debug(f"{vname}: virtual does not appear to have a package file")
# TODO (post-34236): Remove if removing empty test method check
def skip(line):
# This should match the lines in the deprecated test() method
ln = line.strip()
return ln.startswith("#") or ("warn" in ln and "deprecated" in ln)
doc_regex = r'\s+("""[\w\s\(\)\-\,\;\:]+""")'
tests = []
for clss in classes:
methods = inspect.getmembers(clss, predicate=lambda x: inspect.isfunction(x))
for name, test_fn in methods:
# TODO (post-34236): "test"->"test_" once remove deprecated methods
if not name.startswith("test"):
continue
# TODO (post-34236): Could remove empty method check once remove
# TODO (post-34236): deprecated methods though some use cases,
# TODO (post-34236): such as checking packages have actual, non-
# TODO (post-34236): empty tests, may want this check to remain.
source = re.sub(doc_regex, r"", inspect.getsource(test_fn)).splitlines()[1:]
lines = [ln.strip() for ln in source if not skip(ln)]
if not lines:
continue
tests.append((clss.__name__, test_fn)) # type: ignore[union-attr]
return tests
[docs]
def process_test_parts(pkg: Pb, test_specs: List[spack.spec.Spec], verbose: bool = False):
"""Process test parts associated with the package.
Args:
pkg: package being tested
test_specs: list of test specs
verbose: Display verbose output (suppress by default)
Raises:
TestSuiteError: package must be part of an active test suite
"""
if pkg is None or pkg.test_suite is None:
base = "Cannot process tests"
msg = (
f"{base} without a package"
if pkg is None
else f"{pkg.name}: {base}: test suite is missing"
)
raise TestSuiteError(msg)
test_suite = pkg.test_suite
tester = pkg.tester
try:
work_dir = test_suite.test_dir_for_spec(pkg.spec)
for spec in test_specs:
test_suite.current_test_spec = spec
# grab test functions associated with the spec, which may be virtual
try:
tests = test_functions(spec.package_class)
except spack.repo.UnknownPackageError:
# Some virtuals don't have a package so we don't want to report
# them as not having tests when that isn't appropriate.
continue
if len(tests) == 0:
tester.status(spec.name, TestStatus.NO_TESTS)
continue
# copy custom and cached test files to the test stage directory
copy_test_files(pkg, spec)
# Run the tests
for _, test_fn in tests:
with test_part(
pkg,
test_fn.__name__,
purpose=getattr(test_fn, "__doc__"),
work_dir=work_dir,
verbose=verbose,
):
test_fn(pkg)
# If fail-fast was on, we error out above
# If we collect errors, raise them in batch here
if tester.test_failures:
raise TestFailure(tester.test_failures)
finally:
if tester.ran_tests():
tester.write_tested_status()
# log one more test message to provide a completion timestamp
# for CDash reporting
tty.msg("Completed testing")
lines = tester.summarize()
tty.msg("\n{}".format("\n".join(lines)))
if tester.test_failures:
# Print the test log file path
tty.msg(f"\n\nSee test results at:\n {tester.test_log_file}")
else:
tty.msg("No tests to run")
[docs]
def test_process(pkg: Pb, kwargs):
verbose = kwargs.get("verbose", True)
externals = kwargs.get("externals", False)
with pkg.tester.test_logger(verbose, externals) as logger:
if pkg.spec.external and not externals:
print_message(logger, "Skipped tests for external package", verbose)
pkg.tester.status(pkg.spec.name, TestStatus.SKIPPED)
return
if not pkg.spec.installed:
print_message(logger, "Skipped not installed package", verbose)
pkg.tester.status(pkg.spec.name, TestStatus.SKIPPED)
return
# run test methods from the package and all virtuals it provides
v_names = virtuals(pkg)
test_specs = [pkg.spec] + [spack.spec.Spec(v_name) for v_name in sorted(v_names)]
process_test_parts(pkg, test_specs, verbose)
[docs]
def virtuals(pkg):
"""Return a list of unique virtuals for the package.
Args:
pkg: package of interest
Returns: names of unique virtual packages
"""
# provided virtuals have to be deduped by name
v_names = list({vspec.name for vspec in pkg.virtuals_provided})
# hack for compilers that are not dependencies (yet)
# TODO: this all eventually goes away
c_names = ("gcc", "intel", "intel-parallel-studio", "pgi")
if pkg.name in c_names:
v_names.extend(["c", "cxx", "fortran"])
if pkg.spec.satisfies("llvm+clang"):
v_names.extend(["c", "cxx"])
return v_names
[docs]
def get_all_test_suites():
"""Retrieves all validly staged TestSuites
Returns:
list: a list of TestSuite objects, which may be empty if there are none
"""
stage_root = get_test_stage_dir()
if not os.path.isdir(stage_root):
return []
def valid_stage(d):
dirpath = os.path.join(stage_root, d)
return os.path.isdir(dirpath) and test_suite_filename in os.listdir(dirpath)
candidates = [
os.path.join(stage_root, d, test_suite_filename)
for d in os.listdir(stage_root)
if valid_stage(d)
]
test_suites = [TestSuite.from_file(c) for c in candidates]
return test_suites
[docs]
def get_named_test_suites(name):
"""Retrieves test suites with the provided name.
Returns:
list: a list of matching TestSuite instances, which may be empty if none
Raises:
Exception: TestSuiteNameError if no name is provided
"""
if not name:
raise TestSuiteNameError("Test suite name is required.")
test_suites = get_all_test_suites()
return [ts for ts in test_suites if ts.name == name]
[docs]
def get_test_suite(name: str) -> Optional["TestSuite"]:
"""Ensure there is only one matching test suite with the provided name.
Returns:
the name if one matching test suite, else None
Raises:
TestSuiteNameError: If there are more than one matching TestSuites
"""
suites = get_named_test_suites(name)
if len(suites) > 1:
raise TestSuiteNameError(f"Too many suites named '{name}'. May shadow hash.")
if not suites:
return None
return suites[0]
[docs]
def write_test_suite_file(suite):
"""Write the test suite to its (JSON) lock file."""
with open(suite.stage.join(test_suite_filename), "w") as f:
sjson.dump(suite.to_dict(), stream=f)
[docs]
def write_test_summary(counts: "Counter"):
"""Write summary of the totals for each relevant status category.
Args:
counts: counts of the occurrences of relevant test status types
"""
summary = [f"{n} {s.lower()}" for s, n in counts.items() if n > 0]
try:
# New in Python 3.10
total = counts.total() # type: ignore[attr-defined]
except AttributeError:
nums = [n for _, n in counts.items()]
total = sum(nums)
if total:
print("{:=^80}".format(" {} of {} ".format(", ".join(summary), plural(total, "spec"))))
[docs]
class TestSuite:
"""The class that manages specs for ``spack test run`` execution."""
def __init__(self, specs, alias=None):
# copy so that different test suites have different package objects
# even if they contain the same spec
self.specs = [spec.copy() for spec in specs]
self.current_test_spec = None # spec currently tested, can be virtual
self.current_base_spec = None # spec currently running do_test
self.alias = alias
self._hash = None
self._stage = None
self.counts: "Counter" = Counter()
@property
def name(self):
"""The name (alias or, if none, hash) of the test suite."""
return self.alias if self.alias else self.content_hash
@property
def content_hash(self):
"""The hash used to uniquely identify the test suite."""
if not self._hash:
json_text = sjson.dump(self.to_dict())
sha = hashlib.sha1(json_text.encode("utf-8"))
b32_hash = base64.b32encode(sha.digest()).lower()
b32_hash = b32_hash.decode("utf-8")
self._hash = b32_hash
return self._hash
def __call__(self, *args, **kwargs):
self.write_reproducibility_data()
remove_directory = kwargs.get("remove_directory", True)
dirty = kwargs.get("dirty", False)
fail_first = kwargs.get("fail_first", False)
externals = kwargs.get("externals", False)
for spec in self.specs:
try:
if spec.package.test_suite:
raise TestSuiteSpecError(
"Package {} cannot be run in two test suites at once".format(
spec.package.name
)
)
# Set up the test suite to know which test is running
spec.package.test_suite = self
self.current_base_spec = spec
self.current_test_spec = spec
# setup per-test directory in the stage dir
test_dir = self.test_dir_for_spec(spec)
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
fs.mkdirp(test_dir)
# run the package tests
spec.package.do_test(dirty=dirty, externals=externals)
# Clean up on success
if remove_directory:
shutil.rmtree(test_dir)
status = self.test_status(spec, externals)
self.counts[status] += 1
self.write_test_result(spec, status)
except SkipTest:
status = TestStatus.SKIPPED
self.counts[status] += 1
self.write_test_result(spec, TestStatus.SKIPPED)
except BaseException as exc:
status = TestStatus.FAILED
self.counts[status] += 1
tty.debug(f"Test failure: {str(exc)}")
if isinstance(exc, (SyntaxError, TestSuiteSpecError)):
# Create the test log file and report the error.
self.ensure_stage()
msg = f"Testing package {self.test_pkg_id(spec)}\n{str(exc)}"
_add_msg_to_file(self.log_file_for_spec(spec), msg)
msg = f"Test failure: {str(exc)}"
_add_msg_to_file(self.log_file_for_spec(spec), msg)
self.write_test_result(spec, TestStatus.FAILED)
if fail_first:
break
finally:
spec.package.test_suite = None
self.current_test_spec = None
self.current_base_spec = None
write_test_summary(self.counts)
if self.counts[TestStatus.FAILED]:
for spec in self.specs:
print(
"\nSee {} test results at:\n {}".format(
spec.format("{name}-{version}-{hash:7}"), self.log_file_for_spec(spec)
)
)
failures = self.counts[TestStatus.FAILED]
if failures:
raise TestSuiteFailure(failures)
[docs]
def test_status(self, spec: spack.spec.Spec, externals: bool) -> Optional[TestStatus]:
"""Determine the overall test results status for the spec.
Args:
spec: instance of the spec under test
externals: ``True`` if externals are to be tested, else ``False``
Returns:
the spec's test status if available or ``None``
"""
tests_status_file = self.tested_file_for_spec(spec)
if not os.path.exists(tests_status_file):
self.ensure_stage()
if spec.external and not externals:
status = TestStatus.SKIPPED
elif not spec.installed:
status = TestStatus.SKIPPED
else:
status = TestStatus.NO_TESTS
return status
with open(tests_status_file, "r") as f:
value = (f.read()).strip("\n")
return TestStatus(int(value)) if value else TestStatus.NO_TESTS
[docs]
def ensure_stage(self):
"""Ensure the test suite stage directory exists."""
if not os.path.exists(self.stage):
fs.mkdirp(self.stage)
@property
def stage(self):
"""The root test suite stage directory.
Returns:
str: the spec's test stage directory path
"""
if not self._stage:
self._stage = Prefix(fs.join_path(get_test_stage_dir(), self.content_hash))
return self._stage
@stage.setter
def stage(self, value):
"""Set the value of a non-default stage directory."""
self._stage = value if isinstance(value, Prefix) else Prefix(value)
@property
def results_file(self):
"""The path to the results summary file."""
return self.stage.join(results_filename)
[docs]
@classmethod
def test_pkg_id(cls, spec):
"""The standard install test package identifier.
Args:
spec: instance of the spec under test
Returns:
str: the install test package identifier
"""
return spec.format_path("{name}-{version}-{hash:7}")
[docs]
@classmethod
def test_log_name(cls, spec):
"""The standard log filename for a spec.
Args:
spec (spack.spec.Spec): instance of the spec under test
Returns:
str: the spec's log filename
"""
return "%s-test-out.txt" % cls.test_pkg_id(spec)
[docs]
def log_file_for_spec(self, spec):
"""The test log file path for the provided spec.
Args:
spec (spack.spec.Spec): instance of the spec under test
Returns:
str: the path to the spec's log file
"""
return self.stage.join(self.test_log_name(spec))
[docs]
def test_dir_for_spec(self, spec):
"""The path to the test stage directory for the provided spec.
Args:
spec (spack.spec.Spec): instance of the spec under test
Returns:
str: the spec's test stage directory path
"""
return Prefix(self.stage.join(self.test_pkg_id(spec)))
[docs]
@classmethod
def tested_file_name(cls, spec):
"""The standard test status filename for the spec.
Args:
spec (spack.spec.Spec): instance of the spec under test
Returns:
str: the spec's test status filename
"""
return "%s-tested.txt" % cls.test_pkg_id(spec)
[docs]
def tested_file_for_spec(self, spec):
"""The test status file path for the spec.
Args:
spec (spack.spec.Spec): instance of the spec under test
Returns:
str: the spec's test status file path
"""
return fs.join_path(self.stage, self.tested_file_name(spec))
@property
def current_test_cache_dir(self):
"""Path to the test stage directory where the current spec's cached
build-time files were automatically copied.
Returns:
str: path to the current spec's staged, cached build-time files.
Raises:
TestSuiteSpecError: If there is no spec being tested
"""
if not (self.current_test_spec and self.current_base_spec):
raise TestSuiteSpecError("Unknown test cache directory: no specs being tested")
test_spec = self.current_test_spec
base_spec = self.current_base_spec
return self.test_dir_for_spec(base_spec).cache.join(test_spec.name)
@property
def current_test_data_dir(self):
"""Path to the test stage directory where the current spec's custom
package (data) files were automatically copied.
Returns:
str: path to the current spec's staged, custom package (data) files
Raises:
TestSuiteSpecError: If there is no spec being tested
"""
if not (self.current_test_spec and self.current_base_spec):
raise TestSuiteSpecError("Unknown test data directory: no specs being tested")
test_spec = self.current_test_spec
base_spec = self.current_base_spec
return self.test_dir_for_spec(base_spec).data.join(test_spec.name)
[docs]
def write_test_result(self, spec, result):
"""Write the spec's test result to the test suite results file.
Args:
spec (spack.spec.Spec): instance of the spec under test
result (str): result from the spec's test execution (e.g, PASSED)
"""
msg = f"{self.test_pkg_id(spec)} {result}"
_add_msg_to_file(self.results_file, msg)
[docs]
def write_reproducibility_data(self):
for spec in self.specs:
repo_cache_path = self.stage.repo.join(spec.name)
spack.repo.PATH.dump_provenance(spec, repo_cache_path)
for vspec in spec.package.virtuals_provided:
repo_cache_path = self.stage.repo.join(vspec.name)
if not os.path.exists(repo_cache_path):
try:
spack.repo.PATH.dump_provenance(vspec, repo_cache_path)
except spack.repo.UnknownPackageError:
pass # not all virtuals have package files
write_test_suite_file(self)
[docs]
def to_dict(self):
"""Build a dictionary for the test suite.
Returns:
dict: The dictionary contains entries for up to two keys:
specs: list of the test suite's specs in dictionary form
alias: the alias, or name, given to the test suite if provided
"""
specs = [s.to_dict() for s in self.specs]
d = {"specs": specs}
if self.alias:
d["alias"] = self.alias
return d
[docs]
@staticmethod
def from_dict(d):
"""Instantiates a TestSuite based on a dictionary specs and an
optional alias:
specs: list of the test suite's specs in dictionary form
alias: the test suite alias
Returns:
TestSuite: Instance created from the specs
"""
specs = [Spec.from_dict(spec_dict) for spec_dict in d["specs"]]
alias = d.get("alias", None)
return TestSuite(specs, alias)
[docs]
@staticmethod
def from_file(filename):
"""Instantiate a TestSuite using the specs and optional alias
provided in the given file.
Args:
filename (str): The path to the JSON file containing the test
suite specs and optional alias.
Raises:
BaseException: sjson.SpackJSONError if problem parsing the file
"""
try:
with open(filename) as f:
data = sjson.load(f)
test_suite = TestSuite.from_dict(data)
content_hash = os.path.basename(os.path.dirname(filename))
test_suite._hash = content_hash
return test_suite
except Exception as e:
raise sjson.SpackJSONError("error parsing JSON TestSuite:", e)
def _add_msg_to_file(filename, msg):
"""Append the message to the specified file.
Args:
filename (str): path to the file
msg (str): message to be appended to the file
"""
with open(filename, "a+") as f:
f.write(f"{msg}\n")
[docs]
class SkipTest(Exception):
"""Raised when a test (part) is being skipped."""
[docs]
class TestFailure(spack.error.SpackError):
"""Raised when package tests have failed for an installation."""
def __init__(self, failures: List[TestFailureType]):
# Failures are all exceptions
num = len(failures)
msg = "{} failed.\n".format(plural(num, "test"))
for failure, message in failures:
msg += "\n\n%s\n" % str(failure)
msg += "\n%s\n" % message
super().__init__(msg)
[docs]
class TestSuiteError(spack.error.SpackError):
"""Raised when there is an error with the test suite."""
[docs]
class TestSuiteFailure(spack.error.SpackError):
"""Raised when one or more tests in a suite have failed."""
def __init__(self, num_failures):
msg = "%d test(s) in the suite failed.\n" % num_failures
super().__init__(msg)
[docs]
class TestSuiteSpecError(spack.error.SpackError):
"""Raised when there is an issue associated with the spec being tested."""
[docs]
class TestSuiteNameError(spack.error.SpackError):
"""Raised when there is an issue with the naming of the test suite."""