Source code for spack.cmd.config

# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import collections
import os
import shutil
import sys
from typing import List

import llnl.util.filesystem as fs
import llnl.util.tty as tty

import spack.config
import spack.environment as ev
import spack.repo
import spack.schema.env
import spack.schema.packages
import spack.store
import spack.util.spack_yaml as syaml
from spack.cmd.common import arguments
from spack.util.editor import editor

description = "get and set configuration options"
section = "config"
level = "long"


[docs] def setup_parser(subparser): # User can only choose one subparser.add_argument( "--scope", action=arguments.ConfigScope, help="configuration scope to read/modify" ) sp = subparser.add_subparsers(metavar="SUBCOMMAND", dest="config_command") get_parser = sp.add_parser("get", help="print configuration values") get_parser.add_argument( "section", help="configuration section to print\n\noptions: %(choices)s", nargs="?", metavar="section", choices=spack.config.SECTION_SCHEMAS, ) blame_parser = sp.add_parser( "blame", help="print configuration annotated with source file:line" ) blame_parser.add_argument( "section", help="configuration section to print\n\noptions: %(choices)s", nargs="?", metavar="section", choices=spack.config.SECTION_SCHEMAS, ) edit_parser = sp.add_parser("edit", help="edit configuration file") edit_parser.add_argument( "section", help="configuration section to edit\n\noptions: %(choices)s", metavar="section", nargs="?", choices=spack.config.SECTION_SCHEMAS, ) edit_parser.add_argument( "--print-file", action="store_true", help="print the file name that would be edited" ) sp.add_parser("list", help="list configuration sections") add_parser = sp.add_parser("add", help="add configuration parameters") add_parser.add_argument( "path", nargs="?", help="colon-separated path to config that should be added, e.g. 'config:default:true'", ) add_parser.add_argument("-f", "--file", help="file from which to set all config values") change_parser = sp.add_parser("change", help="swap variants etc. on specs in config") change_parser.add_argument("path", help="colon-separated path to config section with specs") change_parser.add_argument("--match-spec", help="only change constraints that match this") prefer_upstream_parser = sp.add_parser( "prefer-upstream", help="set package preferences from upstream" ) prefer_upstream_parser.add_argument( "--local", action="store_true", default=False, help="set packages preferences based on local installs, rather than upstream", ) remove_parser = sp.add_parser("remove", aliases=["rm"], help="remove configuration parameters") remove_parser.add_argument( "path", help="colon-separated path to config that should be removed," " e.g. 'config:default:true'", ) # Make the add parser available later setup_parser.add_parser = add_parser update = sp.add_parser("update", help="update configuration files to the latest format") arguments.add_common_arguments(update, ["yes_to_all"]) update.add_argument("section", help="section to update") revert = sp.add_parser( "revert", help="revert configuration files to their state before update" ) arguments.add_common_arguments(revert, ["yes_to_all"]) revert.add_argument("section", help="section to update")
def _get_scope_and_section(args): """Extract config scope and section from arguments.""" scope = args.scope section = getattr(args, "section", None) path = getattr(args, "path", None) # w/no args and an active environment, point to env manifest if not section and not scope: env = ev.active_environment() if env: scope = env.scope_name # set scope defaults elif not scope: scope = spack.config.default_modify_scope(section) # special handling for commands that take value instead of section if path: section = path[: path.find(":")] if ":" in path else path if not scope: scope = spack.config.default_modify_scope(section) return scope, section
[docs] def config_get(args): """Dump merged YAML configuration for a specific section. With no arguments and an active environment, print the contents of the environment's manifest file (spack.yaml). """ print_configuration(args, blame=False)
[docs] def config_blame(args): """Print out line-by-line blame of merged YAML.""" print_configuration(args, blame=True)
[docs] def config_edit(args): """Edit the configuration file for a specific scope and config section. With no arguments and an active environment, edit the spack.yaml for the active environment. """ spack_env = os.environ.get(ev.spack_env_var) if spack_env and not args.scope: # Don't use the scope object for envs, as `config edit` can be called # for a malformed environment. Use SPACK_ENV to find spack.yaml. config_file = ev.manifest_file(spack_env) else: # If we aren't editing a spack.yaml file, get config path from scope. scope, section = _get_scope_and_section(args) if not scope and not section: tty.die("`spack config edit` requires a section argument or an active environment.") config_file = spack.config.CONFIG.get_config_filename(scope, section) if args.print_file: print(config_file) else: editor(config_file)
[docs] def config_list(args): """List the possible configuration sections. Used primarily for shell tab completion scripts. """ print(" ".join(list(spack.config.SECTION_SCHEMAS)))
[docs] def config_add(args): """Add the given configuration to the specified config scope This is a stateful operation that edits the config files.""" if not (args.file or args.path): tty.error("No changes requested. Specify a file or value.") setup_parser.add_parser.print_help() exit(1) scope, section = _get_scope_and_section(args) if args.file: spack.config.add_from_file(args.file, scope=scope) if args.path: spack.config.add(args.path, scope=scope)
[docs] def config_remove(args): """Remove the given configuration from the specified config scope This is a stateful operation that edits the config files.""" scope, _ = _get_scope_and_section(args) path, _, value = args.path.rpartition(":") existing = spack.config.get(path, scope=scope) if not isinstance(existing, (list, dict)): path, _, value = path.rpartition(":") existing = spack.config.get(path, scope=scope) value = syaml.load(value) if isinstance(existing, list): values = value if isinstance(value, list) else [value] for v in values: existing.remove(v) elif isinstance(existing, dict): existing.pop(value, None) else: # This should be impossible to reach raise spack.config.ConfigError("Config has nested non-dict values") spack.config.set(path, existing, scope)
def _can_update_config_file(scope: spack.config.ConfigScope, cfg_file): if isinstance(scope, spack.config.SingleFileScope): return fs.can_access(cfg_file) return fs.can_write_to_dir(scope.path) and fs.can_access(cfg_file) def _config_change_requires_scope(path, spec, scope, match_spec=None): """Return whether or not anything changed.""" require = spack.config.get(path, scope=scope) if not require: return False changed = False def override_cfg_spec(spec_str): nonlocal changed init_spec = spack.spec.Spec(spec_str) # Overridden spec cannot be anonymous init_spec.name = spec.name if match_spec and not init_spec.satisfies(match_spec): # If there is a match_spec, don't change constraints that # don't match it return spec_str elif not init_spec.intersects(spec): changed = True return str(spack.spec.Spec.override(init_spec, spec)) else: # Don't override things if they intersect, otherwise we'd # be e.g. attaching +debug to every single version spec return spec_str if isinstance(require, str): new_require = override_cfg_spec(require) else: new_require = [] for item in require: if "one_of" in item: item["one_of"] = [override_cfg_spec(x) for x in item["one_of"]] elif "any_of" in item: item["any_of"] = [override_cfg_spec(x) for x in item["any_of"]] elif "spec" in item: item["spec"] = override_cfg_spec(item["spec"]) elif isinstance(item, str): item = override_cfg_spec(item) else: raise ValueError(f"Unexpected requirement: ({type(item)}) {str(item)}") new_require.append(item) spack.config.set(path, new_require, scope=scope) return changed def _config_change(config_path, match_spec_str=None): all_components = spack.config.process_config_path(config_path) key_components = all_components[:-1] key_path = ":".join(key_components) spec = spack.spec.Spec(syaml.syaml_str(all_components[-1])) match_spec = None if match_spec_str: match_spec = spack.spec.Spec(match_spec_str) if key_components[-1] == "require": # Extract the package name from the config path, which allows # args.spec to be anonymous if desired pkg_name = key_components[1] spec.name = pkg_name changed = False for scope in spack.config.writable_scope_names(): changed |= _config_change_requires_scope(key_path, spec, scope, match_spec=match_spec) if not changed: existing_requirements = spack.config.get(key_path) if isinstance(existing_requirements, str): raise spack.config.ConfigError( "'config change' needs to append a requirement," " but existing require: config is not a list" ) ideal_scope_to_modify = None for scope in spack.config.writable_scope_names(): if spack.config.get(key_path, scope=scope): ideal_scope_to_modify = scope break update_path = f"{key_path}:[{str(spec)}]" spack.config.add(update_path, scope=ideal_scope_to_modify) else: raise ValueError("'config change' can currently only change 'require' sections")
[docs] def config_change(args): _config_change(args.path, args.match_spec)
[docs] def config_update(args): # Read the configuration files spack.config.CONFIG.get_config(args.section, scope=args.scope) updates: List[spack.config.ConfigScope] = list( filter( lambda s: not isinstance( s, (spack.config.InternalConfigScope, spack.config.ImmutableConfigScope) ), spack.config.CONFIG.format_updates[args.section], ) ) cannot_overwrite, skip_system_scope = [], False for scope in updates: cfg_file = spack.config.CONFIG.get_config_filename(scope.name, args.section) can_be_updated = _can_update_config_file(scope, cfg_file) if not can_be_updated: if scope.name == "system": skip_system_scope = True tty.warn( 'Not enough permissions to write to "system" scope. ' f"Skipping update at that location [cfg={cfg_file}]" ) continue cannot_overwrite.append((scope, cfg_file)) if cannot_overwrite: msg = "Detected permission issues with the following scopes:\n\n" for scope, cfg_file in cannot_overwrite: msg += "\t[scope={0}, cfg={1}]\n".format(scope.name, cfg_file) msg += ( "\nEither ensure that you have sufficient permissions to " "modify these files or do not include these scopes in the " "update." ) tty.die(msg) if skip_system_scope: updates = [x for x in updates if x.name != "system"] # Report if there are no updates to be done if not updates: msg = 'No updates needed for "{0}" section.' tty.msg(msg.format(args.section)) return proceed = True if not args.yes_to_all: msg = ( "The following configuration files are going to be updated to" " the latest schema format:\n\n" ) for scope in updates: cfg_file = spack.config.CONFIG.get_config_filename(scope.name, args.section) msg += "\t[scope={0}, file={1}]\n".format(scope.name, cfg_file) msg += ( "\nIf the configuration files are updated, versions of Spack " "that are older than this version may not be able to read " "them. Spack stores backups of the updated files which can " 'be retrieved with "spack config revert"' ) tty.msg(msg) proceed = tty.get_yes_or_no("Do you want to proceed?", default=False) if not proceed: tty.die("Operation aborted.") # Get a function to update the format update_fn = spack.config.ensure_latest_format_fn(args.section) for scope in updates: data = scope.get_section(args.section).pop(args.section) update_fn(data) # Make a backup copy and rewrite the file bkp_file = cfg_file + ".bkp" shutil.copy(cfg_file, bkp_file) spack.config.CONFIG.update_config(args.section, data, scope=scope.name, force=True) tty.msg(f'File "{cfg_file}" update [backup={bkp_file}]')
def _can_revert_update(scope_dir, cfg_file, bkp_file): dir_ok = fs.can_write_to_dir(scope_dir) cfg_ok = not os.path.exists(cfg_file) or fs.can_access(cfg_file) bkp_ok = fs.can_access(bkp_file) return dir_ok and cfg_ok and bkp_ok
[docs] def config_revert(args): scopes = [args.scope] if args.scope else [x.name for x in spack.config.CONFIG.file_scopes] # Search for backup files in the configuration scopes Entry = collections.namedtuple("Entry", ["scope", "cfg", "bkp"]) to_be_restored, cannot_overwrite = [], [] for scope in scopes: cfg_file = spack.config.CONFIG.get_config_filename(scope, args.section) bkp_file = cfg_file + ".bkp" # If the backup files doesn't exist move to the next scope if not os.path.exists(bkp_file): continue # If it exists and we don't have write access in this scope # keep track of it and report a comprehensive error later entry = Entry(scope, cfg_file, bkp_file) scope_dir = os.path.dirname(bkp_file) can_be_reverted = _can_revert_update(scope_dir, cfg_file, bkp_file) if not can_be_reverted: cannot_overwrite.append(entry) continue to_be_restored.append(entry) # Report errors if we can't revert a configuration if cannot_overwrite: msg = "Detected permission issues with the following scopes:\n\n" for e in cannot_overwrite: msg += "\t[scope={0.scope}, cfg={0.cfg}, bkp={0.bkp}]\n".format(e) msg += ( "\nEither ensure to have the right permissions before retrying" " or be more specific on the scope to revert." ) tty.die(msg) proceed = True if not args.yes_to_all: msg = "The following scopes will be restored from the corresponding backup files:\n" for entry in to_be_restored: msg += "\t[scope={0.scope}, bkp={0.bkp}]\n".format(entry) msg += "This operation cannot be undone." tty.msg(msg) proceed = tty.get_yes_or_no("Do you want to proceed?", default=False) if not proceed: tty.die("Operation aborted.") for _, cfg_file, bkp_file in to_be_restored: shutil.copy(bkp_file, cfg_file) os.unlink(bkp_file) msg = 'File "{0}" reverted to old state' tty.msg(msg.format(cfg_file))
[docs] def config_prefer_upstream(args): """Generate a packages config based on the configuration of all upstream installs.""" scope = args.scope if scope is None: scope = spack.config.default_modify_scope("packages") all_specs = set(spack.store.STORE.db.query(installed=True)) local_specs = set(spack.store.STORE.db.query_local(installed=True)) pref_specs = local_specs if args.local else all_specs - local_specs conflicting_variants = set() pkgs = {} for spec in pref_specs: # Collect all the upstream compilers and versions for this package. pkg = pkgs.get(spec.name, {"version": []}) all = pkgs.get("all", {"compiler": []}) pkgs["all"] = all pkgs[spec.name] = pkg # We have no existing variant if this is our first added version. existing_variants = pkg.get("variants", None if not pkg["version"] else "") version = spec.version.string if version not in pkg["version"]: pkg["version"].append(version) compiler = str(spec.compiler) if compiler not in all["compiler"]: all["compiler"].append(compiler) # Get and list all the variants that differ from the default. variants = [] for var_name, variant in spec.variants.items(): if var_name in ["patches"] or var_name not in spec.package.variants: continue variant_desc, _ = spec.package.variants[var_name] if variant.value != variant_desc.default: variants.append(str(variant)) variants.sort() variants = " ".join(variants) if spec.name not in conflicting_variants: # Only specify the variants if there's a single variant # set across all versions/compilers. if existing_variants is not None and existing_variants != variants: conflicting_variants.add(spec.name) pkg.pop("variants", None) elif variants: pkg["variants"] = variants if conflicting_variants: tty.warn( "The following packages have multiple conflicting upstream " "specs. You may have to specify, by " "concretized hash, which spec you want when building " "packages that depend on them:\n - {0}".format( "\n - ".join(sorted(conflicting_variants)) ) ) # Simply write the config to the specified file. existing = spack.config.get("packages", scope=scope) new = spack.config.merge_yaml(existing, pkgs) spack.config.set("packages", new, scope) config_file = spack.config.CONFIG.get_config_filename(scope, section) tty.msg("Updated config at {0}".format(config_file))
[docs] def config(parser, args): action = { "get": config_get, "blame": config_blame, "edit": config_edit, "list": config_list, "add": config_add, "rm": config_remove, "remove": config_remove, "update": config_update, "revert": config_revert, "prefer-upstream": config_prefer_upstream, "change": config_change, } action[args.config_command](args)