# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import argparse
import os.path
import textwrap
from llnl.util.lang import stable_partition
import spack.cmd
import spack.config
import spack.deptypes as dt
import spack.environment as ev
import spack.mirror
import spack.modules
import spack.reporters
import spack.spec
import spack.store
from spack.util.pattern import Args
__all__ = ["add_common_arguments"]
#: dictionary of argument-generating functions, keyed by name
_arguments = {}
def arg(fn):
"""Decorator for a function that generates a common argument.
This ensures that argument bunches are created lazily. Decorate
argument-generating functions below with @arg so that
``add_common_arguments()`` can find them.
"""
_arguments[fn.__name__] = fn
return fn
[docs]
def add_common_arguments(parser, list_of_arguments):
"""Extend a parser with extra arguments
Args:
parser: parser to be extended
list_of_arguments: arguments to be added to the parser
"""
for argument in list_of_arguments:
if argument not in _arguments:
message = 'Trying to add non existing argument "{0}" to a command'
raise KeyError(message.format(argument))
x = _arguments[argument]()
parser.add_argument(*x.flags, **x.kwargs)
class ConstraintAction(argparse.Action):
"""Constructs a list of specs based on constraints from the command line
An instance of this class is supposed to be used as an argument action
in a parser. It will read a constraint and will attach a function to the
arguments that accepts optional keyword arguments.
To obtain the specs from a command the function must be called.
"""
def __call__(self, parser, namespace, values, option_string=None):
# Query specs from command line
self.constraint = namespace.constraint = values
self.constraint_specs = namespace.constraint_specs = []
namespace.specs = self._specs
def _specs(self, **kwargs):
# store parsed specs in spec.constraint after a call to specs()
self.constraint_specs[:] = spack.cmd.parse_specs(self.constraint)
# If an environment is provided, we'll restrict the search to
# only its installed packages.
env = ev.active_environment()
if env:
kwargs["hashes"] = set(env.all_hashes())
# return everything for an empty query.
if not self.constraint_specs:
return spack.store.STORE.db.query(**kwargs)
# Return only matching stuff otherwise.
specs = {}
for spec in self.constraint_specs:
for s in spack.store.STORE.db.query(spec, **kwargs):
# This is fast for already-concrete specs
specs[s.dag_hash()] = s
return sorted(specs.values())
class SetParallelJobs(argparse.Action):
"""Sets the correct value for parallel build jobs.
The value is is set in the command line configuration scope so that
it can be retrieved using the spack.config API.
"""
def __call__(self, parser, namespace, jobs, option_string):
# Jobs is a single integer, type conversion is already applied
# see https://docs.python.org/3/library/argparse.html#action-classes
if jobs < 1:
msg = 'invalid value for argument "{0}" ' '[expected a positive integer, got "{1}"]'
raise ValueError(msg.format(option_string, jobs))
spack.config.set("config:build_jobs", jobs, scope="command_line")
setattr(namespace, "jobs", jobs)
class DeptypeAction(argparse.Action):
"""Creates a flag of valid dependency types from a deptype argument."""
def __call__(self, parser, namespace, values, option_string=None):
if not values or values == "all":
deptype = dt.ALL
else:
deptype = dt.canonicalize(values.split(","))
setattr(namespace, self.dest, deptype)
class ConfigScope(argparse.Action):
"""Pick the currently configured config scopes."""
def __init__(self, *args, **kwargs) -> None:
kwargs.setdefault("metavar", spack.config.SCOPES_METAVAR)
super().__init__(*args, **kwargs)
@property
def default(self):
return self._default() if callable(self._default) else self._default
@default.setter
def default(self, value):
self._default = value
@property
def choices(self):
return spack.config.scopes().keys()
@choices.setter
def choices(self, value):
pass
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
def _cdash_reporter(namespace):
"""Helper function to create a CDash reporter. This function gets an early reference to the
argparse namespace under construction, so it can later use it to create the object.
"""
def _factory():
def installed_specs(args):
if getattr(args, "spec", ""):
packages = args.spec
elif getattr(args, "specs", ""):
packages = args.specs
elif getattr(args, "package", ""):
# Ensure CI 'spack test run' can output CDash results
packages = args.package
else:
packages = []
for file in args.specfiles:
with open(file, "r") as f:
s = spack.spec.Spec.from_yaml(f)
packages.append(s.format())
return packages
configuration = spack.reporters.CDashConfiguration(
upload_url=namespace.cdash_upload_url,
packages=installed_specs(namespace),
build=namespace.cdash_build,
site=namespace.cdash_site,
buildstamp=namespace.cdash_buildstamp,
track=namespace.cdash_track,
)
return spack.reporters.CDash(configuration=configuration)
return _factory
class CreateReporter(argparse.Action):
"""Create the correct object to generate reports for installation and testing."""
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
if values == "junit":
setattr(namespace, "reporter", spack.reporters.JUnit)
elif values == "cdash":
setattr(namespace, "reporter", _cdash_reporter(namespace))
@arg
def log_format():
return Args(
"--log-format",
default=None,
action=CreateReporter,
choices=("junit", "cdash"),
help="format to be used for log files",
)
# TODO: merge constraint and installed_specs
@arg
def constraint():
return Args(
"constraint",
nargs=argparse.REMAINDER,
action=ConstraintAction,
help="constraint to select a subset of installed packages",
metavar="installed_specs",
)
@arg
def package():
return Args("package", help="package name")
@arg
def packages():
return Args("packages", nargs="+", help="one or more package names", metavar="package")
# Specs must use `nargs=argparse.REMAINDER` because a single spec can
# contain spaces, and contain variants like '-mpi' that argparse thinks
# are a collection of optional flags.
@arg
def spec():
return Args("spec", nargs=argparse.REMAINDER, help="package spec")
@arg
def specs():
return Args("specs", nargs=argparse.REMAINDER, help="one or more package specs")
@arg
def installed_spec():
return Args(
"spec", nargs=argparse.REMAINDER, help="installed package spec", metavar="installed_spec"
)
@arg
def installed_specs():
return Args(
"specs",
nargs=argparse.REMAINDER,
help="one or more installed package specs",
metavar="installed_specs",
)
@arg
def yes_to_all():
return Args(
"-y",
"--yes-to-all",
action="store_true",
dest="yes_to_all",
help='assume "yes" is the answer to every confirmation request',
)
@arg
def recurse_dependencies():
return Args(
"-r",
"--dependencies",
action="store_true",
dest="recurse_dependencies",
help="recursively traverse spec dependencies",
)
@arg
def recurse_dependents():
return Args(
"-R",
"--dependents",
action="store_true",
dest="dependents",
help="also uninstall any packages that depend on the ones given via command line",
)
@arg
def clean():
return Args(
"--clean",
action="store_false",
default=spack.config.get("config:dirty"),
dest="dirty",
help="unset harmful variables in the build environment (default)",
)
@arg
def deptype():
return Args(
"--deptype",
action=DeptypeAction,
default=dt.ALL,
help="comma-separated list of deptypes to traverse (default=%s)" % ",".join(dt.ALL_TYPES),
)
@arg
def dirty():
return Args(
"--dirty",
action="store_true",
default=spack.config.get("config:dirty"),
dest="dirty",
help="preserve user environment in spack's build environment (danger!)",
)
@arg
def long():
return Args(
"-l", "--long", action="store_true", help="show dependency hashes as well as versions"
)
@arg
def very_long():
return Args(
"-L",
"--very-long",
action="store_true",
help="show full dependency hashes as well as versions",
)
@arg
def tags():
return Args(
"-t",
"--tag",
action="append",
dest="tags",
metavar="TAG",
help="filter a package query by tag (multiple use allowed)",
)
@arg
def namespaces():
return Args(
"-N",
"--namespaces",
action="store_true",
default=False,
help="show fully qualified package names",
)
@arg
def jobs():
return Args(
"-j",
"--jobs",
action=SetParallelJobs,
type=int,
dest="jobs",
help="explicitly set number of parallel jobs",
)
@arg
def install_status():
return Args(
"-I",
"--install-status",
action="store_true",
default=True,
help=(
"show install status of packages\n"
"[+] installed [^] installed in an upstream\n"
" - not installed [-] missing dep of installed package\n"
),
)
@arg
def no_install_status():
return Args(
"--no-install-status",
dest="install_status",
action="store_false",
default=True,
help="do not show install status annotations",
)
@arg
def no_checksum():
return Args(
"-n",
"--no-checksum",
action="store_true",
default=False,
help="do not use checksums to verify downloaded files (unsafe)",
)
@arg
def deprecated():
return Args(
"--deprecated",
action="store_true",
default=False,
help="fetch deprecated versions without warning",
)
def add_cdash_args(subparser, add_help):
cdash_help = {}
if add_help:
cdash_help["upload-url"] = "CDash URL where reports will be uploaded"
cdash_help["build"] = (
"name of the build that will be reported to CDash\n\n"
"defaults to spec of the package to operate on"
)
cdash_help["site"] = (
"site name that will be reported to CDash\n\n" "defaults to current system hostname"
)
cdash_help["track"] = (
"results will be reported to this group on CDash\n\n" "defaults to Experimental"
)
cdash_help["buildstamp"] = (
"use custom buildstamp\n\n"
"instead of letting the CDash reporter prepare the "
"buildstamp which, when combined with build name, site and project, "
"uniquely identifies the build, provide this argument to identify "
"the build yourself. format: %%Y%%m%%d-%%H%%M-[cdash-track]"
)
else:
cdash_help["upload-url"] = argparse.SUPPRESS
cdash_help["build"] = argparse.SUPPRESS
cdash_help["site"] = argparse.SUPPRESS
cdash_help["track"] = argparse.SUPPRESS
cdash_help["buildstamp"] = argparse.SUPPRESS
subparser.add_argument("--cdash-upload-url", default=None, help=cdash_help["upload-url"])
subparser.add_argument("--cdash-build", default=None, help=cdash_help["build"])
subparser.add_argument("--cdash-site", default=None, help=cdash_help["site"])
cdash_subgroup = subparser.add_mutually_exclusive_group()
cdash_subgroup.add_argument("--cdash-track", default="Experimental", help=cdash_help["track"])
cdash_subgroup.add_argument("--cdash-buildstamp", default=None, help=cdash_help["buildstamp"])
def print_cdash_help():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(
"""\
environment variables:
SPACK_CDASH_AUTH_TOKEN
authentication token to present to CDash
"""
),
)
add_cdash_args(parser, True)
parser.print_help()
def sanitize_reporter_options(namespace: argparse.Namespace):
"""Sanitize options that affect generation and configuration of reports, like
CDash or JUnit.
Args:
namespace: options parsed from cli
"""
has_any_cdash_option = (
namespace.cdash_upload_url or namespace.cdash_build or namespace.cdash_site
)
if namespace.log_format == "junit" and has_any_cdash_option:
raise argparse.ArgumentTypeError("cannot pass any cdash option when --log-format=junit")
# If any CDash option is passed, assume --log-format=cdash is implied
if namespace.log_format is None and has_any_cdash_option:
namespace.log_format = "cdash"
namespace.reporter = _cdash_reporter(namespace)
class ConfigSetAction(argparse.Action):
"""Generic action for setting spack config options from CLI.
This works like a ``store_const`` action but you can set the
``dest`` to some Spack configuration path (like ``concretizer:reuse``)
and the ``const`` will be stored there using ``spack.config.set()``
"""
def __init__(
self, option_strings, dest, const, default=None, required=False, help=None, metavar=None
):
# save the config option we're supposed to set
self.config_path = dest
# destination is translated to a legal python identifier by
# substituting '_' for ':'.
dest = dest.replace(":", "_")
super().__init__(
option_strings=option_strings,
dest=dest,
nargs=0,
const=const,
default=default,
required=required,
help=help,
)
def __call__(self, parser, namespace, values, option_string):
# Retrieve the name of the config option and set it to
# the const from the constructor or a value from the CLI.
# Note that this is only called if the argument is actually
# specified on the command line.
spack.config.set(self.config_path, self.const, scope="command_line")
def add_concretizer_args(subparser):
"""Add a subgroup of arguments for controlling concretization.
These will appear in a separate group called 'concretizer arguments'.
There's no need to handle them in your command logic -- they all use
``ConfigSetAction``, which automatically handles setting configuration
options.
If you *do* need to access a value passed on the command line, you can
get at, e.g., the ``concretizer:reuse`` via ``args.concretizer_reuse``.
Just substitute ``_`` for ``:``.
"""
subgroup = subparser.add_argument_group("concretizer arguments")
subgroup.add_argument(
"-U",
"--fresh",
action=ConfigSetAction,
dest="concretizer:reuse",
const=False,
default=None,
help="do not reuse installed deps; build newest configuration",
)
subgroup.add_argument(
"--reuse",
action=ConfigSetAction,
dest="concretizer:reuse",
const=True,
default=None,
help="reuse installed packages/buildcaches when possible",
)
subgroup.add_argument(
"--reuse-deps",
action=ConfigSetAction,
dest="concretizer:reuse",
const="dependencies",
default=None,
help="reuse installed dependencies only",
)
subgroup.add_argument(
"--deprecated",
action=ConfigSetAction,
dest="config:deprecated",
const=True,
default=None,
help="allow concretizer to select deprecated versions",
)
def add_connection_args(subparser, add_help):
subparser.add_argument(
"--s3-access-key-id", help="ID string to use to connect to this S3 mirror"
)
subparser.add_argument(
"--s3-access-key-secret", help="secret string to use to connect to this S3 mirror"
)
subparser.add_argument(
"--s3-access-token", help="access token to use to connect to this S3 mirror"
)
subparser.add_argument(
"--s3-profile", help="S3 profile name to use to connect to this S3 mirror", default=None
)
subparser.add_argument(
"--s3-endpoint-url", help="endpoint URL to use to connect to this S3 mirror"
)
subparser.add_argument("--oci-username", help="username to use to connect to this OCI mirror")
subparser.add_argument("--oci-password", help="password to use to connect to this OCI mirror")
def use_buildcache(cli_arg_value):
"""Translate buildcache related command line arguments into a pair of strings,
representing whether the root or its dependencies can use buildcaches.
Argument type that accepts comma-separated subargs:
1. auto|only|never
2. package:auto|only|never
3. dependencies:auto|only|never
Args:
cli_arg_value (str): command line argument value to be translated
Return:
Tuple of two strings
"""
valid_keys = frozenset(["package", "dependencies"])
valid_values = frozenset(["only", "never", "auto"])
# Split in args, split in key/value, and trim whitespace
args = [tuple(map(lambda x: x.strip(), part.split(":"))) for part in cli_arg_value.split(",")]
# Verify keys and values
def is_valid(arg):
if len(arg) == 1:
return arg[0] in valid_values
if len(arg) == 2:
return arg[0] in valid_keys and arg[1] in valid_values
return False
valid, invalid = stable_partition(args, is_valid)
# print first error
if invalid:
raise argparse.ArgumentTypeError("invalid argument `{}`".format(":".join(invalid[0])))
# Default values
package = "auto"
dependencies = "auto"
# Override in order.
for arg in valid:
if len(arg) == 1:
package = dependencies = arg[0]
continue
key, val = arg
if key == "package":
package = val
else:
dependencies = val
return package, dependencies
def mirror_name_or_url(m):
# Look up mirror by name or use anonymous mirror with path/url.
# We want to guard against typos in mirror names, to avoid pushing
# accidentally to a dir in the current working directory.
# If there's a \ or / in the name, it's interpreted as a path or url.
if "/" in m or "\\" in m:
return spack.mirror.Mirror(m)
# Otherwise, the named mirror is required to exist.
try:
return spack.mirror.require_mirror_name(m)
except ValueError as e:
raise argparse.ArgumentTypeError(
str(e) + ". Did you mean {}?".format(os.path.join(".", m))
)
def mirror_url(url):
try:
return spack.mirror.Mirror.from_url(url)
except ValueError as e:
raise argparse.ArgumentTypeError(str(e))
def mirror_directory(path):
try:
return spack.mirror.Mirror.from_local_path(path)
except ValueError as e:
raise argparse.ArgumentTypeError(str(e))
def mirror_name(name):
try:
return spack.mirror.require_mirror_name(name)
except ValueError as e:
raise argparse.ArgumentTypeError(str(e))