Source code for llnl.util.tty

# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

from __future__ import unicode_literals

import contextlib
import os
import struct
import sys
import textwrap
import traceback
from datetime import datetime
from sys import platform as _platform

import six
from six import StringIO
from six.moves import input

if _platform != "win32":
    import fcntl
    import termios

from llnl.util.tty.color import cescape, clen, cprint, cwrite

# Globals
_debug = 0
_verbose = False
_stacktrace = False
_timestamp = False
_msg_enabled = True
_warn_enabled = True
_error_enabled = True
_output_filter = lambda s: s
indent = "  "

[docs]def debug_level(): return _debug
[docs]def is_verbose(): return _verbose
[docs]def is_debug(level=1): return _debug >= level
[docs]def is_stacktrace(): return _stacktrace
[docs]def set_debug(level=0): global _debug assert level >= 0, 'Debug level must be a positive value' _debug = level
[docs]def set_verbose(flag): global _verbose _verbose = flag
[docs]def set_timestamp(flag): global _timestamp _timestamp = flag
[docs]def set_msg_enabled(flag): global _msg_enabled _msg_enabled = flag
[docs]def set_warn_enabled(flag): global _warn_enabled _warn_enabled = flag
[docs]def set_error_enabled(flag): global _error_enabled _error_enabled = flag
[docs]def msg_enabled(): return _msg_enabled
[docs]def warn_enabled(): return _warn_enabled
[docs]def error_enabled(): return _error_enabled
[docs]@contextlib.contextmanager def output_filter(filter_fn): """Context manager that applies a filter to all output.""" global _output_filter saved_filter = _output_filter try: _output_filter = filter_fn yield finally: _output_filter = saved_filter
[docs]class SuppressOutput: """Class for disabling output in a scope using 'with' keyword""" def __init__(self, msg_enabled=True, warn_enabled=True, error_enabled=True): self._msg_enabled_initial = _msg_enabled self._warn_enabled_initial = _warn_enabled self._error_enabled_initial = _error_enabled self._msg_enabled = msg_enabled self._warn_enabled = warn_enabled self._error_enabled = error_enabled def __enter__(self): set_msg_enabled(self._msg_enabled) set_warn_enabled(self._warn_enabled) set_error_enabled(self._error_enabled) def __exit__(self, exc_type, exc_val, exc_tb): set_msg_enabled(self._msg_enabled_initial) set_warn_enabled(self._warn_enabled_initial) set_error_enabled(self._error_enabled_initial)
[docs]def set_stacktrace(flag): global _stacktrace _stacktrace = flag
[docs]def process_stacktrace(countback): """Gives file and line frame 'countback' frames from the bottom""" st = traceback.extract_stack() # Not all entries may be spack files, we have to remove those that aren't. file_list = [] for frame in st: # Check that the file is a spack file if frame[0].find(os.path.sep + "spack") >= 0: file_list.append(frame[0]) # We use commonprefix to find what the spack 'root' directory is. root_dir = os.path.commonprefix(file_list) root_len = len(root_dir) st_idx = len(st) - countback - 1 st_text = "%s:%i " % (st[st_idx][0][root_len:], st[st_idx][1]) return st_text
[docs]def show_pid(): return is_debug(2)
[docs]def get_timestamp(force=False): """Get a string timestamp""" if _debug or _timestamp or force: # Note inclusion of the PID is useful for parallel builds. pid = ', {0}'.format(os.getpid()) if show_pid() else '' return '[{0}{1}] '.format("%Y-%m-%d-%H:%M:%S.%f"), pid) else: return ''
[docs]def msg(message, *args, **kwargs): if not msg_enabled(): return if isinstance(message, Exception): message = "%s: %s" % (message.__class__.__name__, str(message)) newline = kwargs.get('newline', True) st_text = "" if _stacktrace: st_text = process_stacktrace(2) if newline: cprint( "@*b{%s==>} %s%s" % ( st_text, get_timestamp(), cescape(_output_filter(message)) ) ) else: cwrite( "@*b{%s==>} %s%s" % ( st_text, get_timestamp(), cescape(_output_filter(message)) ) ) for arg in args: print(indent + _output_filter(six.text_type(arg)))
[docs]def info(message, *args, **kwargs): if isinstance(message, Exception): message = "%s: %s" % (message.__class__.__name__, str(message)) format = kwargs.get('format', '*b') stream = kwargs.get('stream', sys.stdout) wrap = kwargs.get('wrap', False) break_long_words = kwargs.get('break_long_words', False) st_countback = kwargs.get('countback', 3) st_text = "" if _stacktrace: st_text = process_stacktrace(st_countback) cprint( "@%s{%s==>} %s%s" % ( format, st_text, get_timestamp(), cescape(_output_filter(six.text_type(message))) ), stream=stream ) for arg in args: if wrap: lines = textwrap.wrap( _output_filter(six.text_type(arg)), initial_indent=indent, subsequent_indent=indent, break_long_words=break_long_words ) for line in lines: stream.write(line + '\n') else: stream.write( indent + _output_filter(six.text_type(arg)) + '\n' )
[docs]def verbose(message, *args, **kwargs): if _verbose: kwargs.setdefault('format', 'c') info(message, *args, **kwargs)
[docs]def debug(message, *args, **kwargs): level = kwargs.get('level', 1) if is_debug(level): kwargs.setdefault('format', 'g') kwargs.setdefault('stream', sys.stderr) info(message, *args, **kwargs)
[docs]def error(message, *args, **kwargs): if not error_enabled(): return kwargs.setdefault('format', '*r') kwargs.setdefault('stream', sys.stderr) info("Error: " + six.text_type(message), *args, **kwargs)
[docs]def warn(message, *args, **kwargs): if not warn_enabled(): return kwargs.setdefault('format', '*Y') kwargs.setdefault('stream', sys.stderr) info("Warning: " + six.text_type(message), *args, **kwargs)
[docs]def die(message, *args, **kwargs): kwargs.setdefault('countback', 4) error(message, *args, **kwargs) sys.exit(1)
[docs]def get_number(prompt, **kwargs): default = kwargs.get('default', None) abort = kwargs.get('abort', None) if default is not None and abort is not None: prompt += ' (default is %s, %s to abort) ' % (default, abort) elif default is not None: prompt += ' (default is %s) ' % default elif abort is not None: prompt += ' (%s to abort) ' % abort number = None while number is None: msg(prompt, newline=False) ans = input() if ans == six.text_type(abort): return None if ans: try: number = int(ans) if number < 1: msg("Please enter a valid number.") number = None except ValueError: msg("Please enter a valid number.") elif default is not None: number = default return number
[docs]def get_yes_or_no(prompt, **kwargs): default_value = kwargs.get('default', None) if default_value is None: prompt += ' [y/n] ' elif default_value is True: prompt += ' [Y/n] ' elif default_value is False: prompt += ' [y/N] ' else: raise ValueError( "default for get_yes_no() must be True, False, or None.") result = None while result is None: msg(prompt, newline=False) ans = input().lower() if not ans: result = default_value if result is None: print("Please enter yes or no.") else: if ans == 'y' or ans == 'yes': result = True elif ans == 'n' or ans == 'no': result = False return result
[docs]def hline(label=None, **kwargs): """Draw a labeled horizontal line. Keyword Arguments: char (str): Char to draw the line with. Default '-' max_width (int): Maximum width of the line. Default is 64 chars. """ char = kwargs.pop('char', '-') max_width = kwargs.pop('max_width', 64) if kwargs: raise TypeError( "'%s' is an invalid keyword argument for this function." % next(kwargs.iterkeys())) rows, cols = terminal_size() if not cols: cols = max_width else: cols -= 2 cols = min(max_width, cols) label = six.text_type(label) prefix = char * 2 + " " suffix = " " + (cols - len(prefix) - clen(label)) * char out = StringIO() out.write(prefix) out.write(label) out.write(suffix) print(out.getvalue())
[docs]def terminal_size(): """Gets the dimensions of the console: (rows, cols).""" if _platform != "win32": def ioctl_gwinsz(fd): try: rc = struct.unpack('hh', fcntl.ioctl( fd, termios.TIOCGWINSZ, '1234')) except BaseException: return return rc rc = ioctl_gwinsz(0) or ioctl_gwinsz(1) or ioctl_gwinsz(2) if not rc: try: fd =, os.O_RDONLY) rc = ioctl_gwinsz(fd) os.close(fd) except BaseException: pass if not rc: rc = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(rc[0]), int(rc[1]) else: if sys.version_info[0] < 3: raise RuntimeError("Terminal size not obtainable on Windows with a\ Python version older than 3") rc = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80)) return int(rc[0]), int(rc[1])