Source code for netjsonconfig.utils

import re
from collections import OrderedDict
from copy import deepcopy

from jsonschema import ValidationError as JsonSchemaError

from .exceptions import ValidationError


[docs] def merge_config(template, config, list_identifiers=None): """ Merges ``config`` on top of ``template``. Conflicting keys are handled in the following way: * simple values (eg: ``str``, ``int``, ``float``, ecc) in ``config`` will overwrite the ones in ``template`` * values of type ``list`` in both ``config`` and ``template`` will be merged using to the ``merge_list`` function * values of type ``dict`` will be merged recursively :param template: template ``dict`` :param config: config ``dict`` :param list_identifiers: ``list`` or ``None`` :returns: merged ``dict`` :raises ValidationError: if incompatible types are found """ result = deepcopy(template) for key, value in config.items(): existing = result.get(key) if isinstance(value, dict) and isinstance(existing, dict): result[key] = merge_config(existing, value, list_identifiers) elif isinstance(value, list) and isinstance(existing, list): result[key] = merge_list(existing, value, list_identifiers) elif ( existing is not None and isinstance(existing, (dict, list)) and isinstance(value, (dict, list)) and type(value) is not type(existing) ): raise ValidationError( JsonSchemaError( f"Incompatible type for '{key}': expected {type(existing).__name__}, " f"got {type(value).__name__}." ) ) else: result[key] = value return result
[docs] def merge_list(list1, list2, identifiers=None): """ Merges ``list2`` on top of ``list1``. If both lists contain dictionaries which have keys specified in ``identifiers`` which have equal values, those dicts will be merged (dicts in ``list2`` will override dicts in ``list1``). The remaining elements will be summed in order to create a list which contains elements of both lists. :param list1: ``list`` from template :param list2: ``list`` from config :param identifiers: ``list`` or ``None`` :returns: merged ``list`` """ identifiers = identifiers or [] dict_map = {"list1": OrderedDict(), "list2": OrderedDict()} counter = 1 for list_ in [list1, list2]: container = dict_map["list{0}".format(counter)] for el in list_: # merge by internal python id by default key = id(el) # Detect identical elements present in both lists # avoid adding the duplicate to the result. # This is needed because some templates may share # one or multiple common files and these do not # not have to be duplicated. if counter == 2 and el in dict_map["list1"].values(): continue # if el is a dict, merge by keys specified in ``identifiers`` if isinstance(el, dict): for id_key in identifiers: if id_key in el: key = el[id_key] break # if key is a list, convert it to tuple which is # hashable and can be used as a dictionary key if isinstance(key, list): key = tuple(key) container[key] = deepcopy(el) counter += 1 merged = merge_config(dict_map["list1"], dict_map["list2"]) return list(merged.values())
def sorted_dict(dict_): return OrderedDict(sorted(dict_.items())) var_pattern = re.compile(r"\{\{\s*(\w*)\s*\}\}") def evaluate_vars(data, context=None): """ Evaluates variables in ``data`` :param data: data structure containing variables, may be ``str``, ``dict`` or ``list`` :param context: ``dict`` containing variables :returns: modified data structure """ context = context or {} if isinstance(data, (dict, list)): if isinstance(data, dict): loop_items = data.items() elif isinstance(data, list): loop_items = enumerate(data) for key, value in loop_items: data[key] = evaluate_vars(value, context) elif isinstance(data, str): vars_found = var_pattern.findall(data) for var in vars_found: var = var.strip() # if found multiple variables, create a new regexp pattern for each # variable, otherwise different variables would get the same value # (see https://github.com/openwisp/netjsonconfig/issues/55) if len(vars_found) > 1: pattern = r"\{\{(\s*%s\s*)\}\}" % var # in case of single variables, use the precompiled # regexp pattern to save computation else: pattern = var_pattern if var in context: data = re.sub(pattern, str(context[var]), data) return data def get_copy(dict_, key, default=None): """ Looks for a key in a dictionary, if found returns a deepcopied value, otherwise returns default value """ value = dict_.get(key, default) if value: return deepcopy(value) return value class _TabsMixin(object): # pragma: nocover """ mixin that adds _tabs method to test classes """ def _tabs(self, string): """ replace 4 spaces with 1 tab """ return string.replace(" ", "\t")