# Copyright Spack Project Developers. See COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)
"""Debug signal handler: prints a stack trace and enters interpreter.
``register_interrupt_handler()`` enables a ctrl-C handler that prints
a stack trace and drops the user into an interpreter.
"""
import collections
import sys
import time
from contextlib import contextmanager
from typing import Callable, Dict, List
from llnl.util.lang import pretty_seconds_formatter
import spack.util.spack_json as sjson
TimerEvent = collections.namedtuple("TimerEvent", ("time", "running", "label"))
TimeTracker = collections.namedtuple("TimeTracker", ("total", "start", "count", "path"))
#: name for the global timer (used in start(), stop(), duration() without arguments)
global_timer_name = "_global"
[docs]
class BaseTimer:
[docs]
def start(self, name=None):
pass
[docs]
def stop(self, name=None):
pass
[docs]
def duration(self, name=None):
return 0.0
[docs]
@contextmanager
def measure(self, name):
yield self
@property
def phases(self):
return []
[docs]
def write_json(self, out=sys.stdout):
pass
[docs]
def write_tty(self, out=sys.stdout):
pass
[docs]
class NullTimer(BaseTimer):
"""Timer interface that does nothing, useful in for "tell
don't ask" style code when timers are optional."""
pass
[docs]
class Timer(BaseTimer):
"""Simple interval timer"""
def __init__(self, now: Callable[[], float] = time.time):
"""
Arguments:
now: function that gives the seconds since e.g. epoch
"""
self._now = now
self._timers: Dict[str, TimeTracker] = {}
self._timer_stack: List[str] = []
self._events: List[TimerEvent] = []
# Push start event
self._events.append(TimerEvent(self._now(), True, global_timer_name))
[docs]
def start(self, name=global_timer_name):
"""
Start or restart a named timer, or the global timer when no name is given.
Arguments:
name (str): Optional name of the timer. When no name is passed, the
global timer is started.
"""
self._events.append(TimerEvent(self._now(), True, name))
[docs]
def stop(self, name=global_timer_name):
"""
Stop a named timer, or all timers when no name is given. Stopping a
timer that has not started has no effect.
Arguments:
name (str): Optional name of the timer. When no name is passed, all
timers are stopped.
"""
self._events.append(TimerEvent(self._now(), False, name))
[docs]
def duration(self, name=global_timer_name):
"""
Get the time in seconds of a named timer, or the total time if no
name is passed. The duration is always 0 for timers that have not been
started, no error is raised.
Arguments:
name (str): (Optional) name of the timer
Returns:
float: duration of timer.
"""
self._flatten()
if name in self._timers:
if name in self._timer_stack:
return self._timers[name].total + (self._now() - self._timers[name].start)
return self._timers[name].total
else:
return 0.0
[docs]
@contextmanager
def measure(self, name):
"""
Context manager that allows you to time a block of code.
Arguments:
name (str): Name of the timer
"""
self.start(name)
yield self
self.stop(name)
@property
def phases(self):
"""Get all named timers (excluding the global/total timer)"""
self._flatten()
return [k for k in self._timers.keys() if not k == global_timer_name]
def _flatten(self):
for event in self._events:
if event.running:
if event.label not in self._timer_stack:
self._timer_stack.append(event.label)
# Only start the timer if it is on top of the stack
# restart doesn't work after a subtimer is started
if event.label == self._timer_stack[-1]:
timer_path = "/".join(self._timer_stack[1:])
tracker = self._timers.get(
event.label, TimeTracker(0.0, event.time, 0, timer_path)
)
assert tracker.path == timer_path
self._timers[event.label] = TimeTracker(
tracker.total, event.time, tracker.count, tracker.path
)
else: # if not event.running:
if event.label in self._timer_stack:
index = self._timer_stack.index(event.label)
for label in self._timer_stack[index:]:
tracker = self._timers[label]
self._timers[label] = TimeTracker(
tracker.total + (event.time - tracker.start),
None,
tracker.count + 1,
tracker.path,
)
self._timer_stack = self._timer_stack[: max(0, index)]
# clear events
self._events = []
[docs]
def write_json(self, out=sys.stdout, extra_attributes={}):
"""Write a json object with times to file"""
self._flatten()
data = {
"total": self._timers[global_timer_name].total,
"phases": [
{
"name": phase,
"path": self._timers[phase].path,
"seconds": self._timers[phase].total,
"count": self._timers[phase].count,
}
for phase in self.phases
],
}
if extra_attributes:
data.update(extra_attributes)
if out:
out.write(sjson.dump(data))
else:
return data
[docs]
def write_tty(self, out=sys.stdout):
"""Write a human-readable summary of timings (depth is 1)"""
self._flatten()
times = [self.duration(p) for p in self.phases]
# Get a consistent unit for the time
pretty_seconds = pretty_seconds_formatter(max(times))
# Tuples of (phase, time) including total.
formatted = list(zip(self.phases, times))
formatted.append(("total", self.duration()))
# Write to out
for name, duration in formatted:
out.write(f" {name:10s} {pretty_seconds(duration):>10s}\n")
#: instance of a do-nothing timer
NULL_TIMER = NullTimer()