Source code for spack.patch

# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import hashlib
import inspect
import os
import os.path
import pathlib
import sys
from typing import Any, Dict, Optional, Tuple, Type

import llnl.util.filesystem
from llnl.url import allowed_archive

import spack
import spack.error
import spack.fetch_strategy as fs
import spack.mirror
import spack.repo
import spack.stage
import spack.util.spack_json as sjson
from spack.util.crypto import Checker, checksum
from spack.util.executable import which, which_string


[docs] def apply_patch( stage: "spack.stage.Stage", patch_path: str, level: int = 1, working_dir: str = ".", reverse: bool = False, ) -> None: """Apply the patch at patch_path to code in the stage. Args: stage: stage with code that will be patched patch_path: filesystem location for the patch to apply level: patch level working_dir: relative path *within* the stage to change to reverse: reverse the patch """ git_utils_path = os.environ.get("PATH", "") if sys.platform == "win32": git = which_string("git") if git: git = pathlib.Path(git) git_root = git.parent.parent git_root = git_root / "usr" / "bin" git_utils_path = os.pathsep.join([str(git_root), git_utils_path]) args = ["-s", "-p", str(level), "-i", patch_path, "-d", working_dir] if reverse: args.append("-R") # TODO: Decouple Spack's patch support on Windows from Git # for Windows, and instead have Spack directly fetch, install, and # utilize that patch. # Note for future developers: The GNU port of patch to windows # has issues handling CRLF line endings unless the --binary # flag is passed. patch = which("patch", required=True, path=git_utils_path) with llnl.util.filesystem.working_dir(stage.source_path): patch(*args)
[docs] class Patch: """Base class for patches. The owning package is not necessarily the package to apply the patch to -- in the case where a dependent package patches its dependency, it is the dependent's fullname. """ sha256: str def __init__( self, pkg: "spack.package_base.PackageBase", path_or_url: str, level: int, working_dir: str, reverse: bool = False, ) -> None: """Initialize a new Patch instance. Args: pkg: the package that owns the patch path_or_url: the relative path or URL to a patch file level: patch level working_dir: relative path *within* the stage to change to reverse: reverse the patch """ # validate level (must be an integer >= 0) if not isinstance(level, int) or not level >= 0: raise ValueError("Patch level needs to be a non-negative integer.") # Attributes shared by all patch subclasses self.owner = pkg.fullname self.path_or_url = path_or_url # needed for debug output self.path: Optional[str] = None # must be set before apply() self.level = level self.working_dir = working_dir self.reverse = reverse
[docs] def apply(self, stage: "spack.stage.Stage") -> None: """Apply a patch to source in a stage. Args: stage: stage where source code lives """ if not self.path or not os.path.isfile(self.path): raise NoSuchPatchError(f"No such patch: {self.path}") apply_patch(stage, self.path, self.level, self.working_dir, self.reverse)
# TODO: Use TypedDict once Spack supports Python 3.8+ only
[docs] def to_dict(self) -> Dict[str, Any]: """Dictionary representation of the patch. Returns: A dictionary representation. """ return { "owner": self.owner, "sha256": self.sha256, "level": self.level, "working_dir": self.working_dir, "reverse": self.reverse, }
def __eq__(self, other: object) -> bool: """Equality check. Args: other: another patch Returns: True if both patches have the same checksum, else False """ if not isinstance(other, Patch): return NotImplemented return self.sha256 == other.sha256 def __hash__(self) -> int: """Unique hash. Returns: A unique hash based on the sha256. """ return hash(self.sha256)
[docs] class FilePatch(Patch): """Describes a patch that is retrieved from a file in the repository.""" _sha256: Optional[str] = None def __init__( self, pkg: "spack.package_base.PackageBase", relative_path: str, level: int, working_dir: str, reverse: bool = False, ordering_key: Optional[Tuple[str, int]] = None, ) -> None: """Initialize a new FilePatch instance. Args: pkg: the class object for the package that owns the patch relative_path: path to patch, relative to the repository directory for a package. level: level to pass to patch command working_dir: path within the source directory where patch should be applied reverse: reverse the patch ordering_key: key used to ensure patches are applied in a consistent order """ self.relative_path = relative_path # patches may be defined by relative paths to parent classes # search mro to look for the file abs_path: Optional[str] = None # At different times we call FilePatch on instances and classes pkg_cls = pkg if inspect.isclass(pkg) else pkg.__class__ for cls in inspect.getmro(pkg_cls): if not hasattr(cls, "module"): # We've gone too far up the MRO break # Cannot use pkg.package_dir because it's a property and we have # classes, not instances. pkg_dir = os.path.abspath(os.path.dirname(cls.module.__file__)) path = os.path.join(pkg_dir, self.relative_path) if os.path.exists(path): abs_path = path break if abs_path is None: msg = "FilePatch: Patch file %s for " % relative_path msg += "package %s.%s does not exist." % (pkg.namespace, pkg.name) raise ValueError(msg) super().__init__(pkg, abs_path, level, working_dir, reverse) self.path = abs_path self.ordering_key = ordering_key @property def sha256(self) -> str: """Get the patch checksum. Returns: The sha256 of the patch file. """ if self._sha256 is None and self.path is not None: self._sha256 = checksum(hashlib.sha256, self.path) assert isinstance(self._sha256, str) return self._sha256 @sha256.setter def sha256(self, value: str) -> None: """Set the patch checksum. Args: value: the sha256 """ self._sha256 = value
[docs] def to_dict(self) -> Dict[str, Any]: """Dictionary representation of the patch. Returns: A dictionary representation. """ data = super().to_dict() data["relative_path"] = self.relative_path return data
[docs] class UrlPatch(Patch): """Describes a patch that is retrieved from a URL.""" def __init__( self, pkg: "spack.package_base.PackageBase", url: str, level: int = 1, *, working_dir: str = ".", reverse: bool = False, sha256: str, # This is required for UrlPatch ordering_key: Optional[Tuple[str, int]] = None, archive_sha256: Optional[str] = None, ) -> None: """Initialize a new UrlPatch instance. Arguments: pkg: the package that owns the patch url: URL where the patch can be fetched level: level to pass to patch command working_dir: path within the source directory where patch should be applied reverse: reverse the patch ordering_key: key used to ensure patches are applied in a consistent order sha256: sha256 sum of the patch, used to verify the patch archive_sha256: sha256 sum of the *archive*, if the patch is compressed (only required for compressed URL patches) """ super().__init__(pkg, url, level, working_dir, reverse) self.url = url self._stage: Optional["spack.stage.Stage"] = None self.ordering_key = ordering_key if allowed_archive(self.url) and not archive_sha256: raise PatchDirectiveError( "Compressed patches require 'archive_sha256' " "and patch 'sha256' attributes: %s" % self.url ) self.archive_sha256 = archive_sha256 if not sha256: raise PatchDirectiveError("URL patches require a sha256 checksum") self.sha256 = sha256
[docs] def apply(self, stage: "spack.stage.Stage") -> None: """Apply a patch to source in a stage. Args: stage: stage where source code lives """ assert self.stage.expanded, "Stage must be expanded before applying patches" # Get the patch file. files = os.listdir(self.stage.source_path) assert len(files) == 1, "Expected one file in stage source path, found %s" % files self.path = os.path.join(self.stage.source_path, files[0]) return super().apply(stage)
@property def stage(self) -> "spack.stage.Stage": """The stage in which to download (and unpack) the URL patch. Returns: The stage object. """ if self._stage: return self._stage fetch_digest = self.archive_sha256 or self.sha256 # Two checksums, one for compressed file, one for its contents if self.archive_sha256 and self.sha256: fetcher: fs.FetchStrategy = fs.FetchAndVerifyExpandedFile( self.url, archive_sha256=self.archive_sha256, expanded_sha256=self.sha256 ) else: fetcher = fs.URLFetchStrategy(self.url, sha256=self.sha256, expand=False) # The same package can have multiple patches with the same name but # with different contents, therefore apply a subset of the hash. name = "{0}-{1}".format(os.path.basename(self.url), fetch_digest[:7]) per_package_ref = os.path.join(self.owner.split(".")[-1], name) mirror_ref = spack.mirror.mirror_archive_paths(fetcher, per_package_ref) self._stage = spack.stage.Stage( fetcher, name=f"{spack.stage.stage_prefix}patch-{fetch_digest}", mirror_paths=mirror_ref, ) return self._stage
[docs] def to_dict(self) -> Dict[str, Any]: """Dictionary representation of the patch. Returns: A dictionary representation. """ data = super().to_dict() data["url"] = self.url if self.archive_sha256: data["archive_sha256"] = self.archive_sha256 return data
[docs] def from_dict( dictionary: Dict[str, Any], repository: Optional["spack.repo.RepoPath"] = None ) -> Patch: """Create a patch from json dictionary. Args: dictionary: dictionary representation of a patch repository: repository containing package Returns: A patch object. Raises: ValueError: If *owner* or *url*/*relative_path* are missing in the dictionary. """ repository = repository or spack.repo.PATH owner = dictionary.get("owner") if "owner" not in dictionary: raise ValueError("Invalid patch dictionary: %s" % dictionary) pkg_cls = repository.get_pkg_class(owner) if "url" in dictionary: return UrlPatch( pkg_cls, dictionary["url"], dictionary["level"], working_dir=dictionary["working_dir"], # Added in v0.22, fallback required for backwards compatibility reverse=dictionary.get("reverse", False), sha256=dictionary["sha256"], archive_sha256=dictionary.get("archive_sha256"), ) elif "relative_path" in dictionary: patch = FilePatch( pkg_cls, dictionary["relative_path"], dictionary["level"], dictionary["working_dir"], # Added in v0.22, fallback required for backwards compatibility dictionary.get("reverse", False), ) # If the patch in the repo changes, we cannot get it back, so we # just check it and fail here. # TODO: handle this more gracefully. sha256 = dictionary["sha256"] checker = Checker(sha256) if patch.path and not checker.check(patch.path): raise fs.ChecksumError( "sha256 checksum failed for %s" % patch.path, "Expected %s but got %s " % (sha256, checker.sum) + "Patch may have changed since concretization.", ) return patch else: raise ValueError("Invalid patch dictionary: %s" % dictionary)
[docs] class PatchCache: """Index of patches used in a repository, by sha256 hash. This allows us to look up patches without loading all packages. It's also needed to properly implement dependency patching, as need a way to look up patches that come from packages not in the Spec sub-DAG. The patch index is structured like this in a file (this is YAML, but we write JSON):: patches: sha256: namespace1.package1: <patch json> namespace2.package2: <patch json> ... etc. ... """ def __init__( self, repository: "spack.repo.RepoPath", data: Optional[Dict[str, Any]] = None ) -> None: """Initialize a new PatchCache instance. Args: repository: repository containing package data: nested dictionary of patches """ if data is None: self.index = {} else: if "patches" not in data: raise IndexError("invalid patch index; try `spack clean -m`") self.index = data["patches"] self.repository = repository
[docs] @classmethod def from_json(cls, stream: Any, repository: "spack.repo.RepoPath") -> "PatchCache": """Initialize a new PatchCache instance from JSON. Args: stream: stream of data repository: repository containing package Returns: A new PatchCache instance. """ return PatchCache(repository=repository, data=sjson.load(stream))
[docs] def to_json(self, stream: Any) -> None: """Dump a JSON representation to a stream. Args: stream: stream of data """ sjson.dump({"patches": self.index}, stream)
[docs] def patch_for_package(self, sha256: str, pkg: "spack.package_base.PackageBase") -> Patch: """Look up a patch in the index and build a patch object for it. We build patch objects lazily because building them requires that we have information about the package's location in its repo. Args: sha256: sha256 hash to look up pkg: Package object to get patch for. Returns: The patch object. """ sha_index = self.index.get(sha256) if not sha_index: raise PatchLookupError( f"Couldn't find patch for package {pkg.fullname} with sha256: {sha256}" ) # Find patches for this class or any class it inherits from for fullname in pkg.fullnames: patch_dict = sha_index.get(fullname) if patch_dict: break else: raise PatchLookupError( f"Couldn't find patch for package {pkg.fullname} with sha256: {sha256}" ) # add the sha256 back (we take it out on write to save space, # because it's the index key) patch_dict = dict(patch_dict) patch_dict["sha256"] = sha256 return from_dict(patch_dict, repository=self.repository)
[docs] def update_package(self, pkg_fullname: str) -> None: """Update the patch cache. Args: pkg_fullname: package to update. """ # remove this package from any patch entries that reference it. empty = [] for sha256, package_to_patch in self.index.items(): remove = [] for fullname, patch_dict in package_to_patch.items(): if patch_dict["owner"] == pkg_fullname: remove.append(fullname) for fullname in remove: package_to_patch.pop(fullname) if not package_to_patch: empty.append(sha256) # remove any entries that are now empty for sha256 in empty: del self.index[sha256] # update the index with per-package patch indexes pkg_cls = self.repository.get_pkg_class(pkg_fullname) partial_index = self._index_patches(pkg_cls, self.repository) for sha256, package_to_patch in partial_index.items(): p2p = self.index.setdefault(sha256, {}) p2p.update(package_to_patch)
[docs] def update(self, other: "PatchCache") -> None: """Update this cache with the contents of another. Args: other: another patch cache to merge """ for sha256, package_to_patch in other.index.items(): p2p = self.index.setdefault(sha256, {}) p2p.update(package_to_patch)
@staticmethod def _index_patches( pkg_class: Type["spack.package_base.PackageBase"], repository: "spack.repo.RepoPath" ) -> Dict[Any, Any]: """Patch index for a specific patch. Args: pkg_class: package object to get patches for repository: repository containing the package Returns: The patch index for that package. """ index = {} # Add patches from the class for cond, patch_list in pkg_class.patches.items(): for patch in patch_list: patch_dict = patch.to_dict() patch_dict.pop("sha256") # save some space index[patch.sha256] = {pkg_class.fullname: patch_dict} for deps_by_name in pkg_class.dependencies.values(): for dependency in deps_by_name.values(): for patch_list in dependency.patches.values(): for patch in patch_list: dspec_cls = repository.get_pkg_class(dependency.spec.name) patch_dict = patch.to_dict() patch_dict.pop("sha256") # save some space index[patch.sha256] = {dspec_cls.fullname: patch_dict} return index
[docs] class NoSuchPatchError(spack.error.SpackError): """Raised when a patch file doesn't exist."""
[docs] class PatchLookupError(NoSuchPatchError): """Raised when a patch file cannot be located from sha256."""
[docs] class PatchDirectiveError(spack.error.SpackError): """Raised when the wrong arguments are suppled to the patch directive."""