Source code for spack.new_installer

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""New installer that will ultimately replace installer.py. It features an event loop, non-blocking
I/O, and a POSIX jobserver to limit concurrency. It also has a more advanced terminal UI. It's
mostly self-contained to avoid interfering with the rest of Spack too much while it's being
developed and tested.

The installer consists of a UI process that manages multiple build processes and handles updates
to the database. It detects or creates a jobserver, and then kicks off an event loop in which it
runs through a build queue, always running at least one build. Concurrent builds run as jobserver
tokens are obtained. This means only one -j flag is needed to control concurrency.

The UI process has two modes: an overview mode where it shows the status of all builds, and a
mode where it follows the logs of a specific build. It listens to keyboard input to switch between
modes.

The build process does an ordinary install, but also spawns a "tee" thread that forwards its build
output to both a log file and the UI process (if the UI process has requested it). This thread also
runs an event loop to listen for control messages from the UI process (to enable/disable echoing
of logs), and for output from the build process."""

import fcntl
import io
import json
import os
import re
import selectors
import shutil
import sys
import tempfile
import termios
import threading
import time
import traceback
import tty
from gzip import GzipFile
from multiprocessing import Pipe, Process
from multiprocessing.connection import Connection
from typing import TYPE_CHECKING, Callable, Dict, Generator, List, Optional, Set, Tuple, Union

from spack.vendor.typing_extensions import Literal

import spack.binary_distribution
import spack.build_environment
import spack.builder
import spack.config
import spack.database
import spack.deptypes as dt
import spack.error
import spack.hooks
import spack.llnl.util.lock
import spack.llnl.util.tty
import spack.paths
import spack.report
import spack.spec
import spack.stage
import spack.store
import spack.traverse
import spack.url_buildcache
import spack.util.lock

if TYPE_CHECKING:
    import spack.package_base

#: Type for specifying installation source modes
InstallPolicy = Literal["auto", "cache_only", "source_only"]

#: How often to update a spinner in seconds
SPINNER_INTERVAL = 0.1

#: How long to display finished packages before graying them out
CLEANUP_TIMEOUT = 2.0

#: Size of the output buffer for child processes
OUTPUT_BUFFER_SIZE = 4096

#: Suffix for temporary backup during overwrite install
OVERWRITE_BACKUP_SUFFIX = ".old"

#: Suffix for temporary cleanup during failed install
OVERWRITE_GARBAGE_SUFFIX = ".garbage"


[docs] class ChildInfo: """Information about a child process.""" __slots__ = ("proc", "spec", "output_r_conn", "state_r_conn", "control_w_conn", "explicit") def __init__( self, proc: Process, spec: spack.spec.Spec, output_r_conn: Connection, state_r_conn: Connection, control_w_conn: Connection, explicit: bool = False, ) -> None: self.proc = proc self.spec = spec self.output_r_conn = output_r_conn self.state_r_conn = state_r_conn self.control_w_conn = control_w_conn self.explicit = explicit
[docs] def cleanup(self, selector: selectors.BaseSelector) -> None: """Unregister and close file descriptors, and join the child process.""" try: selector.unregister(self.output_r_conn.fileno()) except KeyError: pass try: selector.unregister(self.state_r_conn.fileno()) except KeyError: pass try: selector.unregister(self.proc.sentinel) except (KeyError, ValueError): pass self.output_r_conn.close() self.state_r_conn.close() self.control_w_conn.close() self.proc.join()
[docs] def send_state(state: str, state_pipe: io.TextIOWrapper) -> None: """Send a state update message.""" json.dump({"state": state}, state_pipe, separators=(",", ":")) state_pipe.write("\n")
[docs] def send_progress(current: int, total: int, state_pipe: io.TextIOWrapper) -> None: """Send a progress update message.""" json.dump({"progress": current, "total": total}, state_pipe, separators=(",", ":")) state_pipe.write("\n")
[docs] def tee(control_r: int, log_r: int, file_w: int, parent_w: int) -> None: """Forward log_r to file_w and parent_w (if echoing is enabled). Echoing is enabled and disabled by reading from control_r.""" echo_on = False selector = selectors.DefaultSelector() selector.register(log_r, selectors.EVENT_READ) selector.register(control_r, selectors.EVENT_READ) try: while True: for key, _ in selector.select(): if key.fd == log_r: data = os.read(log_r, OUTPUT_BUFFER_SIZE) if not data: # EOF: exit the thread return os.write(file_w, data) if echo_on: os.write(parent_w, data) elif key.fd == control_r: control_data = os.read(control_r, 1) if not control_data: return else: echo_on = control_data == b"1" except OSError: # do not raise pass finally: os.close(log_r)
[docs] class Tee: """Emulates ./build 2>&1 | tee build.log. The output is sent both to a log file and the parent process (if echoing is enabled). The control_fd is used to enable/disable echoing.""" def __init__(self, control: Connection, parent: Connection, log_fd: int) -> None: self.control = control self.parent = parent #: The file descriptor of the log file self.log_fd = log_fd r, w = os.pipe() self.tee_thread = threading.Thread( target=tee, args=(self.control.fileno(), r, self.log_fd, self.parent.fileno()), daemon=True, ) self.tee_thread.start() os.dup2(w, sys.stdout.fileno()) os.dup2(w, sys.stderr.fileno()) os.close(w)
[docs] def close(self) -> None: # Closing stdout and stderr should close the last reference to the write end of the pipe, # causing the tee thread to wake up, flush the last data, and exit. sys.stdout.flush() sys.stderr.flush() os.close(sys.stdout.fileno()) os.close(sys.stderr.fileno()) self.tee_thread.join() # Only then close the other fds. self.control.close() self.parent.close() os.close(self.log_fd)
[docs] def install_from_buildcache( mirrors: List[spack.url_buildcache.MirrorURLAndVersion], spec: spack.spec.Spec, unsigned: Optional[bool], state_stream: io.TextIOWrapper, ) -> bool: send_state("fetching from build cache", state_stream) tarball_stage = spack.binary_distribution.download_tarball(spec.build_spec, unsigned, mirrors) if tarball_stage is None: return False send_state("relocating", state_stream) spack.binary_distribution.extract_tarball(spec, tarball_stage, force=False) if spec.spliced: # overwrite old metadata with new spack.store.STORE.layout.write_spec(spec, spack.store.STORE.layout.spec_file_path(spec)) # now a block of curious things follow that should be fixed. pkg = spec.package if hasattr(pkg, "_post_buildcache_install_hook"): pkg._post_buildcache_install_hook() pkg.installed_from_binary_cache = True return True
[docs] class PrefixPivoter: """Manages the installation prefix during overwrite installations.""" def __init__(self, prefix: str, overwrite: bool, keep_prefix: bool = False) -> None: """Initialize the prefix pivoter. Args: prefix: The installation prefix path overwrite: Whether to allow overwriting an existing prefix keep_prefix: Whether to keep a failed installation prefix (when not overwriting) """ self.prefix = prefix #: Whether to allow installation when the prefix exists self.overwrite = overwrite #: Whether to keep a failed installation prefix self.keep_prefix = keep_prefix #: Temporary location for the original prefix during overwrite self.tmp_prefix: Optional[str] = None self.parent = os.path.dirname(prefix) def __enter__(self) -> "PrefixPivoter": """Enter the context: move existing prefix to temporary location if needed.""" if not self._lexists(self.prefix): return self if not self.overwrite: raise spack.error.InstallError(f"Install prefix {self.prefix} already exists") # Move the existing prefix to a temporary location self.tmp_prefix = self._mkdtemp( dir=self.parent, prefix=".", suffix=OVERWRITE_BACKUP_SUFFIX ) self._rename(self.prefix, self.tmp_prefix) return self def __exit__( self, exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[object] ) -> None: """Exit the context: cleanup on success, restore on failure.""" if exc_type is None: # Success: remove the backup in case of overwrite if self.tmp_prefix is not None: self._rmtree_ignore_errors(self.tmp_prefix) return # Failure handling: # Priority 1: If we're overwriting, always restore the original prefix # Priority 2: If keep_prefix is False, remove the failed installation if self.overwrite and self.tmp_prefix is not None: # Overwrite case: restore the original prefix if it existed # The highest priority is to restore the original prefix, so we try to: # rename prefix -> garbage: move failed dir out of the way # rename tmp_prefix -> prefix: restore original prefix # remove garbage (this is allowed to fail) garbage = self._mkdtemp(dir=self.parent, prefix=".", suffix=OVERWRITE_GARBAGE_SUFFIX) try: self._rename(self.prefix, garbage) has_failed_prefix = True except FileNotFoundError: # prefix dir does not exist, so we don't have to delete it. has_failed_prefix = False self._rename(self.tmp_prefix, self.prefix) if has_failed_prefix: self._rmtree_ignore_errors(garbage) elif not self.keep_prefix and self._lexists(self.prefix): # Not overwriting, keep_prefix is False: remove the failed installation garbage = self._mkdtemp(dir=self.parent, prefix=".", suffix=OVERWRITE_GARBAGE_SUFFIX) self._rename(self.prefix, garbage) self._rmtree_ignore_errors(garbage) # else: keep_prefix is True, leave the failed prefix in place def _lexists(self, path: str) -> bool: return os.path.lexists(path) def _rename(self, src: str, dst: str) -> None: os.rename(src, dst) def _mkdtemp(self, dir: str, prefix: str, suffix: str) -> str: return tempfile.mkdtemp(dir=dir, prefix=prefix, suffix=suffix) def _rmtree_ignore_errors(self, path: str) -> None: shutil.rmtree(path, ignore_errors=True)
[docs] def worker_function( spec: spack.spec.Spec, explicit: bool, mirrors: List[spack.url_buildcache.MirrorURLAndVersion], unsigned: Optional[bool], install_policy: InstallPolicy, dirty: bool, keep_stage: bool, restage: bool, overwrite: bool, keep_prefix: bool, skip_patch: bool, state: Connection, parent: Connection, echo_control: Connection, makeflags: str, js1: Optional[Connection], js2: Optional[Connection], store: spack.store.Store, config: spack.config.Configuration, ): """ Function run in the build child process. Installs the specified spec, sending state updates and build output back to the parent process. Args: spec: Spec to install explicit: Whether the spec was explicitly requested by the user mirrors: List of buildcache mirrors to try unsigned: Whether to allow unsigned buildcache entries install_policy: ``"auto"``, ``"cache_only"``, or ``"source_only"`` dirty: Whether to preserve user environment in the build environment keep_stage: Whether to keep the build stage after installation restage: Whether to restage the source before building overwrite: Whether to overwrite the existing install prefix keep_prefix: Whether to keep a failed installation prefix skip_patch: Whether to skip the patch phase state: Connection to send state updates to parent: Connection to send build output to echo_control: Connection to receive echo control messages from makeflags: MAKEFLAGS to set, so that the build process uses the POSIX jobserver js1: Connection for old style jobserver read fd (if any). Unused, just to inherit fd. js2: Connection for old style jobserver write fd (if any). Unused, just to inherit fd. store: global store instance from parent config: global config instance from parent """ # TODO: don't start a build for external packages if spec.external: return os.environ["MAKEFLAGS"] = makeflags spack.store.STORE = store spack.config.CONFIG = config spack.paths.set_working_dir() # Create a log file in the root of the stage dir. log_fd, log_path = tempfile.mkstemp( prefix=f"spack-stage-{spec.name}-{spec.dag_hash()}-", suffix=".log", dir=spack.stage.get_stage_root(), ) tee = Tee(echo_control, parent, log_fd) # Use closedfd=false because of the connection objects. Use line buffering. state_stream = os.fdopen(state.fileno(), "w", buffering=1, closefd=False) exit_code = 0 try: with PrefixPivoter(spec.prefix, overwrite, keep_prefix): _install( spec, explicit, mirrors, unsigned, install_policy, dirty, keep_stage, restage, skip_patch, state_stream, log_path, store, ) except Exception: traceback.print_exc() # log the traceback to the log file exit_code = 1 finally: tee.close() state_stream.close() if exit_code == 0 and not os.path.lexists(spec.package.install_log_path): # Try to install the compressed log file try: with open(log_path, "rb") as f, open(spec.package.install_log_path, "wb") as g: # Use GzipFile directly so we can omit filename / mtime in header gzip_file = GzipFile(filename="", mode="wb", compresslevel=6, mtime=0, fileobj=g) shutil.copyfileobj(f, gzip_file) gzip_file.close() os.unlink(log_path) except Exception: pass # don't fail the build just because log compression failed sys.exit(exit_code)
def _install( spec: spack.spec.Spec, explicit: bool, mirrors: List[spack.url_buildcache.MirrorURLAndVersion], unsigned: Optional[bool], install_policy: InstallPolicy, dirty: bool, keep_stage: bool, restage: bool, skip_patch: bool, state_stream: io.TextIOWrapper, log_path: str, store: spack.store.Store = spack.store.STORE, ) -> None: """Install a spec from build cache or source.""" # Create the stage and log file before starting the tee thread. pkg = spec.package # Try to install from buildcache, unless user asked for source only if install_policy != "source_only": if mirrors and install_from_buildcache(mirrors, spec, unsigned, state_stream): spack.hooks.post_install(spec, explicit) return elif install_policy == "cache_only": # Binary required but not available send_state("no binary available", state_stream) raise spack.error.InstallError(f"No binary available for {spec}") spack.build_environment.setup_package(pkg, dirty=dirty) store.layout.create_install_directory(spec) stage = pkg.stage stage.keep = keep_stage # Then try a source build. with stage: if restage: stage.destroy() stage.create() os.symlink(log_path, pkg.log_path) send_state("staging", state_stream) if not skip_patch: pkg.do_patch() else: pkg.do_stage() os.chdir(stage.source_path) spack.hooks.pre_install(spec) for phase in spack.builder.create(pkg): send_state(phase.name, state_stream) phase.execute() spack.hooks.post_install(spec, explicit)
[docs] class JobServer: """Attach to an existing POSIX jobserver or create a FIFO-based one.""" def __init__(self, num_jobs: int) -> None: #: Keep track of how many tokens Spack itself has acquired, which is used to release them. self.tokens_acquired = 0 self.num_jobs = num_jobs self.fifo_path: Optional[str] = None self.created = False self._setup() # Ensure that Executable()(...) in build processes ultimately inherit jobserver fds. os.set_inheritable(self.r, True) os.set_inheritable(self.w, True) # r_conn and w_conn are used to make build processes inherit the jobserver fds if needed. # Connection objects close the fd as they are garbage collected, so store them. self.r_conn = Connection(self.r) self.w_conn = Connection(self.w) def _setup(self) -> None: fifo_config = get_jobserver_config() if type(fifo_config) is str: # FIFO-based jobserver. Try to open the FIFO. open_attempt = open_existing_jobserver_fifo(fifo_config) if open_attempt: self.r, self.w = open_attempt self.fifo_path = fifo_config return elif type(fifo_config) is tuple: # Old style pipe-based jobserver. Validate the fds before using them. r, w = fifo_config if fcntl.fcntl(r, fcntl.F_GETFD) != -1 and fcntl.fcntl(w, fcntl.F_GETFD) != -1: self.r, self.w = r, w return # No existing jobserver we can connect to: create a FIFO-based one. self.r, self.w, self.fifo_path = create_jobserver_fifo(self.num_jobs) self.created = True
[docs] def makeflags(self, gmake: Optional[spack.spec.Spec]) -> str: """Return the MAKEFLAGS for a build process, depending on its gmake build dependency.""" if self.fifo_path and (not gmake or gmake.satisfies("@4.4:")): return f" -j{self.num_jobs} --jobserver-auth=fifo:{self.fifo_path}" elif not gmake or gmake.satisfies("@4.0:"): return f" -j{self.num_jobs} --jobserver-auth={self.r},{self.w}" else: return f" -j{self.num_jobs} --jobserver-fds={self.r},{self.w}"
[docs] def acquire(self, jobs: int) -> int: """Try and acquire at most 'jobs' tokens from the jobserver. Returns the number of tokens actually acquired (may be less than requested, or zero).""" try: num_acquired = len(os.read(self.r, jobs)) self.tokens_acquired += num_acquired return num_acquired except BlockingIOError: return 0
[docs] def release(self) -> None: """Release a token back to the jobserver.""" # The last job to quit has an implicit token, so don't release if we have none. if self.tokens_acquired == 0: return os.write(self.w, b"+") self.tokens_acquired -= 1
[docs] def close(self) -> None: # Remove the FIFO if we created it. if self.created and self.fifo_path: try: os.unlink(self.fifo_path) except OSError: pass try: os.rmdir(os.path.dirname(self.fifo_path)) except OSError: pass # TODO: implement a sanity check here: # 1. did we release all tokens we acquired? # 2. if we created the jobserver, did the children return all tokens? self.r_conn.close() self.w_conn.close()
[docs] def start_build( spec: spack.spec.Spec, explicit: bool, mirrors: List[spack.url_buildcache.MirrorURLAndVersion], unsigned: Optional[bool], install_policy: InstallPolicy, dirty: bool, keep_stage: bool, restage: bool, overwrite: bool, keep_prefix: bool, skip_patch: bool, jobserver: JobServer, ) -> ChildInfo: """Start a new build.""" # Create pipes for the child's output, state reporting, and control. state_r_conn, state_w_conn = Pipe(duplex=False) output_r_conn, output_w_conn = Pipe(duplex=False) control_r_conn, control_w_conn = Pipe(duplex=False) # Obtain the MAKEFLAGS to be set in the child process, and determine whether it's necessary # for the child process to inherit our jobserver fds. gmake = next(iter(spec.dependencies("gmake")), None) makeflags = jobserver.makeflags(gmake) fifo = "--jobserver-auth=fifo:" in makeflags proc = Process( target=worker_function, args=( spec, explicit, mirrors, unsigned, install_policy, dirty, keep_stage, restage, overwrite, keep_prefix, skip_patch, state_w_conn, output_w_conn, control_r_conn, makeflags, None if fifo else jobserver.r_conn, None if fifo else jobserver.w_conn, spack.store.STORE, spack.config.CONFIG, ), ) proc.start() # The parent process does not need the write ends of the main pipes or the read end of control. state_w_conn.close() output_w_conn.close() control_r_conn.close() # Set the read ends to non-blocking: in principle redundant with epoll/kqueue, but safer. os.set_blocking(output_r_conn.fileno(), False) os.set_blocking(state_r_conn.fileno(), False) return ChildInfo(proc, spec, output_r_conn, state_r_conn, control_w_conn, explicit)
[docs] def get_jobserver_config(makeflags: Optional[str] = None) -> Optional[Union[str, Tuple[int, int]]]: """Parse MAKEFLAGS for jobserver. Either it's a FIFO or (r, w) pair of file descriptors. Args: makeflags: MAKEFLAGS string to parse. If None, reads from os.environ. """ makeflags = os.environ.get("MAKEFLAGS", "") if makeflags is None else makeflags if not makeflags: return None # We can have the following flags: # --jobserver-fds=R,W (before GNU make 4.2) # --jobserver-auth=fifo:PATH or --jobserver-auth=R,W (after GNU make 4.2) # In case of multiple, the last one wins. matches = re.findall(r" --jobserver-[^=]+=([^ ]+)", makeflags) if not matches: return None last_match: str = matches[-1] assert isinstance(last_match, str) if last_match.startswith("fifo:"): return last_match[5:] parts = last_match.split(",", 1) if len(parts) != 2: return None try: return int(parts[0]), int(parts[1]) except ValueError: return None
[docs] def create_jobserver_fifo(num_jobs: int) -> Tuple[int, int, str]: """Create a new jobserver FIFO with the specified number of job tokens.""" tmpdir = tempfile.mkdtemp() fifo_path = os.path.join(tmpdir, "jobserver_fifo") try: os.mkfifo(fifo_path, 0o600) read_fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK) write_fd = os.open(fifo_path, os.O_WRONLY) # write num_jobs - 1 tokens, because the first job is implicit os.write(write_fd, b"+" * (num_jobs - 1)) return read_fd, write_fd, fifo_path except Exception: try: os.unlink(fifo_path) except OSError as e: spack.llnl.util.tty.debug(f"Failed to remove POSIX jobserver FIFO: {e}", level=3) pass try: os.rmdir(tmpdir) except OSError as e: spack.llnl.util.tty.debug(f"Failed to remove POSIX jobserver FIFO dir: {e}", level=3) pass raise
[docs] def open_existing_jobserver_fifo(fifo_path: str) -> Optional[Tuple[int, int]]: """Open an existing jobserver FIFO for reading and writing.""" try: read_fd = os.open(fifo_path, os.O_RDONLY | os.O_NONBLOCK) write_fd = os.open(fifo_path, os.O_WRONLY) return read_fd, write_fd except OSError: return None
[docs] class FdInfo: """Information about a file descriptor mapping.""" __slots__ = ("pid", "name") def __init__(self, pid: int, name: str) -> None: self.pid = pid self.name = name
[docs] class BuildInfo: """Information about a package being built.""" __slots__ = ( "state", "explicit", "version", "hash", "name", "external", "prefix", "finished_time", "progress_percent", "control_w_conn", ) def __init__(self, spec: spack.spec.Spec, explicit: bool, control_w_conn: Connection) -> None: self.state: str = "starting" self.explicit: bool = explicit self.version: str = str(spec.version) self.hash: str = spec.dag_hash(7) self.name: str = spec.name self.external: bool = spec.external self.prefix: str = spec.prefix self.finished_time: Optional[float] = None self.progress_percent: Optional[int] = None self.control_w_conn = control_w_conn
[docs] class BuildStatus: """Tracks the build status display for terminal output.""" def __init__( self, total: int, stdout: io.TextIOWrapper = sys.stdout, # type: ignore[assignment] get_terminal_size: Callable[[], Tuple[int, int]] = os.get_terminal_size, get_time: Callable[[], float] = time.monotonic, is_tty: Optional[bool] = None, ) -> None: #: Ordered dict of build ID -> info self.total = total self.completed = 0 self.builds: Dict[str, BuildInfo] = {} self.finished_builds: List[BuildInfo] = [] self.spinner_chars = ["|", "/", "-", "\\"] self.spinner_index = 0 self.dirty = True # Start dirty to draw initial state self.active_area_rows = 0 self.total_lines = 0 self.next_spinner_update = 0.0 self.next_update = 0.0 self.overview_mode = True # Whether to draw the package overview self.tracked_build_id = "" # identifier of the package whose logs we follow self.search_term = "" self.search_mode = False self.stdout = stdout self.get_terminal_size = get_terminal_size self.get_time = get_time self.is_tty = is_tty if is_tty is not None else self.stdout.isatty()
[docs] def add_build(self, spec: spack.spec.Spec, explicit: bool, control_w_conn: Connection) -> None: """Add a new build to the display and mark the display as dirty.""" self.builds[spec.dag_hash()] = BuildInfo(spec, explicit, control_w_conn) self.dirty = True
[docs] def toggle(self) -> None: """Toggle between overview mode and following a specific build.""" if self.overview_mode: self.next() else: self.active_area_rows = 0 self.search_term = "" self.search_mode = False self.overview_mode = True self.dirty = True try: os.write(self.builds[self.tracked_build_id].control_w_conn.fileno(), b"0") except (KeyError, OSError): pass self.tracked_build_id = ""
[docs] def search_input(self, input: str) -> None: """Handle keyboard input when in search mode""" if input in ("\r", "\n"): self.next(1) elif input == "\x1b": # Escape self.search_mode = False self.search_term = "" self.dirty = True elif input in ("\x7f", "\b"): # Backspace self.search_term = self.search_term[:-1] self.dirty = True elif input.isprintable(): self.search_term += input self.dirty = True
def _is_displayed(self, build: BuildInfo) -> bool: """Returns true if the build matches the search term, or when no search term is set.""" # When not in search mode, the search_term is "", which always evaluates to True below return self.search_term in build.name or build.hash.startswith(self.search_term) def _get_next(self, direction: int) -> Optional[str]: """Returns the next or previous unfinished build ID matching the search term, or None if none found. Direction should be 1 for next, -1 for previous.""" matching = [ build_id for build_id, build in self.builds.items() if build.finished_time is None and self._is_displayed(build) ] if not matching: return None try: idx = matching.index(self.tracked_build_id) except ValueError: return matching[0] if direction == 1 else matching[-1] return matching[(idx + direction) % len(matching)]
[docs] def next(self, direction: int = 1) -> None: """Follow the logs of the next build in the list.""" new_build_id = self._get_next(direction) if not new_build_id or self.tracked_build_id == new_build_id: return new_build = self.builds[new_build_id] if self.overview_mode: self.overview_mode = False # Stop following the previous and start following the new build. if self.tracked_build_id: try: os.write(self.builds[self.tracked_build_id].control_w_conn.fileno(), b"0") except (KeyError, OSError): pass self.tracked_build_id = new_build_id # Tell the user we're following new logs, and instruct the child to start sending them. self.stdout.write( f"\n==> Following logs of {new_build.name}" f"\033[0;36m@{new_build.version}\033[0m\n" ) self.stdout.flush() try: os.write(new_build.control_w_conn.fileno(), b"1") except (KeyError, OSError): pass
[docs] def update_state(self, build_id: str, state: str) -> None: """Update the state of a package and mark the display as dirty.""" build_info = self.builds[build_id] build_info.state = state build_info.progress_percent = None if state in ("finished", "failed"): self.completed += 1 build_info.finished_time = self.get_time() + CLEANUP_TIMEOUT if build_id == self.tracked_build_id and not self.overview_mode: self.toggle() self.dirty = True # For non-TTY output, print state changes immediately without colors if not self.is_tty: self.stdout.write( f"{build_info.hash} {build_info.name}@{build_info.version}: {state}\n" ) self.stdout.flush()
[docs] def update_progress(self, build_id: str, current: int, total: int) -> None: """Update the progress of a package and mark the display as dirty.""" percent = int((current / total) * 100) build_info = self.builds[build_id] if build_info.progress_percent != percent: build_info.progress_percent = percent self.dirty = True
[docs] def update(self, finalize: bool = False) -> None: """Redraw the interactive display.""" if not self.is_tty or not self.overview_mode: return now = self.get_time() # Avoid excessive redraws if not finalize and now < self.next_update: return # Only update the spinner if there are still running packages if now >= self.next_spinner_update and any( pkg.finished_time is None for pkg in self.builds.values() ): self.spinner_index = (self.spinner_index + 1) % len(self.spinner_chars) self.dirty = True self.next_spinner_update = now + SPINNER_INTERVAL for build_id in list(self.builds): build_info = self.builds[build_id] if build_info.state == "failed" or build_info.finished_time is None: continue if finalize or now >= build_info.finished_time: self.finished_builds.append(build_info) del self.builds[build_id] self.dirty = True if not self.dirty: return # Build the overview output in a buffer and print all at once to avoid flickering. buffer = io.StringIO() # Move cursor up to the start of the display area if self.active_area_rows > 0: buffer.write(f"\033[{self.active_area_rows}F") max_width, max_height = self.get_terminal_size() self.total_lines = 0 total_finished = len(self.finished_builds) # First flush the finished builds. These are "persisted" in terminal history. for build in self.finished_builds: self._render_build(build, buffer, max_width) self.finished_builds.clear() # Then a header followed by the active builds. This is the "mutable" part of the display. long_header_len = len( f"Progress: {self.completed}/{self.total} /: filter v: logs n/p: next/prev" ) if long_header_len < max_width: self._println( buffer, f"\033[1mProgress:\033[0m {self.completed}/{self.total}" " \033[36m/\033[0m: filter \033[36mv\033[0m: logs" " \033[36mn\033[0m/\033[36mp\033[0m: next/prev", ) else: self._println(buffer, f"\033[1mProgress:\033[0m {self.completed}/{self.total}") displayed_builds = ( [b for b in self.builds.values() if self._is_displayed(b)] if self.search_term else self.builds.values() ) len_builds = len(displayed_builds) # Truncate if we have more builds than fit on the screen. In that case we have to reserve # an additional line for the "N more..." message. truncate_at = max_height - 3 if len_builds + 2 > max_height else len_builds for i, build in enumerate(displayed_builds, 1): if i > truncate_at: self._println(buffer, f"{len_builds - i + 1} more...") break self._render_build(build, buffer, max_width) if self.search_mode: buffer.write(f"filter> {self.search_term}\033[K") # Clear any remaining lines from previous display buffer.write("\033[0J") # Print everything at once to avoid flickering self.stdout.write(buffer.getvalue()) self.stdout.flush() # Update the number of lines drawn for next time. It reflects the number of active builds. self.active_area_rows = self.total_lines - total_finished self.dirty = False # Schedule next UI update self.next_update = now + SPINNER_INTERVAL / 2
def _println(self, buffer: io.StringIO, line: str = "") -> None: """Print a line to the buffer, handling line clearing and cursor movement.""" self.total_lines += 1 if line: buffer.write(line) if self.total_lines > self.active_area_rows: buffer.write("\033[0m\033[K\n") # reset, clear to EOL, newline else: buffer.write("\033[0m\033[K\033[1E") # reset, clear to EOL, move down 1 line
[docs] def print_logs(self, build_id: str, data: bytes) -> None: # Discard logs we are not following. Generally this should not happen as we tell the child # to only send logs when we are following it. It could maybe happen while transitioning # between builds. if self.overview_mode or build_id != self.tracked_build_id: return # TODO: drop initial bytes from data until first newline (?) self.stdout.buffer.write(data) self.stdout.flush()
def _render_build(self, build_info: BuildInfo, buffer: io.StringIO, max_width: int) -> None: line_width = 0 for component in self._generate_line_components(build_info): # ANSI escape sequence(s), does not contribute to width if not component.startswith("\033"): line_width += len(component) if line_width > max_width: break buffer.write(component) self._println(buffer) def _generate_line_components(self, build_info: BuildInfo) -> Generator[str, None, None]: """Yield formatted line components for a package. Escape sequences are yielded as separate strings so they do not contribute to the line width.""" if build_info.external: indicator = "[e]" elif build_info.state == "finished": indicator = "[+]" elif build_info.state == "failed": indicator = "[x]" else: indicator = f"[{self.spinner_chars[self.spinner_index]}]" if build_info.state == "failed": yield "\033[31m" # red elif build_info.state == "finished": yield "\033[32m" # green yield indicator yield "\033[0m" # reset yield " " yield "\033[0;90m" # dark gray yield build_info.hash yield "\033[0m" # reset yield " " # Package name in bold white if explicit, default otherwise if build_info.explicit: yield "\033[1;37m" # bold white yield build_info.name yield "\033[0m" # reset else: yield build_info.name yield "\033[0;36m" # cyan yield f"@{build_info.version}" yield "\033[0m" # reset # progress or state if build_info.progress_percent is not None: yield " fetching" yield f": {build_info.progress_percent}%" elif build_info.state == "finished": yield f" {build_info.prefix}" else: yield f" {build_info.state}"
Nodes = Dict[str, spack.spec.Spec] Edges = Dict[str, Set[str]]
[docs] class BuildGraph: """Represents the dependency graph for package installation.""" def __init__( self, specs: List[spack.spec.Spec], root_policy: InstallPolicy, dependencies_policy: InstallPolicy, include_build_deps: bool, install_package: bool, install_deps: bool, database: spack.database.Database, overwrite_set: Optional[Set[str]] = None, ): """Construct a build graph from the given specs. This includes only packages that need to be installed. Installed packages are pruned from the graph, and build dependencies are only included when necessary.""" self.roots = {s.dag_hash() for s in specs} self.nodes = {s.dag_hash(): s for s in specs} self.parent_to_child: Dict[str, Set[str]] = {} self.child_to_parent: Dict[str, Set[str]] = {} overwrite_set = overwrite_set or set() specs_to_prune: Set[str] = set() stack: List[Tuple[spack.spec.Spec, InstallPolicy]] = [ (s, root_policy) for s in self.nodes.values() ] with database.read_transaction(): # Set the install prefix for each spec based on the db record or store layout for s in spack.traverse.traverse_nodes(specs): _, record = database.query_by_spec_hash(s.dag_hash()) if record and record.path: s.set_prefix(record.path) else: s.set_prefix(spack.store.STORE.layout.path_for_spec(s)) # Build the graph and determine which specs to prune while stack: spec, install_policy = stack.pop() key = spec.dag_hash() _, record = database.query_by_spec_hash(key) # Conditionally include build dependencies if record and record.installed and key not in overwrite_set: specs_to_prune.add(key) dependencies = spec.dependencies(deptype=dt.LINK | dt.RUN) elif install_policy == "cache_only" and not include_build_deps: dependencies = spec.dependencies(deptype=dt.LINK | dt.RUN) else: dependencies = spec.dependencies(deptype=dt.BUILD | dt.LINK | dt.RUN) self.parent_to_child[key] = {d.dag_hash() for d in dependencies} # Enqueue new dependencies for d in dependencies: if d.dag_hash() in self.nodes: continue self.nodes[d.dag_hash()] = d stack.append((d, dependencies_policy)) # Construct reverse lookup from child to parent for parent, children in self.parent_to_child.items(): for child in children: if child in self.child_to_parent: self.child_to_parent[child].add(parent) else: self.child_to_parent[child] = {parent} # If we're not installing the package itself, mark root specs for pruning too if not install_package: specs_to_prune.update(s.dag_hash() for s in specs) # Prune specs from the build graph. Their parents become parents of their children and # their children become children of their parents. for key in specs_to_prune: for parent in self.child_to_parent.get(key, ()): self.parent_to_child[parent].remove(key) self.parent_to_child[parent].update(self.parent_to_child.get(key, ())) for child in self.parent_to_child.get(key, ()): self.child_to_parent[child].remove(key) self.child_to_parent[child].update(self.child_to_parent.get(key, ())) self.parent_to_child.pop(key, None) self.child_to_parent.pop(key, None) self.nodes.pop(key, None) # If we're not installing dependencies, verify that all remaining nodes in the build graph # after pruning are roots. If there are any non-root nodes, it means there are uninstalled # dependencies that we're not supposed to install. if not install_deps: non_root_spec = next((v for k, v in self.nodes.items() if k not in self.roots), None) if non_root_spec is not None: raise spack.error.InstallError( f"Failed to install in package only mode: dependency {non_root_spec} is not " "installed" )
[docs] def enqueue_parents(self, dag_hash: str, pending_builds: List[str]) -> None: """After a spec is installed, remove it from the graph and enqueue any parents that are now ready to install. Args: dag_hash: The dag_hash of the spec that was just installed pending_builds: List to append parent specs that are ready to build """ # Remove node and edges from the node in the build graph self.parent_to_child.pop(dag_hash, None) self.nodes.pop(dag_hash, None) parents = self.child_to_parent.pop(dag_hash, None) if not parents: return # Enqueue any parents and remove edges to the installed child for parent in parents: children = self.parent_to_child[parent] children.remove(dag_hash) if not children: pending_builds.append(parent)
[docs] class PackageInstaller: def __init__( self, packages: List["spack.package_base.PackageBase"], *, dirty: bool = False, explicit: Union[Set[str], bool] = False, overwrite: Optional[Union[List[str], Set[str]]] = None, fail_fast: bool = False, fake: bool = False, include_build_deps: bool = False, install_deps: bool = True, install_package: bool = True, install_source: bool = False, keep_prefix: bool = False, keep_stage: bool = False, restage: bool = True, skip_patch: bool = False, stop_at: Optional[str] = None, stop_before: Optional[str] = None, tests: Union[bool, List[str], Set[str]] = False, unsigned: Optional[bool] = None, verbose: bool = False, concurrent_packages: Optional[int] = None, root_policy: InstallPolicy = "auto", dependencies_policy: InstallPolicy = "auto", ) -> None: assert install_package or install_deps, "Must install package, dependencies or both" if fail_fast: raise NotImplementedError("Fail-fast installs are not implemented") elif fake: raise NotImplementedError("Fake installs are not implemented") elif install_source: raise NotImplementedError("Installing sources is not implemented") elif stop_at is not None: raise NotImplementedError("Stopping at an install phase is not implemented") elif stop_before is not None: raise NotImplementedError("Stopping before an install phase is not implemented") elif tests is not False: raise NotImplementedError("Tests during install are not implemented") # verbose and concurrent_packages are not worth erroring out for specs = [pkg.spec for pkg in packages] self.root_policy: InstallPolicy = root_policy self.dependencies_policy: InstallPolicy = dependencies_policy self.include_build_deps = include_build_deps #: Set of DAG hashes to overwrite (if already installed) self.overwrite: Set[str] = set(overwrite) if overwrite else set() self.keep_prefix = keep_prefix # Buffer for incoming, partially received state data from child processes self.state_buffers: Dict[int, str] = {} # Build the dependency graph self.build_graph = BuildGraph( specs, root_policy, dependencies_policy, include_build_deps, install_package, install_deps, spack.store.STORE.db, self.overwrite, ) #: check what specs we could fetch from binaries (checks against cache, not remotely) spack.binary_distribution.BINARY_INDEX.update() self.binary_cache_for_spec = { s.dag_hash(): spack.binary_distribution.BINARY_INDEX.find_by_hash(s.dag_hash()) for s in self.build_graph.nodes.values() } self.unsigned = unsigned self.dirty = dirty self.restage = restage self.keep_stage = keep_stage self.skip_patch = skip_patch #: queue of packages ready to install (no children) self.pending_builds = [ parent for parent, children in self.build_graph.parent_to_child.items() if not children ] if explicit is True: self.explicit = {spec.dag_hash() for spec in specs} elif explicit is False: self.explicit = set() else: self.explicit = explicit self.running_builds: Dict[int, ChildInfo] = {} self.build_status = BuildStatus(len(self.build_graph.nodes)) self.jobs = spack.config.determine_number_of_jobs(parallel=True) self.reports: Dict[str, spack.report.RequestRecord] = {}
[docs] def install(self) -> None: # This installer has not implemented the per-spec exclusive locks during installation. # Instead, take an exclusive lock on the entire range to avoid that other Spack install # process start installing the same specs. lock = spack.util.lock.Lock( str(spack.store.STORE.prefix_locker.lock_path), desc="prefix lock" ) lock.acquire_write() try: self._installer() finally: lock.release_write()
def _installer(self) -> None: jobserver = JobServer(self.jobs) # Set stdin to non-blocking for key press detection if sys.stdin.isatty(): old_stdin_settings = termios.tcgetattr(sys.stdin) tty.setcbreak(sys.stdin.fileno()) else: old_stdin_settings = None selector = selectors.DefaultSelector() selector.register(sys.stdin.fileno(), selectors.EVENT_READ, "stdin") # Setup the database write lock. TODO: clean this up if isinstance(spack.store.STORE.db.lock, spack.util.lock.Lock): spack.store.STORE.db.lock._ensure_parent_directory() spack.store.STORE.db.lock._file = spack.llnl.util.lock.FILE_TRACKER.get_fh( spack.store.STORE.db.lock.path ) to_insert_in_database: List[ChildInfo] = [] failures: List[spack.spec.Spec] = [] try: # Start the first job immediately, as it does not require a jobserver token. if self.pending_builds and not self.running_builds: self._start(selector, jobserver) while self.pending_builds or self.running_builds or to_insert_in_database: # Only monitor the jobserver if we have pending builds. if self.pending_builds and jobserver.r not in selector.get_map(): selector.register(jobserver.r, selectors.EVENT_READ, "jobserver") elif not self.pending_builds and jobserver.r in selector.get_map(): selector.unregister(jobserver.r) jobserver_token_available = False stdin_ready = False events = selector.select(timeout=SPINNER_INTERVAL) finished_pids = [] for key, _ in events: data = key.data if isinstance(data, FdInfo): # Child output (logs and state updates) child_info = self.running_builds[data.pid] if data.name == "output": self._handle_child_logs(key.fd, child_info, selector) elif data.name == "state": self._handle_child_state(key.fd, child_info, selector) elif data.name == "sentinel": finished_pids.append(data.pid) elif data == "jobserver": jobserver_token_available = True elif data == "stdin": stdin_ready = True for pid in finished_pids: build = self.running_builds.pop(pid) jobserver.release() build.cleanup(selector) if build.proc.exitcode == 0: to_insert_in_database.append(build) self.build_status.update_state(build.spec.dag_hash(), "finished") else: failures.append(build.spec) self.build_status.update_state(build.spec.dag_hash(), "failed") if stdin_ready: try: char = sys.stdin.read(1) except OSError: continue overview = self.build_status.overview_mode if overview and self.build_status.search_mode: self.build_status.search_input(char) elif overview and char == "/": self.build_status.enter_search() elif char == "v" or char in ("q", "\x1b") and not overview: self.build_status.toggle() elif char == "n": self.build_status.next(1) elif char == "p" or char == "N": self.build_status.next(-1) # Flush installed packages to the database and enqueue any parents that are now # ready. if to_insert_in_database and self._save_to_db(to_insert_in_database): for entry in to_insert_in_database: self.build_graph.enqueue_parents( entry.spec.dag_hash(), self.pending_builds ) to_insert_in_database.clear() # Again, the first job should start immediately and does not require a token. if self.pending_builds and not self.running_builds: self._start(selector, jobserver) # For the rest we try to obtain tokens from the jobserver. if self.pending_builds and jobserver_token_available: # Then we try to schedule as many jobs as we can acquire tokens for. max_new_jobs = len(self.pending_builds) for _ in range(jobserver.acquire(max_new_jobs)): self._start(selector, jobserver) # Finally update the UI self.build_status.update() except KeyboardInterrupt: # Cleanup running builds. for child in self.running_builds.values(): child.proc.join() raise finally: # Restore terminal settings if old_stdin_settings: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_stdin_settings) # Clean up resources # Final cleanup of any remaining finished packages before exit self.build_status.overview_mode = True self.build_status.update(finalize=True) selector.close() jobserver.close() if failures: lines = [f"{s}: {s.package.log_path}" for s in failures] raise spack.error.InstallError( "The following packages failed to install:\n" + "\n".join(lines) ) def _save_to_db(self, to_insert_in_database: List[ChildInfo]) -> bool: db = spack.store.STORE.db try: # Only try to get the lock once (non-blocking). If it fails, try it next time. if db.lock.acquire_write(timeout=1e-9): db._read() except spack.util.lock.LockTimeoutError: return False try: for entry in to_insert_in_database: db._add(entry.spec, explicit=entry.explicit) return True finally: db.lock.release_write(db._write) def _start(self, selector: selectors.BaseSelector, jobserver: JobServer) -> None: dag_hash = self.pending_builds.pop() explicit = dag_hash in self.explicit spec = self.build_graph.nodes[dag_hash] is_develop = spec.is_develop child_info = start_build( spec, explicit=explicit, mirrors=self.binary_cache_for_spec[dag_hash], unsigned=self.unsigned, install_policy=( self.root_policy if dag_hash in self.build_graph.roots else self.dependencies_policy ), dirty=self.dirty, # keep_stage/restage logic taken from installer.py keep_stage=self.keep_stage or is_develop, restage=self.restage and not is_develop, overwrite=dag_hash in self.overwrite, keep_prefix=self.keep_prefix, skip_patch=self.skip_patch, jobserver=jobserver, ) pid = child_info.proc.pid assert type(pid) is int self.running_builds[pid] = child_info selector.register( child_info.output_r_conn.fileno(), selectors.EVENT_READ, FdInfo(pid, "output") ) selector.register( child_info.state_r_conn.fileno(), selectors.EVENT_READ, FdInfo(pid, "state") ) selector.register(child_info.proc.sentinel, selectors.EVENT_READ, FdInfo(pid, "sentinel")) self.build_status.add_build( child_info.spec, explicit=explicit, control_w_conn=child_info.control_w_conn ) def _handle_child_logs( self, r_fd: int, child_info: ChildInfo, selector: selectors.BaseSelector ) -> None: """Handle reading output logs from a child process pipe.""" try: # There might be more data than OUTPUT_BUFFER_SIZE, but we will read that in the next # iteration of the event loop to keep things responsive. data = os.read(r_fd, OUTPUT_BUFFER_SIZE) except OSError: data = None if not data: # EOF or error try: selector.unregister(r_fd) except KeyError: pass return self.build_status.print_logs(child_info.spec.dag_hash(), data) def _handle_child_state( self, r_fd: int, child_info: ChildInfo, selector: selectors.BaseSelector ) -> None: """Handle reading state updates from a child process pipe.""" try: # There might be more data than OUTPUT_BUFFER_SIZE, but we will read that in the next # iteration of the event loop to keep things responsive. data = os.read(r_fd, OUTPUT_BUFFER_SIZE) except OSError: data = None if not data: # EOF or error try: selector.unregister(r_fd) except KeyError: pass self.state_buffers.pop(r_fd, None) return # Append new data to the buffer for this fd and process it buffer = self.state_buffers.get(r_fd, "") + data.decode(errors="replace") lines = buffer.split("\n") # The last element of split() will be a partial line or an empty string. # We store it back in the buffer for the next read. self.state_buffers[r_fd] = lines.pop() for line in lines: if not line: continue message = json.loads(line) if "state" in message: self.build_status.update_state(child_info.spec.dag_hash(), message["state"]) elif "progress" in message and "total" in message: self.build_status.update_progress( child_info.spec.dag_hash(), message["progress"], message["total"] )