import gzip
import ipaddress
import json
import re
import tarfile
from collections import OrderedDict
from copy import deepcopy
from io import BytesIO
from jsonschema import Draft4Validator, draft4_format_checker
from jsonschema.exceptions import ValidationError as JsonSchemaError
from ...exceptions import ValidationError
from ...schema import DEFAULT_FILE_MODE
from ...utils import evaluate_vars, merge_config
_host_name_re = re.compile(r"^[A-Za-z0-9][A-Za-z0-9\.\-]{1,255}$")
class BaseBackend(object):
"""
Base Backend class
"""
schema = None
FILE_SECTION_DELIMITER = '# ---------- files ---------- #'
list_identifiers = []
def __init__(self, config=None, native=None, templates=None, context=None):
"""
:param config: ``dict`` containing a valid **NetJSON** configuration dictionary
:param native: ``str`` or file object representing a native configuration that will
be parsed and converted to a **NetJSON** configuration dictionary
:param templates: ``list`` containing **NetJSON** configuration dictionaries that
will be used as a base for the main config
:param context: ``dict`` containing configuration variables
:raises TypeError: raised if ``config`` is not of type ``dict`` or if
``templates`` is not of type ``list``
"""
# initialize empty instance attributes
self.config = None
self.intermediate_data = None
# forward conversion (NetJSON > native configuration)
if config is not None:
# perform deepcopy to avoid modifying the original config argument
config = deepcopy(self._load(config))
self.config = self._merge_config(config, templates)
self.config = self._evaluate_vars(self.config, context)
# backward conversion (native configuration > NetJSON)
elif native is not None:
self.parse(native)
else:
raise ValueError(
'Expecting either config or native argument to be '
'passed during the initialization of the backend'
)
def _load(self, config):
"""
Loads config from string or dict
"""
if isinstance(config, str):
try:
config = json.loads(config)
except ValueError:
pass
if not isinstance(config, dict):
raise TypeError(
'config block must be an instance of dict or a valid NetJSON string'
)
return config
def _merge_config(self, config, templates):
"""
Merges config with templates
"""
if not templates:
return config
# type check
if not isinstance(templates, list):
raise TypeError('templates argument must be an instance of list')
# merge templates with main configuration
result = {}
config_list = templates + [config]
for merging in config_list:
result = merge_config(result, self._load(merging), self.list_identifiers)
return result
def _evaluate_vars(self, config, context):
"""
Evaluates configuration variables
"""
# return immediately if context is empty
if not context:
return config
# only if variables are found perform evaluation
return evaluate_vars(config, context)
def _render_files(self):
"""
Renders additional files specified in ``self.config['files']``
"""
output = ''
# render files
files = self.config.get('files', [])
# add delimiter
if files:
output += '\n{0}\n\n'.format(self.FILE_SECTION_DELIMITER)
for f in files:
mode = f.get('mode', DEFAULT_FILE_MODE)
# add file to output
file_output = (
'# path: {0}\n'
'# mode: {1}\n\n'
'{2}\n\n'.format(f['path'], mode, f['contents'])
)
output += file_output
return output
def _deduplicate_files(self):
files = self.config.get('files', [])
if not files:
return
files_dict = OrderedDict()
for file in files:
files_dict[file['path']] = file
self.config['files'] = list(files_dict.values())
@draft4_format_checker.checks('cidr', AssertionError)
def _cidr_notation(value):
try:
ipaddress.ip_network(value)
except ValueError as e:
assert False, str(e)
return True
@draft4_format_checker.checks('hostname', JsonSchemaError)
def _is_hostname(value):
"""
The hostname validation has been taken from jsonschema~=3.2.0
(jsonschema._format.is_host_name). The newer versions of
jsonschema enforces FQDN validation which is not always
required in OpenWISP. E.g. setting up hostname of a device.
"""
if not isinstance(value, str):
return True
if not _host_name_re.match(value):
return False
components = value.split(".")
for component in components:
if len(component) > 63:
return False
return True
def validate(self):
try:
Draft4Validator(self.schema, format_checker=draft4_format_checker).validate(
self.config
)
except JsonSchemaError as e:
raise ValidationError(e)
def render(self, files=True):
"""
Converts the configuration dictionary into the corresponding configuration format
:param files: whether to include "additional files" in the output or not;
defaults to ``True``
:returns: string with output
"""
self.validate()
# convert NetJSON config to intermediate data structure
if self.intermediate_data is None:
self.to_intermediate()
self._deduplicate_files()
# support multiple renderers
renderers = getattr(self, 'renderers', None) or [self.renderer]
# convert intermediate data structure to native configuration
output = ''
for renderer_class in renderers:
renderer = renderer_class(self)
output += renderer.render()
# remove reference to renderer instance (not needed anymore)
del renderer
# are we required to include
# additional files?
if files:
# render additional files
files_output = self._render_files()
if files_output:
# max 2 new lines
output += files_output.replace('\n\n\n', '\n\n')
# return the configuration
return output
def json(self, validate=True, *args, **kwargs):
"""
returns a string formatted as **NetJSON DeviceConfiguration**;
performs validation before returning output;
``*args`` and ``*kwargs`` will be passed to ``json.dumps``;
:returns: string
"""
if validate:
self.validate()
# automatically adds NetJSON type
config = deepcopy(self.config)
config.update({'type': 'DeviceConfiguration'})
return json.dumps(config, *args, **kwargs)
def generate(self):
"""
Returns a ``BytesIO`` instance representing an in-memory tar.gz archive
containing the native router configuration.
:returns: in-memory tar.gz archive, instance of ``BytesIO``
"""
tar_bytes = BytesIO()
tar = tarfile.open(fileobj=tar_bytes, mode='w')
self._generate_contents(tar)
self._process_files(tar)
tar.close()
tar_bytes.seek(0) # set pointer to beginning of stream
# `mtime` parameter of gzip file must be 0, otherwise any checksum operation
# would return a different digest even when content is the same.
# to achieve this we must use the python `gzip` library because the `tarfile`
# library does not seem to offer the possibility to modify the gzip `mtime`.
gzip_bytes = BytesIO()
gz = gzip.GzipFile(fileobj=gzip_bytes, mode='wb', mtime=0)
gz.write(tar_bytes.getvalue())
gz.close()
gzip_bytes.seek(0) # set pointer to beginning of stream
return gzip_bytes
def _generate_contents(self, tar):
raise NotImplementedError()
def write(self, name, path='./'):
"""
Like ``generate`` but writes to disk.
:param name: file name, the tar.gz extension will be added automatically
:param path: directory where the file will be written to, defaults to ``./``
:returns: None
"""
byte_object = self.generate()
file_name = '{0}.tar.gz'.format(name)
if not path.endswith('/'):
path += '/'
f = open('{0}{1}'.format(path, file_name), 'wb')
f.write(byte_object.getvalue())
f.close()
def _process_files(self, tar):
"""
Adds files specified in self.config['files'] to tarfile instance.
:param tar: tarfile instance
:returns: None
"""
# insert additional files
for file_item in self.config.get('files', []):
path = file_item['path']
# remove leading slashes from path
if path.startswith('/'):
path = path[1:]
self._add_file(
tar=tar,
name=path,
contents=file_item['contents'],
mode=file_item.get('mode', DEFAULT_FILE_MODE),
)
def _add_file(self, tar, name, contents, mode=DEFAULT_FILE_MODE):
"""
Adds a single file in tarfile instance.
:param tar: tarfile instance
:param name: string representing filename or path
:param contents: string representing file contents
:param mode: string representing file mode, defaults to 644
:returns: None
"""
byte_contents = BytesIO(contents.encode('utf8'))
info = tarfile.TarInfo(name=name)
info.size = len(contents)
# mtime must be 0 or any checksum operation
# will return a different digest even when content is the same
info.mtime = 0
info.type = tarfile.REGTYPE
info.mode = int(mode, 8) # permissions converted to decimal notation
tar.addfile(tarinfo=info, fileobj=byte_contents)
def to_intermediate(self):
"""
Converts the NetJSON configuration dictionary (self.config)
to the intermediate data structure (self.intermediate_data) that will
be then used by the renderer class to generate the router configuration
"""
self.validate()
self.intermediate_data = OrderedDict()
for converter_class in self.converters:
# skip unnecessary loop cycles
if not converter_class.should_run_forward(self.config):
continue
converter = converter_class(self)
value = converter.to_intermediate()
# maintain backward compatibility with backends
# that are currently in development by GSoC students
# TODO for >= 0.6.2: remove once all backends have upgraded
if value and isinstance(value, (tuple, list)): # pragma: nocover
value = OrderedDict(value)
if value:
self.intermediate_data = merge_config(
self.intermediate_data, value, list_identifiers=['.name']
)
def parse(self, native):
"""
Parses a native configuration and converts
it to a NetJSON configuration dictionary
"""
if not hasattr(self, 'parser') or not self.parser:
raise NotImplementedError('Parser class not specified')
parser = self.parser(native)
self.intermediate_data = parser.intermediate_data
del parser
self.to_netjson()
def to_netjson(self):
"""
Converts the intermediate data structure (self.intermediate_data)
to the NetJSON configuration dictionary (self.config)
"""
self.__backup_intermediate_data()
self.config = OrderedDict()
for converter_class in self.converters:
if not converter_class.should_run_backward(self.intermediate_data):
continue
converter = converter_class(self)
value = converter.to_netjson()
if value:
self.config = merge_config(
self.config, value, list_identifiers=self.list_identifiers
)
self.__restore_intermediate_data()
self.validate()
def __backup_intermediate_data(self):
self._intermediate_copy = deepcopy(self.intermediate_data)
def __restore_intermediate_data(self):
del self.intermediate_data
self.intermediate_data = self._intermediate_copy
del self._intermediate_copy
class BaseVpnBackend(BaseBackend):
"""
Shared logic between VPN backends
Requires setting the following attributes:
- vpn_pattern
- config_suffix
"""
def _generate_contents(self, tar):
"""
Adds configuration files to tarfile instance.
:param tar: tarfile instance
:returns: None
"""
text = self.render(files=False)
# create a list with all the packages (and remove empty entries)
vpn_instances = self.vpn_pattern.split(text)
if '' in vpn_instances:
vpn_instances.remove('')
# create a file for each VPN
for vpn in vpn_instances:
lines = vpn.split('\n')
vpn_name = lines[0]
text_contents = '\n'.join(lines[2:])
# do not end with double new line
if text_contents.endswith('\n\n'):
text_contents = text_contents[0:-1]
self._add_file(
tar=tar,
name='{0}{1}'.format(vpn_name, self.config_suffix),
contents=text_contents,
)