Source code for spack.patch

# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import hashlib
import inspect
import os
import os.path
import sys

import llnl.util.filesystem
import llnl.util.lang

import spack
import spack.error
import spack.fetch_strategy as fs
import spack.repo
import spack.stage
import spack.util.spack_json as sjson
from spack.util.compression import allowed_archive
from spack.util.crypto import Checker, checksum
from spack.util.executable import which, which_string


[docs]def apply_patch(stage, patch_path, level=1, working_dir="."): """Apply the patch at patch_path to code in the stage. Args: stage (spack.stage.Stage): stage with code that will be patched patch_path (str): filesystem location for the patch to apply level (int or None): patch level (default 1) working_dir (str): relative path *within* the stage to change to (default '.') """ git_utils_path = os.environ.get("PATH", "") if sys.platform == "win32": git = which_string("git", required=True) git_root = git.split("\\")[:-2] git_root.extend(["usr", "bin"]) git_utils_path = os.sep.join(git_root) # TODO: Decouple Spack's patch support on Windows from Git # for Windows, and instead have Spack directly fetch, install, and # utilize that patch. # Note for future developers: The GNU port of patch to windows # has issues handling CRLF line endings unless the --binary # flag is passed. patch = which("patch", required=True, path=git_utils_path) with llnl.util.filesystem.working_dir(stage.source_path): patch("-s", "-p", str(level), "-i", patch_path, "-d", working_dir)
[docs]class Patch(object): """Base class for patches. Arguments: pkg (str): the package that owns the patch The owning package is not necessarily the package to apply the patch to -- in the case where a dependent package patches its dependency, it is the dependent's fullname. """ def __init__(self, pkg, path_or_url, level, working_dir): # validate level (must be an integer >= 0) if not isinstance(level, int) or not level >= 0: raise ValueError("Patch level needs to be a non-negative integer.") # Attributes shared by all patch subclasses self.owner = pkg.fullname self.path_or_url = path_or_url # needed for debug output self.path = None # must be set before apply() self.level = level self.working_dir = working_dir
[docs] def fetch(self): """Fetch the patch in case of a UrlPatch"""
[docs] def clean(self): """Clean up the patch stage in case of a UrlPatch"""
[docs] def apply(self, stage): """Apply a patch to source in a stage. Arguments: stage (spack.stage.Stage): stage where source code lives """ assert self.path, "Path for patch not set in apply: %s" % self.path_or_url if not os.path.isfile(self.path): raise NoSuchPatchError("No such patch: %s" % self.path) apply_patch(stage, self.path, self.level, self.working_dir)
@property def stage(self): return None
[docs] def to_dict(self): """Partial dictionary -- subclases should add to this.""" return { "owner": self.owner, "sha256": self.sha256, "level": self.level, "working_dir": self.working_dir, }
def __eq__(self, other): return self.sha256 == other.sha256 def __hash__(self): return hash(self.sha256)
[docs]class FilePatch(Patch): """Describes a patch that is retrieved from a file in the repository. Arguments: pkg (str): the class object for the package that owns the patch relative_path (str): path to patch, relative to the repository directory for a package. level (int): level to pass to patch command working_dir (str): path within the source directory where patch should be applied """ def __init__(self, pkg, relative_path, level, working_dir, ordering_key=None): self.relative_path = relative_path # patches may be defined by relative paths to parent classes # search mro to look for the file abs_path = None # At different times we call FilePatch on instances and classes pkg_cls = pkg if inspect.isclass(pkg) else pkg.__class__ for cls in inspect.getmro(pkg_cls): if not hasattr(cls, "module"): # We've gone too far up the MRO break # Cannot use pkg.package_dir because it's a property and we have # classes, not instances. pkg_dir = os.path.abspath(os.path.dirname(cls.module.__file__)) path = os.path.join(pkg_dir, self.relative_path) if os.path.exists(path): abs_path = path break if abs_path is None: msg = "FilePatch: Patch file %s for " % relative_path msg += "package %s.%s does not exist." % (pkg.namespace, pkg.name) raise ValueError(msg) super(FilePatch, self).__init__(pkg, abs_path, level, working_dir) self.path = abs_path self._sha256 = None self.ordering_key = ordering_key @property def sha256(self): if self._sha256 is None: self._sha256 = checksum(hashlib.sha256, self.path) return self._sha256
[docs] def to_dict(self): return llnl.util.lang.union_dicts( super(FilePatch, self).to_dict(), {"relative_path": self.relative_path} )
[docs]class UrlPatch(Patch): """Describes a patch that is retrieved from a URL. Arguments: pkg (str): the package that owns the patch url (str): URL where the patch can be fetched level (int): level to pass to patch command working_dir (str): path within the source directory where patch should be applied """ def __init__(self, pkg, url, level=1, working_dir=".", ordering_key=None, **kwargs): super(UrlPatch, self).__init__(pkg, url, level, working_dir) self.url = url self._stage = None self.ordering_key = ordering_key self.archive_sha256 = kwargs.get("archive_sha256") if allowed_archive(self.url) and not self.archive_sha256: raise PatchDirectiveError( "Compressed patches require 'archive_sha256' " "and patch 'sha256' attributes: %s" % self.url ) self.sha256 = kwargs.get("sha256") if not self.sha256: raise PatchDirectiveError("URL patches require a sha256 checksum")
[docs] def fetch(self): """Retrieve the patch in a temporary stage and compute self.path Args: stage: stage for the package that needs to be patched """ self.stage.create() self.stage.fetch() self.stage.check() root = self.stage.path if self.archive_sha256: self.stage.expand_archive() root = self.stage.source_path files = os.listdir(root) if not files: if self.archive_sha256: raise NoSuchPatchError("Archive was empty: %s" % self.url) else: raise NoSuchPatchError("Patch failed to download: %s" % self.url) self.path = os.path.join(root, files.pop()) if not os.path.isfile(self.path): raise NoSuchPatchError("Archive %s contains no patch file!" % self.url) # for a compressed archive, Need to check the patch sha256 again # and the patch is in a directory, not in the same place if self.archive_sha256 and spack.config.get("config:checksum"): checker = Checker(self.sha256) if not checker.check(self.path): raise fs.ChecksumError( "sha256 checksum failed for %s" % self.path, "Expected %s but got %s" % (self.sha256, checker.sum), )
@property def stage(self): if self._stage: return self._stage # use archive digest for compressed archives fetch_digest = self.sha256 if self.archive_sha256: fetch_digest = self.archive_sha256 fetcher = fs.URLFetchStrategy(self.url, fetch_digest, expand=bool(self.archive_sha256)) # The same package can have multiple patches with the same name but # with different contents, therefore apply a subset of the hash. name = "{0}-{1}".format(os.path.basename(self.url), fetch_digest[:7]) per_package_ref = os.path.join(self.owner.split(".")[-1], name) # Reference starting with "spack." is required to avoid cyclic imports mirror_ref = spack.mirror.mirror_archive_paths(fetcher, per_package_ref) self._stage = spack.stage.Stage(fetcher, mirror_paths=mirror_ref) self._stage.create() return self._stage
[docs] def clean(self): self.stage.destroy()
[docs] def to_dict(self): data = super(UrlPatch, self).to_dict() data["url"] = self.url if self.archive_sha256: data["archive_sha256"] = self.archive_sha256 return data
[docs]def from_dict(dictionary, repository=None): """Create a patch from json dictionary.""" repository = repository or spack.repo.path owner = dictionary.get("owner") if "owner" not in dictionary: raise ValueError("Invalid patch dictionary: %s" % dictionary) pkg_cls = repository.get_pkg_class(owner) if "url" in dictionary: return UrlPatch( pkg_cls, dictionary["url"], dictionary["level"], dictionary["working_dir"], sha256=dictionary["sha256"], archive_sha256=dictionary.get("archive_sha256"), ) elif "relative_path" in dictionary: patch = FilePatch( pkg_cls, dictionary["relative_path"], dictionary["level"], dictionary["working_dir"] ) # If the patch in the repo changes, we cannot get it back, so we # just check it and fail here. # TODO: handle this more gracefully. sha256 = dictionary["sha256"] checker = Checker(sha256) if not checker.check(patch.path): raise fs.ChecksumError( "sha256 checksum failed for %s" % patch.path, "Expected %s but got %s " % (sha256, checker.sum) + "Patch may have changed since concretization.", ) return patch else: raise ValueError("Invalid patch dictionary: %s" % dictionary)
[docs]class PatchCache(object): """Index of patches used in a repository, by sha256 hash. This allows us to look up patches without loading all packages. It's also needed to properly implement dependency patching, as need a way to look up patches that come from packages not in the Spec sub-DAG. The patch index is structured like this in a file (this is YAML, but we write JSON):: patches: sha256: namespace1.package1: <patch json> namespace2.package2: <patch json> ... etc. ... """ def __init__(self, repository, data=None): if data is None: self.index = {} else: if "patches" not in data: raise IndexError("invalid patch index; try `spack clean -m`") self.index = data["patches"] self.repository = repository
[docs] @classmethod def from_json(cls, stream, repository): return PatchCache(repository=repository, data=sjson.load(stream))
[docs] def to_json(self, stream): sjson.dump({"patches": self.index}, stream)
[docs] def patch_for_package(self, sha256, pkg): """Look up a patch in the index and build a patch object for it. Arguments: sha256 (str): sha256 hash to look up pkg (spack.package_base.PackageBase): Package object to get patch for. We build patch objects lazily because building them requires that we have information about the package's location in its repo. """ sha_index = self.index.get(sha256) if not sha_index: raise NoSuchPatchError( "Couldn't find patch for package %s with sha256: %s" % (pkg.fullname, sha256) ) # Find patches for this class or any class it inherits from for fullname in pkg.fullnames: patch_dict = sha_index.get(fullname) if patch_dict: break else: raise NoSuchPatchError( "Couldn't find patch for package %s with sha256: %s" % (pkg.fullname, sha256) ) # add the sha256 back (we take it out on write to save space, # because it's the index key) patch_dict = dict(patch_dict) patch_dict["sha256"] = sha256 return from_dict(patch_dict, repository=self.repository)
[docs] def update_package(self, pkg_fullname): # remove this package from any patch entries that reference it. empty = [] for sha256, package_to_patch in self.index.items(): remove = [] for fullname, patch_dict in package_to_patch.items(): if patch_dict["owner"] == pkg_fullname: remove.append(fullname) for fullname in remove: package_to_patch.pop(fullname) if not package_to_patch: empty.append(sha256) # remove any entries that are now empty for sha256 in empty: del self.index[sha256] # update the index with per-package patch indexes pkg_cls = self.repository.get_pkg_class(pkg_fullname) partial_index = self._index_patches(pkg_cls, self.repository) for sha256, package_to_patch in partial_index.items(): p2p = self.index.setdefault(sha256, {}) p2p.update(package_to_patch)
[docs] def update(self, other): """Update this cache with the contents of another.""" for sha256, package_to_patch in other.index.items(): p2p = self.index.setdefault(sha256, {}) p2p.update(package_to_patch)
@staticmethod def _index_patches(pkg_class, repository): index = {} # Add patches from the class for cond, patch_list in pkg_class.patches.items(): for patch in patch_list: patch_dict = patch.to_dict() patch_dict.pop("sha256") # save some space index[patch.sha256] = {pkg_class.fullname: patch_dict} # and patches on dependencies for name, conditions in pkg_class.dependencies.items(): for cond, dependency in conditions.items(): for pcond, patch_list in dependency.patches.items(): for patch in patch_list: dspec_cls = repository.get_pkg_class(dependency.spec.name) patch_dict = patch.to_dict() patch_dict.pop("sha256") # save some space index[patch.sha256] = {dspec_cls.fullname: patch_dict} return index
[docs]class NoSuchPatchError(spack.error.SpackError): """Raised when a patch file doesn't exist."""
[docs]class PatchDirectiveError(spack.error.SpackError): """Raised when the wrong arguments are suppled to the patch directive."""