#!/usr/bin/env pmpython # # Copyright (C) 2019 Marko Myllynen # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # """PCP netcheck Performance Metrics Domain Agent""" try: import configparser except ImportError: import ConfigParser as configparser import importlib import traceback import hashlib import socket import struct import sys import os from collections import OrderedDict from copy import deepcopy from pcp.pmapi import pmContext as PCP from pcp.pmda import PMDA, pmdaInstid, pmdaIndom, pmdaMetric from cpmapi import PM_INDOM_NULL, PM_ERR_VALUE, PM_LABEL_CLUSTER, PM_LABEL_INDOM from pmdautil import CheckRunner from modules.pcpnetcheck import PCPNetcheckModuleParams, DGW, DNS, NTP # pylint: disable=too-few-public-methods, too-many-instance-attributes class PCPNetcheckModuleConfig(object): """PCP Netcheck Module Config Class""" def __init__(self, module): """Constructor""" self.module = module self.config = None self.cached = None self.prefix = None self.cluster = -1 self.metrics = None self.helpers = None self.refresh = None self.netdata = None self.indom = None self.insts = None self.label_indom = None self.label_cluster = None self.label_instance = None # pylint: disable=too-many-instance-attributes class NetcheckPMDA(PMDA): """PCP netcheck PMDA""" def __init__(self, name, domain): """Constructor""" PMDA.__init__(self, name, domain) # See Install self.log("Initializing, currently in 'notready' state.") self.check_runner = None self.modules = OrderedDict() self.clusters = OrderedDict() self.indoms = {} self.hosts = OrderedDict([(DGW, -1), (DNS, -1)]) self.background_check = True self.check_hosts_parallel = True self.check_interval = 60 self.align_interval = True self.module_fail_fatal = True self.dgw = None self.dns = None self.ntp = None self.read_config() self.connect_pmcd() self.set_user(PCP.pmGetConfig('PCP_USER')) self.init_modules() self.register_metrics() if not self.in_pmda_setup() and self.background_check: self.check_runner.start() self.set_refresh(self.netcheck_refresh) self.set_fetch_callback(self.netcheck_fetch_callback) self.set_label(self.netcheck_label) self.set_label_callback(self.netcheck_label_callback) self.pmda_ready() if not self.in_pmda_setup(): self.log("Ready to process requests.") @classmethod def in_pmda_setup(cls): """Check if PMDA is in setup state""" return os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS') # pylint: disable=too-many-branches def read_config(self): """Read PMDA configuration file""" self.log("Reading configuration.") conffile = PCP.pmGetConfig('PCP_PMDAS_DIR') conffile += '/' + self.read_name() + '/' + self.read_name() + '.conf' if not os.path.isfile(conffile): self.err("Configuration file %s not found, aborting." % conffile) sys.exit(1) # Python < 3.2 compat if sys.version_info[0] >= 3 and sys.version_info[1] >= 2: config = configparser.ConfigParser() else: config = configparser.SafeConfigParser() # pylint: disable=deprecated-class try: config.read(conffile) except Exception as error: # pylint: disable=broad-except self.err(str(error)) sys.exit(1) if config.has_section('pmda'): for opt in config.options('pmda'): if opt == 'modules': if config.get('pmda', opt): for module in config.get('pmda', opt).split(","): self.modules[module] = PCPNetcheckModuleConfig(module) elif opt == 'hosts': self.hosts = OrderedDict() if config.get('pmda', opt): for host in config.get('pmda', opt).split(","): self.hosts[host] = -1 elif opt == 'background_check': self.background_check = config.getboolean('pmda', opt) elif opt == 'check_hosts_parallel': self.check_hosts_parallel = config.getboolean('pmda', opt) elif opt == 'check_interval': try: self.check_interval = int(PCP.pmParseInterval(config.get('pmda', opt))[0]) except Exception: # pylint: disable=broad-except self.err("Invalid time format for 'check_interval', aborting.") sys.exit(1) elif opt == 'align_interval': self.align_interval = config.getboolean('pmda', opt) elif opt == 'module_failure_fatal': self.module_fail_fatal = config.getboolean('pmda', opt) else: self.err("Invalid directive '%s' in %s, aborting." % (opt, conffile)) sys.exit(1) else: self.err("No [pmda] section found, aborting.") sys.exit(1) self.verify_config() self.log("Reading module setup configuration:") for module in self.modules: self.log(module) self.read_module_setup_config(config, module) self.log("Module setup configurations read.") def verify_config(self): """Verify configuration""" if not self.modules: self.err("No modules enabled, aborting.") sys.exit(1) self.log("Enabled modules:") self.log(str(list(self.modules))) self.log("Configured hosts:") self.log(str(list(self.hosts.keys()))) if DGW in self.hosts.keys(): self.set_default_gateway() if DNS in self.hosts.keys(): self.set_nameservers() if NTP in self.hosts.keys(): self.set_timeservers() if self.dgw or self.dns or self.ntp: self.log("Configured and determined hosts:") self.log(str(list(self.hosts.keys()))) if not self.hosts.keys(): self.err("No hosts configured, aborting.") sys.exit(1) self.log("Configured background check: %s." % str(self.background_check)) self.log("Configured parallel setting: %s." % str(self.check_hosts_parallel)) self.log("Configured check interval: %ds." % self.check_interval) self.log("Configured align interval: %s." % str(self.align_interval)) def update_hosts(self, old, new): """Update hosts""" if old and old in self.hosts: del self.hosts[old] if new: for host in new.split(","): self.hosts[host] = -1 def set_default_gateway(self): """Set default gateway""" def read_default_gateway(): """Read default gateway""" with open("/proc/net/route") as route: for line in route: fields = line.strip().split() if not fields or fields[1] != '00000000' or not int(fields[3], 16) & 2: continue return socket.inet_ntoa(struct.pack("=L", int(fields[2], 16))) return None self.dgw = read_default_gateway() if not self.dgw: self.err("Failed to determine default gateway, disabling.") del self.hosts[DGW] return self.log("Determined default gateway: %s." % str([self.dgw])) self.update_hosts(DGW, self.dgw) def set_nameservers(self): """Set nameservers""" def read_nameservers(): """Read nameservers""" dns = set() with open("/etc/resolv.conf") as resolv_conf: for line in resolv_conf: fields = line.strip().split() if fields and fields[0] == 'nameserver' and fields[1]: dns.add(fields[1]) return ",".join(dns) self.dns = read_nameservers() if not self.dns: self.err("Failed to determine nameservers, disabling.") del self.hosts[DNS] return self.log("Determined nameservers: %s." % self.dns.split(",")) self.update_hosts(DNS, self.dns) def set_timeservers(self): """Set timeservers""" def read_timeservers(): """Read timeservers""" ntp = set() with open("/etc/chrony.conf") as chrony_conf: for line in chrony_conf: fields = line.strip().split() if fields and (fields[0] == 'server' or fields[0] == 'peer') and fields[1]: ntp.add(fields[1]) return ",".join(ntp) self.ntp = read_timeservers() if not self.ntp: self.err("Failed to determine timeservers, disabling.") del self.hosts[NTP] return self.log("Determined timeservers: %s." % self.ntp.split(",")) self.update_hosts(NTP, self.ntp) def read_module_setup_config(self, config, module): """Read module setup configuration""" if config.has_section(module): self.modules[module].config = config for opt in config.options(module): if opt == 'cluster': try: self.modules[module].cluster = int(config.get(module, opt)) if self.modules[module].cluster < 0: raise ValueError("Positive integer expected") except ValueError: self.err("Positive integer expected for 'cluster', aborting.") sys.exit(1) if opt == 'pmda_indom_cache': self.modules[module].cached = config.getboolean(module, opt) else: self.err("No [%s] section found, aborting." % module) sys.exit(1) if self.modules[module].cluster < 0: cluster = int(hashlib.sha1(module.encode('UTF-8')).hexdigest(), 16) % 4096 if [id for m in self.modules if self.modules[m].cluster == cluster]: self.err("Hash collision for module '%s' cluster ID, aborting." % module) self.err("'cluster' must be set for this module explicitly.") sys.exit(1) self.modules[module].cluster = cluster if self.modules[module].cached is None: self.modules[module].cached = True def init_modules(self): """Initialize modules""" self.log("Initializing modules:") if self.background_check: self.check_runner = CheckRunner(self.dbg, self.log, self.err) params = PCPNetcheckModuleParams() params.background_check = self.background_check params.check_hosts_parallel = self.check_hosts_parallel params.check_interval = self.check_interval params.align_interval = self.align_interval params.dgw = self.dgw params.dns = self.dns params.ntp = self.ntp for module in list(self.modules): self.log("%s, cluster ID: %d" % (module, self.modules[module].cluster)) try: mod = importlib.import_module('modules.%s' % module) except ImportError as error: self.err(str(error)) self.err("Module '%s' not found, aborting." % module) sys.exit(1) try: params.hosts = deepcopy(self.hosts) obj = mod.PCPNetcheckModule(self.modules[module].config, self.dbg, self.log, self.err, params) except Exception as error: # pylint: disable=broad-except self.err(str(error)) if self.module_fail_fatal: self.err("Module '%s' failed to initialize, aborting." % module) sys.exit(1) self.err("Module '%s' failed to initialize, disabling." % module) del self.modules[module] continue self.modules[module].metrics = obj.metrics self.modules[module].helpers = obj.helpers self.modules[module].refresh = obj.refresh self.modules[module].netdata = obj.netdata self.modules[module].label_cluster = obj.label_cluster self.modules[module].label_indom = obj.label_indom self.modules[module].label_instance = obj.label_instance if obj.background_check: self.check_runner.add_module(module, obj, obj.check_interval, obj.align_interval) if not self.modules: self.err("No functional modules available, aborting.") sys.exit(1) self.log("Modules initialized.") def register_metrics(self): """Register metrics""" self.log("Registering metrics:") for module in self.modules: self.log(module) cluster = self.modules[module].cluster indom, metrics = self.modules[module].metrics() caching = {} if self.modules[module].cached else [] self.modules[module].indom = PM_INDOM_NULL if indom: self.modules[module].indom = self.indom(cluster) self.modules[module].insts = pmdaIndom(self.modules[module].indom, caching) self.modules[module].helpers(self.modules[module].insts) self.add_indom(self.modules[module].insts) for item, metric in enumerate(metrics): self.add_metric('netcheck.' + metric[0], pmdaMetric(self.pmid(cluster, item), metric[2], self.modules[module].indom, metric[3], metric[4]), metric[5], metric[5]) self.indoms[self.modules[module].indom] = module self.clusters[cluster] = module self.log("Metrics registered.") def netcheck_refresh(self, cluster): """Refresh cluster""" module = self.clusters[cluster] insts = {} try: insts = self.modules[module].refresh() except Exception as error: # pylint: disable=broad-except self.err(str(module)) self.err(str(error)) self.err(traceback.format_exc()) if self.modules[module].indom != PM_INDOM_NULL: if not self.modules[module].cached: insts_array = [] if insts: for name, i in insts.items(): insts_array.append(pmdaInstid(i, name)) insts = insts_array self.modules[module].insts.set_instances(self.modules[module].indom, insts) self.replace_indom(self.modules[module].indom, insts) if not self.modules[module].cached: self.pmns_refresh() def netcheck_fetch_callback(self, cluster, item, inst): """Fetch callback""" module = self.clusters[cluster] try: return self.modules[module].netdata(item, inst) except Exception as error: # pylint: disable=broad-except self.err(str(module)) self.err(str(error)) self.err(traceback.format_exc()) return [PM_ERR_VALUE, 0] def netcheck_label(self, ident, label_type): """Cluster and instance domain labels""" if label_type == PM_LABEL_CLUSTER: module = self.clusters[ident] return self.modules[module].label_cluster() if label_type == PM_LABEL_INDOM: module = self.indoms[ident] return self.modules[module].label_indom() return '{}' def netcheck_label_callback(self, indom, inst): """Instance labels""" module = self.indoms[indom] return self.modules[module].label_instance(inst) if __name__ == '__main__': NetcheckPMDA('netcheck', 152).run()