Source code for spack.util.spack_yaml

# Copyright 2013-2022 Lawrence Livermore National Security, LLC and other
# Spack Project Developers. See the top-level COPYRIGHT file for details.
#
# SPDX-License-Identifier: (Apache-2.0 OR MIT)

"""Enhanced YAML parsing for Spack.

- ``load()`` preserves YAML Marks on returned objects -- this allows
  us to access file and line information later.

- ``Our load methods use ``OrderedDict`` class instead of YAML's
  default unorderd dict.

"""
import collections
import ctypes
import re
from typing import List  # novm

import ruamel.yaml as yaml
from ruamel.yaml import RoundTripDumper, RoundTripLoader
from six import StringIO, string_types

from llnl.util.compat import Mapping
from llnl.util.tty.color import cextra, clen, colorize

import spack.error

# Only export load and dump
__all__ = ['load', 'dump', 'SpackYAMLError']


# Make new classes so we can add custom attributes.
# Also, use OrderedDict instead of just dict.
class syaml_dict(collections.OrderedDict):
    def __repr__(self):
        mappings = ('%r: %r' % (k, v) for k, v in self.items())
        return '{%s}' % ', '.join(mappings)


class syaml_list(list):
    __repr__ = list.__repr__


class syaml_str(str):
    __repr__ = str.__repr__


class syaml_int(int):
    __repr__ = int.__repr__


#: mapping from syaml type -> primitive type
syaml_types = {
    syaml_str: string_types,
    syaml_int: int,
    syaml_dict: dict,
    syaml_list: list,
}


markable_types = set(syaml_types) | set([
    yaml.comments.CommentedSeq,
    yaml.comments.CommentedMap])


def syaml_type(obj):
    """Get the corresponding syaml wrapper type for a primitive type.

    Return:
        (object): syaml-typed copy of object, or the obj if no wrapper
    """
    for syaml_t, t in syaml_types.items():
        if type(obj) is not bool and isinstance(obj, t):
            return syaml_t(obj) if type(obj) != syaml_t else obj
    return obj


def markable(obj):
    """Whether an object can be marked."""
    return type(obj) in markable_types


def mark(obj, node):
    """Add start and end markers to an object."""
    if hasattr(node, 'start_mark'):
        obj._start_mark = node.start_mark
    elif hasattr(node, '_start_mark'):
        obj._start_mark = node._start_mark
    if hasattr(node, 'end_mark'):
        obj._end_mark = node.end_mark
    elif hasattr(node, '_end_mark'):
        obj._end_mark = node._end_mark


def marked(obj):
    """Whether an object has been marked by spack_yaml."""
    return (hasattr(obj, '_start_mark') and obj._start_mark or
            hasattr(obj, '_end_mark') and obj._end_mark)


class OrderedLineLoader(RoundTripLoader):
    """YAML loader specifically intended for reading Spack configuration
       files. It preserves order and line numbers. It also has special-purpose
       logic for handling dictionary keys that indicate a Spack config
       override: namely any key that contains an "extra" ':' character.

       Mappings read in by this loader behave like an ordered dict.
       Sequences, mappings, and strings also have new attributes,
       ``_start_mark`` and ``_end_mark``, that preserve YAML line
       information in the output data.

    """
    #
    # Override construct_yaml_* so that we can apply _start_mark/_end_mark to
    # them. The superclass returns CommentedMap/CommentedSeq objects that we
    # can add attributes to (and we depend on their behavior to preserve
    # comments).
    #
    # The inherited sequence/dictionary constructors return empty instances
    # and fill in with mappings later.  We preserve this behavior.
    #

    def construct_yaml_str(self, node):
        value = super(OrderedLineLoader, self).construct_yaml_str(node)
        # There is no specific marker to indicate that we are parsing a key,
        # so this assumes we are talking about a Spack config override key if
        # it ends with a ':' and does not contain a '@' (which can appear
        # in config values that refer to Spack specs)
        if value and value.endswith(':') and '@' not in value:
            value = syaml_str(value[:-1])
            value.override = True
        else:
            value = syaml_str(value)
        mark(value, node)
        return value

    def construct_yaml_seq(self, node):
        gen = super(OrderedLineLoader, self).construct_yaml_seq(node)
        data = next(gen)
        if markable(data):
            mark(data, node)
        yield data
        for x in gen:
            pass

    def construct_yaml_map(self, node):
        gen = super(OrderedLineLoader, self).construct_yaml_map(node)
        data = next(gen)
        if markable(data):
            mark(data, node)
        yield data
        for x in gen:
            pass


# register above new constructors
OrderedLineLoader.add_constructor(
    'tag:yaml.org,2002:map', OrderedLineLoader.construct_yaml_map)
OrderedLineLoader.add_constructor(
    'tag:yaml.org,2002:seq', OrderedLineLoader.construct_yaml_seq)
OrderedLineLoader.add_constructor(
    'tag:yaml.org,2002:str', OrderedLineLoader.construct_yaml_str)


class OrderedLineDumper(RoundTripDumper):
    """Dumper that preserves ordering and formats ``syaml_*`` objects.

      This dumper preserves insertion ordering ``syaml_dict`` objects
      when they're written out.  It also has some custom formatters
      for ``syaml_*`` objects so that they are formatted like their
      regular Python equivalents, instead of ugly YAML pyobjects.

    """

    def ignore_aliases(self, _data):
        """Make the dumper NEVER print YAML aliases."""
        return True

    def represent_data(self, data):
        result = super(OrderedLineDumper, self).represent_data(data)
        if data is None:
            result.value = syaml_str("null")
        return result

    def represent_str(self, data):
        if hasattr(data, 'override') and data.override:
            data = data + ':'
        return super(OrderedLineDumper, self).represent_str(data)


class SafeDumper(RoundTripDumper):

    def ignore_aliases(self, _data):
        """Make the dumper NEVER print YAML aliases."""
        return True


# Make our special objects look like normal YAML ones.
RoundTripDumper.add_representer(syaml_dict, RoundTripDumper.represent_dict)
RoundTripDumper.add_representer(syaml_list, RoundTripDumper.represent_list)
RoundTripDumper.add_representer(syaml_int, RoundTripDumper.represent_int)
RoundTripDumper.add_representer(syaml_str, RoundTripDumper.represent_str)
OrderedLineDumper.add_representer(syaml_str, OrderedLineDumper.represent_str)


#: Max integer helps avoid passing too large a value to cyaml.
maxint = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1


[docs]def dump(obj, default_flow_style=False, stream=None): return yaml.dump(obj, default_flow_style=default_flow_style, width=maxint, Dumper=SafeDumper, stream=stream)
def file_line(mark): """Format a mark as <file>:<line> information.""" result = mark.name if mark.line: result += ':' + str(mark.line) return result #: Global for interactions between LineAnnotationDumper and dump_annotated(). #: This is nasty but YAML doesn't give us many ways to pass arguments -- #: yaml.dump() takes a class (not an instance) and instantiates the dumper #: itself, so we can't just pass an instance _annotations = [] # type: List[str] class LineAnnotationDumper(OrderedLineDumper): """Dumper that generates per-line annotations. Annotations are stored in the ``_annotations`` global. After one dump pass, the strings in ``_annotations`` will correspond one-to-one with the lines output by the dumper. LineAnnotationDumper records blame information after each line is generated. As each line is parsed, it saves file/line info for each object printed. At the end of each line, it creates an annotation based on the saved mark and stores it in ``_annotations``. For an example of how to use this, see ``dump_annotated()``, which writes to a ``StringIO`` then joins the lines from that with annotations. """ saved = None def __init__(self, *args, **kwargs): super(LineAnnotationDumper, self).__init__(*args, **kwargs) del _annotations[:] self.colors = 'KgrbmcyGRBMCY' self.filename_colors = {} def process_scalar(self): super(LineAnnotationDumper, self).process_scalar() if marked(self.event.value): self.saved = self.event.value def represent_data(self, data): """Force syaml_str to be passed through with marks.""" result = super(LineAnnotationDumper, self).represent_data(data) if data is None: result.value = syaml_str("null") elif isinstance(result.value, string_types): result.value = syaml_str(data) if markable(result.value): mark(result.value, data) return result def write_line_break(self): super(LineAnnotationDumper, self).write_line_break() if self.saved is None: _annotations.append(colorize('@K{---}')) return # append annotations at the end of each line if self.saved: mark = self.saved._start_mark color = self.filename_colors.get(mark.name) if not color: ncolors = len(self.colors) color = self.colors[len(self.filename_colors) % ncolors] self.filename_colors[mark.name] = color fmt = '@%s{%%s}' % color ann = fmt % mark.name if mark.line is not None: ann += ':@c{%s}' % (mark.line + 1) _annotations.append(colorize(ann)) else: _annotations.append('') def load_config(*args, **kwargs): """Load but modify the loader instance so that it will add __line__ attributes to the returned object.""" kwargs['Loader'] = OrderedLineLoader return yaml.load(*args, **kwargs)
[docs]def load(*args, **kwargs): return yaml.load(*args, **kwargs)
def dump_config(*args, **kwargs): blame = kwargs.pop('blame', False) if blame: return dump_annotated(*args, **kwargs) else: kwargs['Dumper'] = OrderedLineDumper return yaml.dump(*args, **kwargs) def dump_annotated(data, stream=None, *args, **kwargs): kwargs['Dumper'] = LineAnnotationDumper sio = StringIO() yaml.dump(data, sio, *args, **kwargs) # write_line_break() is not called by YAML for empty lines, so we # skip empty lines here with \n+. lines = re.split(r"\n+", sio.getvalue().rstrip()) getvalue = None if stream is None: stream = StringIO() getvalue = stream.getvalue # write out annotations and lines, accounting for color width = max(clen(a) for a in _annotations) formats = ['%%-%ds %%s\n' % (width + cextra(a)) for a in _annotations] for f, a, l in zip(formats, _annotations, lines): stream.write(f % (a, l)) if getvalue: return getvalue() def sorted_dict(dict_like): """Return an ordered dict with all the fields sorted recursively. Args: dict_like (dict): dictionary to be sorted Returns: dictionary sorted recursively """ result = syaml_dict(sorted(dict_like.items())) for key, value in result.items(): if isinstance(value, Mapping): result[key] = sorted_dict(value) return result
[docs]class SpackYAMLError(spack.error.SpackError): """Raised when there are issues with YAML parsing.""" def __init__(self, msg, yaml_error): super(SpackYAMLError, self).__init__(msg, str(yaml_error))