Source code for spack.new_installer_base

# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

"""Abstract base classes for new_installer:
TUI terminal state, IPC channels, and job scheduling."""

import abc
import codecs
import io
import os
import re
import selectors
import socket
import sys
import threading
from multiprocessing.connection import Connection
from typing import TYPE_CHECKING, Any, Callable, Optional, Tuple, Union

import spack.spec

if TYPE_CHECKING:
    from spack.new_installer import BuildStatus

# Inter-process communication type
if sys.platform == "win32":
    IpcChannel = socket.socket
else:
    IpcChannel = Connection

#: Size of the output buffer for child processes
OUTPUT_BUFFER_SIZE = 32768


[docs] class StdinReader: """Non-blocking stdin reading with UTF-8 decoding, on top of a platform-specific function that reads raw bytes. Raw bytes are read from the backing file descriptor or socket for stdin (instead of the TextIOWrapper) to avoid double buffering issues: the event loop triggers when the fd is ready to read, and if we do a partial read from the TextIOWrapper, it will likely drain the fd and buffer the remainder internally, which the event loop is not aware of, and user input doesn't come through.""" def __init__(self, read_raw: Callable[[], bytes]) -> None: #: Platform-specific function that reads available raw bytes from stdin self.read_raw = read_raw #: Handle multi-byte UTF-8 characters self.decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") #: For stripping out arrow and navigation keys self.ansi_escape_re = re.compile(r"\x1b\[[0-9;]*[A-Za-z~]") def _decode(self, raw: bytes) -> str: return self.ansi_escape_re.sub("", self.decoder.decode(raw))
[docs] def read(self) -> str: try: return self._decode(self.read_raw()) except OSError: return ""
[docs] class BaseTerminalState(abc.ABC): """Abstract base for platform-specific terminal state management.""" def __init__( self, selector: selectors.BaseSelector, build_status: "BuildStatus", on_suspend: Optional[Callable[[], None]] = None, on_resume: Optional[Callable[[], None]] = None, ) -> None: self.selector = selector self.build_status = build_status self.on_suspend = on_suspend self.on_resume = on_resume
[docs] @classmethod def stdout_is_interactive(cls) -> bool: return sys.stdout.isatty()
[docs] @classmethod def stdin_is_interactive(cls) -> bool: return sys.stdin.isatty()
[docs] @abc.abstractmethod def create_stdin_reader(self) -> StdinReader: pass
[docs] @abc.abstractmethod def setup(self) -> None: pass
[docs] @abc.abstractmethod def teardown_input(self) -> None: """Restore input settings and signal handlers. Called before the final UI render.""" pass
[docs] def teardown_output(self) -> None: """Restore output settings. Called after the final UI render."""
[docs] def teardown(self) -> None: self.teardown_input() self.teardown_output()
[docs] @abc.abstractmethod def enter_foreground(self) -> None: pass
[docs] @abc.abstractmethod def drain_sigwinch(self) -> None: """Drain the platform-specific sigwinch notification channel.""" pass
# The methods below are job-control hooks: a platform with suspend/resume support (SIGTSTP/ # SIGCONT on POSIX) overrides them to transition between foreground and headless mode. The # defaults are for platforms without job control, where the process never goes headless after # setup().
[docs] def enter_background(self) -> None: pass
[docs] def handle_continue(self) -> None: pass
[docs] def should_enter_foreground(self) -> bool: """Return True if the process should switch from headless to foreground mode.""" return False
[docs] class FdInfo: """Information about a file descriptor mapping.""" __slots__ = ("pid", "name") def __init__(self, pid: int, name: str) -> None: self.pid = pid self.name = name
[docs] class ProcessExitNotifier(abc.ABC): """Selector-watchable handle that becomes readable when the child process exits.""" @property @abc.abstractmethod def fileobj(self) -> Union[int, socket.socket]: """Object/fd to register with the selector to detect process exit."""
[docs] def close(self) -> None: """Release any resources. Default: nothing to release."""
[docs] class JobServerBase(abc.ABC): """Abstract base for controlling build concurrency.""" def __init__(self, num_jobs: int) -> None: #: The number of jobs to run concurrently self.num_jobs = num_jobs #: The target number of jobs to run concurrently, which may differ from num_jobs if the #: user has requested a decrease in parallelism, but we haven't consumed enough tokens to #: reflect that yet. This value is used in the UI. The value self.target_jobs can only be #: modified if Spack owns the jobserver, and not when it's attached to a parent jobserver. self.target_jobs = num_jobs
[docs] def has_target_parallelism(self) -> bool: return self.num_jobs == self.target_jobs
[docs] @abc.abstractmethod def makeflags_and_data(self, gmake: Optional[spack.spec.Spec]) -> Tuple[Optional[str], Any]: """Return a tuple of (makeflags, data) to be passed to the child process. The makeflags are meant to be set in the child process's environment, and the data is implementation specific data serialized and sent to the child process for jobserver support."""
[docs] @abc.abstractmethod def update_selector(self, selector: selectors.BaseSelector, wake: bool) -> None: """Listen or stop listening for jobserver events on the given selector."""
[docs] @abc.abstractmethod def increase_parallelism(self) -> None: """Increase the target parallelism by one."""
[docs] @abc.abstractmethod def decrease_parallelism(self) -> None: """Decrease the target parallelism by one."""
[docs] @abc.abstractmethod def acquire(self, jobs: int) -> int: """Try and acquire at most 'jobs' tokens from the jobserver. Returns the number of tokens actually acquired (may be less than requested, or zero)."""
[docs] @abc.abstractmethod def release(self) -> None: """Release a token back to the jobserver."""
[docs] @abc.abstractmethod def close(self) -> None: """Close any resources associated with the jobserver."""
[docs] class NoopJobServer(JobServerBase): """Dummy jobserver for platforms lacking jobserver support."""
[docs] def makeflags_and_data(self, gmake: Optional[spack.spec.Spec]) -> Tuple[Optional[str], Any]: return (None, None)
[docs] def update_selector(self, selector: selectors.BaseSelector, wake: bool) -> None: ...
[docs] def increase_parallelism(self) -> None: ...
[docs] def decrease_parallelism(self) -> None: ...
[docs] def acquire(self, jobs: int) -> int: return 0
[docs] def release(self) -> None: ...
[docs] def close(self) -> None: ...
[docs] class Tee(abc.ABC): """Emulates ./build 2>&1 | tee build.log. Output is sent to a log file and the parent process (if echoing is enabled). The control socket is used to enable/disable echoing.""" def __init__(self, control: IpcChannel, parent: IpcChannel, log_path: str) -> None: self.control = control self.parent = parent # sys.stdout and sys.stderr may have been replaced with file objects under pytest, so # redirect their file descriptors in addition to the original fds 1 and 2. fds = {sys.stdout.fileno(), sys.stderr.fileno(), 1, 2} self.saved_fds = {fd: os.dup(fd) for fd in fds} #: The path of the log file self.log_path = log_path log_file = open(self.log_path, "ab") r, w = os.pipe() self.tee_thread = threading.Thread(target=self.run, args=(r, log_file), daemon=True) self.tee_thread.start() for fd in fds: os.dup2(w, fd) self._setup_handles() os.close(w) def _setup_handles(self) -> None: pass def _restore_handles(self) -> None: pass
[docs] @abc.abstractmethod def run(self, log_r: int, log_file: io.BufferedWriter) -> None: """Read from log_r, write to log_file; echo to parent when enabled. Runs in a thread.""" pass
[docs] def close(self) -> None: # Closing stdout and stderr should close the last reference to the write end of the pipe, # causing the tee thread to wake up, flush the last data, and exit. We restore stdout and # stderr, because between sys.exit and the actual process exit buffers may be flushed, and # can cause exit code 120 (witnessed under pytest+coverage on macOS). sys.stdout.flush() sys.stderr.flush() for fd, saved_fd in self.saved_fds.items(): os.dup2(saved_fd, fd) os.close(saved_fd) self.tee_thread.join() # Only then close the other fds. self.control.close() self.parent.close() self._restore_handles()