Source code for debinterface.dnsmasqRange

# -*- coding: utf-8 -*-
from __future__ import print_function, with_statement, absolute_import
import copy
import os
import shutil
from socket import inet_aton

from . import toolutils
try:
    import typing as tp
except ImportError:
    pass


DEFAULT_CONFIG = {
    'dhcp-range': [
        {
            'interface': 'wlan0',
            'start': '10.1.10.11',
            'end': '10.1.10.250',
            'lease_time': '24h'
        },
        {
            'interface': 'eth1',
            'start': '10.1.20.10',
            'end': '10.1.20.250',
            'lease_time': '24h'
        }
    ],
    'dhcp-leasefile': '/var/tmp/dnsmasq.leases'
}  # type: tp.Dict[str, tp.Any]


[docs]class DnsmasqRange(object): """ Basic dnsmasq conf of the more file which holds the ip ranges per interface. Made for handling very basic dhcp-range options """ _config = None # type: tp.Dict[str, tp.Any] _path = None # type: str backup_path = None # type: str _leases_path = None # type: str def __init__(self, path, backup_path=None, leases_path='/var/tmp/dnsmasq.leases'): # type: (str, tp.Optional[str], str)->None self._config = {} self._path = path if not backup_path: self.backup_path = path + ".bak" else: self.backup_path = backup_path self._leases_path = leases_path @property def config(self): # type: ()->tp.Dict[str, tp.Any] return self._config
[docs] def set(self, key, value): # type: (str, tp.Union[str, tp.Dict[str, tp.Any]])->None if key == "dhcp-range": if "dhcp-range" not in self._config: self._config["dhcp-range"] = [] if isinstance(value, str): value = self._extract_range_info(value) if value: if self.get_itf_range(value["interface"]): self.update_range( interface=value["interface"], start=value["start"], end=value["end"], lease_time=value["lease_time"] ) else: self._config["dhcp-range"].append(value) else: self._config[str(key).strip()] = value
[docs] def validate(self): # type: ()->bool try: required = ["interface", "start", "end", "lease_time"] for rng in self._config["dhcp-range"]: for key in required: if key not in rng: raise ValueError("Missing option : {0}".format(key)) if inet_aton(rng["end"]) < inet_aton(rng["start"]): raise ValueError("Start IP range must be before end IP") itf_names = [ data["interface"] for data in self._config["dhcp-range"] ] if len(itf_names) != len(set(itf_names)): msg = "Multiple interfaces with the same name" raise ValueError(msg) except KeyError: pass # dhcp-range is not mandatory return True
[docs] def update_range(self, interface, start, end, lease_time): # type: (str, str, str, str)->bool """Update existing range based on the interface name If does not exist will be created Args: interface (str): interface name start (str) : start ip of range end (str) : end ip of range lease_time (str) : lease_time Returns: bool: True if configuration was updated or created, False otherwise """ current_range = self.get_itf_range(interface) new_range = { 'interface': interface, 'lease_time': lease_time, "start": start, "end": end } if current_range and (current_range == new_range): return False self.rm_itf_range(interface) self.set("dhcp-range", new_range) return True
[docs] def get_itf_range(self, if_name): # type: (str)->tp.Optional[tp.Dict[str, tp.Any]] """ If no interface, return None """ if "dhcp-range" not in self._config: return None for v in self._config['dhcp-range']: if v["interface"] == if_name: return v # type: ignore return None
[docs] def rm_itf_range(self, if_name): # type: (str)->bool ''' Rm range info for the given interface Args: if_name (str) : interface name Returns: bool: True if configuration was updated, False otherwise ''' if "dhcp-range" in self._config: current_len = len(self._config['dhcp-range']) self._config['dhcp-range'][:] = [ x for x in self._config['dhcp-range'] if x["interface"] != if_name ] if len(self._config['dhcp-range']) < current_len: return True return False
[docs] def set_defaults(self): # type: ()->None """ Defaults for my needs, you should probably override this one """ self._config = copy.deepcopy(DEFAULT_CONFIG)
[docs] def read(self, path=None): # type: (tp.Optional[str])->None if path is None: path = self._path self._config = {} with open(path, "r") as dnsmasq: for line in dnsmasq: if line.startswith('#') is True or line == "\n" or line == "": continue # No \n allowed here key, value = line.replace("\n", '').split("=") if key and value: self.set(key, value)
[docs] def write(self, path=None): # type: (tp.Optional[str])->None self.validate() if path is None: path = self._path self.backup() with toolutils.atomic_write(path) as dnsmasq: for k, v in self._config.items(): if k == "dhcp-range": if not v: continue for r in v: line = "dhcp-range=interface:{0},{1},{2},{3}\n".format( r["interface"], r["start"], r["end"], r["lease_time"] ) dnsmasq.write(line) else: key = str(k).strip() value = str(v).strip() dnsmasq.write("{0}={1}\n".format(key, value))
@staticmethod
[docs] def controlService(action): # type: (str)->tp.Tuple[bool, str] """ return True/False, command output """ if action not in ["start", "stop", "restart"]: return False, "Invalid action" return toolutils.safe_subprocess(["/etc/init.d/dnsmasq", action])
[docs] def clear_leases(self): # type: ()->None """rm /var/tmp/dnsmasq.leases""" try: os.remove(self._leases_path) except Exception: pass
[docs] def backup(self): # type: ()->None """ return True/False, command output """ if self.backup_path: shutil.copy(self._path, self.backup_path)
[docs] def restore(self): # type: ()->None """ return True/False, command output """ if self.backup_path: shutil.copy(self.backup_path, self._path)
[docs] def delete(self): # type: ()->None """ return True/False, command output """ if self.backup_path: os.remove(self._path)
@staticmethod def _extract_range_info(value): # type: (str)->tp.Dict[str, tp.Any] ret = {} try: breaked = value.split(",") ret["interface"] = breaked[0].split(":")[1] ret["start"] = breaked[1] ret["end"] = breaked[2] ret["lease_time"] = breaked[3] except Exception: pass return ret
[docs] def apply_changes(self): # type: ()->None """Write changes to fs and restart daemon""" self.controlService("stop") self.write() self.clear_leases() self.controlService("start")