# Copyright 2013-2024 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
import errno
import inspect
import io
import os
import shutil
import sys
from typing import Any, BinaryIO, Callable, Dict, List, Optional
import llnl.url
from llnl.util import tty
from spack.error import SpackError
from spack.util.executable import CommandNotFoundError, which
try:
import bz2 # noqa
BZ2_SUPPORTED = True
except ImportError:
BZ2_SUPPORTED = False
try:
import gzip # noqa
GZIP_SUPPORTED = True
except ImportError:
GZIP_SUPPORTED = False
try:
import lzma # noqa # novermin
LZMA_SUPPORTED = True
except ImportError:
LZMA_SUPPORTED = False
def _system_untar(archive_file: str, remove_archive_file: bool = False) -> str:
"""Returns path to unarchived tar file. Untars archive via system tar.
Args:
archive_file (str): absolute path to the archive to be extracted.
Can be one of .tar(.[gz|bz2|xz|Z]) or .(tgz|tbz|tbz2|txz).
"""
archive_file_no_ext = llnl.url.strip_extension(archive_file)
outfile = os.path.basename(archive_file_no_ext)
if archive_file_no_ext == archive_file:
# the archive file has no extension. Tar on windows cannot untar onto itself
# archive_file can be a tar file (which causes the problem on windows) but it can
# also have other extensions (on Unix) such as tgz, tbz2, ...
archive_file = archive_file_no_ext + "-input"
shutil.move(archive_file_no_ext, archive_file)
tar = which("tar", required=True)
# GNU tar's --no-same-owner is not as portable, -o works for BSD tar too. This flag is relevant
# when extracting archives as root, where tar attempts to set original ownership of files. This
# is redundant when distributing tarballs, as the tarballs are created on different systems
# than where they are extracted. In certain cases like rootless containers, setting original
# ownership is known to fail, so we need to disable it.
tar.add_default_arg("-oxf")
tar(archive_file)
if remove_archive_file:
# remove input file to prevent two stage
# extractions from being treated as exploding
# archives by the fetcher
os.remove(archive_file)
return outfile
def _bunzip2(archive_file: str) -> str:
"""Returns path to decompressed file.
Uses Python's bz2 module to decompress bz2 compressed archives
Fall back to system utility failing to find Python module `bz2`
Args:
archive_file: absolute path to the bz2 archive to be decompressed
"""
if BZ2_SUPPORTED:
return _py_bunzip(archive_file)
else:
return _system_bunzip(archive_file)
def _py_bunzip(archive_file: str) -> str:
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via python's bz2 module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
f_bz = bz2.BZ2File(archive_file, mode="rb")
with open(archive_out, "wb") as ar:
shutil.copyfileobj(f_bz, ar)
f_bz.close()
return archive_out
def _system_bunzip(archive_file: str) -> str:
"""Returns path to decompressed file.
Decompresses bz2 compressed archives/files via system bzip2 utility"""
compressed_file_name = os.path.basename(archive_file)
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "bz2"))
working_dir = os.getcwd()
archive_out = os.path.join(working_dir, decompressed_file)
copy_path = os.path.join(working_dir, compressed_file_name)
shutil.copy(archive_file, copy_path)
bunzip2 = which("bunzip2", required=True)
bunzip2.add_default_arg("-q")
bunzip2(copy_path)
return archive_out
def _gunzip(archive_file: str) -> str:
"""Returns path to gunzip'd file. Decompresses `.gz` extensions. Prefer native Python
`gzip` module. Falling back to system utility gunzip. Like gunzip, but extracts in the current
working directory instead of in-place.
Args:
archive_file: absolute path of the file to be decompressed
"""
return _py_gunzip(archive_file) if GZIP_SUPPORTED else _system_gunzip(archive_file)
def _py_gunzip(archive_file: str) -> str:
"""Returns path to gunzip'd file. Decompresses `.gz` compressed archvies via python gzip
module"""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "gz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
f_in = gzip.open(archive_file, "rb")
with open(destination_abspath, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
f_in.close()
return destination_abspath
def _system_gunzip(archive_file: str) -> str:
"""Returns path to gunzip'd file. Decompresses `.gz` compressed files via system gzip"""
archive_file_no_ext = llnl.url.strip_compression_extension(archive_file)
if archive_file_no_ext == archive_file:
# the zip file has no extension. On Unix gunzip cannot unzip onto itself
archive_file = archive_file + ".gz"
shutil.move(archive_file_no_ext, archive_file)
decompressed_file = os.path.basename(archive_file_no_ext)
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
copy_path = os.path.join(working_dir, compressed_file)
shutil.copy(archive_file, copy_path)
gzip = which("gzip", required=True)
gzip.add_default_arg("-d")
gzip(copy_path)
return destination_abspath
def _do_nothing(archive_file: str) -> None:
return None
def _unzip(archive_file: str) -> str:
"""Returns path to extracted zip archive. Extract Zipfile, searching for unzip system
executable. If unavailable, search for 'tar' executable on system and use instead.
Args:
archive_file: absolute path of the file to be decompressed
"""
if sys.platform == "win32":
return _system_untar(archive_file)
unzip = which("unzip", required=True)
unzip.add_default_arg("-q")
unzip(archive_file)
return os.path.basename(llnl.url.strip_extension(archive_file, extension="zip"))
def _system_unZ(archive_file: str) -> str:
"""Returns path to decompressed file
Decompress UNIX compress style compression
Utilizes gunzip on unix and 7zip on Windows
"""
if sys.platform == "win32":
return _system_7zip(archive_file)
return _system_gunzip(archive_file)
def _lzma_decomp(archive_file):
"""Returns path to decompressed xz file. Decompress lzma compressed files. Prefer Python native
lzma module, but fall back on command line xz tooling to find available Python support."""
return _py_lzma(archive_file) if LZMA_SUPPORTED else _xz(archive_file)
def _win_compressed_tarball_handler(decompressor: Callable[[str], str]) -> Callable[[str], str]:
"""Returns function pointer to two stage decompression
and extraction method
Decompress and extract compressed tarballs on Windows.
This method uses a decompression method in conjunction with
the tar utility to perform decompression and extraction in
a two step process first using decompressor to decompress,
and tar to extract.
The motivation for this method is Windows tar utility's lack
of access to the xz tool (unsupported natively on Windows) but
can be installed manually or via spack
"""
def unarchive(archive_file: str):
# perform intermediate extraction step
# record name of new archive so we can extract
decomped_tarball = decompressor(archive_file)
# run tar on newly decomped archive
outfile = _system_untar(decomped_tarball, remove_archive_file=True)
return outfile
return unarchive
def _py_lzma(archive_file: str) -> str:
"""Returns path to decompressed .xz files. Decompress lzma compressed .xz files via Python
lzma module."""
decompressed_file = os.path.basename(llnl.url.strip_compression_extension(archive_file, "xz"))
archive_out = os.path.join(os.getcwd(), decompressed_file)
with open(archive_out, "wb") as ar:
with lzma.open(archive_file) as lar:
shutil.copyfileobj(lar, ar)
return archive_out
def _xz(archive_file):
"""Returns path to decompressed xz files. Decompress lzma compressed .xz files via xz command
line tool."""
decompressed_file = os.path.basename(llnl.url.strip_extension(archive_file, extension="xz"))
working_dir = os.getcwd()
destination_abspath = os.path.join(working_dir, decompressed_file)
compressed_file = os.path.basename(archive_file)
copy_path = os.path.join(working_dir, compressed_file)
shutil.copy(archive_file, copy_path)
xz = which("xz", required=True)
xz.add_default_arg("-d")
xz(copy_path)
return destination_abspath
def _system_7zip(archive_file):
"""Returns path to decompressed file
Unpack/decompress with 7z executable
7z is able to handle a number file extensions however
it may not be available on system.
Without 7z, Windows users with certain versions of Python may
be unable to extract .xz files, and all Windows users will be unable
to extract .Z files. If we cannot find 7z either externally or a
Spack installed copy, we fail, but inform the user that 7z can
be installed via `spack install 7zip`
Args:
archive_file (str): absolute path of file to be unarchived
"""
outfile = os.path.basename(llnl.url.strip_compression_extension(archive_file))
_7z = which("7z")
if not _7z:
raise CommandNotFoundError(
"7z unavailable,\
unable to extract %s files. 7z can be installed via Spack"
% llnl.url.extension_from_path(archive_file)
)
_7z.add_default_arg("e")
_7z(archive_file)
return outfile
[docs]
def decompressor_for(path: str, extension: Optional[str] = None):
"""Returns appropriate decompression/extraction algorithm function pointer
for provided extension. If extension is none, it is computed
from the `path` and the decompression function is derived
from that information."""
if not extension:
extension = extension_from_magic_numbers(path, decompress=True)
if not extension or not llnl.url.allowed_archive(extension):
raise CommandNotFoundError(
f"Cannot extract {path}, unrecognized file extension: '{extension}'"
)
if sys.platform == "win32":
return decompressor_for_win(extension)
else:
return decompressor_for_nix(extension)
[docs]
def decompressor_for_nix(extension: str) -> Callable[[str], Any]:
"""Returns a function pointer to appropriate decompression algorithm based on extension type
and unix specific considerations i.e. a reasonable expectation system utils like gzip, bzip2,
and xz are available
Args:
extension: path of the archive file requiring decompression
"""
extension_to_decompressor: Dict[str, Callable[[str], Any]] = {
"zip": _unzip,
"gz": _gunzip,
"bz2": _bunzip2,
"Z": _system_unZ, # no builtin support for .Z files
"xz": _lzma_decomp,
"whl": _do_nothing,
}
return extension_to_decompressor.get(extension, _system_untar)
def _determine_py_decomp_archive_strategy(extension: str) -> Optional[Callable[[str], Any]]:
"""Returns appropriate python based decompression strategy
based on extension type"""
extension_to_decompressor: Dict[str, Callable[[str], str]] = {
"gz": _py_gunzip,
"bz2": _py_bunzip,
"xz": _py_lzma,
}
return extension_to_decompressor.get(extension, None)
[docs]
def decompressor_for_win(extension: str) -> Callable[[str], Any]:
"""Returns a function pointer to appropriate decompression
algorithm based on extension type and Windows specific considerations
Windows natively vendors *only* tar, no other archive/compression utilities
So we must rely exclusively on Python module support for all compression
operations, tar for tarballs and zip files, and 7zip for Z compressed archives
and files as Python does not provide support for the UNIX compress algorithm
"""
extension = llnl.url.expand_contracted_extension(extension)
extension_to_decompressor: Dict[str, Callable[[str], Any]] = {
# Windows native tar can handle .zip extensions, use standard unzip method
"zip": _unzip,
# if extension is standard tarball, invoke Windows native tar
"tar": _system_untar,
# Python does not have native support of any kind for .Z files. In these cases, we rely on
# 7zip, which must be installed outside of Spack and added to the PATH or externally
# detected
"Z": _system_unZ,
"xz": _lzma_decomp,
"whl": _do_nothing,
}
decompressor = extension_to_decompressor.get(extension)
if decompressor:
return decompressor
# Windows vendors no native decompression tools, attempt to derive Python based decompression
# strategy. Expand extension from abbreviated ones, i.e. tar.gz from .tgz
compression_extension = llnl.url.compression_ext_from_compressed_archive(extension)
decompressor = (
_determine_py_decomp_archive_strategy(compression_extension)
if compression_extension
else None
)
if not decompressor:
raise SpackError(
"Spack was unable to determine a proper decompression strategy for"
f"valid extension: {extension}"
"This is a bug, please file an issue at https://github.com/spack/spack/issues"
)
if "tar" not in extension:
return decompressor
return _win_compressed_tarball_handler(decompressor)
[docs]
class FileTypeInterface:
"""Base interface class for describing and querying file type information. FileType describes
information about a single file type such as typical extension and byte header properties,
and provides an interface to check a given file against said type based on magic number.
This class should be subclassed each time a new type is to be described.
Subclasses should each describe a different type of file. In order to do so, they must define
the extension string, magic number, and header offset (if non zero). If a class has multiple
magic numbers, it will need to override the method describing that file type's magic numbers
and the method that checks a types magic numbers against a given file's."""
OFFSET = 0
extension: str
name: str
[docs]
@classmethod
def magic_numbers(cls) -> List[bytes]:
"""Return a list of all potential magic numbers for a filetype"""
return [
value for name, value in inspect.getmembers(cls) if name.startswith("_MAGIC_NUMBER")
]
[docs]
def matches_magic(self, stream: BinaryIO) -> bool:
"""Returns true if the stream matches the current file type by any of its magic numbers.
Resets stream to original position.
Args:
stream: file byte stream
"""
# move to location of magic bytes
offset = stream.tell()
stream.seek(self.OFFSET)
magic_bytes = stream.read(self.header_size())
stream.seek(offset)
return any(magic_bytes.startswith(magic) for magic in self.magic_numbers())
[docs]
class CompressedFileTypeInterface(FileTypeInterface):
"""Interface class for FileTypes that include compression information"""
[docs]
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
"""This method returns the first num_bytes of a decompressed stream. Returns None if no
builtin support for decompression."""
return None
def _decompressed_peek(
decompressed_stream: io.BufferedIOBase, stream: BinaryIO, num_bytes: int
) -> io.BytesIO:
# Read the first num_bytes of the decompressed stream, do not advance the stream position.
pos = stream.tell()
data = decompressed_stream.read(num_bytes)
stream.seek(pos)
return io.BytesIO(data)
[docs]
class BZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x42\x5a\x68"
extension = "bz2"
name = "bzip2 compressed data"
[docs]
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
if BZ2_SUPPORTED:
return _decompressed_peek(bz2.BZ2File(stream), stream, num_bytes)
return None
[docs]
class ZCompressedFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER_LZW = b"\x1f\x9d"
_MAGIC_NUMBER_LZH = b"\x1f\xa0"
extension = "Z"
name = "compress'd data"
[docs]
class GZipFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\x1f\x8b\x08"
extension = "gz"
name = "gzip compressed data"
[docs]
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
if GZIP_SUPPORTED:
return _decompressed_peek(gzip.GzipFile(fileobj=stream), stream, num_bytes)
return None
[docs]
class LzmaFileType(CompressedFileTypeInterface):
_MAGIC_NUMBER = b"\xfd7zXZ"
extension = "xz"
name = "xz compressed data"
[docs]
def peek(self, stream: BinaryIO, num_bytes: int) -> Optional[io.BytesIO]:
if LZMA_SUPPORTED:
return _decompressed_peek(lzma.LZMAFile(stream), stream, num_bytes)
return None
[docs]
class TarFileType(FileTypeInterface):
OFFSET = 257
_MAGIC_NUMBER_GNU = b"ustar \0"
_MAGIC_NUMBER_POSIX = b"ustar\x0000"
extension = "tar"
name = "tar archive"
[docs]
class ZipFleType(FileTypeInterface):
_MAGIC_NUMBER = b"PK\003\004"
extension = "zip"
name = "Zip archive data"
#: Maximum number of bytes to read from a file to determine any archive type. Tar is the largest.
MAX_BYTES_ARCHIVE_HEADER = TarFileType.OFFSET + TarFileType.header_size()
#: Collection of supported archive and compression file type identifier classes.
SUPPORTED_FILETYPES: List[FileTypeInterface] = [
BZipFileType(),
ZCompressedFileType(),
GZipFileType(),
LzmaFileType(),
TarFileType(),
ZipFleType(),
]
def _extension_of_compressed_file(
file_type: CompressedFileTypeInterface, stream: BinaryIO
) -> Optional[str]:
"""Retrieves the extension of a file after decompression from its magic numbers, if it can be
decompressed."""
# To classify the file we only need to decompress the first so many bytes.
decompressed_magic = file_type.peek(stream, MAX_BYTES_ARCHIVE_HEADER)
if not decompressed_magic:
return None
return extension_from_magic_numbers_by_stream(decompressed_magic, decompress=False)
[docs]
def extension_from_magic_numbers_by_stream(
stream: BinaryIO, decompress: bool = False
) -> Optional[str]:
"""Returns the typical extension for the opened file, without leading ``.``, based on its magic
numbers.
If the stream does not represent file type recongized by Spack (see
:py:data:`SUPPORTED_FILETYPES`), the method will return None
Args:
stream: stream representing a file on system
decompress: if True, compressed files are checked for archive types beneath compression.
For example tar.gz if True versus only gz if False."""
for file_type in SUPPORTED_FILETYPES:
if not file_type.matches_magic(stream):
continue
ext = file_type.extension
if decompress and isinstance(file_type, CompressedFileTypeInterface):
uncompressed_ext = _extension_of_compressed_file(file_type, stream)
if not uncompressed_ext:
tty.debug(
"Cannot derive file extension from magic number;"
" falling back to original file name."
)
return llnl.url.extension_from_path(stream.name)
ext = f"{uncompressed_ext}.{ext}"
tty.debug(f"File extension {ext} successfully derived by magic number.")
return ext
return None
def _maybe_abbreviate_extension(path: str, extension: str) -> str:
"""If the file is a compressed tar archive, return the abbreviated extension t[xz|gz|bz2|bz]
instead of tar.[xz|gz|bz2|bz] if the file's original name also has an abbreviated extension."""
if not extension.startswith("tar."):
return extension
abbr = f"t{extension[4:]}"
return abbr if llnl.url.has_extension(path, abbr) else extension
[docs]
def extension_from_magic_numbers(path: str, decompress: bool = False) -> Optional[str]:
"""Return typical extension without leading ``.`` of a compressed file or archive at the given
path, based on its magic numbers, similar to the `file` utility. Notice that the extension
returned from this function may not coincide with the file's given extension.
Args:
path: file to determine extension of
decompress: If True, method will peek into decompressed file to check for archive file
types. If False, the method will return only the top-level extension (for example
``gz`` and not ``tar.gz``).
Returns:
Spack recognized archive file extension as determined by file's magic number and file name.
If file is not on system or is of a type not recognized by Spack as an archive or
compression type, None is returned. If the file is classified as a compressed tarball, the
extension is abbreviated (for instance ``tgz`` not ``tar.gz``) if that matches the file's
given extension.
"""
try:
with open(path, "rb") as f:
ext = extension_from_magic_numbers_by_stream(f, decompress)
except OSError as e:
if e.errno == errno.ENOENT:
return None
raise
# Return the extension derived from the magic number if possible.
if ext:
return _maybe_abbreviate_extension(path, ext)
# Otherwise, use the extension from the file name.
return llnl.url.extension_from_path(path)