Source code for debinterface.interfacesWriter

# -*- coding: utf-8 -*-
# Write interface
from __future__ import print_function, with_statement, absolute_import
import shutil
from collections import defaultdict
import os
from string import Template

from . import toolutils
from .adapter import NetworkAdapter

try:
    import typing as tp
except ImportError:
    pass


[docs]class InterfacesWriter(object): """ Short lived class to write interfaces file """ # Define templetes for blocks used in /etc/network/interfaces. _auto = Template('auto $name\n') _hotplug = Template('allow-hotplug $name\n') _iface = Template('iface $name $addrFam $source\n') _cmd = Template('\t$varient $value\n') _comment = Template('# $line\n') _addressFields = [ 'address', 'network', 'netmask', 'broadcast', 'gateway', 'dns-nameservers', 'dns-search' ] _prepFields = ['pre-up', 'pre-down', 'up', 'down', 'post-up', 'post-down'] _bridgeFields = ['ports', 'fd', 'hello', 'maxage', 'stp', 'maxwait'] _plugins = ['hostapd', 'wpa-conf'] def __init__(self, adapters, interfaces_path, backup_path=None, header_comment=None): # type: (tp.List[NetworkAdapter], str, tp.Optional[str], tp.Optional[str])->None """ if backup_path is None => no backup """ self._adapters = adapters self._interfaces_path = interfaces_path self._backup_path = backup_path try: is_str = isinstance(header_comment, basestring) except NameError: is_str = isinstance(header_comment, str) if is_str: self._header_comment = header_comment else: self._header_comment = None @property def adapters(self): # type: ()->tp.List[NetworkAdapter] return self._adapters @adapters.setter def adapters(self, value): # type: (tp.List[NetworkAdapter])->None self._adapters = value
[docs] def write_interfaces(self): # type: ()->None adapters_by_path = defaultdict(list) for adapter in self._adapters: # retrocompat check path = adapter.interfaces_path or self._interfaces_path adapters_by_path[adapter.interfaces_path].append(adapter) self._backup_interfaces(adapters_by_path.keys()) try: for path, adapters in adapters_by_path.items(): with toolutils.atomic_write(path) as interfaces: # Write any header comments for main file. if path == self._interfaces_path: self._write_header_comment(interfaces) # Loop through the provided networkAdapters and # write the new file. for adapter in adapters: # Get dict of details about the adapter. self._write_adapter(interfaces, adapter) self._check_interfaces(self._interfaces_path) except Exception: # Any error, let's roll back self._restore_interfaces(adapters_by_path.keys()) raise
[docs] def _check_interfaces(self, interfaces_path): # type: (str)->None """Uses ifup to check interfaces file. If it is not in the default place, each interface must be checked one by one. Args: interfaces_path (string) : the path to interfaces file Raises: ValueError : if invalid network interfaces """ ret = False output = "" if not self._adapters: return if interfaces_path == "/etc/network/interfaces": # Do not use long form to increase portability with Busybox # -n : print out what would happen, but don't do it # -i : interfaces file ret, output = toolutils.safe_subprocess([ "/sbin/ifup", "-a", "-n" ]) else: for adapter in self._adapters: # Do not use long form to increase portability with Busybox # -n : print out what would happen, but don't do it # -i : interfaces file ret, output = toolutils.safe_subprocess([ "/sbin/ifup", "-n", "-i{0}".format(interfaces_path), adapter.attributes["name"] ]) if not ret: break if not ret: raise ValueError("Invalid network interfaces file " "written to disk, restoring to previous " "one : {0}".format(output))
def _write_header_comment(self, interfaces): # type: (tp.IO[str])->None if self._header_comment: for line in self._header_comment.split('\n'): # Check the beginning of the line for a comment field # if it does not exist, add it. if line[:2] != "# ": line = self._comment.substitute(line=line) else: # split strips the newline, add it back line = line + '\n' interfaces.write(line) # Create a blank line between comment and start of interfaces interfaces.write('\n') def _write_adapter(self, interfaces, adapter): # type: (tp.IO[str], NetworkAdapter)->None try: adapter.validateAll() except ValueError as e: print(repr(e)) raise ifAttributes = adapter.export() self._write_auto(interfaces, adapter, ifAttributes) self._write_hotplug(interfaces, adapter, ifAttributes) self._write_addrFam(interfaces, adapter, ifAttributes) self._write_addressing(interfaces, adapter, ifAttributes) self._write_bridge(interfaces, adapter, ifAttributes) self._write_plugins(interfaces, adapter, ifAttributes) self._write_callbacks(interfaces, adapter, ifAttributes) self._write_unknown(interfaces, adapter, ifAttributes) interfaces.write("\n")
[docs] def _write_auto(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write if applicable """ try: if adapter.attributes['auto'] is True: d = dict(name=ifAttributes['name']) interfaces.write(self._auto.substitute(d)) except KeyError: pass
[docs] def _write_hotplug(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write if applicable """ try: if ifAttributes['hotplug'] is True: d = dict(name=ifAttributes['name']) interfaces.write(self._hotplug.substitute(d)) except KeyError: pass
[docs] def _write_addrFam(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Construct and write the iface declaration. The addrFam clause needs a little more processing. """ # Write the source clause. # Will not error if omitted. Maybe not the best plan. try: if (not ifAttributes["name"] or not ifAttributes["addrFam"] or not ifAttributes["source"]): raise ValueError("Invalid field content") d = dict(name=ifAttributes['name'], addrFam=ifAttributes['addrFam'], source=ifAttributes['source']) interfaces.write(self._iface.substitute(d)) except KeyError: pass
def _write_addressing(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None for field in self._addressFields: try: value = ifAttributes[field] if value and value != 'None': if isinstance(value, list): d = dict(varient=field, value=" ".join(ifAttributes[field])) else: d = dict(varient=field, value=ifAttributes[field]) interfaces.write(self._cmd.substitute(d)) # Keep going if a field isn't provided. except KeyError: pass
[docs] def _write_bridge(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write the bridge information. """ for field in self._bridgeFields: try: value = ifAttributes['bridge-opts'][field] if value and value != 'None': d = dict(varient="bridge_" + field, value=value) interfaces.write(self._cmd.substitute(d)) # Keep going if a field isn't provided. except KeyError: pass
[docs] def _write_callbacks(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write the up, down, pre-up, pre-down, post-up, and post-down clauses. """ for field in self._prepFields: try: for item in ifAttributes[field]: if item and item != 'None': d = dict(varient=field, value=item) interfaces.write(self._cmd.substitute(d)) except KeyError: # Keep going if a field isn't provided. pass
[docs] def _write_plugins(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write plugins options, currently hostapd. """ for field in self._plugins: try: if field in ifAttributes and ifAttributes[field] != 'None': d = dict(varient=field, value=ifAttributes[field]) interfaces.write(self._cmd.substitute(d)) # Keep going if a field isn't provided. except KeyError: pass
[docs] def _write_unknown(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write unknowns options """ try: for k, v in ifAttributes['unknown'].items(): if v: d = dict(varient=k, value=str(v)) interfaces.write(self._cmd.substitute(d)) except (KeyError, ValueError): pass
[docs] def _write_sourced_paths(self, interfaces, adapter, ifAttributes): # type: (tp.IO[str], NetworkAdapter, tp.Dict[str, tp.Any])->None """ Write sourced paths """ for path in ifAttributes.get('sourced_paths', []): d = dict(varient="source", value=str(path)) interfaces.write(self._cmd.substitute(d))
[docs] def _backup_interfaces(self, adapters_paths): # type: (tp.List[str])->None """Backup interfaces file is the file exists Returns: True/False, command output Raises: IOError : if the copy fails and the source file exists """ if not self._backup_path: return backup_path = self._backup_path if os.path.isfile(backup_path): backup_path = os.path.dirname(backup_path) # For better backward compatibility if not os.path.isdir(backup_path): os.mkdir(backup_path) for adapter_path in adapters_paths: try: src_path = os.path.join(backup_path, os.path.basename(adapter_path) + ".bak") shutil.copy(adapter_path, src_path) except IOError as ex: # Only raise if source actually exists if os.path.exists(adapter_path): raise ex
[docs] def _restore_interfaces(self, adapters_paths): # type: (tp.List[str])->None """Restore interfaces file is the file exists Returns: True/False, command output Raises: IOError : if the copy fails and the source file exists """ if not self._backup_path: return for adapter_path in adapters_paths: src_path = '' try: backup_path = self._backup_path if os.path.isfile(backup_path): backup_path = os.path.dirname(backup_path) src_path = os.path.join(backup_path, os.path.basename(adapter_path) + ".bak") shutil.copy(src_path, adapter_path) except IOError as ex: # Only raise if source actually exists if os.path.exists(src_path): raise ex