# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import argparse
import ast
import os
import re
import sys
import warnings
from itertools import islice, zip_longest
from typing import Callable, Dict, List, Optional
import spack.llnl.util.tty as tty
import spack.llnl.util.tty.color as color
import spack.paths
import spack.repo
import spack.util.git
import spack.util.spack_yaml
from spack.llnl.util.filesystem import working_dir
from spack.spec_parser import NAME, VERSION_LIST, SpecTokens
from spack.tokenize import Token, TokenBase, Tokenizer
from spack.util.executable import Executable, which
description = "runs source code style checks on spack"
section = "developer"
level = "long"
[docs]
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks"""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
for group in zip_longest(*args, fillvalue=fillvalue):
yield filter(None, group)
#: List of paths to exclude from checks -- relative to spack root
exclude_paths = [os.path.relpath(spack.paths.vendor_path, spack.paths.prefix)]
#: Order in which tools should be run. flake8 is last so that it can
#: double-check the results of other tools (if, e.g., ``--fix`` was provided)
#: The list maps an executable name to a method to ensure the tool is
#: bootstrapped or present in the environment.
tool_names = ["import", "isort", "black", "flake8", "mypy"]
#: warnings to ignore in mypy
mypy_ignores = [
# same as `disable_error_code = "annotation-unchecked"` in pyproject.toml, which
# doesn't exist in mypy 0.971 for Python 3.6
"[annotation-unchecked]"
]
[docs]
def is_package(f):
"""Whether flake8 should consider a file as a core file or a package.
We run flake8 with different exceptions for the core and for
packages, since we allow ``from spack.package import *`` and poking globals
into packages.
"""
return f.startswith("var/spack/") and f.endswith("package.py")
#: decorator for adding tools to the list
#: tools we run in spack style
tools: Dict[str, tool] = {}
[docs]
def changed_files(base="develop", untracked=True, all_files=False, root=None):
"""Get list of changed files in the Spack repository.
Arguments:
base (str): name of base branch to evaluate differences with.
untracked (bool): include untracked files in the list.
all_files (bool): list all files in the repository.
root (str): use this directory instead of the Spack prefix.
"""
if root is None:
root = spack.paths.prefix
git = spack.util.git.git(required=True)
# ensure base is in the repo
base_sha = git(
"rev-parse", "--quiet", "--verify", "--revs-only", base, fail_on_error=False, output=str
)
if git.returncode != 0:
tty.die(
"This repository does not have a '%s' revision." % base,
"spack style needs this branch to determine which files changed.",
"Ensure that '%s' exists, or specify files to check explicitly." % base,
)
range = "{0}...".format(base_sha.strip())
git_args = [
# Add changed files committed since branching off of develop
["diff", "--name-only", "--diff-filter=ACMR", range],
# Add changed files that have been staged but not yet committed
["diff", "--name-only", "--diff-filter=ACMR", "--cached"],
# Add changed files that are unstaged
["diff", "--name-only", "--diff-filter=ACMR"],
]
# Add new files that are untracked
if untracked:
git_args.append(["ls-files", "--exclude-standard", "--other"])
# add everything if the user asked for it
if all_files:
git_args.append(["ls-files", "--exclude-standard"])
excludes = [os.path.realpath(os.path.join(root, f)) for f in exclude_paths]
changed = set()
for arg_list in git_args:
files = git(*arg_list, output=str).split("\n")
for f in files:
# Ignore non-Python files
if not (f.endswith(".py") or f == "bin/spack"):
continue
# Ignore files in the exclude locations
if any(os.path.realpath(f).startswith(e) for e in excludes):
continue
changed.add(f)
return sorted(changed)
[docs]
def setup_parser(subparser: argparse.ArgumentParser) -> None:
subparser.add_argument(
"-b",
"--base",
action="store",
default="develop",
help="branch to compare against to determine changed files (default: develop)",
)
subparser.add_argument(
"-a", "--all", action="store_true", help="check all files, not just changed files"
)
subparser.add_argument(
"-r",
"--root-relative",
action="store_true",
default=False,
help="print root-relative paths (default: cwd-relative)",
)
subparser.add_argument(
"-U",
"--no-untracked",
dest="untracked",
action="store_false",
default=True,
help="exclude untracked files from checks",
)
subparser.add_argument(
"-f",
"--fix",
action="store_true",
default=False,
help="format automatically if possible (e.g., with isort, black)",
)
subparser.add_argument(
"--root", action="store", default=None, help="style check a different spack instance"
)
tool_group = subparser.add_mutually_exclusive_group()
tool_group.add_argument(
"-t",
"--tool",
action="append",
help="specify which tools to run (default: %s)" % ", ".join(tool_names),
)
tool_group.add_argument(
"-s",
"--skip",
metavar="TOOL",
action="append",
help="specify tools to skip (choose from %s)" % ", ".join(tool_names),
)
subparser.add_argument(
"--spec-strings",
action="store_true",
help="upgrade spec strings in Python, JSON and YAML files for compatibility with Spack "
"v1.0 and v0.x. Example: spack style ``--spec-strings $(git ls-files)``. Note: must be "
"used only on specs from spack v0.X.",
)
subparser.add_argument("files", nargs=argparse.REMAINDER, help="specific files to check")
[docs]
def cwd_relative(path, root, initial_working_dir):
"""Translate prefix-relative path to current working directory-relative."""
return os.path.relpath(os.path.join(root, path), initial_working_dir)
[docs]
def rewrite_and_print_output(
output, args, re_obj=re.compile(r"^(.+):([0-9]+):"), replacement=r"{0}:{1}:"
):
"""rewrite ouput with <file>:<line>: format to respect path args"""
# print results relative to current working directory
def translate(match):
return replacement.format(
cwd_relative(match.group(1), args.root, args.initial_working_dir),
*list(match.groups()[1:]),
)
for line in output.split("\n"):
if not line:
continue
if any(ignore in line for ignore in mypy_ignores):
# some mypy annotations can't be disabled in older mypys (e.g. .971, which
# is the only mypy that supports python 3.6), so we filter them here.
continue
if not args.root_relative and re_obj:
line = re_obj.sub(translate, line)
print(line)
[docs]
@tool("flake8", required=True)
def run_flake8(flake8_cmd, file_list, args):
returncode = 0
output = ""
# run in chunks of 100 at a time to avoid line length limit
# filename parameter in config *does not work* for this reliably
for chunk in grouper(file_list, 100):
output = flake8_cmd(
# always run with config from running spack prefix
"--config=%s" % os.path.join(spack.paths.prefix, ".flake8"),
*chunk,
fail_on_error=False,
output=str,
)
returncode |= flake8_cmd.returncode
rewrite_and_print_output(output, args)
print_tool_result("flake8", returncode)
return returncode
[docs]
@tool("mypy")
def run_mypy(mypy_cmd, file_list, args):
# always run with config from running spack prefix
common_mypy_args = [
"--config-file",
os.path.join(spack.paths.prefix, "pyproject.toml"),
"--show-error-codes",
]
mypy_arg_sets = [common_mypy_args + ["--package", "spack", "--package", "llnl"]]
if "SPACK_MYPY_CHECK_PACKAGES" in os.environ:
mypy_arg_sets.append(
common_mypy_args + ["--package", "packages", "--disable-error-code", "no-redef"]
)
returncode = 0
for mypy_args in mypy_arg_sets:
output = mypy_cmd(*mypy_args, fail_on_error=False, output=str)
returncode |= mypy_cmd.returncode
rewrite_and_print_output(output, args)
print_tool_result("mypy", returncode)
return returncode
[docs]
@tool("isort")
def run_isort(isort_cmd, file_list, args):
# always run with config from running spack prefix
isort_args = ("--settings-path", os.path.join(spack.paths.prefix, "pyproject.toml"))
if not args.fix:
isort_args += ("--check", "--diff")
pat = re.compile("ERROR: (.*) Imports are incorrectly sorted")
replacement = "ERROR: {0} Imports are incorrectly sorted"
returncode = [0]
def process_files(file_list, is_args):
for chunk in grouper(file_list, 100):
packed_args = is_args + tuple(chunk)
output = isort_cmd(*packed_args, fail_on_error=False, output=str, error=str)
returncode[0] |= isort_cmd.returncode
rewrite_and_print_output(output, args, pat, replacement)
# packages
process_files(filter(is_package, file_list), isort_args)
# non-packages
process_files(filter(lambda f: not is_package(f), file_list), isort_args)
print_tool_result("isort", returncode[0])
return returncode[0]
[docs]
@tool("black")
def run_black(black_cmd, file_list, args):
# always run with config from running spack prefix
black_args = ("--config", os.path.join(spack.paths.prefix, "pyproject.toml"))
if not args.fix:
black_args += ("--check", "--diff")
if color.get_color_when(): # only show color when spack would
black_args += ("--color",)
pat = re.compile("would reformat +(.*)")
replacement = "would reformat {0}"
returncode = 0
output = ""
# run in chunks of 100 at a time to avoid line length limit
# filename parameter in config *does not work* for this reliably
for chunk in grouper(file_list, 100):
packed_args = black_args + tuple(chunk)
output = black_cmd(*packed_args, fail_on_error=False, output=str, error=str)
returncode |= black_cmd.returncode
rewrite_and_print_output(output, args, pat, replacement)
print_tool_result("black", returncode)
return returncode
def _module_part(root: str, expr: str):
parts = expr.split(".")
# spack.pkg is for repositories, don't try to resolve it here.
if expr.startswith(spack.repo.PKG_MODULE_PREFIX_V1) or expr == "spack.pkg":
return None
while parts:
f1 = os.path.join(root, "lib", "spack", *parts) + ".py"
f2 = os.path.join(root, "lib", "spack", *parts, "__init__.py")
if (
os.path.exists(f1)
# ensure case sensitive match
and f"{parts[-1]}.py" in os.listdir(os.path.dirname(f1))
or os.path.exists(f2)
):
return ".".join(parts)
parts.pop()
return None
def _run_import_check(
file_list: List[str],
*,
fix: bool,
root_relative: bool,
root=spack.paths.prefix,
working_dir=spack.paths.prefix,
out=sys.stdout,
):
if sys.version_info < (3, 9):
print("import check requires Python 3.9 or later")
return 0
is_use = re.compile(r"(?<!from )(?<!import )spack\.[a-zA-Z0-9_\.]+")
# redundant imports followed by a `# comment` are ignored, cause there can be legimitate reason
# to import a module: execute module scope init code, or to deal with circular imports.
is_abs_import = re.compile(r"^import (spack\.[a-zA-Z0-9_\.]+)$", re.MULTILINE)
exit_code = 0
for file in file_list:
to_add = set()
to_remove = []
pretty_path = file if root_relative else cwd_relative(file, root, working_dir)
try:
with open(file, "r", encoding="utf-8") as f:
contents = f.read()
parsed = ast.parse(contents)
except Exception:
exit_code = 1
print(f"{pretty_path}: could not parse", file=out)
continue
for m in is_abs_import.finditer(contents):
# Find at most two occurences: the first is the import itself, the second is its usage.
if len(list(islice(re.finditer(rf"{re.escape(m.group(1))}(?!\w)", contents), 2))) == 1:
to_remove.append(m.group(0))
exit_code = 1
print(f"{pretty_path}: redundant import: {m.group(1)}", file=out)
# Clear all strings to avoid matching comments/strings etc.
for node in ast.walk(parsed):
if isinstance(node, ast.Constant) and isinstance(node.value, str):
node.value = ""
filtered_contents = ast.unparse(parsed) # novermin
for m in is_use.finditer(filtered_contents):
module = _module_part(root, m.group(0))
if not module or module in to_add:
continue
if re.search(rf"import {re.escape(module)}(?!\w|\.)", contents):
continue
to_add.add(module)
exit_code = 1
print(f"{pretty_path}: missing import: {module} ({m.group(0)})", file=out)
if not fix or not to_add and not to_remove:
continue
with open(file, "r", encoding="utf-8") as f:
lines = f.readlines()
if to_add:
# insert missing imports before the first import, delegate ordering to isort
for node in parsed.body:
if isinstance(node, (ast.Import, ast.ImportFrom)):
first_line = node.lineno
break
else:
print(f"{pretty_path}: could not fix", file=out)
continue
lines.insert(first_line, "\n".join(f"import {x}" for x in to_add) + "\n")
new_contents = "".join(lines)
# remove redundant imports
for statement in to_remove:
new_contents = new_contents.replace(f"{statement}\n", "")
with open(file, "w", encoding="utf-8") as f:
f.write(new_contents)
return exit_code
[docs]
@tool("import", external=False)
def run_import_check(import_check_cmd, file_list, args):
exit_code = _run_import_check(
file_list,
fix=args.fix,
root_relative=args.root_relative,
root=args.root,
working_dir=args.initial_working_dir,
)
print_tool_result("import", exit_code)
return exit_code
def _bootstrap_dev_dependencies():
import spack.bootstrap
with spack.bootstrap.ensure_bootstrap_configuration():
spack.bootstrap.ensure_environment_dependencies()
IS_PROBABLY_COMPILER = re.compile(r"%[a-zA-Z_][a-zA-Z0-9\-]")
class _LegacySpecTokens(TokenBase):
"""Reconstructs the tokens for previous specs, so we can reuse code to rotate them"""
# Dependency
START_EDGE_PROPERTIES = r"(?:\^\[)"
END_EDGE_PROPERTIES = r"(?:\])"
DEPENDENCY = r"(?:\^)"
# Version
VERSION_HASH_PAIR = SpecTokens.VERSION_HASH_PAIR.regex
GIT_VERSION = SpecTokens.GIT_VERSION.regex
VERSION = SpecTokens.VERSION.regex
# Variants
PROPAGATED_BOOL_VARIANT = SpecTokens.PROPAGATED_BOOL_VARIANT.regex
BOOL_VARIANT = SpecTokens.BOOL_VARIANT.regex
PROPAGATED_KEY_VALUE_PAIR = SpecTokens.PROPAGATED_KEY_VALUE_PAIR.regex
KEY_VALUE_PAIR = SpecTokens.KEY_VALUE_PAIR.regex
# Compilers
COMPILER_AND_VERSION = rf"(?:%\s*(?:{NAME})(?:[\s]*)@\s*(?:{VERSION_LIST}))"
COMPILER = rf"(?:%\s*(?:{NAME}))"
# FILENAME
FILENAME = SpecTokens.FILENAME.regex
# Package name
FULLY_QUALIFIED_PACKAGE_NAME = SpecTokens.FULLY_QUALIFIED_PACKAGE_NAME.regex
UNQUALIFIED_PACKAGE_NAME = SpecTokens.UNQUALIFIED_PACKAGE_NAME.regex
# DAG hash
DAG_HASH = SpecTokens.DAG_HASH.regex
# White spaces
WS = SpecTokens.WS.regex
# Unexpected character(s)
UNEXPECTED = SpecTokens.UNEXPECTED.regex
def _spec_str_reorder_compiler(idx: int, blocks: List[List[Token]]) -> None:
# only move the compiler to the back if it exists and is not already at the end
if not 0 <= idx < len(blocks) - 1:
return
# if there's only whitespace after the compiler, don't move it
if all(token.kind == _LegacySpecTokens.WS for block in blocks[idx + 1 :] for token in block):
return
# rotate left and always add at least one WS token between compiler and previous token
compiler_block = blocks.pop(idx)
if compiler_block[0].kind != _LegacySpecTokens.WS:
compiler_block.insert(0, Token(_LegacySpecTokens.WS, " "))
# delete the WS tokens from the new first block if it was at the very start, to prevent leading
# WS tokens.
while idx == 0 and blocks[0][0].kind == _LegacySpecTokens.WS:
blocks[0].pop(0)
blocks.append(compiler_block)
def _spec_str_format(spec_str: str) -> Optional[str]:
"""Given any string, try to parse as spec string, and rotate the compiler token to the end
of each spec instance. Returns the formatted string if it was changed, otherwise None."""
# We parse blocks of tokens that include leading whitespace, and move the compiler block to
# the end when we hit a dependency ^... or the end of a string.
# [@3.1][ +foo][ +bar][ %gcc@3.1][ +baz]
# [@3.1][ +foo][ +bar][ +baz][ %gcc@3.1]
current_block: List[Token] = []
blocks: List[List[Token]] = []
compiler_block_idx = -1
in_edge_attr = False
legacy_tokenizer = Tokenizer(_LegacySpecTokens)
for token in legacy_tokenizer.tokenize(spec_str):
if token.kind == _LegacySpecTokens.UNEXPECTED:
# parsing error, we cannot fix this string.
return None
elif token.kind in (_LegacySpecTokens.COMPILER, _LegacySpecTokens.COMPILER_AND_VERSION):
# multiple compilers are not supported in Spack v0.x, so early return
if compiler_block_idx != -1:
return None
current_block.append(token)
blocks.append(current_block)
current_block = []
compiler_block_idx = len(blocks) - 1
elif token.kind in (
_LegacySpecTokens.START_EDGE_PROPERTIES,
_LegacySpecTokens.DEPENDENCY,
_LegacySpecTokens.UNQUALIFIED_PACKAGE_NAME,
_LegacySpecTokens.FULLY_QUALIFIED_PACKAGE_NAME,
):
_spec_str_reorder_compiler(compiler_block_idx, blocks)
compiler_block_idx = -1
if token.kind == _LegacySpecTokens.START_EDGE_PROPERTIES:
in_edge_attr = True
current_block.append(token)
blocks.append(current_block)
current_block = []
elif token.kind == _LegacySpecTokens.END_EDGE_PROPERTIES:
in_edge_attr = False
current_block.append(token)
blocks.append(current_block)
current_block = []
elif in_edge_attr:
current_block.append(token)
elif token.kind in (
_LegacySpecTokens.VERSION_HASH_PAIR,
_LegacySpecTokens.GIT_VERSION,
_LegacySpecTokens.VERSION,
_LegacySpecTokens.PROPAGATED_BOOL_VARIANT,
_LegacySpecTokens.BOOL_VARIANT,
_LegacySpecTokens.PROPAGATED_KEY_VALUE_PAIR,
_LegacySpecTokens.KEY_VALUE_PAIR,
_LegacySpecTokens.DAG_HASH,
):
current_block.append(token)
blocks.append(current_block)
current_block = []
elif token.kind == _LegacySpecTokens.WS:
current_block.append(token)
else:
raise ValueError(f"unexpected token {token}")
if current_block:
blocks.append(current_block)
_spec_str_reorder_compiler(compiler_block_idx, blocks)
new_spec_str = "".join(token.value for block in blocks for token in block)
return new_spec_str if spec_str != new_spec_str else None
SpecStrHandler = Callable[[str, int, int, str, str], None]
def _spec_str_default_handler(path: str, line: int, col: int, old: str, new: str):
"""A SpecStrHandler that prints formatted spec strings and their locations."""
print(f"{path}:{line}:{col}: `{old}` -> `{new}`")
def _spec_str_fix_handler(path: str, line: int, col: int, old: str, new: str):
"""A SpecStrHandler that updates formatted spec strings in files."""
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
new_line = lines[line - 1].replace(old, new)
if new_line == lines[line - 1]:
tty.warn(f"{path}:{line}:{col}: could not apply fix: `{old}` -> `{new}`")
return
lines[line - 1] = new_line
print(f"{path}:{line}:{col}: fixed `{old}` -> `{new}`")
with open(path, "w", encoding="utf-8") as f:
f.writelines(lines)
def _spec_str_ast(path: str, tree: ast.AST, handler: SpecStrHandler) -> None:
"""Walk the AST of a Python file and apply handler to formatted spec strings."""
for node in ast.walk(tree):
if sys.version_info >= (3, 8):
if isinstance(node, ast.Constant) and isinstance(node.value, str):
current_str = node.value
else:
continue
elif isinstance(node, ast.Str):
current_str = node.s
else:
continue
if not IS_PROBABLY_COMPILER.search(current_str):
continue
new = _spec_str_format(current_str)
if new is not None:
handler(path, node.lineno, node.col_offset, current_str, new)
def _spec_str_json_and_yaml(path: str, data: dict, handler: SpecStrHandler) -> None:
"""Walk a YAML or JSON data structure and apply handler to formatted spec strings."""
queue = [data]
seen = set()
while queue:
current = queue.pop(0)
if id(current) in seen:
continue
seen.add(id(current))
if isinstance(current, dict):
queue.extend(current.values())
queue.extend(current.keys())
elif isinstance(current, list):
queue.extend(current)
elif isinstance(current, str) and IS_PROBABLY_COMPILER.search(current):
new = _spec_str_format(current)
if new is not None:
mark = getattr(current, "_start_mark", None)
if mark:
line, col = mark.line + 1, mark.column + 1
else:
line, col = 0, 0
handler(path, line, col, current, new)
def _check_spec_strings(
paths: List[str], handler: SpecStrHandler = _spec_str_default_handler
) -> None:
"""Open Python, JSON and YAML files, and format their string literals that look like spec
strings. A handler is called for each formatting, which can be used to print or apply fixes."""
for path in paths:
is_json_or_yaml = path.endswith(".json") or path.endswith(".yaml") or path.endswith(".yml")
is_python = path.endswith(".py")
if not is_json_or_yaml and not is_python:
continue
try:
with open(path, "r", encoding="utf-8") as f:
# skip files that are likely too large to be user code or config
if os.fstat(f.fileno()).st_size > 1024 * 1024:
warnings.warn(f"skipping {path}: too large.")
continue
if is_json_or_yaml:
_spec_str_json_and_yaml(path, spack.util.spack_yaml.load_config(f), handler)
elif is_python:
_spec_str_ast(path, ast.parse(f.read()), handler)
except (OSError, spack.util.spack_yaml.SpackYAMLError, SyntaxError, ValueError):
warnings.warn(f"skipping {path}")
continue
[docs]
def style(parser, args):
if args.spec_strings:
if not args.files:
tty.die("No files provided to check spec strings.")
handler = _spec_str_fix_handler if args.fix else _spec_str_default_handler
return _check_spec_strings(args.files, handler)
# save initial working directory for relativizing paths later
args.initial_working_dir = os.getcwd()
# ensure that the config files we need actually exist in the spack prefix.
# assertions b/c users should not ever see these errors -- they're checked in CI.
assert os.path.isfile(os.path.join(spack.paths.prefix, "pyproject.toml"))
assert os.path.isfile(os.path.join(spack.paths.prefix, ".flake8"))
# validate spack root if the user provided one
args.root = os.path.realpath(args.root) if args.root else spack.paths.prefix
spack_script = os.path.join(args.root, "bin", "spack")
if not os.path.exists(spack_script):
tty.die("This does not look like a valid spack root.", "No such file: '%s'" % spack_script)
file_list = args.files
if file_list:
def prefix_relative(path):
return os.path.relpath(os.path.abspath(os.path.realpath(path)), args.root)
file_list = [prefix_relative(p) for p in file_list]
# process --tool and --skip arguments
selected = set(tool_names)
if args.tool is not None:
selected = validate_toolset(args.tool)
if args.skip is not None:
selected -= validate_toolset(args.skip)
if not selected:
tty.msg("Nothing to run.")
return
tools_to_run = [t for t in tool_names if t in selected]
if missing_tools(tools_to_run):
_bootstrap_dev_dependencies()
return_code = 0
with working_dir(args.root):
if not file_list:
file_list = changed_files(args.base, args.untracked, args.all)
print_style_header(file_list, args, tools_to_run)
for tool_name in tools_to_run:
tool = tools[tool_name]
print_tool_header(tool_name)
return_code |= tool.fun(tool.executable, file_list, args)
if return_code == 0:
tty.msg(color.colorize("@*{spack style checks were clean}"))
else:
tty.error(color.colorize("@*{spack style found errors}"))
return return_code