# # Copyright (C) 2019 Marko Myllynen # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # """PCP netcheck PMDA module base class""" # pylint: disable=too-many-arguments,too-many-positional-arguments try: from time import sleep, perf_counter as time except ImportError: from time import sleep, time import sys import socket from ctypes import c_int from collections import OrderedDict from subprocess import Popen, PIPE from multiprocessing import Process from pcp.pmapi import pmContext as PCP from cpmda import PMDA_FETCH_NOVALUES # Special config strings DGW = "DGW" DNS = "DNS" NTP = "NTP" # pylint: disable=too-many-instance-attributes, too-few-public-methods class PCPNetcheckModuleParams(object): """PCP Netcheck Module Parameters Class""" def __init__(self): """Constructor""" self.hosts = None self.background_check = True self.check_hosts_parallel = True self.check_interval = 60 self.align_interval = True self.dgw = None self.dns = None self.ntp = None # pylint: disable=too-many-instance-attributes, too-many-arguments class PCPNetcheckModuleBase(object): """PCP Netcheck Module Base Class""" HOST_TMPL_STR = '%HOST' HOST_FAIL_STR = "" def __init__(self, module, config, dbg, log, err, params): """Constructor""" self._who = module self._dbg = dbg self._log = log self._err = err self.hosts = params.hosts self.background_check = params.background_check self.check_hosts_parallel = params.check_hosts_parallel self.check_interval = params.check_interval self.align_interval = params.align_interval self.errcnt = OrderedDict() for host in self.hosts: self.errcnt[host] = 0 self.dgw = params.dgw self.dns = params.dns self.ntp = params.ntp self.count = 1 self.onetrip = 1 self.timeout = 1.0 self.timings = {} self.insts = {} self.items = () self.pmdaIndom = None # pylint: disable=invalid-name self.config = config self.debug = False self.common_opts = ['cluster', 'pmda_indom_cache,', 'debug', 'hosts', 'background_check', 'check_hosts_parallel', 'check_interval', 'align_interval', 'timeout'] for opt in self.config.options(self._who): if opt == 'debug': self.debug = self.config.getboolean(self._who, opt) if opt == 'hosts': if self.config.get(self._who, opt): self.read_hosts(opt) self.log("Using module-specific hosts:") self.log(str(list(self.hosts.keys()))) if opt == 'background_check': self.background_check = self.config.getboolean(self._who, opt) self.log("Using module-specific background check: %s." % str(self.background_check)) if opt == 'check_hosts_parallel': self.check_hosts_parallel = self.config.getboolean(self._who, opt) self.log("Using module-specific parallel setting: %s." % str(self.check_hosts_parallel)) if opt == 'check_interval': try: self.check_interval = \ int(PCP.pmParseInterval(self.config.get(self._who, opt))[0]) self.log("Using module-specific interval: %d s." % self.check_interval) except Exception: # pylint: disable=broad-except self.err("Invalid time format for 'check_interval', aborting.") sys.exit(1) if opt == 'align_interval': self.align_interval = self.config.getboolean(self._who, opt) self.log("Using module-specific align interval: %s." % str(self.align_interval)) if opt == 'timeout': self.timeout = float(self.config.get(self._who, opt)) self.assert_positive(opt, self.timeout) self.insts = {host: c_int(1) for host in self.hosts} self.dbg("Debug logging enabled.") def read_hosts(self, opt): """Read hosts""" self.hosts = OrderedDict() for host in self.config.get(self._who, opt).split(","): self.hosts[host] = -1 self.update_hosts(DGW, self.dgw) self.update_hosts(DNS, self.dns) self.update_hosts(NTP, self.ntp) def dbg(self, msg): """Log a debug message""" if self.debug: self._dbg(self._who + ": " + msg) def log(self, msg): """Log a message""" self._log(self._who + ": " + msg) def err(self, msg): """Log an error""" self._err(self._who + ": " + msg) def log_command_once(self, cmd_template): """Log command line to be used, once only at startup""" if self.onetrip == 1: msg = 'shell command:' for arg in cmd_template: msg = msg + ' ' + arg self.log(msg) self.onetrip = 0 def metrics(self): """Get metric definitions""" raise NotImplementedError def helpers(self, pmdaIndom): # pylint: disable=invalid-name """Register helper function references""" self.pmdaIndom = pmdaIndom def do_check(self): """Do net check""" raise NotImplementedError def refresh(self): """Refresh cluster""" if not self.background_check: self.do_check() return self.insts def netdata(self, item, inst): """Return net check result as PCP metric value""" try: ref = self.hosts if item == 0 else self.timings key = self.pmdaIndom.inst_name_lookup(inst) return [ref[key], 1] except Exception: # pylint: disable=broad-except return [PMDA_FETCH_NOVALUES, 0] def label_cluster(self): # pylint: disable=no-self-use """Cluster labels""" return '{"netcheck_module":"' + self._who + '"}' def label_indom(self): # pylint: disable=no-self-use """Instance domain labels""" return '{}' def label_instance(self, inst): # pylint: disable=no-self-use, unused-argument """Instance labels""" return '{}' # # Helpers for modules # @staticmethod def assert_positive(option, value): """Assert positive value""" if value <= 0: raise ValueError("Positive value expected for '%s', aborting." % option) @staticmethod def is_ipv6(host): """Test if IPv6 address""" try: return ':' in host or ':' in socket.getaddrinfo(host, 0)[0][4][0] except Exception: # pylint: disable=broad-except return False def update_hosts(self, old, new): """Update hosts""" if old and old in self.hosts: del self.hosts[old] if new: for host in new.split(","): self.hosts[host] = -1 @staticmethod def _generate_command(cmd_template, host): """Generate command""" cmd = [item.replace(PCPNetcheckModuleBase.HOST_TMPL_STR, host) for item in cmd_template] return cmd def _run_check_commands(self, cmd_template, timed=False): """Run check commands""" if self.check_hosts_parallel: results = self._run_check_commands_parallel(cmd_template, timed) else: results = self._run_check_commands_sequential(cmd_template, timed) # results[0]: output, results[1]: exit status, results[2]: timings for host in self.hosts: if results[1][host] != 0: # command had non-zero exit status self.errcnt[host] += 1 if self.errcnt[host] <= 3: # log first 3 such errors for each host self.log(host + ' failed, exit=' + str(results[1][host]) + ', output=' + str(list(results[0][host]))) return results @staticmethod def _collect_results(proc): """Collect results""" output = proc.communicate() return_code = -2 if proc.returncode == -15 else proc.returncode return output, return_code def _run_check_commands_sequential(self, cmd_template, timed): """Run check commands sequentially""" procs = OrderedDict() outputs = OrderedDict() return_codes = OrderedDict() timings = OrderedDict() if timed else {} for host in self.hosts: cmd = self._generate_command(cmd_template, host) if timed: timings[host] = time() procs[host] = Popen(cmd, stdout=PIPE, stderr=PIPE) self._wait_for_checks(procs, timings) outputs[host], return_codes[host] = self._collect_results(procs[host]) del procs[host] return outputs, return_codes, timings def _run_check_commands_parallel(self, cmd_template, timed): """Run check commands in parallel""" procs = OrderedDict() outputs = OrderedDict() return_codes = OrderedDict() timings = OrderedDict() if timed else {} for host in self.hosts: cmd = self._generate_command(cmd_template, host) if timed: timings[host] = time() procs[host] = Popen(cmd, stdout=PIPE, stderr=PIPE) self._wait_for_checks(procs, timings) for host in self.hosts: outputs[host], return_codes[host] = self._collect_results(procs[host]) return outputs, return_codes, timings def _start_method(self, method, params): """Start method in process""" proc = Process(target=method, name=self._who, args=params) proc.daemon = True proc.start() return proc def _run_check_methods(self, method, params): """Run check methods""" if self.check_hosts_parallel: return self._run_check_methods_parallel(method, params) return self._run_check_methods_sequential(method, params) def _run_check_methods_sequential(self, method, params): """Run check methods sequentially""" for host in self.hosts: proc = self._start_method(method, params + (host,)) self._wait_for_checks({host: proc}) def _run_check_methods_parallel(self, method, params): """Run check commands in parallel""" procs = OrderedDict() for host in self.hosts: procs[host] = self._start_method(method, params + (host,)) self._wait_for_checks(procs) @staticmethod def _check_alive(proc, subproc): """Test if check is alive""" if subproc: if proc.poll() is None: return True else: if proc.is_alive(): return True return False def _wait_for_checks(self, procs, timings=None): """Wait for checks to complete""" if not procs: return end_time = time() + float(self.timeout) * self.count start_times = {proc: timings[proc] for proc in timings} if timings else {} pause = 0.2 if not timings else 0.02 subproc = isinstance(list(procs.values())[0], Popen) checks_alive = True while checks_alive: checks_alive = False for proc in procs: if self._check_alive(procs[proc], subproc): checks_alive = True else: if timings and timings[proc] == start_times[proc]: timings[proc] = time() - start_times[proc] if time() > end_time: break sleep(pause) for proc in procs: try: procs[proc].terminate() except Exception: # pylint: disable=broad-except pass if timings and timings[proc] == start_times[proc]: timings[proc] = -2 if not subproc: procs[proc].join()