# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Utility classes for logging the output of blocks of code."""
import atexit
import ctypes
import errno
import io
import multiprocessing
import os
import re
import select
import signal
import sys
import traceback
from contextlib import contextmanager
from multiprocessing.connection import Connection
from threading import Thread
from typing import IO, Callable, Dict, List, Optional, TextIO, Tuple
import spack.llnl.util.tty as tty
if sys.platform == "win32":
import ctypes.wintypes as wintypes
import msvcrt
kernel32 = ctypes.windll.kernel32
kernel32.GetStdHandle.argtypes = [wintypes.DWORD]
kernel32.GetStdHandle.restype = wintypes.HANDLE
kernel32.SetStdHandle.argtypes = [wintypes.DWORD, wintypes.HANDLE]
# https://docs.microsoft.com/en-us/windows/console/getstdhandle
WIN_STD_OUTPUT_HANDLE = -11
WIN_STD_ERROR_HANDLE = -12
try:
import termios
except ImportError:
termios = None # type: ignore[assignment]
esc, bell, lbracket, bslash, newline = r"\x1b", r"\x07", r"\[", r"\\", r"\n"
# Ansi Control Sequence Introducers (CSI) are a well-defined format
# Standard ECMA-48: Control Functions for Character-Imaging I/O Devices, section 5.4
# https://www.ecma-international.org/wp-content/uploads/ECMA-48_5th_edition_june_1991.pdf
csi_pre = f"{esc}{lbracket}"
csi_param, csi_inter, csi_post = r"[0-?]", r"[ -/]", r"[@-~]"
ansi_csi = f"{csi_pre}{csi_param}*{csi_inter}*{csi_post}"
# General ansi escape sequences have well-defined prefixes,
# but content and suffixes are less reliable.
# Conservatively assume they end with either "<ESC>\" or "<BELL>",
# with no intervening "<ESC>"/"<BELL>" keys or newlines
esc_pre = f"{esc}[@-_]"
esc_content = f"[^{esc}{bell}{newline}]"
esc_post = f"(?:{esc}{bslash}|{bell})"
ansi_esc = f"{esc_pre}{esc_content}*{esc_post}"
# Use this to strip escape sequences
_escape = re.compile(f"{ansi_csi}|{ansi_esc}")
# control characters for enabling/disabling echo
#
# We use control characters to ensure that echo enable/disable are inline
# with the other output. We always follow these with a newline to ensure
# one per line the following newline is ignored in output.
xon, xoff = "\x11\n", "\x13\n"
control = re.compile("(\x11\n|\x13\n)")
[docs]
@contextmanager
def ignore_signal(signum):
"""Context manager to temporarily ignore a signal."""
old_handler = signal.signal(signum, signal.SIG_IGN)
try:
yield
finally:
signal.signal(signum, old_handler)
def _is_background_tty(stdin: IO[str]) -> bool:
"""True if the stream is a tty and calling process is in the background."""
return stdin.isatty() and os.getpgrp() != os.tcgetpgrp(stdin.fileno())
def _strip(line: str) -> str:
"""Strip color and control characters from a line."""
return _escape.sub("", line)
[docs]
class preserve_terminal_settings:
"""Context manager to preserve terminal settings on a stream.
Stores terminal settings before the context and ensures they are restored after.
Ensures that things like echo and canonical line mode are not left disabled if
terminal settings in the context are not properly restored.
"""
def __init__(self, stdin: Optional[IO[str]]) -> None:
"""Create a context manager that preserves terminal settings on a stream.
Args:
stream: keyboard input stream, typically sys.stdin
"""
self.stdin = stdin
def _restore_default_terminal_settings(self) -> None:
"""Restore the original input configuration on ``self.stdin``."""
# Can be called in foreground or background. When called in the background, tcsetattr
# triggers SIGTTOU, which we must ignore, or the process will be stopped.
assert self.stdin is not None and self.old_cfg is not None and termios is not None
with ignore_signal(signal.SIGTTOU):
termios.tcsetattr(self.stdin, termios.TCSANOW, self.old_cfg)
def __enter__(self) -> "preserve_terminal_settings":
"""Store terminal settings."""
self.old_cfg = None
# Ignore all this if the input stream is not a tty.
if not self.stdin or not self.stdin.isatty() or not termios:
return self
# save old termios settings to restore later
self.old_cfg = termios.tcgetattr(self.stdin)
# add an atexit handler to ensure the terminal is restored
atexit.register(self._restore_default_terminal_settings)
return self
def __exit__(self, exc_type, exception, traceback):
"""If termios was available, restore old settings."""
if self.old_cfg:
self._restore_default_terminal_settings()
atexit.unregister(self._restore_default_terminal_settings)
[docs]
class Unbuffered:
"""Wrapper for Python streams that forces them to be unbuffered.
This is implemented by forcing a flush after each write.
"""
def __init__(self, stream):
self.stream = stream
[docs]
def write(self, data):
self.stream.write(data)
self.stream.flush()
[docs]
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
[docs]
def log_output(*args, **kwargs):
"""Context manager that logs its output to a file.
In the simplest case, the usage looks like this::
with log_output('logfile.txt'):
# do things ... output will be logged
Any output from the with block will be redirected to ``logfile.txt``.
If you also want the output to be echoed to ``stdout``, use the
``echo`` parameter::
with log_output('logfile.txt', echo=True):
# do things ... output will be logged and printed out
The following is available on Unix only. No-op on Windows.
And, if you just want to echo *some* stuff from the parent, use
``force_echo``::
with log_output('logfile.txt', echo=False) as logger:
# do things ... output will be logged
with logger.force_echo():
# things here will be echoed *and* logged
See individual log classes for more information.
This method is actually a factory serving a per platform
(unix vs windows) log_output class
"""
if sys.platform == "win32":
return threadlog(*args, **kwargs)
else:
return nixlog(*args, **kwargs)
@contextmanager
def _force_echo(active: bool):
"""Bracket a region with in-band ``xon``/``xoff`` so it is echoed even when echo is off.
We use these control characters rather than, say, a separate pipe, because they're in-band and
assured to appear exactly before and after the text we want to echo.
"""
if not active:
raise RuntimeError("Can't call force_echo() outside log_output region!")
sys.stdout.write(xon)
sys.stdout.flush()
try:
yield
finally:
sys.stdout.write(xoff)
sys.stdout.flush()
[docs]
def redirect_stdio(write_fd: int) -> Dict[int, int]:
"""Redirect the stdout/stderr file descriptors to ``write_fd``, returning the saved fds.
Flushes ``sys.stdout``/``sys.stderr`` first so anything buffered goes to the original
streams. ``sys.stdout`` and ``sys.stderr`` may have been replaced with file objects (e.g.
under pytest), so their file descriptors are redirected in addition to the original 1 and 2.
"""
sys.stdout.flush()
sys.stderr.flush()
saved_fds = {}
for fd in {sys.stdout.fileno(), sys.stderr.fileno(), 1, 2}:
saved_fds[fd] = os.dup(fd)
os.dup2(write_fd, fd)
return saved_fds
[docs]
def restore_stdio(saved_fds: Dict[int, int]) -> None:
"""Undo ``redirect_stdio``: flush and restore the original file descriptors."""
sys.stdout.flush()
sys.stderr.flush()
for fd, saved_fd in saved_fds.items():
os.dup2(saved_fd, fd)
os.close(saved_fd)
def _process_line(
line: str,
log_file: IO[str],
stdout: TextIO,
echo: bool,
force_echo: bool,
filter_fn: Optional[Callable[[str], str]],
) -> bool:
"""Write one line of output to ``log_file``, echoing it to ``stdout`` if enabled.
In-band ``xon``/``xoff`` control characters are stripped from the line, and ansi escape
sequences are additionally stripped from what goes to ``log_file``. Flushing is left to the
caller. Returns the new force-echo state, as toggled by ``xon``/``xoff`` in ``line``.
"""
# find control characters and strip them.
clean_line, num_controls = control.subn("", line)
# Echo to stdout if requested or forced.
if echo or force_echo:
output_line = filter_fn(clean_line) if filter_fn else clean_line
enc = stdout.encoding
if enc != "utf-8":
# On Python 3.6 and 3.7-3.14 with non-{utf-8,C} locale stdout may not be able to
# handle utf-8 output. We do an inefficient dance of re-encoding with errors
# replaced, so stdout.write does not raise.
output_line = output_line.encode(enc, "replace").decode(enc)
stdout.write(output_line)
# Stripped output to log file.
log_file.write(_strip(clean_line))
if num_controls > 0:
return force_echo_on(force_echo, control.findall(line))
return force_echo
[docs]
class nixlog:
"""
Under the hood, we spawn a daemon and set up a pipe between this
process and the daemon. The daemon writes our output to both the
file and to stdout (if echoing). The parent process can communicate
with the daemon to tell it when and when not to echo; this is what
force_echo does. You can also enable/disable echoing by typing ``v``.
We use OS-level file descriptors to do the redirection, which
redirects output for subprocesses and system calls.
"""
def __init__(
self, filename: str, echo=False, debug=0, buffer=False, filter_fn=None, append=False
):
"""Create a new output log context manager.
Args:
filename (str): path to file where output should be logged
echo (bool): whether to echo output in addition to logging it
debug (int): positive to enable tty debug mode during logging
buffer (bool): pass buffer=True to skip unbuffering output; note
this doesn't set up any *new* buffering
filter_fn (callable, optional): Callable[str] -> str to filter each
line of output
append (bool): whether to append to file ('a' mode)
The filename will be opened and closed entirely within ``__enter__``
and ``__exit__``.
By default, we unbuffer sys.stdout and sys.stderr because the
logger will include output from executed programs and from python
calls. If stdout and stderr are buffered, their output won't be
printed in the right place w.r.t. output from commands.
Logger daemon is not started until ``__enter__()``.
"""
self.filename = filename
self.echo = echo
self.debug = debug
self.buffer = buffer
self.filter_fn = filter_fn
self.append = append
self._active = False # used to prevent re-entry
def __enter__(self):
if self._active:
raise RuntimeError("Can't re-enter the same log_output!")
# record parent color settings before redirecting. We do this
# because color output depends on whether the *original* stdout
# is a TTY. New stdout won't be a TTY so we force colorization.
self._saved_color = tty.color._force_color
forced_color = tty.color.get_color_when()
# also record parent debug settings -- in case the logger is
# forcing debug output.
self._saved_debug = tty._debug
# Pipe for redirecting output to logger
read_fd, self.write_fd = multiprocessing.Pipe(duplex=False)
# Pipe for communication back from the daemon
# Currently only used to save echo value between uses
self.parent_pipe, child_pipe = multiprocessing.Pipe(duplex=False)
stdin_fd = None
stdout_fd = None
try:
# need to pass this b/c multiprocessing closes stdin in child.
try:
if sys.stdin.isatty():
stdin_fd = Connection(os.dup(sys.stdin.fileno()))
except BaseException:
# just don't forward input if this fails
pass
# If our process has redirected stdout after the forkserver was started, we need to
# make the forked processes use the new file descriptors.
if multiprocessing.get_start_method() == "forkserver":
stdout_fd = Connection(os.dup(sys.stdout.fileno()))
self.process = multiprocessing.Process(
target=_writer_daemon,
args=(
stdin_fd,
stdout_fd,
read_fd,
self.write_fd,
self.echo,
self.filename,
self.append,
child_pipe,
self.filter_fn,
),
)
self.process.daemon = True # must set before start()
self.process.start()
finally:
if stdin_fd:
stdin_fd.close()
if stdout_fd:
stdout_fd.close()
read_fd.close()
# Now do the actual output redirection. We use OS-level file descriptors, as this
# redirects output for subprocesses and system calls.
self._redirected_fds = redirect_stdio(self.write_fd.fileno())
self.write_fd.close()
# Unbuffer stdout and stderr at the Python level
if not self.buffer:
sys.stdout = Unbuffered(sys.stdout)
sys.stderr = Unbuffered(sys.stderr)
# Force color and debug settings now that we have redirected.
tty.color.set_color_when(forced_color)
tty._debug = self.debug
# track whether we're currently inside this log_output
self._active = True
# return this log_output object so that the user can do things
# like temporarily echo some output.
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Flush buffered output to the logger daemon and restore the original fds.
restore_stdio(self._redirected_fds)
# recover and store echo settings from the child before it dies
try:
self.echo = self.parent_pipe.recv()
except EOFError:
# This may occur if some exception prematurely terminates the
# _writer_daemon. An exception will have already been generated.
pass
# now that the write pipe is closed (in this __exit__, when we restore
# stdout with dup2), the logger daemon process loop will terminate. We
# wait for that here.
self.process.join()
# restore old color and debug settings
tty.color._force_color = self._saved_color
tty._debug = self._saved_debug
self._active = False # safe to enter again
[docs]
def force_echo(self):
"""Context manager to force local echo, even if echo is off."""
return _force_echo(self._active)
def _thread_log_writer(
read_fd: int,
logfile: str,
stdout: io.TextIOWrapper,
append: bool,
echo: bool,
filter_fn: Optional[Callable],
):
"""Writer loop for the thread-based ``threadlog``. Reads lines from ``read_fd`` until EOF and
hands them to ``_process_line``. Runs in a thread, so it spawns no process."""
force_echo = False
read_file = os.fdopen(read_fd, "r", encoding="utf-8", errors="replace", buffering=1)
try:
with open(logfile, mode="a" if append else "w", encoding="utf-8") as log_file:
while True:
line = read_file.readline(4096)
if not line:
break
force_echo = _process_line(line, log_file, stdout, echo, force_echo, filter_fn)
log_file.flush()
stdout.flush()
except Exception as e:
tty.error(f"Exception in log writer thread! {e}", stream=stdout)
traceback.print_exc(file=stdout)
finally:
read_file.close()
stdout.close()
[docs]
class threadlog:
"""Thread-based logger that logs to a file and optionally echoes to the terminal.
Keeps ``nixlog``'s in-band ``xon``/``xoff`` protocol -- which is what makes ``force_echo`` and
output ordering exact -- but runs the writer in a *thread* instead of a *process*, keeping the
logged code free of ``multiprocessing`` machinery (e.g. lazily started forkserver processes).
Unlike ``nixlog`` it does not read stdin for the ``v`` echo toggle; that is left to the
topmost logger.
"""
def __init__(
self, filename: str, echo=False, debug=0, buffer=False, filter_fn=None, append=False
):
self.filename = filename
self.echo = echo
self.debug = debug
self.buffer = buffer
self.filter_fn = filter_fn
self.append = append
self._active = False
def __enter__(self):
if self._active:
raise RuntimeError("Can't re-enter the same log_output!")
# Record parent color/debug settings before redirecting. Colorization depends on whether
# the *original* stdout is a TTY; the redirected stdout won't be, so force colorization.
self._saved_color = tty.color._force_color
forced_color = tty.color.get_color_when()
self._saved_debug = tty._debug
read_fd, write_fd = os.pipe()
# Dup the original stdout so the writer thread can still echo to the terminal after the
# redirection below replaces fds 1/2.
echo_writer = os.fdopen(os.dup(sys.stdout.fileno()), "w", encoding="utf-8", newline="\n")
self._active = True
self._thread = Thread(
target=_thread_log_writer,
args=(read_fd, self.filename, echo_writer, self.append, self.echo, self.filter_fn),
)
self._thread.start()
self._saved_fds = redirect_stdio(write_fd)
os.close(write_fd)
if sys.platform == "win32":
# Point the console std handles at the pipe too: native subprocesses inherit those
# rather than fds 1/2.
self._saved_handles = (
kernel32.GetStdHandle(WIN_STD_OUTPUT_HANDLE),
kernel32.GetStdHandle(WIN_STD_ERROR_HANDLE),
)
h_write = msvcrt.get_osfhandle(1)
os.set_handle_inheritable(h_write, True)
kernel32.SetStdHandle(WIN_STD_OUTPUT_HANDLE, h_write)
kernel32.SetStdHandle(WIN_STD_ERROR_HANDLE, h_write)
# Re-create sys.stdout/stderr now that their fds are pipes, so Python picks FileIO
# (WriteFile) rather than ConsoleIO (WriteConsoleW fails on pipe handles).
self._saved_streams = (sys.stdout, sys.stderr)
sys.stdout = os.fdopen(
sys.stdout.fileno(), "w", encoding="utf-8", buffering=1, closefd=False
)
sys.stderr = os.fdopen(
sys.stderr.fileno(), "w", encoding="utf-8", buffering=1, closefd=False
)
if not self.buffer:
sys.stdout = Unbuffered(sys.stdout)
sys.stderr = Unbuffered(sys.stderr)
tty.color.set_color_when(forced_color)
tty._debug = self.debug
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Restoring the fds closes the last write ends of the pipe, so the writer thread reaches
# EOF and returns.
restore_stdio(self._saved_fds)
self._thread.join()
if sys.platform == "win32":
kernel32.SetStdHandle(WIN_STD_OUTPUT_HANDLE, self._saved_handles[0])
kernel32.SetStdHandle(WIN_STD_ERROR_HANDLE, self._saved_handles[1])
sys.stdout, sys.stderr = self._saved_streams
tty.color._force_color = self._saved_color
tty._debug = self._saved_debug
self._active = False
[docs]
def force_echo(self):
"""Context manager to force local echo, even if echo is off."""
return _force_echo(self._active)
def _writer_daemon(
stdin_fd: Optional[Connection],
stdout_fd: Optional[Connection],
read_fd: Connection,
write_fd: Connection,
echo: bool,
log_filename: str,
append: bool,
control_fd: Connection,
filter_fn: Optional[Callable[[str], str]],
) -> None:
"""Daemon used by ``log_output`` to write to a log file and to ``stdout``.
The daemon receives output from the parent process and writes it both
to a log and, optionally, to ``stdout``. The relationship looks like
this::
Terminal
|
| +-------------------------+
| | Parent Process |
+--------> | with log_output(): |
| stdin | ... |
| +-------------------------+
| ^ | write_fd (parent's redirected stdout)
| | control |
| | pipe |
| | v read_fd
| +-------------------------+ stdout
| | Writer daemon |------------>
+--------> | read from read_fd | log_file
stdin | write to out and log |------------>
+-------------------------+
Within the ``log_output`` handler, the parent's output is redirected
to a pipe from which the daemon reads. The daemon writes each line
from the pipe to a log file and (optionally) to ``stdout``. The user
can hit ``v`` to toggle output on ``stdout``.
In addition to the input and output file descriptors, the daemon
interacts with the parent via ``control_pipe``. It reports whether
``stdout`` was enabled or disabled when it finished.
Arguments:
stdin_fd: optional input from the terminal
read_fd: pipe for reading from parent's redirected stdout
echo: initial echo setting -- controlled by user and preserved across multiple writer
daemons
log_filename: filename where output should be logged
append: whether to append to the file or overwrite it
control_pipe: multiprocessing pipe on which to send control information to the parent
filter_fn: optional function to filter each line of output
"""
# This process depends on closing all instances of write_pipe to terminate the reading loop
write_fd.close()
# 1. Use line buffering (3rd param = 1) since Python 3 has a bug
# that prevents unbuffered text I/O. [needs citation]
# 2. Enforce a UTF-8 interpretation of build process output with errors replaced by '?'.
# The downside is that the log file will not contain the exact output of the build process.
# 3. closefd=False because Connection has "ownership"
read_file = os.fdopen(
read_fd.fileno(), "r", 1, encoding="utf-8", errors="replace", closefd=False
)
if stdin_fd:
stdin_file = os.fdopen(stdin_fd.fileno(), closefd=False)
else:
stdin_file = None
if stdout_fd:
os.dup2(stdout_fd.fileno(), sys.stdout.fileno())
stdout_fd.close()
# list of streams to select from
istreams = [read_file, stdin_file] if stdin_file else [read_file]
force_echo = False # parent can force echo for certain output
log_file = open(log_filename, mode="a" if append else "w", encoding="utf-8")
try:
with keyboard_input(stdin_file) as kb:
while True:
# fix the terminal settings if we recently came to
# the foreground
kb.check_fg_bg()
# wait for input from any stream. use a coarse timeout to
# allow other checks while we wait for input
rlist, _, _ = select.select(istreams, [], [], 0.1)
# Allow user to toggle echo with 'v' key.
# Currently ignores other chars.
# only read stdin if we're in the foreground
if stdin_file and stdin_file in rlist and not _is_background_tty(stdin_file):
# it's possible to be backgrounded between the above
# check and the read, so we ignore SIGTTIN here.
with ignore_signal(signal.SIGTTIN):
try:
if stdin_file.read(1) == "v":
echo = not echo
except OSError as e:
# If SIGTTIN is ignored, the system gives EIO
# to let the caller know the read failed b/c it
# was in the bg. Ignore that too.
if e.errno != errno.EIO:
raise
if read_file in rlist:
line_count = 0
try:
while line_count < 100:
# Handle output from the calling process.
line = read_file.readline()
if not line:
return
line_count += 1
force_echo = _process_line(
line, log_file, sys.stdout, echo, force_echo, filter_fn
)
if not _input_available(read_file):
break
finally:
if line_count > 0:
if echo or force_echo:
sys.stdout.flush()
log_file.flush()
except BaseException:
tty.error("Exception occurred in writer daemon!")
traceback.print_exc()
finally:
log_file.close()
read_fd.close()
if stdin_fd:
stdin_fd.close()
# send echo value back to the parent so it can be preserved.
control_fd.send(echo)
[docs]
def force_echo_on(force_echo: bool, controls: List[str]):
if xon in controls:
return True
if xoff in controls:
return False
return force_echo
def _input_available(f):
return f in select.select([f], [], [], 0)[0]