# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""The pty module handles pseudo-terminals.
Currently, the infrastructure here is only used to test llnl.util.tty.log.
If this is used outside a testing environment, we will want to reconsider
things like timeouts in ``ProcessController.wait()``, which are set to
get tests done quickly, not to avoid high CPU usage.
Note: The functionality in this module is unsupported on Windows
"""
from __future__ import print_function
import multiprocessing
import os
import re
import signal
import sys
import time
import traceback
import llnl.util.tty.log as log
from spack.util.executable import which
termios = None
try:
import termios as term_mod
termios = term_mod
except ImportError:
pass
[docs]class ProcessController(object):
"""Wrapper around some fundamental process control operations.
This allows one process (the controller) to drive another (the
minion) similar to the way a shell would, by sending signals and I/O.
"""
def __init__(self, pid, controller_fd,
timeout=1, sleep_time=1e-1, debug=False):
"""Create a controller to manipulate the process with id ``pid``
Args:
pid (int): id of process to control
controller_fd (int): controller fd attached to pid's stdin
timeout (int): time in seconds for wait operations to time out
(default 1 second)
sleep_time (int): time to sleep after signals, to control the
signal rate of the controller (default 1e-1)
debug (bool): whether ``horizontal_line()`` and ``status()`` should
produce output when called (default False)
``sleep_time`` allows the caller to insert delays after calls
that signal or modify the controlled process. Python behaves very
poorly if signals arrive too fast, and drowning a Python process
with a Python handler with signals can kill the process and hang
our tests, so we throttle this a closer-to-interactive rate.
"""
self.pid = pid
self.pgid = os.getpgid(pid)
self.controller_fd = controller_fd
self.timeout = timeout
self.sleep_time = sleep_time
self.debug = debug
# we need the ps command to wait for process statuses
self.ps = which("ps", required=True)
[docs] def get_canon_echo_attrs(self):
"""Get echo and canon attributes of the terminal of controller_fd."""
cfg = termios.tcgetattr(self.controller_fd)
return (
bool(cfg[3] & termios.ICANON),
bool(cfg[3] & termios.ECHO),
)
[docs] def horizontal_line(self, name):
"""Labled horizontal line for debugging."""
if self.debug:
sys.stderr.write(
"------------------------------------------- %s\n" % name
)
[docs] def status(self):
"""Print debug message with status info for the minion."""
if self.debug:
canon, echo = self.get_canon_echo_attrs()
sys.stderr.write("canon: %s, echo: %s\n" % (
"on" if canon else "off",
"on" if echo else "off",
))
sys.stderr.write("input: %s\n" % self.input_on())
sys.stderr.write("bg: %s\n" % self.background())
sys.stderr.write("\n")
[docs] def background(self):
"""True if pgid is in a background pgroup of controller_fd's tty."""
return self.pgid != os.tcgetpgrp(self.controller_fd)
[docs] def tstp(self):
"""Send SIGTSTP to the controlled process."""
self.horizontal_line("tstp")
os.killpg(self.pgid, signal.SIGTSTP)
time.sleep(self.sleep_time)
[docs] def cont(self):
self.horizontal_line("cont")
os.killpg(self.pgid, signal.SIGCONT)
time.sleep(self.sleep_time)
[docs] def fg(self):
self.horizontal_line("fg")
with log.ignore_signal(signal.SIGTTOU):
os.tcsetpgrp(self.controller_fd, os.getpgid(self.pid))
time.sleep(self.sleep_time)
[docs] def bg(self):
self.horizontal_line("bg")
with log.ignore_signal(signal.SIGTTOU):
os.tcsetpgrp(self.controller_fd, os.getpgrp())
time.sleep(self.sleep_time)
[docs] def write(self, byte_string):
self.horizontal_line("write '%s'" % byte_string.decode("utf-8"))
os.write(self.controller_fd, byte_string)
[docs] def wait(self, condition):
start = time.time()
while (((time.time() - start) < self.timeout) and not condition()):
time.sleep(1e-2)
assert condition()
[docs] def wait_enabled(self):
self.wait(lambda: self.input_on() and not self.background())
[docs] def wait_disabled(self):
self.wait(lambda: not self.input_on() and self.background())
[docs] def wait_disabled_fg(self):
self.wait(lambda: not self.input_on() and not self.background())
[docs] def proc_status(self):
status = self.ps("-p", str(self.pid), "-o", "stat", output=str)
status = re.split(r"\s+", status.strip(), re.M)
return status[1]
[docs] def wait_stopped(self):
self.wait(lambda: "T" in self.proc_status())
[docs] def wait_running(self):
self.wait(lambda: "T" not in self.proc_status())
[docs]class PseudoShell(object):
"""Sets up controller and minion processes with a PTY.
You can create a ``PseudoShell`` if you want to test how some
function responds to terminal input. This is a pseudo-shell from a
job control perspective; ``controller_function`` and ``minion_function``
are set up with a pseudoterminal (pty) so that the controller can drive
the minion through process control signals and I/O.
The two functions should have signatures like this::
def controller_function(proc, ctl, **kwargs)
def minion_function(**kwargs)
``controller_function`` is spawned in its own process and passed three
arguments:
proc
the ``multiprocessing.Process`` object representing the minion
ctl
a ``ProcessController`` object tied to the minion
kwargs
keyword arguments passed from ``PseudoShell.start()``.
``minion_function`` is only passed ``kwargs`` delegated from
``PseudoShell.start()``.
The ``ctl.controller_fd`` will have its ``controller_fd`` connected to
``sys.stdin`` in the minion process. Both processes will share the
same ``sys.stdout`` and ``sys.stderr`` as the process instantiating
``PseudoShell``.
Here are the relationships between processes created::
._________________________________________________________.
| Minion Process | pid 2
| - runs minion_function | pgroup 2
|_________________________________________________________| session 1
^
| create process with controller_fd connected to stdin
| stdout, stderr are the same as caller
._________________________________________________________.
| Controller Process | pid 1
| - runs controller_function | pgroup 1
| - uses ProcessController and controller_fd to | session 1
| control minion |
|_________________________________________________________|
^
| create process
| stdin, stdout, stderr are the same as caller
._________________________________________________________.
| Caller | pid 0
| - Constructs, starts, joins PseudoShell | pgroup 0
| - provides controller_function, minion_function | session 0
|_________________________________________________________|
"""
def __init__(self, controller_function, minion_function):
self.proc = None
self.controller_function = controller_function
self.minion_function = minion_function
# these can be optionally set to change defaults
self.controller_timeout = 1
self.sleep_time = 0
[docs] def start(self, **kwargs):
"""Start the controller and minion processes.
Arguments:
kwargs (dict): arbitrary keyword arguments that will be
passed to controller and minion functions
The controller process will create the minion, then call
``controller_function``. The minion process will call
``minion_function``.
"""
self.proc = multiprocessing.Process(
target=PseudoShell._set_up_and_run_controller_function,
args=(self.controller_function, self.minion_function,
self.controller_timeout, self.sleep_time),
kwargs=kwargs,
)
self.proc.start()
[docs] def join(self):
"""Wait for the minion process to finish, and return its exit code."""
self.proc.join()
return self.proc.exitcode
@staticmethod
def _set_up_and_run_minion_function(
tty_name, stdout_fd, stderr_fd, ready, minion_function, **kwargs):
"""Minion process wrapper for PseudoShell.
Handles the mechanics of setting up a PTY, then calls
``minion_function``.
"""
# new process group, like a command or pipeline launched by a shell
os.setpgrp()
# take controlling terminal and set up pty IO
stdin_fd = os.open(tty_name, os.O_RDWR)
os.dup2(stdin_fd, sys.stdin.fileno())
os.dup2(stdout_fd, sys.stdout.fileno())
os.dup2(stderr_fd, sys.stderr.fileno())
os.close(stdin_fd)
if kwargs.get("debug"):
sys.stderr.write(
"minion: stdin.isatty(): %s\n" % sys.stdin.isatty())
# tell the parent that we're really running
if kwargs.get("debug"):
sys.stderr.write("minion: ready!\n")
ready.value = True
try:
minion_function(**kwargs)
except BaseException:
traceback.print_exc()
@staticmethod
def _set_up_and_run_controller_function(
controller_function, minion_function, controller_timeout,
sleep_time, **kwargs):
"""Set up a pty, spawn a minion process, execute controller_function.
Handles the mechanics of setting up a PTY, then calls
``controller_function``.
"""
os.setsid() # new session; this process is the controller
controller_fd, minion_fd = os.openpty()
pty_name = os.ttyname(minion_fd)
# take controlling terminal
pty_fd = os.open(pty_name, os.O_RDWR)
os.close(pty_fd)
ready = multiprocessing.Value('i', False)
minion_process = multiprocessing.Process(
target=PseudoShell._set_up_and_run_minion_function,
args=(pty_name, sys.stdout.fileno(), sys.stderr.fileno(),
ready, minion_function),
kwargs=kwargs,
)
minion_process.start()
# wait for subprocess to be running and connected.
while not ready.value:
time.sleep(1e-5)
pass
if kwargs.get("debug"):
sys.stderr.write("pid: %d\n" % os.getpid())
sys.stderr.write("pgid: %d\n" % os.getpgrp())
sys.stderr.write("sid: %d\n" % os.getsid(0))
sys.stderr.write("tcgetpgrp: %d\n" % os.tcgetpgrp(controller_fd))
sys.stderr.write("\n")
minion_pgid = os.getpgid(minion_process.pid)
sys.stderr.write("minion pid: %d\n" % minion_process.pid)
sys.stderr.write("minion pgid: %d\n" % minion_pgid)
sys.stderr.write(
"minion sid: %d\n" % os.getsid(minion_process.pid))
sys.stderr.write("\n")
sys.stderr.flush()
# set up controller to ignore SIGTSTP, like a shell
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
# call the controller function once the minion is ready
try:
controller = ProcessController(
minion_process.pid, controller_fd, debug=kwargs.get("debug"))
controller.timeout = controller_timeout
controller.sleep_time = sleep_time
error = controller_function(minion_process, controller, **kwargs)
except BaseException:
error = 1
traceback.print_exc()
minion_process.join()
# return whether either the parent or minion failed
return error or minion_process.exitcode