Source code for spack.util.gpg

# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import contextlib
import errno
import functools
import os
import re

import spack.bootstrap
import spack.error
import spack.paths
import spack.util.executable
import spack.version

#: Executable instance for "gpg", initialized lazily
GPG = None
#: Executable instance for "gpgconf", initialized lazily
GPGCONF = None
#: Socket directory required if a non default home directory is used
SOCKET_DIR = None
#: GNUPGHOME environment variable in the context of this Python module
GNUPGHOME = None


[docs]def clear(): """Reset the global state to uninitialized.""" global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME GPG, GPGCONF, SOCKET_DIR, GNUPGHOME = None, None, None, None
[docs]def init(gnupghome=None, force=False): """Initialize the global objects in the module, if not set. When calling any gpg executable, the GNUPGHOME environment variable is set to: 1. The value of the `gnupghome` argument, if not None 2. The value of the "SPACK_GNUPGHOME" environment variable, if set 3. The default gpg path for Spack otherwise Args: gnupghome (str): value to be used for GNUPGHOME when calling GnuPG executables force (bool): if True forces the re-initialization even if the global objects are set already """ global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME if force: clear() # If the executables are already set, there's nothing to do if GPG and GNUPGHOME: return # Set the value of GNUPGHOME to be used in this module GNUPGHOME = (gnupghome or os.getenv('SPACK_GNUPGHOME') or spack.paths.gpg_path) # Set the executable objects for "gpg" and "gpgconf" with spack.bootstrap.ensure_bootstrap_configuration(): spack.bootstrap.ensure_gpg_in_path_or_raise() GPG, GPGCONF = _gpg(), _gpgconf() GPG.add_default_env('GNUPGHOME', GNUPGHOME) if GPGCONF: GPGCONF.add_default_env('GNUPGHOME', GNUPGHOME) # Set the socket dir if not using GnuPG defaults SOCKET_DIR = _socket_dir(GPGCONF) # Make sure that the GNUPGHOME exists if not os.path.exists(GNUPGHOME): os.makedirs(GNUPGHOME) os.chmod(GNUPGHOME, 0o700) if not os.path.isdir(GNUPGHOME): msg = 'GNUPGHOME "{0}" exists and is not a directory'.format(GNUPGHOME) raise SpackGPGError(msg) if SOCKET_DIR is not None: GPGCONF('--create-socketdir')
def _autoinit(func): """Decorator to ensure that global variables have been initialized before running the decorated function. Args: func (callable): decorated function """ @functools.wraps(func) def _wrapped(*args, **kwargs): init() return func(*args, **kwargs) return _wrapped
[docs]@contextlib.contextmanager def gnupghome_override(dir): """Set the GNUPGHOME to a new location for this context. Args: dir (str): new value for GNUPGHOME """ global GPG, GPGCONF, SOCKET_DIR, GNUPGHOME # Store backup values _GPG, _GPGCONF = GPG, GPGCONF _SOCKET_DIR, _GNUPGHOME = SOCKET_DIR, GNUPGHOME clear() # Clear global state init(gnupghome=dir, force=True) yield clear() GPG, GPGCONF = _GPG, _GPGCONF SOCKET_DIR, GNUPGHOME = _SOCKET_DIR, _GNUPGHOME
def _parse_secret_keys_output(output): keys = [] found_sec = False for line in output.split('\n'): if found_sec: if line.startswith('fpr'): keys.append(line.split(':')[9]) found_sec = False elif line.startswith('ssb'): found_sec = False elif line.startswith('sec'): found_sec = True return keys def _parse_public_keys_output(output): """ Returns a list of public keys with their fingerprints """ keys = [] found_pub = False current_pub_key = '' for line in output.split('\n'): if found_pub: if line.startswith('fpr'): keys.append((current_pub_key, line.split(':')[9])) found_pub = False elif line.startswith('ssb'): found_pub = False elif line.startswith('pub'): current_pub_key = line.split(':')[4] found_pub = True return keys def _get_unimported_public_keys(output): keys = [] for line in output.split('\n'): if line.startswith('pub'): keys.append(line.split(':')[4]) return keys
[docs]class SpackGPGError(spack.error.SpackError): """Class raised when GPG errors are detected."""
[docs]@_autoinit def create(**kwargs): """Create a new key pair.""" r, w = os.pipe() with contextlib.closing(os.fdopen(r, 'r')) as r: with contextlib.closing(os.fdopen(w, 'w')) as w: w.write(''' Key-Type: rsa Key-Length: 4096 Key-Usage: sign Name-Real: %(name)s Name-Email: %(email)s Name-Comment: %(comment)s Expire-Date: %(expires)s %%no-protection %%commit ''' % kwargs) GPG('--gen-key', '--batch', input=r)
[docs]@_autoinit def signing_keys(*args): """Return the keys that can be used to sign binaries.""" output = GPG( '--list-secret-keys', '--with-colons', '--fingerprint', *args, output=str ) return _parse_secret_keys_output(output)
[docs]@_autoinit def public_keys_to_fingerprint(*args): """Return the keys that can be used to verify binaries.""" output = GPG( '--list-public-keys', '--with-colons', '--fingerprint', *args, output=str ) return _parse_public_keys_output(output)
[docs]@_autoinit def public_keys(*args): """Return a list of fingerprints""" keys_and_fpr = public_keys_to_fingerprint(*args) return [key_and_fpr[1] for key_and_fpr in keys_and_fpr]
[docs]@_autoinit def export_keys(location, keys, secret=False): """Export public keys to a location passed as argument. Args: location (str): where to export the keys keys (list): keys to be exported secret (bool): whether to export secret keys or not """ if secret: GPG("--export-secret-keys", "--armor", "--output", location, *keys) else: GPG("--batch", "--yes", "--armor", "--export", "--output", location, *keys)
[docs]@_autoinit def trust(keyfile): """Import a public key from a file and trust it. Args: keyfile (str): file with the public key """ # Get the public keys we are about to import output = GPG('--with-colons', keyfile, output=str, error=str) keys = _get_unimported_public_keys(output) # Import them GPG('--import', keyfile) # Set trust to ultimate key_to_fpr = dict(public_keys_to_fingerprint()) for key in keys: # Skip over keys we cannot find a fingerprint for. if key not in key_to_fpr: continue fpr = key_to_fpr[key] r, w = os.pipe() with contextlib.closing(os.fdopen(r, 'r')) as r: with contextlib.closing(os.fdopen(w, 'w')) as w: w.write("{0}:6:\n".format(fpr)) GPG('--import-ownertrust', input=r)
[docs]@_autoinit def untrust(signing, *keys): """Delete known keys. Args: signing (bool): if True deletes the secret keys *keys: keys to be deleted """ if signing: skeys = signing_keys(*keys) GPG('--batch', '--yes', '--delete-secret-keys', *skeys) pkeys = public_keys(*keys) GPG('--batch', '--yes', '--delete-keys', *pkeys)
[docs]@_autoinit def sign(key, file, output, clearsign=False): """Sign a file with a key. Args: key: key to be used to sign file (str): file to be signed output (str): output file (either the clearsigned file or the detached signature) clearsign (bool): if True wraps the document in an ASCII-armored signature, if False creates a detached signature """ signopt = '--clearsign' if clearsign else '--detach-sign' GPG(signopt, '--armor', '--default-key', key, '--output', output, file)
[docs]@_autoinit def verify(signature, file=None, suppress_warnings=False): """Verify the signature on a file. Args: signature (str): signature of the file (or clearsigned file) file (str): file to be verified. If None, then signature is assumed to be a clearsigned file. suppress_warnings (bool): whether or not to suppress warnings from GnuPG """ args = [signature] if file: args.append(file) kwargs = {'error': str} if suppress_warnings else {} GPG('--verify', *args, **kwargs)
[docs]@_autoinit def list(trusted, signing): """List known keys. Args: trusted (bool): if True list public keys signing (bool): if True list private keys """ if trusted: GPG('--list-public-keys') if signing: GPG('--list-secret-keys')
def _verify_exe_or_raise(exe): msg = ( 'Spack requires gpgconf version >= 2\n' ' To install a suitable version using Spack, run\n' ' spack install gnupg@2:\n' ' and load it by running\n' ' spack load gnupg@2:' ) if not exe: raise SpackGPGError(msg) output = exe('--version', output=str) match = re.search(r"^gpg(conf)? \(GnuPG\) (.*)$", output, re.M) if not match: raise SpackGPGError( 'Could not determine "{0}" version'.format(exe.name) ) if spack.version.Version(match.group(2)) < spack.version.Version('2'): raise SpackGPGError(msg) def _gpgconf(): exe = spack.util.executable.which('gpgconf', 'gpg2conf', 'gpgconf2') _verify_exe_or_raise(exe) # ensure that the gpgconf we found can run "gpgconf --create-socketdir" try: exe('--dry-run', '--create-socketdir', output=os.devnull, error=os.devnull) except spack.util.executable.ProcessError: # no dice exe = None return exe def _gpg(): exe = spack.util.executable.which('gpg2', 'gpg') _verify_exe_or_raise(exe) return exe def _socket_dir(gpgconf): # Try to ensure that (/var)/run/user/$(id -u) exists so that # `gpgconf --create-socketdir` can be run later. # # NOTE(opadron): This action helps prevent a large class of # "file-name-too-long" errors in gpg. # If there is no suitable gpgconf, don't even bother trying to # pre-create a user run dir. if not gpgconf: return None result = None for var_run in ('/run', '/var/run'): if not os.path.exists(var_run): continue var_run_user = os.path.join(var_run, 'user') try: if not os.path.exists(var_run_user): os.mkdir(var_run_user) os.chmod(var_run_user, 0o777) user_dir = os.path.join(var_run_user, str(os.getuid())) if not os.path.exists(user_dir): os.mkdir(user_dir) os.chmod(user_dir, 0o700) # If the above operation fails due to lack of permissions, then # just carry on without running gpgconf and hope for the best. # # NOTE(opadron): Without a dir in which to create a socket for IPC, # gnupg may fail if GNUPGHOME is set to a path that # is too long, where "too long" in this context is # actually quite short; somewhere in the # neighborhood of more than 100 characters. # # TODO(opadron): Maybe a warning should be printed in this case? except OSError as exc: if exc.errno not in (errno.EPERM, errno.EACCES): raise user_dir = None # return the last iteration that provides a usable user run dir if user_dir is not None: result = user_dir return result