#!/usr/bin/env pmpython # # Copyright (C) 2024 Red Hat. # Copyright (C) 2024 Nikhil Jain # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # # pylint: disable=too-many-arguments, too-many-positional-arguments # pylint: disable=line-too-long, broad-except, too-many-lines """ Performance Metrics Domain Agent exporting uwsgi metrics. """ try: import ConfigParser except ImportError: import configparser as ConfigParser from cpmapi import ( PM_TYPE_STRING, PM_INDOM_NULL, PM_TYPE_DOUBLE, PM_TYPE_U64, PM_TYPE_U32, PM_SEM_COUNTER, PM_SEM_INSTANT, PM_ERR_AGAIN, PM_ERR_INST, PM_ERR_PMID ) from ctypes import c_uint, c_float, c_wchar_p, POINTER, cast, Structure from pcp.pmapi import pmUnits from pcp.pmapi import pmContext as PCP from pcp.pmda import PMDA, pmdaMetric, pmdaIndom import os import requests import sys DEFAULT_TIMEOUT = 2.5 # seconds DEFAULT_HOST = '127.0.0.1' DEFAULT_PORT = 9051 def _is_pmda_setup(): """checks if PMDA is in setup state""" return os.environ.get('PCP_PYTHON_DOMAIN') or os.environ.get('PCP_PYTHON_PMNS') class UWSGISUMMARY(Structure): """ Define the structure for the attributes that make up the uwsgi workers summary """ _fields_ = [ ("total_workers", c_uint), ("avg_response_time_msec", c_float), ("total_requests_served", c_uint), ("total_workers_accepting_requests", c_uint), ("total_exceptions", c_uint), ("total_harakiri_count", c_uint), ("total_busy_worker_count", c_uint), ("total_idle_worker_count", c_uint), ("total_pause_worker_count", c_uint) ] def __init__(self, workers=0, avg_rt=0.0, req=0, acc_req=0, exception=0, harakiri=0, busy_workers=0, idle_workers=0, pause_workers=0): Structure.__init__(self) self.total_workers = workers self.avg_response_time_msec = avg_rt self.total_requests_served = req self.total_workers_accepting_requests = acc_req self.total_exceptions = exception self.total_harakiri_count = harakiri self.total_busy_worker_count = busy_workers self.total_idle_worker_count = idle_workers self.total_pause_worker_count = pause_workers class WORKER(Structure): """ Define the structure for the attributes that make up a uwsgi worker object """ _fields_ = [ ("avg_rt", c_float), ("requests_served", c_uint), ("accepting_requests", c_uint), ("exceptions", c_uint), ("harakiri_count", c_uint), ("status", c_wchar_p) ] def __init__(self, avg_rt=0.0, req_served=0, acc_req=0, exceptions=0, harakiri=0, status=''): Structure.__init__(self) self.avg_rt = avg_rt self.requests_served = req_served self.accepting_requests = acc_req self.exceptions = exceptions self.harakiri_count = harakiri self.status = status class UwsgiPMDA(PMDA): """ PMDA class for uwsgi performance metrics """ UWSGI_SUMMARY = 0 WORKER_STATS = 1 summary = {} workers = {} def __init__(self, name, domain): ''' Initialisation - register metrics, callbacks, drop privileges ''' PMDA.__init__(self, name, domain) self.timeout = DEFAULT_TIMEOUT self.summary = UWSGISUMMARY() self.host = DEFAULT_HOST self.port = DEFAULT_PORT self.get_failed = False self.read_config() self.build_url() self.connect_pmcd() # Define the instance domains pointing to the dicts self.workers_indom = self.indom(0) self.add_indom(pmdaIndom(self.workers_indom, self.workers)) # define the Summary (roll up, at-a-glance numbers) # uwsgi.summary. # .total_workers # .avg_response_time_msec # .total_requests_served # .total_workers_accepting_requests # .total_exceptions # .total_harakiri_count # .total_busy_worker_count # .total_idle_worker_count # .total_pause_worker_count self.add_metric(name + '.summary.total_workers', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 0), PM_TYPE_U32, PM_INDOM_NULL, PM_SEM_INSTANT, pmUnits()), "Total number of uwsgi workers") self.add_metric(name + '.summary.avg_response_time_msec', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 1), PM_TYPE_DOUBLE, PM_INDOM_NULL, PM_SEM_INSTANT, pmUnits()), "Average response time across all current workers") self.add_metric(name + '.summary.total_requests_served', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 2), PM_TYPE_U64, PM_INDOM_NULL, PM_SEM_COUNTER, pmUnits()), "Total requests served by all workers since starting") self.add_metric(name + '.summary.total_workers_accepting_requests', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 3), PM_TYPE_U32, PM_INDOM_NULL, PM_SEM_INSTANT, pmUnits()), "Number of current workers accepting requests") self.add_metric(name + '.summary.total_exceptions', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 4), PM_TYPE_U64, PM_INDOM_NULL, PM_SEM_COUNTER, pmUnits()), "Total exceptions across all workers since starting") self.add_metric(name + '.summary.total_harakiri_count', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 5), PM_TYPE_U64, PM_INDOM_NULL, PM_SEM_COUNTER, pmUnits()), "Total harakiri count across all workers since starting") self.add_metric(name + '.summary.total_busy_worker_count', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 6), PM_TYPE_U32, PM_INDOM_NULL, PM_SEM_INSTANT, pmUnits()), "Number of busy workers currently") self.add_metric(name + '.summary.total_idle_worker_count', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 7), PM_TYPE_U32, PM_INDOM_NULL, PM_SEM_INSTANT, pmUnits()), "Number of idle workers currently") self.add_metric(name + '.summary.total_pause_worker_count', pmdaMetric(PMDA.pmid(UwsgiPMDA.UWSGI_SUMMARY, 8), PM_TYPE_U32, PM_INDOM_NULL, PM_SEM_INSTANT, pmUnits()), "Number of pause workers currently") # Add the uwsgi worker metrics (metrics per worker) # uwsgi.worker # .avg_rt # .requests_served # .accepting_requests # .exceptions # .harakiri_count # .status self.add_metric(name + '.worker.avg_rt', pmdaMetric(PMDA.pmid(UwsgiPMDA.WORKER_STATS, 0), PM_TYPE_DOUBLE, self.workers_indom, PM_SEM_COUNTER, pmUnits()), "Average response time") self.add_metric(name + '.worker.requests_served', pmdaMetric(PMDA.pmid(UwsgiPMDA.WORKER_STATS, 1), PM_TYPE_U64, self.workers_indom, PM_SEM_COUNTER, pmUnits()), "Requests served") self.add_metric(name + '.worker.accepting_requests', pmdaMetric(PMDA.pmid(UwsgiPMDA.WORKER_STATS, 2), PM_TYPE_U64, self.workers_indom, PM_SEM_COUNTER, pmUnits()), "Accepting requests") self.add_metric(name + '.worker.exceptions', pmdaMetric(PMDA.pmid(UwsgiPMDA.WORKER_STATS, 3), PM_TYPE_U64, self.workers_indom, PM_SEM_COUNTER, pmUnits()), "Exceptions count") self.add_metric(name + '.worker.harakiri_count', pmdaMetric(PMDA.pmid(UwsgiPMDA.WORKER_STATS, 4), PM_TYPE_U64, self.workers_indom, PM_SEM_COUNTER, pmUnits()), "Harakiti count") self.add_metric(name + '.worker.status', pmdaMetric(PMDA.pmid(UwsgiPMDA.WORKER_STATS, 5), PM_TYPE_STRING, self.workers_indom, PM_SEM_COUNTER, pmUnits()), "Worker status") # declare the callbacks self.set_fetch(self.uwsgi_fetch) self.set_instance(self.uwsgi_instance) self.set_fetch_callback(self.uwsgi_fetch_callback) self.set_user(PCP.pmGetConfig('PCP_USER')) def refresh_all(self): stats = {} stats['workers'] = [] try: stats = requests.get(self.url, timeout=self.timeout).json() #self.log("refresh_all stats:", str(stats)) self.get_failed = False except Exception: self.get_failed = True avg_response_time_msec = 0 # pass any persistent long-running counters into the constructor summary = UWSGISUMMARY(req=self.summary.total_requests_served, harakiri=self.summary.total_harakiri_count, exception=self.summary.total_exceptions) for w in stats['workers']: worker_id = w['id'] worker = WORKER() worker.avg_rt = w['avg_rt'] worker.requests_served = w['requests'] worker.accepting_requests = w['accepting'] worker.exceptions = w['exceptions'] worker.harakiri_count = w['harakiri_count'] worker.status = w['status'] # update the summary with the this worker data summary.total_workers += 1 summary.total_requests_served += worker.requests_served summary.total_workers_accepting_requests += worker.accepting_requests summary.total_exceptions += worker.exceptions summary.total_harakiri_count += worker.harakiri_count if worker.status == 'busy': summary.total_busy_worker_count += 1 elif worker.status =='idle': summary.total_idle_worker_count += 1 elif worker.status == 'pause': summary.total_pause_worker_count += 1 avg_response_time_msec += worker.avg_rt self.workers['worker-'+str(worker_id)] = worker summary.avg_response_time_msec = avg_response_time_msec if summary.total_workers > 1: # divide-by-zero guard summary.avg_response_time_msec /= summary.total_workers self.summary = summary def read_config(self): """ Read configuration """ conffile = PCP.pmGetConfig('PCP_PMDAS_DIR') conffile += '/' + self.read_name() + '/' + self.read_name() + '.conf' # Silently ignore missing file/section # Python < 3.2 compat if sys.version_info[0] >= 3 and sys.version_info[1] >= 2: config = ConfigParser.ConfigParser() else: config = ConfigParser.SafeConfigParser() config.read(conffile) if config.has_section('pmda'): for opt in config.options('pmda'): if opt == 'host': self.host = config.get('pmda', opt) elif opt == 'port': self.port = config.get('pmda', opt) else: self.err("Invalid directive '%s' in %s." % (opt, conffile)) sys.exit(1) def build_url(self): """Build up url needed for HTTP GET request""" self.url = 'http://'+str(self.host)+':'+str(self.port) def refresh(self): self.refresh_all() def uwsgi_instance(self, serial): self.refresh() def uwsgi_fetch(self): """ Called once per "fetch" PDU, before callbacks """ self.workers.clear() # refresh the internal 'cache' describing summary and workers self.refresh() # replace the instance domain cache with these updated object(s) self.replace_indom(self.workers_indom, self.workers) def uwsgi_fetch_summary_callback(self, item, inst): """ provide the summary information to the caller :param item: item number :param inst: instance of this object :return: value of a given instance/attribute """ item_lookup = [summary_attr[0] for summary_attr in UWSGISUMMARY._fields_] if 0 <= item <= (len(UWSGISUMMARY._fields_) - 1): try: return [getattr(self.summary, item_lookup[item]), 1] except AttributeError: # most likely GET failed return [PM_ERR_AGAIN, 0] else: return [PM_ERR_INST, 0] def uwsgi_fetch_worker_callback(self, item, inst): """ provide the worker statistics to the caller :param item: item number :param inst: instance of this object :return: value of a given instance/attribute """ voidp = self.inst_lookup(self.workers_indom, inst) if voidp is None: return [PM_ERR_INST, 0] cache = cast(voidp, POINTER(WORKER)) item_lookup = [worker_attr[0] for worker_attr in WORKER._fields_] worker = cache.contents if 0 <= item <= (len(WORKER._fields_) - 1): return [getattr(worker, item_lookup[item]), 1] return [PM_ERR_INST, 0] def uwsgi_fetch_callback(self, cluster, item, inst): """ fetch helper - look at request and send to other helpers based on cluster number """ if cluster == UwsgiPMDA.UWSGI_SUMMARY: return self.uwsgi_fetch_summary_callback(item, inst) elif cluster == UwsgiPMDA.WORKER_STATS: return self.uwsgi_fetch_worker_callback(item, inst) return [PM_ERR_PMID, 0] def domain_probe(self): """ check: return True if server is available and monitorable """ self.refresh_all() return self.get_failed if __name__ == '__main__': UwsgiPMDA('uwsgi', 161).run()