# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import base64
import hashlib
import os
import re
import shutil
import sys
import six
import llnl.util.filesystem as fs
import spack.error
import spack.paths
import spack.util.prefix
import spack.util.spack_json as sjson
from spack.spec import Spec
test_suite_filename = 'test_suite.lock'
results_filename = 'results.txt'
[docs]def get_escaped_text_output(filename):
"""Retrieve and escape the expected text output from the file
Args:
filename (str): path to the file
Returns:
list: escaped text lines read from the file
"""
with open(filename, 'r') as f:
# Ensure special characters are escaped as needed
expected = f.read()
# Split the lines to make it easier to debug failures when there is
# a lot of output
return [re.escape(ln) for ln in expected.split('\n')]
[docs]def get_test_stage_dir():
return spack.util.path.canonicalize_path(
spack.config.get('config:test_stage', spack.paths.default_test_path)
)
[docs]def get_all_test_suites():
stage_root = get_test_stage_dir()
if not os.path.isdir(stage_root):
return []
def valid_stage(d):
dirpath = os.path.join(stage_root, d)
return (os.path.isdir(dirpath) and
test_suite_filename in os.listdir(dirpath))
candidates = [
os.path.join(stage_root, d, test_suite_filename)
for d in os.listdir(stage_root)
if valid_stage(d)
]
test_suites = [TestSuite.from_file(c) for c in candidates]
return test_suites
[docs]def get_named_test_suites(name):
"""Return a list of the names of any test suites with that name."""
if not name:
raise TestSuiteNameError('Test suite name is required.')
test_suites = get_all_test_suites()
return [ts for ts in test_suites if ts.name == name]
[docs]def get_test_suite(name):
names = get_named_test_suites(name)
if len(names) > 1:
raise TestSuiteNameError(
'Too many suites named "{0}". May shadow hash.'.format(name)
)
if not names:
return None
return names[0]
[docs]def write_test_suite_file(suite):
"""Write the test suite to its lock file."""
with open(suite.stage.join(test_suite_filename), 'w') as f:
sjson.dump(suite.to_dict(), stream=f)
[docs]def write_test_summary(num_failed, num_skipped, num_untested, num_specs):
failed = "{0} failed, ".format(num_failed) if num_failed else ''
skipped = "{0} skipped, ".format(num_skipped) if num_skipped else ''
no_tests = "{0} no-tests, ".format(num_untested) if num_untested else ''
num_passed = num_specs - num_failed - num_untested - num_skipped
print("{:=^80}".format(" {0}{1}{2}{3} passed of {4} specs "
.format(failed, no_tests, skipped, num_passed, num_specs)))
[docs]class TestSuite(object):
def __init__(self, specs, alias=None):
# copy so that different test suites have different package objects
# even if they contain the same spec
self.specs = [spec.copy() for spec in specs]
self.current_test_spec = None # spec currently tested, can be virtual
self.current_base_spec = None # spec currently running do_test
self.alias = alias
self._hash = None
self.fails = 0
@property
def name(self):
return self.alias if self.alias else self.content_hash
@property
def content_hash(self):
if not self._hash:
json_text = sjson.dump(self.to_dict())
sha = hashlib.sha1(json_text.encode('utf-8'))
b32_hash = base64.b32encode(sha.digest()).lower()
if sys.version_info[0] >= 3:
b32_hash = b32_hash.decode('utf-8')
self._hash = b32_hash
return self._hash
def __call__(self, *args, **kwargs):
self.write_reproducibility_data()
remove_directory = kwargs.get('remove_directory', True)
dirty = kwargs.get('dirty', False)
fail_first = kwargs.get('fail_first', False)
externals = kwargs.get('externals', False)
skipped, untested = 0, 0
for spec in self.specs:
try:
if spec.package.test_suite:
raise TestSuiteSpecError(
"Package {0} cannot be run in two test suites at once"
.format(spec.package.name)
)
# Set up the test suite to know which test is running
spec.package.test_suite = self
self.current_base_spec = spec
self.current_test_spec = spec
# setup per-test directory in the stage dir
test_dir = self.test_dir_for_spec(spec)
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
fs.mkdirp(test_dir)
# run the package tests
spec.package.do_test(dirty=dirty, externals=externals)
# Clean up on success
if remove_directory:
shutil.rmtree(test_dir)
# Log test status based on whether any non-pass-only test
# functions were called
tested = os.path.exists(self.tested_file_for_spec(spec))
if tested:
status = 'PASSED'
else:
self.ensure_stage()
if spec.external and not externals:
status = 'SKIPPED'
skipped += 1
else:
status = 'NO-TESTS'
untested += 1
self.write_test_result(spec, status)
except BaseException as exc:
self.fails += 1
if isinstance(exc, (SyntaxError, TestSuiteSpecError)):
# Create the test log file and report the error.
self.ensure_stage()
msg = 'Testing package {0}\n{1}'\
.format(self.test_pkg_id(spec), str(exc))
_add_msg_to_file(self.log_file_for_spec(spec), msg)
self.write_test_result(spec, 'FAILED')
if fail_first:
break
finally:
spec.package.test_suite = None
self.current_test_spec = None
self.current_base_spec = None
write_test_summary(self.fails, skipped, untested, len(self.specs))
if self.fails:
raise TestSuiteFailure(self.fails)
[docs] def ensure_stage(self):
if not os.path.exists(self.stage):
fs.mkdirp(self.stage)
@property
def stage(self):
return spack.util.prefix.Prefix(
os.path.join(get_test_stage_dir(), self.content_hash))
@property
def results_file(self):
return self.stage.join(results_filename)
[docs] @classmethod
def test_pkg_id(cls, spec):
"""Build the standard install test package identifier
Args:
spec (Spec): instance of the spec under test
Returns:
(str): the install test package identifier
"""
return spec.format('{name}-{version}-{hash:7}')
[docs] @classmethod
def test_log_name(cls, spec):
return '%s-test-out.txt' % cls.test_pkg_id(spec)
[docs] def log_file_for_spec(self, spec):
return self.stage.join(self.test_log_name(spec))
[docs] def test_dir_for_spec(self, spec):
return self.stage.join(self.test_pkg_id(spec))
[docs] @classmethod
def tested_file_name(cls, spec):
return '%s-tested.txt' % cls.test_pkg_id(spec)
[docs] def tested_file_for_spec(self, spec):
return self.stage.join(self.tested_file_name(spec))
@property
def current_test_cache_dir(self):
if not (self.current_test_spec and self.current_base_spec):
raise TestSuiteSpecError(
"Unknown test cache directory: no specs being tested"
)
test_spec = self.current_test_spec
base_spec = self.current_base_spec
return self.test_dir_for_spec(base_spec).cache.join(test_spec.name)
@property
def current_test_data_dir(self):
if not (self.current_test_spec and self.current_base_spec):
raise TestSuiteSpecError(
"Unknown test data directory: no specs being tested"
)
test_spec = self.current_test_spec
base_spec = self.current_base_spec
return self.test_dir_for_spec(base_spec).data.join(test_spec.name)
[docs] def add_failure(self, exc, msg):
current_hash = self.current_base_spec.dag_hash()
current_failures = self.failures.get(current_hash, [])
current_failures.append((exc, msg))
self.failures[current_hash] = current_failures
[docs] def write_test_result(self, spec, result):
msg = "{0} {1}".format(self.test_pkg_id(spec), result)
_add_msg_to_file(self.results_file, msg)
[docs] def write_reproducibility_data(self):
for spec in self.specs:
repo_cache_path = self.stage.repo.join(spec.name)
spack.repo.path.dump_provenance(spec, repo_cache_path)
for vspec in spec.package.virtuals_provided:
repo_cache_path = self.stage.repo.join(vspec.name)
if not os.path.exists(repo_cache_path):
try:
spack.repo.path.dump_provenance(vspec, repo_cache_path)
except spack.repo.UnknownPackageError:
pass # not all virtuals have package files
write_test_suite_file(self)
[docs] def to_dict(self):
specs = [s.to_dict() for s in self.specs]
d = {'specs': specs}
if self.alias:
d['alias'] = self.alias
return d
[docs] @staticmethod
def from_dict(d):
specs = [Spec.from_dict(spec_dict) for spec_dict in d['specs']]
alias = d.get('alias', None)
return TestSuite(specs, alias)
[docs] @staticmethod
def from_file(filename):
try:
with open(filename, 'r') as f:
data = sjson.load(f)
test_suite = TestSuite.from_dict(data)
content_hash = os.path.basename(os.path.dirname(filename))
test_suite._hash = content_hash
return test_suite
except Exception as e:
raise six.raise_from(
sjson.SpackJSONError("error parsing JSON TestSuite:", str(e)),
e,
)
def _add_msg_to_file(filename, msg):
"""Add the message to the specified file
Args:
filename (str): path to the file
msg (str): message to be appended to the file
"""
with open(filename, 'a+') as f:
f.write('{0}\n'.format(msg))
[docs]class TestFailure(spack.error.SpackError):
"""Raised when package tests have failed for an installation."""
def __init__(self, failures):
# Failures are all exceptions
msg = "%d tests failed.\n" % len(failures)
for failure, message in failures:
msg += '\n\n%s\n' % str(failure)
msg += '\n%s\n' % message
super(TestFailure, self).__init__(msg)
[docs]class TestSuiteFailure(spack.error.SpackError):
"""Raised when one or more tests in a suite have failed."""
def __init__(self, num_failures):
msg = "%d test(s) in the suite failed.\n" % num_failures
super(TestSuiteFailure, self).__init__(msg)
[docs]class TestSuiteSpecError(spack.error.SpackError):
"""Raised when there is an issue associated with the spec being tested."""
[docs]class TestSuiteNameError(spack.error.SpackError):
"""Raised when there is an issue with the naming of the test suite."""