# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
from __future__ import unicode_literals
import contextlib
import os
import struct
import sys
import textwrap
import traceback
from datetime import datetime
from sys import platform as _platform
import six
from six import StringIO
from six.moves import input
if _platform != "win32":
import fcntl
import termios
from llnl.util.tty.color import cescape, clen, cprint, cwrite
# Globals
_debug = 0
_verbose = False
_stacktrace = False
_timestamp = False
_msg_enabled = True
_warn_enabled = True
_error_enabled = True
_output_filter = lambda s: s
indent = " "
[docs]def debug_level():
return _debug
[docs]def is_verbose():
return _verbose
[docs]def is_debug(level=1):
return _debug >= level
[docs]def is_stacktrace():
return _stacktrace
[docs]def set_debug(level=0):
global _debug
assert level >= 0, 'Debug level must be a positive value'
_debug = level
[docs]def set_verbose(flag):
global _verbose
_verbose = flag
[docs]def set_timestamp(flag):
global _timestamp
_timestamp = flag
[docs]def set_msg_enabled(flag):
global _msg_enabled
_msg_enabled = flag
[docs]def set_warn_enabled(flag):
global _warn_enabled
_warn_enabled = flag
[docs]def set_error_enabled(flag):
global _error_enabled
_error_enabled = flag
[docs]def msg_enabled():
return _msg_enabled
[docs]def warn_enabled():
return _warn_enabled
[docs]def error_enabled():
return _error_enabled
[docs]@contextlib.contextmanager
def output_filter(filter_fn):
"""Context manager that applies a filter to all output."""
global _output_filter
saved_filter = _output_filter
try:
_output_filter = filter_fn
yield
finally:
_output_filter = saved_filter
[docs]class SuppressOutput:
"""Class for disabling output in a scope using 'with' keyword"""
def __init__(self,
msg_enabled=True,
warn_enabled=True,
error_enabled=True):
self._msg_enabled_initial = _msg_enabled
self._warn_enabled_initial = _warn_enabled
self._error_enabled_initial = _error_enabled
self._msg_enabled = msg_enabled
self._warn_enabled = warn_enabled
self._error_enabled = error_enabled
def __enter__(self):
set_msg_enabled(self._msg_enabled)
set_warn_enabled(self._warn_enabled)
set_error_enabled(self._error_enabled)
def __exit__(self, exc_type, exc_val, exc_tb):
set_msg_enabled(self._msg_enabled_initial)
set_warn_enabled(self._warn_enabled_initial)
set_error_enabled(self._error_enabled_initial)
[docs]def set_stacktrace(flag):
global _stacktrace
_stacktrace = flag
[docs]def process_stacktrace(countback):
"""Gives file and line frame 'countback' frames from the bottom"""
st = traceback.extract_stack()
# Not all entries may be spack files, we have to remove those that aren't.
file_list = []
for frame in st:
# Check that the file is a spack file
if frame[0].find(os.path.sep + "spack") >= 0:
file_list.append(frame[0])
# We use commonprefix to find what the spack 'root' directory is.
root_dir = os.path.commonprefix(file_list)
root_len = len(root_dir)
st_idx = len(st) - countback - 1
st_text = "%s:%i " % (st[st_idx][0][root_len:], st[st_idx][1])
return st_text
[docs]def show_pid():
return is_debug(2)
[docs]def get_timestamp(force=False):
"""Get a string timestamp"""
if _debug or _timestamp or force:
# Note inclusion of the PID is useful for parallel builds.
pid = ', {0}'.format(os.getpid()) if show_pid() else ''
return '[{0}{1}] '.format(
datetime.now().strftime("%Y-%m-%d-%H:%M:%S.%f"), pid)
else:
return ''
[docs]def msg(message, *args, **kwargs):
if not msg_enabled():
return
if isinstance(message, Exception):
message = "%s: %s" % (message.__class__.__name__, str(message))
newline = kwargs.get('newline', True)
st_text = ""
if _stacktrace:
st_text = process_stacktrace(2)
if newline:
cprint(
"@*b{%s==>} %s%s" % (
st_text,
get_timestamp(),
cescape(_output_filter(message))
)
)
else:
cwrite(
"@*b{%s==>} %s%s" % (
st_text,
get_timestamp(),
cescape(_output_filter(message))
)
)
for arg in args:
print(indent + _output_filter(six.text_type(arg)))
[docs]def info(message, *args, **kwargs):
if isinstance(message, Exception):
message = "%s: %s" % (message.__class__.__name__, str(message))
format = kwargs.get('format', '*b')
stream = kwargs.get('stream', sys.stdout)
wrap = kwargs.get('wrap', False)
break_long_words = kwargs.get('break_long_words', False)
st_countback = kwargs.get('countback', 3)
st_text = ""
if _stacktrace:
st_text = process_stacktrace(st_countback)
cprint(
"@%s{%s==>} %s%s" % (
format,
st_text,
get_timestamp(),
cescape(_output_filter(six.text_type(message)))
),
stream=stream
)
for arg in args:
if wrap:
lines = textwrap.wrap(
_output_filter(six.text_type(arg)),
initial_indent=indent,
subsequent_indent=indent,
break_long_words=break_long_words
)
for line in lines:
stream.write(line + '\n')
else:
stream.write(
indent + _output_filter(six.text_type(arg)) + '\n'
)
[docs]def verbose(message, *args, **kwargs):
if _verbose:
kwargs.setdefault('format', 'c')
info(message, *args, **kwargs)
[docs]def debug(message, *args, **kwargs):
level = kwargs.get('level', 1)
if is_debug(level):
kwargs.setdefault('format', 'g')
kwargs.setdefault('stream', sys.stderr)
info(message, *args, **kwargs)
[docs]def error(message, *args, **kwargs):
if not error_enabled():
return
kwargs.setdefault('format', '*r')
kwargs.setdefault('stream', sys.stderr)
info("Error: " + six.text_type(message), *args, **kwargs)
[docs]def warn(message, *args, **kwargs):
if not warn_enabled():
return
kwargs.setdefault('format', '*Y')
kwargs.setdefault('stream', sys.stderr)
info("Warning: " + six.text_type(message), *args, **kwargs)
[docs]def die(message, *args, **kwargs):
kwargs.setdefault('countback', 4)
error(message, *args, **kwargs)
sys.exit(1)
[docs]def get_number(prompt, **kwargs):
default = kwargs.get('default', None)
abort = kwargs.get('abort', None)
if default is not None and abort is not None:
prompt += ' (default is %s, %s to abort) ' % (default, abort)
elif default is not None:
prompt += ' (default is %s) ' % default
elif abort is not None:
prompt += ' (%s to abort) ' % abort
number = None
while number is None:
msg(prompt, newline=False)
ans = input()
if ans == six.text_type(abort):
return None
if ans:
try:
number = int(ans)
if number < 1:
msg("Please enter a valid number.")
number = None
except ValueError:
msg("Please enter a valid number.")
elif default is not None:
number = default
return number
[docs]def get_yes_or_no(prompt, **kwargs):
default_value = kwargs.get('default', None)
if default_value is None:
prompt += ' [y/n] '
elif default_value is True:
prompt += ' [Y/n] '
elif default_value is False:
prompt += ' [y/N] '
else:
raise ValueError(
"default for get_yes_no() must be True, False, or None.")
result = None
while result is None:
msg(prompt, newline=False)
ans = input().lower()
if not ans:
result = default_value
if result is None:
print("Please enter yes or no.")
else:
if ans == 'y' or ans == 'yes':
result = True
elif ans == 'n' or ans == 'no':
result = False
return result
[docs]def hline(label=None, **kwargs):
"""Draw a labeled horizontal line.
Keyword Arguments:
char (str): Char to draw the line with. Default '-'
max_width (int): Maximum width of the line. Default is 64 chars.
"""
char = kwargs.pop('char', '-')
max_width = kwargs.pop('max_width', 64)
if kwargs:
raise TypeError(
"'%s' is an invalid keyword argument for this function."
% next(kwargs.iterkeys()))
rows, cols = terminal_size()
if not cols:
cols = max_width
else:
cols -= 2
cols = min(max_width, cols)
label = six.text_type(label)
prefix = char * 2 + " "
suffix = " " + (cols - len(prefix) - clen(label)) * char
out = StringIO()
out.write(prefix)
out.write(label)
out.write(suffix)
print(out.getvalue())
[docs]def terminal_size():
"""Gets the dimensions of the console: (rows, cols)."""
if _platform != "win32":
def ioctl_gwinsz(fd):
try:
rc = struct.unpack('hh', fcntl.ioctl(
fd, termios.TIOCGWINSZ, '1234'))
except BaseException:
return
return rc
rc = ioctl_gwinsz(0) or ioctl_gwinsz(1) or ioctl_gwinsz(2)
if not rc:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
rc = ioctl_gwinsz(fd)
os.close(fd)
except BaseException:
pass
if not rc:
rc = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
return int(rc[0]), int(rc[1])
else:
if sys.version_info[0] < 3:
raise RuntimeError("Terminal size not obtainable on Windows with a\
Python version older than 3")
rc = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
return int(rc[0]), int(rc[1])