Source code for spack.util.file_cache

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

import errno
import math
import os
import pathlib
import shutil
from typing import IO, Dict, Optional, Tuple, Union

from llnl.util.filesystem import rename

from spack.error import SpackError
from spack.util.lock import Lock, ReadTransaction, WriteTransaction


def _maybe_open(path: Union[str, pathlib.Path]) -> Optional[IO[str]]:
    try:
        return open(path, "r", encoding="utf-8")
    except OSError as e:
        if e.errno != errno.ENOENT:
            raise
        return None


[docs] class ReadContextManager: def __init__(self, path: Union[str, pathlib.Path]) -> None: self.path = path def __enter__(self) -> Optional[IO[str]]: """Return a file object for the cache if it exists.""" self.cache_file = _maybe_open(self.path) return self.cache_file def __exit__(self, type, value, traceback): if self.cache_file: self.cache_file.close()
[docs] class WriteContextManager: def __init__(self, path: str) -> None: self.path = path self.tmp_path = f"{self.path}.tmp" def __enter__(self) -> Tuple[Optional[IO[str]], IO[str]]: """Return (old_file, new_file) file objects, where old_file is optional.""" self.old_file = _maybe_open(self.path) self.new_file = open(self.tmp_path, "w", encoding="utf-8") return self.old_file, self.new_file def __exit__(self, type, value, traceback): if self.old_file: self.old_file.close() self.new_file.close() if value: os.remove(self.tmp_path) else: rename(self.tmp_path, self.path)
[docs] class FileCache: """This class manages cached data in the filesystem. - Cache files are fetched and stored by unique keys. Keys can be relative paths, so that there can be some hierarchy in the cache. - The FileCache handles locking cache files for reading and writing, so client code need not manage locks for cache entries. """ def __init__(self, root: Union[str, pathlib.Path], timeout=120): """Create a file cache object. This will create the cache directory if it does not exist yet. Args: root: specifies the root directory where the cache stores files timeout: when there is contention among multiple Spack processes for cache files, this specifies how long Spack should wait before assuming that there is a deadlock. """ if isinstance(root, str): root = pathlib.Path(root) self.root = root self.root.mkdir(parents=True, exist_ok=True) self._locks: Dict[Union[pathlib.Path, str], Lock] = {} self.lock_timeout = timeout
[docs] def destroy(self): """Remove all files under the cache root.""" for f in self.root.iterdir(): if f.is_dir(): shutil.rmtree(f, True) else: f.unlink()
[docs] def cache_path(self, key: Union[str, pathlib.Path]): """Path to the file in the cache for a particular key.""" return self.root / key
def _lock_path(self, key: Union[str, pathlib.Path]): """Path to the file in the cache for a particular key.""" keyfile = os.path.basename(key) keydir = os.path.dirname(key) return self.root / keydir / ("." + keyfile + ".lock") def _get_lock(self, key: Union[str, pathlib.Path]): """Create a lock for a key, if necessary, and return a lock object.""" if key not in self._locks: self._locks[key] = Lock(str(self._lock_path(key)), default_timeout=self.lock_timeout) return self._locks[key]
[docs] def init_entry(self, key: Union[str, pathlib.Path]): """Ensure we can access a cache file. Create a lock for it if needed. Return whether the cache file exists yet or not. """ cache_path = self.cache_path(key) # Avoid using pathlib here to allow the logic below to # function as is # TODO: Maybe refactor the following logic for pathlib exists = os.path.exists(cache_path) if exists: if not cache_path.is_file(): raise CacheError("Cache file is not a file: %s" % cache_path) if not os.access(cache_path, os.R_OK): raise CacheError("Cannot access cache file: %s" % cache_path) else: # if the file is hierarchical, make parent directories parent = cache_path.parent if parent != self.root: parent.mkdir(parents=True, exist_ok=True) if not os.access(parent, os.R_OK | os.W_OK): raise CacheError("Cannot access cache directory: %s" % parent) # ensure lock is created for this key self._get_lock(key) return exists
[docs] def read_transaction(self, key: Union[str, pathlib.Path]): """Get a read transaction on a file cache item. Returns a ReadTransaction context manager and opens the cache file for reading. You can use it like this: with file_cache_object.read_transaction(key) as cache_file: cache_file.read() """ path = self.cache_path(key) return ReadTransaction( self._get_lock(key), acquire=lambda: ReadContextManager(path) # type: ignore )
[docs] def write_transaction(self, key: Union[str, pathlib.Path]): """Get a write transaction on a file cache item. Returns a WriteTransaction context manager that opens a temporary file for writing. Once the context manager finishes, if nothing went wrong, moves the file into place on top of the old file atomically. """ path = self.cache_path(key) if os.path.exists(path) and not os.access(path, os.W_OK): raise CacheError(f"Insufficient permissions to write to file cache at {path}") return WriteTransaction( self._get_lock(key), acquire=lambda: WriteContextManager(path) # type: ignore )
[docs] def mtime(self, key: Union[str, pathlib.Path]) -> float: """Return modification time of cache file, or -inf if it does not exist. Time is in units returned by os.stat in the mtime field, which is platform-dependent. """ if not self.init_entry(key): return -math.inf else: return self.cache_path(key).stat().st_mtime
[docs] def remove(self, key: Union[str, pathlib.Path]): file = self.cache_path(key) lock = self._get_lock(key) try: lock.acquire_write() file.unlink() except OSError as e: # File not found is OK, so remove is idempotent. if e.errno != errno.ENOENT: raise finally: lock.release_write()
[docs] class CacheError(SpackError): pass