# # Copyright (C) 2019 Marko Myllynen # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # """PCP netcheck PMDA urlget module""" # pylint: disable=too-many-arguments,too-many-positional-arguments # pylint: disable=invalid-name try: from time import perf_counter as time except ImportError: from time import time try: from urllib import request as urlget except ImportError: import urllib2 as urlget import ssl import sys from re import search from multiprocessing import Manager from pcp.pmapi import pmUnits from cpmapi import PM_TYPE_32, PM_TYPE_FLOAT, PM_SEM_INSTANT, PM_TIME_SEC from cpmda import PMDA_FETCH_NOVALUES from modules.pcpnetcheck import PCPNetcheckModuleBase # Module constants MODULE = 'url_get' BASENS = 'url.' units_none = pmUnits(0, 0, 0, 0, 0, 0) units_secs = pmUnits(0, 1, 0, 0, PM_TIME_SEC, 0) class PCPNetcheckModule(PCPNetcheckModuleBase): # pylint: disable=too-many-arguments """PCP netcheck url_get module""" def __init__(self, config, dbg, log, err, params): """Constructor""" PCPNetcheckModuleBase.__init__(self, MODULE, config, dbg, log, err, params) self.url = None self.string = None self.timeout = 3.0 self.results = {} self.prereq_check() for opt in self.config.options(MODULE): if opt == 'url': self.url = self.config.get(MODULE, opt) elif opt == 'string': self.string = str(self.config.get(MODULE, opt)) elif opt not in self.common_opts: self.err("Invalid directive '%s' in %s, aborting." % (opt, MODULE)) sys.exit(1) if not self.url: self.err("'url' is mandatory, aborting.") sys.exit(1) self.log("Module parameters: url: '%s', timeout: %s." % (self.url, self.timeout)) if self.string: self.log("Search string: '%s'" % self.string) else: self.log("Search string not set.") self.log("Initialized.") def prereq_check(self): """Check module prerequisities""" # Nothing def metrics(self): """Get metric definitions""" self.items = ( # Name - reserved - type - semantics - units - help (BASENS + 'status', None, PM_TYPE_32, PM_SEM_INSTANT, units_none, 'url_get status'), (BASENS + 'duration', None, PM_TYPE_FLOAT, PM_SEM_INSTANT, units_secs, 'url_get duration'), (BASENS + 'search_result', None, PM_TYPE_32, PM_SEM_INSTANT, units_none, 'url_get search result'), ) return True, self.items def netdata(self, item, inst): """Return net check result as PCP metric value""" try: if item == 0: ref = self.hosts elif item == 1: ref = self.timings elif item == 2: ref = self.results else: self.err("Internal error, aborting!") sys.exit(1) key = self.pmdaIndom.inst_name_lookup(inst) return [ref[key], 1] except Exception: # pylint: disable=broad-except return [PMDA_FETCH_NOVALUES, 0] def _do_url_get(self, hosts, timings, results, host): """Do URL get""" try: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE url = self.url.replace(self.HOST_TMPL_STR, host) req = urlget.Request(url, headers={'User-Agent': "PCP netcheck PMDA"}) data = True content = "" start_time = time() handle = urlget.urlopen(req, context=ctx) while data: data = handle.read() content += data end_time = time() hosts[host] = 0 timings[host] = end_time - start_time results[host] = 0 if search(self.string, content) else 1 except Exception: # pylint: disable=broad-except pass def do_check(self): """Do net check""" hosts = Manager().dict() timings = Manager().dict() results = Manager().dict() self._run_check_methods(self._do_url_get, (hosts, timings, results)) for host in self.hosts: self.hosts[host] = hosts[host] if host in hosts else 1 self.timings[host] = timings[host] if host in timings else -2 self.results[host] = results[host] if host in results else -1