Source code for spack.util.compression

# Copyright 2013-2023 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import inspect
import io
import os
import re
import shutil
import sys
from itertools import product

from llnl.util import tty

import spack.util.path as spath
from spack.error import SpackError
from spack.util.executable import CommandNotFoundError, which

# Supported archive extensions.
PRE_EXTS = ["tar", "TAR"]
EXTS = ["gz", "bz2", "xz", "Z"]
NOTAR_EXTS = ["zip", "tgz", "tbz2", "tbz", "txz"]

# Add PRE_EXTS and EXTS last so that .tar.gz is matched *before* .tar or .gz
ALLOWED_ARCHIVE_TYPES = (
    [".".join(ext) for ext in product(PRE_EXTS, EXTS)] + PRE_EXTS + EXTS + NOTAR_EXTS
)

ALLOWED_SINGLE_EXT_ARCHIVE_TYPES = PRE_EXTS + EXTS + NOTAR_EXTS

try:
    import bz2  # noqa

    _bz2_support = True
except ImportError:
    _bz2_support = False


try:
    import gzip  # noqa

    _gzip_support = True
except ImportError:
    _gzip_support = False


try:
    import lzma  # noqa # novermin

    _lzma_support = True
except ImportError:
    _lzma_support = False


[docs]def is_lzma_supported(): return _lzma_support
[docs]def is_gzip_supported(): return _gzip_support
[docs]def is_bz2_supported(): return _bz2_support
[docs]def allowed_archive(path): return False if not path else any(path.endswith(t) for t in ALLOWED_ARCHIVE_TYPES)
def _system_untar(archive_file): """Returns path to unarchived tar file. Untars archive via system tar. Args: archive_file (str): absolute path to the archive to be extracted. Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz). """ outfile = os.path.basename(strip_extension(archive_file, "tar")) tar = which("tar", required=True) tar.add_default_arg("-oxf") tar(archive_file) return outfile def _bunzip2(archive_file): """Returns path to decompressed file. Uses Python's bz2 module to decompress bz2 compressed archives Fall back to system utility failing to find Python module `bz2` Args: archive_file (str): absolute path to the bz2 archive to be decompressed """ if is_bz2_supported(): return _py_bunzip(archive_file) else: return _system_bunzip(archive_file) def _py_bunzip(archive_file): """Returns path to decompressed file. Decompresses bz2 compressed archives/files via python's bz2 module""" decompressed_file = os.path.basename(strip_compression_extension(archive_file, "bz2")) working_dir = os.getcwd() archive_out = os.path.join(working_dir, decompressed_file) f_bz = bz2.BZ2File(archive_file, mode="rb") with open(archive_out, "wb") as ar: shutil.copyfileobj(f_bz, ar) f_bz.close() return archive_out def _system_bunzip(archive_file): """Returns path to decompressed file. Decompresses bz2 compressed archives/files via system bzip2 utility""" compressed_file_name = os.path.basename(archive_file) decompressed_file = os.path.basename(strip_compression_extension(archive_file, "bz2")) working_dir = os.getcwd() archive_out = os.path.join(working_dir, decompressed_file) copy_path = os.path.join(working_dir, compressed_file_name) shutil.copy(archive_file, copy_path) bunzip2 = which("bunzip2", required=True) bunzip2.add_default_arg("-q") bunzip2(copy_path) return archive_out def _gunzip(archive_file): """Returns path to gunzip'd file Decompresses `.gz` extensions. Prefer native Python `gzip` module. Failing back to system utility gunzip. Like gunzip, but extracts in the current working directory instead of in-place. Args: archive_file (str): absolute path of the file to be decompressed """ if is_gzip_supported(): return _py_gunzip(archive_file) else: return _system_gunzip(archive_file) def _py_gunzip(archive_file): """Returns path to gunzip'd file Decompresses `.gz` compressed archvies via python gzip module""" decompressed_file = os.path.basename(strip_compression_extension(archive_file, "gz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) f_in = gzip.open(archive_file, "rb") with open(destination_abspath, "wb") as f_out: shutil.copyfileobj(f_in, f_out) f_in.close() return destination_abspath def _system_gunzip(archive_file): """Returns path to gunzip'd file Decompresses `.gz` compressed files via system gzip""" decompressed_file = os.path.basename(strip_compression_extension(archive_file, "gz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) compressed_file = os.path.basename(archive_file) copy_path = os.path.join(working_dir, compressed_file) shutil.copy(archive_file, copy_path) gzip = which("gzip", required=True) gzip.add_default_arg("-d") gzip(copy_path) return destination_abspath def _unzip(archive_file): """Returns path to extracted zip archive Extract Zipfile, searching for unzip system executable If unavailable, search for 'tar' executable on system and use instead Args: archive_file (str): absolute path of the file to be decompressed """ extracted_file = os.path.basename(strip_extension(archive_file, "zip")) if sys.platform == "win32": return _system_untar(archive_file) else: exe = "unzip" arg = "-q" unzip = which(exe, required=True) unzip.add_default_arg(arg) unzip(archive_file) return extracted_file def _system_unZ(archive_file): """Returns path to decompressed file Decompress UNIX compress style compression Utilizes gunzip on unix and 7zip on Windows """ if sys.platform == "win32": result = _system_7zip(archive_file) else: result = _system_gunzip(archive_file) return result def _lzma_decomp(archive_file): """Returns path to decompressed xz file. Decompress lzma compressed files. Prefer Python native lzma module, but fall back on command line xz tooling to find available Python support.""" if is_lzma_supported(): return _py_lzma(archive_file) else: return _xz(archive_file) def _win_compressed_tarball_handler(decompressor): """Returns function pointer to two stage decompression and extraction method Decompress and extract compressed tarballs on Windows. This method uses a decompression method in conjunction with the tar utility to perform decompression and extraction in a two step process first using decompressor to decompress, and tar to extract. The motivation for this method is Windows tar utility's lack of access to the xz tool (unsupported natively on Windows) but can be installed manually or via spack """ def unarchive(archive_file): # perform intermediate extraction step # record name of new archive so we can extract # and later clean up decomped_tarball = decompressor(archive_file) if check_extension(decomped_tarball, "tar"): # run tar on newly decomped archive outfile = _system_untar(decomped_tarball) # clean intermediate archive to mimic end result # produced by one shot decomp/extraction os.remove(decomped_tarball) return outfile return decomped_tarball return unarchive def _py_lzma(archive_file): """Returns path to decompressed .xz files Decompress lzma compressed .xz files via python lzma module""" decompressed_file = os.path.basename(strip_extension(archive_file, "xz")) archive_out = os.path.join(os.getcwd(), decompressed_file) with open(archive_out, "wb") as ar: with lzma.open(archive_file) as lar: shutil.copyfileobj(lar, ar) return archive_out def _xz(archive_file): """Returns path to decompressed xz files Decompress lzma compressed .xz files via xz command line tool. """ decompressed_file = os.path.basename(strip_extension(archive_file, "xz")) working_dir = os.getcwd() destination_abspath = os.path.join(working_dir, decompressed_file) compressed_file = os.path.basename(archive_file) copy_path = os.path.join(working_dir, compressed_file) shutil.copy(archive_file, copy_path) xz = which("xz", required=True) xz.add_default_arg("-d") xz(copy_path) return destination_abspath def _system_7zip(archive_file): """Returns path to decompressed file Unpack/decompress with 7z executable 7z is able to handle a number file extensions however it may not be available on system. Without 7z, Windows users with certain versions of Python may be unable to extract .xz files, and all Windows users will be unable to extract .Z files. If we cannot find 7z either externally or a Spack installed copy, we fail, but inform the user that 7z can be installed via `spack install 7zip` Args: archive_file (str): absolute path of file to be unarchived """ outfile = os.path.basename(strip_compression_extension(archive_file)) _7z = which("7z") if not _7z: raise CommandNotFoundError( "7z unavailable,\ unable to extract %s files. 7z can be installed via Spack" % extension_from_path(archive_file) ) _7z.add_default_arg("e") _7z(archive_file) return outfile
[docs]def decompressor_for(path, extension=None): """Returns appropriate decompression/extraction algorithm function pointer for provided extension. If extension is none, it is computed from the `path` and the decompression function is derived from that information.""" if not extension: extension = extension_from_file(path, decompress=True) if not allowed_archive(extension): raise CommandNotFoundError( "Cannot extract archive, \ unrecognized file extension: '%s'" % extension ) if sys.platform == "win32": return decompressor_for_win(extension) else: return decompressor_for_nix(extension)
[docs]def decompressor_for_nix(extension): """Returns a function pointer to appropriate decompression algorithm based on extension type and unix specific considerations i.e. a reasonable expectation system utils like gzip, bzip2, and xz are available Args: path (str): path of the archive file requiring decompression """ if re.match(r"zip$", extension): return _unzip if re.match(r"gz$", extension): return _gunzip if re.match(r"bz2$", extension): return _bunzip2 # Python does not have native support # of any kind for .Z files. In these cases, # we rely on external tools such as tar, # 7z, or uncompressZ if re.match(r"Z$", extension): return _system_unZ # Python and platform may not have support for lzma # compression. If no lzma support, use tools available on systems if re.match(r"xz$", extension): return _lzma_decomp return _system_untar
def _determine_py_decomp_archive_strategy(extension): """Returns appropriate python based decompression strategy based on extension type""" # Only rely on Python decompression support for gz if re.match(r"gz$", extension): return _py_gunzip # Only rely on Python decompression support for bzip2 if re.match(r"bz2$", extension): return _py_bunzip # Only rely on Python decompression support for xz if re.match(r"xz$", extension): return _py_lzma return None
[docs]def decompressor_for_win(extension): """Returns a function pointer to appropriate decompression algorithm based on extension type and Windows specific considerations Windows natively vendors *only* tar, no other archive/compression utilities So we must rely exclusively on Python module support for all compression operations, tar for tarballs and zip files, and 7zip for Z compressed archives and files as Python does not provide support for the UNIX compress algorithm Args: path (str): path of the archive file requiring decompression extension (str): extension """ extension = expand_contracted_extension(extension) # Windows native tar can handle .zip extensions, use standard # unzip method if re.match(r"zip$", extension): return _unzip # if extension is standard tarball, invoke Windows native tar if re.match(r"tar$", extension): return _system_untar # Python does not have native support # of any kind for .Z files. In these cases, # we rely on 7zip, which must be installed outside # of spack and added to the PATH or externally detected if re.match(r"Z$", extension): return _system_unZ # Windows vendors no native decompression tools, attempt to derive # python based decompression strategy # Expand extension from contracted extension i.e. tar.gz from .tgz # no-op on non contracted extensions compression_extension = compression_ext_from_compressed_archive(extension) decompressor = _determine_py_decomp_archive_strategy(compression_extension) if not decompressor: raise SpackError( "Spack was unable to determine a proper decompression strategy for" f"valid extension: {extension}" "This is a bug, please file an issue at https://github.com/spack/spack/issues" ) if "tar" not in extension: return decompressor return _win_compressed_tarball_handler(decompressor)
[docs]class FileTypeInterface: """ Base interface class for describing and querying file type information. FileType describes information about a single file type such as extension, and byte header properties, and provides an interface to check a given file against said type based on magic number. This class should be subclassed each time a new type is to be described. Note: This class should not be used directly as it does not define any specific file. Attempts to directly use this class will fail, as it does not define a magic number or extension string. Subclasses should each describe a different type of file. In order to do so, they must define the extension string, magic number, and header offset (if non zero). If a class has multiple magic numbers, it will need to override the method describin that file types magic numbers and the method that checks a types magic numbers against a given file's. """ OFFSET = 0 compressed = False
[docs] @staticmethod def name(): raise NotImplementedError
[docs] @classmethod def magic_number(cls): """Return a list of all potential magic numbers for a filetype""" return [x[1] for x in inspect.getmembers(cls) if x[0].startswith("_MAGIC_NUMBER")]
[docs] @classmethod def header_size(cls): """Return size of largest magic number associated with file type""" return max([len(x) for x in cls.magic_number()])
@classmethod def _bytes_check(cls, magic_bytes): for magic in cls.magic_number(): if magic_bytes.startswith(magic): return True return False
[docs] @classmethod def is_file_of_type(cls, iostream): """Query byte stream for appropriate magic number Args: iostream: file byte stream Returns: Bool denoting whether file is of class file type based on magic number """ if not iostream: return False # move to location of magic bytes iostream.seek(cls.OFFSET) magic_bytes = iostream.read(cls.header_size()) # return to beginning of file iostream.seek(0) if cls._bytes_check(magic_bytes): return True return False
[docs]class CompressedFileTypeInterface(FileTypeInterface): """Interface class for FileTypes that include compression information""" compressed = True
[docs] @staticmethod def decomp_in_memory(stream): """This method decompresses and loads the first 200 or so bytes of a compressed file to check for compressed archives. This does not decompress the entire file and should not be used for direct expansion of archives/compressed files """ raise NotImplementedError("Implementation by compression subclass required")
[docs]class BZipFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\x42\x5a\x68" extension = "bz2"
[docs] @staticmethod def name(): return "bzip2 compressed data"
[docs] @staticmethod def decomp_in_memory(stream): if is_bz2_supported(): # checking for underlying archive, only decomp as many bytes # as is absolutely neccesary for largest archive header (tar) comp_stream = stream.read(TarFileType.OFFSET + TarFileType.header_size()) return io.BytesIO(initial_bytes=bz2.BZ2Decompressor().decompress(comp_stream)) return None
[docs]class ZCompressedFileType(CompressedFileTypeInterface): _MAGIC_NUMBER_LZW = b"\x1f\x9d" _MAGIC_NUMBER_LZH = b"\x1f\xa0" extension = "Z"
[docs] @staticmethod def name(): return "compress'd data"
[docs] @staticmethod def decomp_in_memory(stream): # python has no method of decompressing `.Z` files in memory return None
[docs]class GZipFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\x1f\x8b\x08" extension = "gz"
[docs] @staticmethod def name(): return "gzip compressed data"
[docs] @staticmethod def decomp_in_memory(stream): if is_gzip_supported(): # checking for underlying archive, only decomp as many bytes # as is absolutely neccesary for largest archive header (tar) return io.BytesIO( initial_bytes=gzip.GzipFile(fileobj=stream).read( TarFileType.OFFSET + TarFileType.header_size() ) ) return None
[docs]class LzmaFileType(CompressedFileTypeInterface): _MAGIC_NUMBER = b"\xfd7zXZ" extension = "xz"
[docs] @staticmethod def name(): return "xz compressed data"
[docs] @staticmethod def decomp_in_memory(stream): if is_lzma_supported(): # checking for underlying archive, only decomp as many bytes # as is absolutely neccesary for largest archive header (tar) max_size = TarFileType.OFFSET + TarFileType.header_size() return io.BytesIO( initial_bytes=lzma.LZMADecompressor().decompress( stream.read(max_size), max_length=max_size ) ) return None
[docs]class TarFileType(FileTypeInterface): OFFSET = 257 _MAGIC_NUMBER_GNU = b"ustar \0" _MAGIC_NUMBER_POSIX = b"ustar\x0000" extension = "tar"
[docs] @staticmethod def name(): return "tar archive"
[docs]class ZipFleType(FileTypeInterface): _MAGIC_NUMBER = b"PK\003\004" extension = "zip"
[docs] @staticmethod def name(): return "Zip archive data"
# collection of valid Spack recognized archive and compression # file type identifier classes. VALID_FILETYPES = [ BZipFileType, ZCompressedFileType, GZipFileType, LzmaFileType, TarFileType, ZipFleType, ]
[docs]def extension_from_stream(stream, decompress=False): """Return extension represented by stream corresponding to archive file If stream does not represent an archive type recongized by Spack (see `spack.util.compression.ALLOWED_ARCHIVE_TYPES`) method will return None Extension type is derived by searching for identifying bytes in file stream. Args: stream : stream representing a file on system decompress (bool) : if True, compressed files are checked for archive types beneath compression i.e. tar.gz default is False, otherwise, return top level type i.e. gz Return: A string represting corresponding archive extension or None as relevant. """ for arc_type in VALID_FILETYPES: if arc_type.is_file_of_type(stream): suffix_ext = arc_type.extension prefix_ext = "" if arc_type.compressed and decompress: # stream represents compressed file # get decompressed stream (if possible) decomp_stream = arc_type.decomp_in_memory(stream) prefix_ext = extension_from_stream(decomp_stream, decompress=decompress) if not prefix_ext: # We were unable to decompress or unable to derive # a nested extension from decompressed file. # Try to use filename parsing to check for # potential nested extensions if there are any tty.debug( "Cannot derive file extension from magic number;" " falling back to regex path parsing." ) return extension_from_path(stream.name) resultant_ext = suffix_ext if not prefix_ext else ".".join([prefix_ext, suffix_ext]) tty.debug("File extension %s successfully derived by magic number." % resultant_ext) return resultant_ext return None
[docs]def extension_from_file(file, decompress=False): """Return extension from archive file path Extension is derived based on magic number parsing similar to the `file` utility. Attempts to return abbreviated file extensions whenever a file has an abbreviated extension such as `.tgz` or `.txz`. This distinction in abbreivated extension names is accomplished by string parsing. Args: file (os.PathLike): path descibing file on system for which ext will be determined. decompress (bool): If True, method will peek into compressed files to check for archive file types. default is False. If false, method will be unable to distinguish `.tar.gz` from `.gz` or similar. Return: Spack recognized archive file extension as determined by file's magic number and file name. If file is not on system or is of an type not recognized by Spack as an archive or compression type, None is returned. """ if os.path.exists(file): with open(file, "rb") as f: ext = extension_from_stream(f, decompress) # based on magic number, file is compressed # tar archive. Check to see if file is abbreviated as # t[xz|gz|bz2|bz] if ext and ext.startswith("tar."): suf = ext.split(".")[1] abbr = "t" + suf if check_extension(file, abbr): return abbr if not ext: # If unable to parse extension from stream, # attempt to fall back to string parsing ext = extension_from_path(file) return ext return None
[docs]def extension_from_path(path): """Returns the allowed archive extension for a path. If path does not include a valid archive extension (see`spack.util.compression.ALLOWED_ARCHIVE_TYPES`) return None """ if path is None: raise ValueError("Can't call extension() on None") for t in ALLOWED_ARCHIVE_TYPES: if check_extension(path, t): return t return None
[docs]def strip_compression_extension(path, ext=None): """Returns path with last supported or provided archive extension stripped""" path = expand_contracted_extension_in_path(path) exts_to_check = EXTS if ext: exts_to_check = [ext] for ext_check in exts_to_check: mod_path = check_and_remove_ext(path, ext_check) if mod_path != path: return mod_path return path
[docs]def strip_extension(path, ext=None): """Returns the part of a path that does not include extension. If ext is given, only attempts to remove that extension. If no extension given, attempts to strip any valid extension from path""" if ext: return check_and_remove_ext(path, ext) for t in ALLOWED_ARCHIVE_TYPES: mod_path = check_and_remove_ext(path, t) if mod_path != path: return mod_path return path
[docs]def check_extension(path, ext): """Returns true if extension is present in path false otherwise""" # Strip sourceforge suffix. prefix, _ = spath.find_sourceforge_suffix(path) if not ext.startswith(r"\."): ext = r"\.%s$" % ext if re.search(ext, prefix): return True return False
[docs]def reg_remove_ext(path, ext): """Returns path with ext remove via regex""" if path and ext: suffix = r"\.%s$" % ext return re.sub(suffix, "", path) return path
[docs]def check_and_remove_ext(path, ext): """Returns path with extension removed if extension is present in path. Otherwise just returns path""" if check_extension(path, ext): return reg_remove_ext(path, ext) return path
def _substitute_extension(path, old_ext, new_ext): """Returns path with old_ext replaced with new_ext. old_ext and new_ext can be extension strings or regexs""" return re.sub(rf"{old_ext}", rf"{new_ext}", path)
[docs]def expand_contracted_extension_in_path(path, ext=None): """Returns path with any contraction extension (i.e. tgz) expanded (i.e. tar.gz). If ext is specified, only attempt to expand that extension""" if not ext: ext = extension_from_path(path) expanded_ext = expand_contracted_extension(ext) if expanded_ext != ext: return _substitute_extension(path, ext, expanded_ext) return path
[docs]def expand_contracted_extension(extension): """Return expanded version of contracted extension i.e. .tgz -> .tar.gz, no op on non contracted extensions""" extension = extension.strip(".") contraction_map = {"tgz": "tar.gz", "txz": "tar.xz", "tbz": "tar.bz2", "tbz2": "tar.bz2"} return contraction_map.get(extension, extension)
[docs]def compression_ext_from_compressed_archive(extension): """Returns compression extension for a compressed archive""" extension = expand_contracted_extension(extension) for ext in [*EXTS]: if ext in extension: return ext