#!/usr/bin/env pmpython # # Copyright (c) 2017-2020 Red Hat. # Copyright (c) 2017 Ronak Jain. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # ''' Performance Metrics Domain Agent exporting OpenMetrics endpoint metrics. ''' # pylint: disable=bad-continuation, line-too-long, too-many-lines # pylint: disable=too-many-nested-blocks, too-many-return-statements # pylint: disable=broad-except, bare-except, missing-docstring # pylint: disable=too-many-arguments, too-many-positional-arguments # pylint: disable=expression-not-assigned import os import re import time import pickle import traceback import argparse import threading import subprocess import sys from ctypes import c_int from socket import gethostname from stat import ST_MODE, S_IXUSR, ST_CTIME import requests from pcp.pmapi import pmUnits, pmContext from pcp.pmda import PMDA, pmdaMetric, pmdaIndom, pmdaInstid import cpmapi as c_api import cpmda if sys.version[0] == '2': import Queue as queue else: import queue # Sort config file list - generally needed because os.walk() # returns a different order on different platforms and/or filesystems. # Config file list sorting is expensive with a large number of URLs # and/or scripts. See the --nosort option to turn it off. sort_conf_list = True # Number of seconds to wait between poll attempts on a source that # we've never been able to connect to & collect a list of metrics from. empty_source_pmns_poll = 10.0 MAX_CLUSTER = 0xfff # ~ max. number of openmetrics sources MAX_METRIC = 0x3ff # ~ max. number of metrics per source MAX_INDOM = 0x7fffffff # coincidentally, ~ product of above # These numbers are combined to create unique numbers for several # purposes. The first two types are used only for internal # pmdaopenmetrics purposes, and are not visible as indoms to pmapi # clients. # # indom# 0: maps source nicknames to 12-bit "cluster" numbers, cluster#0 is not used # indom# 1..4095: for each source=cluster, map metric names to pmid numbers # indom# 4096 + cluster#*1024 ...+1023: actual pmns indom for each metric labeltype = {c_api.PM_LABEL_CONTEXT: "PM_LABEL_CONTEXT", c_api.PM_LABEL_DOMAIN: "PM_LABEL_DOMAIN", c_api.PM_LABEL_INDOM: "PM_LABEL_INDOM", c_api.PM_LABEL_CLUSTER: "PM_LABEL_CLUSTER", c_api.PM_LABEL_ITEM: "PM_LABEL_ITEM", c_api.PM_LABEL_INSTANCES: "PM_LABEL_INSTANCES"} typestrD = {'double': c_api.PM_TYPE_DOUBLE, 'float': c_api.PM_TYPE_FLOAT, 'u64': c_api.PM_TYPE_U64, '64': c_api.PM_TYPE_64, 'u32': c_api.PM_TYPE_U32, '32': c_api.PM_TYPE_32, 'string': c_api.PM_TYPE_STRING, 'aggregate': c_api.PM_TYPE_AGGREGATE} class Metric(object): ''' Metric information class ''' def __init__(self, source, name, metricnum, instances, optional_instances, pcpline, helpline, typeline): self.source = source self.name = name self.metricnum = metricnum # seen during fetch callbacks self.pmid = source.pmda.pmid(source.cluster, metricnum) # add domain/cluster# self.indom_number = MAX_CLUSTER+1 + (source.cluster * (MAX_METRIC+1)) + metricnum self.pcpline = pcpline self.typeline = typeline self.munits = pmUnits(0, 0, 0, 0, 0, 0) self.multiplier = 1 # pmParseUnits returns a tuple of (pmUnits, multiplier) self.values = {} # instance-vector-to-value self.labels = {} # metric labels self.optional_labels = {} # optional labels self.inst_labels = {} # dict of instid:labels (where labels is a dict) if instances is not None: # dict is empty for singular metrics (no indom) for key, val in instances.items(): self.labels[key] = val if optional_instances is not None: for key, val in optional_instances.items(): self.optional_labels[key] = val self.assign_metadata() self.singular = (instances is None) or (pcpline and "PM_INDOM_NULL" in pcpline.split(' ')) if self.singular: # no labels in ingested data, *or* we have PCP5 (or config METADATA) metadata with PM_INDOM_NULL self.mindom = c_api.PM_INDOM_NULL self.indom_table = None else: self.mindom = self.source.pmda.indom(self.indom_number) # add domain# self.indom_table = PersistentNameTable(self.source.pmda, self.indom_number, MAX_INDOM) ## retained as an example # if self.source.pmda.dbg: # c_units = c_api.pmUnits_int(self.munits.dimSpace, self.munits.dimTime, self.munits.dimCount, # self.munits.scaleSpace, self.munits.scaleTime, self.munits.scaleCount) self.source.pmda.debug("Metric: adding metric %s pmid=%s type=%d sem=%d singular=%s mindom=0x%x labels=%s" % (name, pmContext.pmIDStr(self.pmid), self.mtype, self.msem, self.singular, self.mindom, self.labels)) self.obj = pmdaMetric(self.pmid, self.mtype, self.mindom, self.msem, self.munits) if helpline: # it could be None! unescaped = helpline.replace('\\\\', '\\').replace('\\n', '\n') split = unescaped.split('\n') help_oneline = split[0] # must have at least one entry help_text = '\n'.join(split[1:]) # may have other entries else: help_oneline = '' help_text = '' try: self.source.pmda.add_metric(self.mname, self.obj, help_oneline, help_text) self.source.pmda.set_need_refresh() except Exception as e: self.source.pmda.err("Cannot add metric %s (%d): %s" % (self.mname, self.pmid, e)) if self.source.pmda.dbg: self.source.pmda.debug("created metric %#x (%s) labels='%s' optional_labels='%s'" % (self.pmid, self.mname, self.labels, self.optional_labels)) def decodeTypeStr(self, typestr): try: ret = typestrD[typestr] except: ret = c_api.PM_TYPE_DOUBLE return ret def parse_pcp_units(self, s): ''' parse a PCP units string ''' mult = 1 units = pmUnits(0, 0, 0, 0, 0, 0) if s != 'none': try: units, mult = pmContext.pmParseUnitsStr(s) except Exception as e: self.source.pmda.err('parse_pcp_units "%s" Error: %s' % (s, e)) self.source.pmda.debug('parse_pcp_units "%s": units=%s mult=%f' % (s, str(units), mult)) if self.source.pmda.dbg else None return units, mult def assign_metadata(self): ''' Compute metric metadata self.{mtype, mname, msem, munits} from the available information already stored in self. ''' # # Default metadata, unless overridden by PCP line or heuristics # self.mtype = self.decodeTypeStr("double") self.munits = pmUnits(0, 0, 0, 0, 0, 0) self.msem = c_api.PM_SEM_INSTANT # aka guage if self.pcpline: # # Use PCP or PCP5 line for metadata # metadata = self.pcpline.split(' ') len_metadata = len(metadata) self.mname = 'openmetrics.' + self.source.name + '.' + metadata[1] if metadata[0] == 'PCP5' or metadata[0] == 'METADATA': # PCPv5 metadata, e.g.: # PCP5 simple.time.user double PM_INDOM_NULL counter sec # or matching regex from config file, e.g. : # METADATA simple.time.user double PM_INDOM_NULL counter sec if self.source.pmda.dbg: self.source.pmda.debug("assign_metadata: %s" % metadata) if metadata[4] == 'instant': self.msem = c_api.PM_SEM_INSTANT elif metadata[4] == 'counter': self.msem = c_api.PM_SEM_COUNTER elif metadata[4] == 'discrete': self.msem = c_api.PM_SEM_DISCRETE # metric type (not semantics!) self.mtype = self.decodeTypeStr(metadata[2]) if len_metadata >= 5: # units extends to the end of the line self.munits, self.multiplier = self.parse_pcp_units(' '.join(metadata[5:])) elif metadata[0] == 'PCP': # pre-PCPv5 metadata, e.g.: # PCP cgroup.memory.stat.inactive_file instant byte if self.source.pmda.dbg: self.source.pmda.debug("assign_metadata: %s" % metadata) if len_metadata >= 3 and metadata[2] == 'instant': self.msem = c_api.PM_SEM_INSTANT elif len_metadata >= 3 and metadata[2] == 'counter': self.msem = c_api.PM_SEM_COUNTER elif len_metadata >= 3 and metadata[2] == 'discrete': self.msem = c_api.PM_SEM_DISCRETE if len_metadata >= 4: # units extends to the end of the line self.munits, self.multiplier = self.parse_pcp_units(' '.join(metadata[3:])) # note: PCP metadata doesn't include indom (only PCP5 does) else: # # No PCP metadata - we have to apply heuristics # # Split the openmetrics metric name by "_", to help # decode convention for squishing unit/scale data. pieces = self.name.split('_') if self.source.pmda.dbg: self.source.pmda.debug("assign_metadata: using heuristics %s" % pieces) self.mname = 'openmetrics.' + self.source.name + '.' + self.name.replace(":", ".") if self.typeline == 'counter' or 'total' in pieces or 'count' in pieces or 'sum' in pieces: self.msem = c_api.PM_SEM_COUNTER elif self.typeline == 'histogram' and 'bucket' in pieces: self.msem = c_api.PM_SEM_COUNTER else: self.msem = c_api.PM_SEM_INSTANT if (self.typeline in ('histogram', 'summary') and 'count' in pieces): # regardless of UNIT; *_sum peer has proper type self.munits = pmUnits(0, 0, 1, 0, 0, 0) # simple count elif self.typeline == 'histogram' and 'bucket' in pieces: # ditto self.munits = pmUnits(0, 0, 1, 0, 0, 0) # simple count elif 'seconds' in pieces: self.munits = pmUnits(0, 1, 0, 0, 3, 0) elif 'microseconds' in pieces: self.munits = pmUnits(0, 1, 0, 0, 1, 0) elif 'bytes' in pieces: self.munits = pmUnits(1, 0, 0, 0, 0, 0) self.multiplier = 1 if self.source.pmda.dbg: self.source.pmda.debug('assign_metadata: mname="%s" type="%s" msem="%s" munits="%s"' % (self.mname, self.mtype, self.msem, self.munits)) def clear_values(self): ''' Erase all stored instance/value pairs, in anticipation of a new set. ''' self.values.clear() def store_inst(self, labels, value): ''' Store given new instance/value pair. ''' # assert (labels is None) == (self.indom_table is None) # no metric indom flipflop if self.singular: inst = c_api.PM_IN_NULL instname = "PM_IN_NULL" self.source.pmda.debug('store_inst mname="%s" singular=True value="%s" labels=%s' % (self.mname, value, labels)) else: instname = None if self.pcpline: # pmproxy # NB: no quoting/transforms - preserve incoming value verbatim if 'instname' in labels.keys(): instname = labels['instname'] elif 'instance' in labels.keys(): # back-compat instname = labels['instance'] if instname is None: self.indom_table.prefix_mode = True # Mark for instance# prefixed names instname = "" for key, val in sorted(labels.items()): if instname: instname += ' ' instname += key + ":" + val + "" self.labels[key] = val inst = self.indom_table.intern_lookup_value(instname) self.values[inst] = value self.inst_labels[inst] = labels if self.source.pmda.dbg: self.source.pmda.debug('store_inst mname=%s inst=%d instname="%s" value="%s"' % (self.mname, inst, instname, value)) self.source.pmda.debug('store_inst mname=%s inst_labels[%d]=%s' % (self.mname, inst, labels)) def save(self): if self.indom_table is not None: self.indom_table.save() def str2value(self, valstr): self.source.pmda.debug('str2value mname=%s type=%d valstr=%s' % (self.mname, self.mtype, valstr)) if self.mtype in [c_api.PM_TYPE_DOUBLE, c_api.PM_TYPE_FLOAT]: return float(valstr) # float is actually a double in python3 if self.mtype in [c_api.PM_TYPE_U64, c_api.PM_TYPE_64, c_api.PM_TYPE_U32, c_api.PM_TYPE_32]: return int(valstr) # int is actually a long in python3 return valstr def fetch_inst(self, inst): # fetch instance self.source.pmda.debug('fetch_inst mname=%s inst=%d values=%s' % (self.mname, inst, self.values)) if not self.values: # Metric may have disappeared if self.source.pmda.dbg: self.source.pmda.debug('fetch_inst inst=%d has no values' % inst) ret = [c_api.PM_ERR_AGAIN, 0] elif self.singular or inst in self.values: if self.source.pmda.dbg: self.source.pmda.debug('fetch_inst returning type=%s value=%s' % (type(self.values[inst]), self.values[inst])) ret = [self.str2value(self.values[inst]), 1] else: # Instance has disappeared so indicate a missing value. # This is the ideal libpcp_pmda code (not PM_ERR_INST). ret = [c_api.PM_ERR_VALUE, 0] self.source.pmda.debug('fetch_inst returning %s' % ret) return ret class PersistentNameTable(object): '''Persistent name table. Answers name-to-number queries by assigning persistent ids. Updates pmda's indom table intermittently. Persistent by saving dictionary in $PCP_VAR_DIR/pmdas/, similarly to how pmdaCache functions do. A table may be flagged to add an instance-number prefix to all its instance-names, for ensuring uniqueness of the sort approved by pmLookupIndom(3). ''' def __init__(self, thispmda, indom, maxnum): self.pmda = thispmda self.indom = indom self.maxnum = maxnum self.need_save = False self.prefix_mode = False # set later for non-PCP metric instances self.store_file_name = '%s/config/pmda/%d.%d.py' % (os.environ['PCP_VAR_DIR'], thispmda.domain, indom) try: # slightly used! with open(self.store_file_name, 'rb') as f: self.instances = pickle.load(f, encoding="bytes") try: # Fetch the prefixness of the mapping early, so we send # the correct indom strings to the C code the first time. self.prefix_mode = pickle.load(f, encoding="bytes") except: pass self.pmda.debug("loaded %s%s, %d instances" % (self.store_file_name, (" (pfx)" if self.prefix_mode else ""), len(self.instances))) if self.pmda.dbg else None self.need_save = True # to push values down into c pmda layer # TODO need separate flag for 'push values to file'; too much i/o except Exception: # new! self.instances = [] # won't be saved till nonempty self.names_to_instances = {} for i, n in enumerate(self.instances): self.names_to_instances[n] = i # prime the pmda indom table with an empty list; the pmdaIndom # object will be replaced shortly within .replace_indom() self.pmdaindom = pmdaIndom(thispmda.indom(self.indom), []) self.pmda.add_indom(self.pmdaindom) self.save() # replace it with real data now, if non-empty def save(self): '''Push values to the PMDA layer as well as the backing store file.''' if self.need_save: # save to the pmda C layer indom_array = [] # pmdaInstid array, computed on demand self.pmda.debug("indom %d:" % (self.pmda.indom(self.indom))) if self.pmda.dbg else None for i, n in enumerate(self.instances): if self.prefix_mode: instname = str(i) + " " + str(n) else: instname = str(n) self.pmda.debug("%4d:\t%s" % (i, instname)) if self.pmda.dbg else None indom_array.append(pmdaInstid(c_int(i), instname)) # NB: use replace_indom(int, [...]) overload, to pass indom_array, # else it gets ignored self.pmda.replace_indom(self.pmda.indom(self.indom), indom_array) self.pmda.set_need_refresh() # save to disk too try: # slightly used! with open(self.store_file_name, 'wb') as f: pickle.dump(self.instances, f, protocol=0) # 0: most compatible pickle.dump(self.prefix_mode, f, protocol=0) self.pmda.debug("saved %s%s, %d instances" % (self.store_file_name, (" (pfx)" if self.prefix_mode else ""), len(self.instances))) if self.pmda.dbg else None self.need_save = False # reset only on success except Exception as e: self.pmda.err("cannot save %s: %s" % (self.store_file_name, e)) def intern_lookup_value(self, name): '''Add/lookup given name, return its persistent identifier.''' if name in self.names_to_instances: # fast path # mapping the translated name to inst return self.names_to_instances[name] else: # new name num = len(self.instances) if num > self.maxnum: raise ValueError('Too many (%d) different names' % num) self.instances.append(name) self.names_to_instances[name] = num assert self.instances[num] == name self.need_save = True return num # the new inst number class SampleLineParser(object): '''A parser for one metric [{instance}] value [timestamp] line. State machine is required since we're lexing text with embedded quoted freeform strings with punctuation, etc., so can't simply do substring searching. ''' def parse_metric_name_start(self, char): if char.isspace(): pass else: self.state = self.parse_metric_name self.name = char def parse_metric_name(self, char): if char == '{': self.state = self.parse_label_name_start self.labels = {} elif char.isspace(): self.state = self.parse_post_metric_name else: self.name += char def parse_post_metric_name(self, char): if char.isspace(): pass elif char == '{': self.state = self.parse_label_name_start self.labels = {} else: self.state = self.parse_value self.value = char def parse_label_name_start(self, char): if char.isspace(): pass elif char == '}': self.state = self.parse_post_labels else: self.state = self.parse_label_name self.lname = char def parse_label_name(self, char): if char.isspace(): self.state = self.parse_label_equals elif char == '=': self.state = self.parse_label_value_start else: self.lname += char def parse_label_equals(self, char): if char.isspace(): pass elif char == '=': self.state = self.parse_label_value_start else: raise ValueError("Expected =") def parse_label_value_start(self, char): if char.isspace(): pass elif char == '"': self.state = self.parse_label_value self.lvalue = "" else: raise ValueError("Expected \"") def parse_label_value(self, char): if char == '\\': self.state = self.parse_label_value_escapechar elif char == '"': self.state = self.post_label_value self.labels[self.lname] = self.lvalue self.lname = None self.lvalue = None else: self.lvalue += char def parse_label_value_escapechar(self, char): if char == 'n': self.state = self.parse_label_value self.lvalue += '\n' elif char == '"': self.state = self.parse_label_value self.lvalue += '\"' else: self.state = self.parse_label_value self.lvalue += '\\' + char # transcribe \XYZ literally def post_label_value(self, char): if char.isspace(): pass elif char in (', ', ','): self.state = self.parse_label_name_start elif char == '}': self.state = self.parse_post_labels else: raise ValueError("Expected , or }") def parse_post_labels(self, char): if char.isspace(): pass else: self.state = self.parse_value self.value = char def parse_value(self, char): if char.isspace(): # timestamp possibly following self.state = self.parse_chomp else: self.value += char def parse_chomp(self, char): # ignored stuff pass def __init__(self, line): # mis-initialize output variables to force state # machine transitions to do it right self.name = None self.value = None self.labels = None self.lname = None self.lvalue = None # run state machine self.state = self.parse_metric_name_start for char in line: self.state(char) assert self.name assert self.value class Source(object): '''An instance of this class represents a distinct OpenMetrics exporter, identified by a nickname (the next PMNS component beneath openmetrics.*), and a URL. Metrics will show up at openmetrics.NICKNAME.METRIC as/when the source is online. ''' def __init__(self, name, cluster, path, is_scripted, thispmda): self.name = name # source nickname self.cluster = cluster # unique/persistent id# for nickname self.path = path # pathname to .url or executable file self.url = None self.parse_error = False self.parse_url_time = 0 # timestamp of config file when it was last parsed self.is_scripted = is_scripted self.pmda = thispmda # the shared pmda self.requests = None self.headers = None self.filterlist = None self.metadatalist = None self.document = None self.refresh_time = 0 # "never" if not is_scripted: # source is a URL. Create a session for it and initialize a few things self.requests = self.pmda.requests # allow persistent connections etc. self.headers = {} # dict of headers for the http get self.filterlist = [] # list of filters from URL config file self.metadatalist = [] # list of metadata from URL config file. # persistently assign numeric id to our metrics self.pmids_table = PersistentNameTable(self.pmda, cluster, MAX_METRIC) self.metrics_by_name = {} # name -> Metric self.metrics_by_num = {} # number (last component of pmid) -> Metric def old_enough_for_refresh(self): '''But what is "old"? If it is empty (no metrics), then it has probably never been connected to successfully. OTOH, if it hasn't been fetched from "recently", there may be new metrics. So we could track the last time a fetch was done to this source, and "time out" the PMNS from it. ''' now = time.time() last_try_age = now - self.refresh_time return len(self.metrics_by_name) == 0 and last_try_age > empty_source_pmns_poll def check_filter(self, name, entrytype): ''' return "INCLUDE" or "OPTIONAL" if name of type entrytype ("METRIC" or "LABEL") is included or optional (and not excluded) by filters for this source. First match prevails. ''' if self.filterlist is None: return "INCLUDE" for f in self.filterlist: fs = f.split() # INCLUDE|EXCLUDE|OPTIONAL METRIC|LABEL regex if fs[1] == entrytype: pat = ' '.join(fs[2:]) # regex may have spaces rx = self.pmda.lookup_regex(pat) self.pmda.debug("check_filter(%s) fs=%s regex='%s'" % (name, fs, pat)) if self.pmda.dbg else None match = rx.match(name) self.pmda.debug(".... rx.match(%s, %s) -> %s" % (pat, name, match)) if self.pmda.dbg else None if match: return fs[0] # no regex match => include by default return "INCLUDE" def filter_labelset(self, labelset): ''' return included and optional labelsets trimmed by filters ''' self.pmda.debug("filter_labelset(labelset=%s)" % labelset) if self.pmda.dbg else None included_labels = {} optional_labels = {} if labelset is None: return None, None # singular; nothing to include or exclude for lname, lval in labelset.items(): designation = self.check_filter(lname, "LABEL") self.pmda.debug("... filter_labelset lname=%s lval=%s => %s" % (lname, lval, designation)) if self.pmda.dbg else None if designation == "INCLUDE": included_labels[lname] = lval elif designation == "OPTIONAL": # all optional labels are also included labels included_labels[lname] = optional_labels[lname] = lval if included_labels == {}: return None, None # no labels => singular indom if optional_labels == {}: optional_labels = None return included_labels, optional_labels def instname_labels(self, included_labels, optional_labels): ''' Return labels to be used to form the instance name i.e. included labels that are not optional ''' self.pmda.debug("instname_labels(included=%s, optional=%s)" % (included_labels, optional_labels)) if self.pmda.dbg else None if optional_labels is None: naming_labels = included_labels else: naming_labels = {} if included_labels is not None: for name, val in included_labels.items(): if not name in optional_labels: naming_labels[name] = val self.pmda.log("instname_labels returning %s" % naming_labels) if self.pmda.dbg else None return naming_labels def valid_metric_name(self, name): ''' Check validity of given metric name component: - only contains alphanumerics and/or underscores (colon removed later, allowed through here); - only starts with an alphabetic character. ''' if not name[0].isalpha(): return False if not re.sub('[_.:]', '', name).isalnum(): return False return True def parse_metric_line(self, line, pcpline, helpline, typeline): ''' Parse the sample line, identify/create corresponding metric & instance. ''' try: sp = SampleLineParser(line) self.pmda.debug("parsed '%s' -> %s %s %s" % (line, sp.name, sp.labels, sp.value)) if self.pmda.dbg else None self.pmda.debug("parse_metric_line sp.labels=%s" % sp.labels) if self.pmda.dbg else None included_labels, optional_labels = self.filter_labelset(sp.labels) naming_labels = self.instname_labels(included_labels, optional_labels) # not used if singular self.pmda.debug("included_labels '%s'" % (included_labels)) if self.pmda.dbg else None self.pmda.debug("optional_labels '%s'" % (optional_labels)) if self.pmda.dbg else None if sp.name in self.metrics_by_name: m = self.metrics_by_name[sp.name] assert self.metrics_by_num[m.metricnum] == m if m.singular: # singular metrics have no naming labels m.store_inst(included_labels, sp.value) else: m.store_inst(naming_labels, sp.value) self.pmda.debug("naming_labels '%s'" % (naming_labels)) if self.pmda.dbg else None else: # check metric is not excluded by filters fullname = "openmetrics.%s.%s" % (self.name, sp.name) self.pmda.debug("Checking metric '%s'" % (fullname)) if self.pmda.dbg else None # Nb: filter pattern is applied only to the leaf component of the full metric name if self.check_filter(sp.name, "METRIC") != "INCLUDE": self.pmda.log("Metric %s excluded by config filters" % fullname) return True else: if not self.valid_metric_name(sp.name): raise ValueError('invalid metric name: ' + sp.name) # new metric metricnum = self.pmids_table.intern_lookup_value(sp.name) # check if the config specifies metadata for this metric if self.metadatalist: for metadata in self.metadatalist: rx = self.pmda.lookup_regex(metadata[0]) # no spaces allowed in metric name regex if rx.match(sp.name): # config metadata overrides the PCP or PCP5 line parsed from the inline metric data pcpline = "METADATA %s %s" % (sp.name, ' '.join(metadata[1:])) # to EOL self.pmda.log('metric "%s" config metadata matches "%s": pcpline = "%s"' % (sp.name, metadata[0], pcpline)) break m = Metric(self, sp.name, metricnum, included_labels, optional_labels, pcpline, helpline, typeline) self.metrics_by_name[sp.name] = m self.metrics_by_num[metricnum] = m # not pmid! if m.singular: m.store_inst(included_labels, sp.value) else: m.store_inst(naming_labels, sp.value) self.pmda.set_notify_change() except ValueError as e: if not self.parse_error: self.pmda.err("cannot parse name in %s: %s" % (line, e)) self.parse_error = True # one-time-only error diagnostic return False except Exception as e: if self.pmda.dbg: traceback.print_exc() # traceback can be handy here self.pmda.err("cannot parse/store %s: %s" % (line, e)) return False return True def parse_lines(self, text): ''' Refresh all the metric metadata as it is found, including creating new metrics. Store away metric values for subsequent fetch()es. Input parse errors result in exceptions and early termination. That's OK, we don't try heroics to parse non-compliant data. Return number of metrics extracted. ''' num_metrics = 0 lines = text.splitlines() pcpline = None helpline = None typeline = None badness = False state = "metadata" for line in lines: self.pmda.debug("line: %s state: %s" % (line, state)) if self.pmda.dbg else None l = line.strip() # whitespace if l == "": # blank line, ignore, no state change continue elif l.startswith("#"): # comment if state == "metrics": state = "metadata" pcpline = None # NB: throw away previous block's metadata helpline = None typeline = None lp = l.split() if len(lp) < 2: continue # NB: for a well-formed exporter file, # the # metadata blocks must precede # the metric values; we can ignore lp[2]. if lp[1] == 'PCP': pcpline = ' '.join(lp[1:]) elif lp[1] == 'PCP5': pcpline = "%s %s %s" % (lp[1], lp[2], ' '.join(lp[4:])) # omit pmid elif lp[1] == 'HELP': # assume lp[2] matches metric name helpline = ' '.join(lp[3:]) elif lp[1] == 'TYPE': # actually, semantics (counter/instant/discrete) # assume lp[2] matches metric name typeline = ' '.join(lp[3:]) else: pass # ignore other comment lines else: # metric{...} value line state = "metrics" # NB: could verify helpline/typeline lp[2] matches, # but we don't have to go out of our way to support # non-compliant exporters. if not self.parse_metric_line(l, pcpline, helpline, typeline): badness = True break # bad metric line, skip the remainder of this file num_metrics += 1 # clear one-time-only error diagnostic if the situation is now resolved if not badness: self.parse_error = False # NB: this logic only ever -adds- Metrics to a Source. If a source # stops supplying some metrics a PMAPI client will see PM_ERR_VALUE # returned when trying to fetch them. return num_metrics def parse_url_config(self, filepath): ''' Parse a URL config file. The first line is always the URL. Remaining lines are prefixed with a keyword. Supported keywords include '#' for a comment, 'HEADER:' to add to the header passed to the headers dict parameter to the get() call. Note the ':' are important, and spaces are collapsed. e.g. # here is an example header http://someserver/someplace/endpoint.html HEADER: authtoken: some auth token # filters are used to include/exclude metric names FILTER: {INCLUDE|EXCLUDE} METRIC regex # filters are also used to INCLUDE/EXCLUDE labels and to designate # OPTIONAL labels that should be included (unless explicitly excluded) # but NOT used in instance names. All labels that are not excluded or # optional are otherwise used in instance names. FILTER: {INCLUDE|OPTIONAL|EXCLUDE} LABEL regex # METADATA: regex type indom semantics units ... to EOL # # Specifies the PCP metadata to use for metric (leaf) names matching the regex. # Metric names matching the regex will use this metadata (overriding PCP or PCP5 comments). # If a metric name does not match any METADATA regex, it will use the '# PCP' or '# PCP5' line, # falling back to an heuristic if there is no '# PCP' or '# PCP5' line. # # The METADATA: fields are as follows: # regex - an extended regex that matches metric names returned by the URL # type - one of the PCP numeric data types: double, float, u64, 64, u32 or 32 # indom - 'PM_INDOM_NULL' or a name that will be added as an indom_name=name optional label # semantics - 'counter', 'instant' or 'discrete' # units - 'none' or a string (to end-of line) parseable by pmParseUnitsStr(3) # # Note this is for URL configs only (scripted configs should generate PCP5 metadata). ''' conf = open(filepath, 'r').read().strip().split('\n') self.url = conf[0] self.filterlist = [] # filters matching metric names or labels for line in conf[1:]: if not line or line.startswith('#'): continue elif line.startswith('HEADER:'): header = line.split(':', 2) key = ''.join(header[1].split()) val = ' '.join(header[2:]).lstrip() self.headers[key] = val elif line.startswith('FILTER:'): # strip off 'FILTER:' and any leading space # each list entry is (uppercase are literal) : # {INCLUDE|EXCLUDE} METRIC regex # {INCLUDE|EXCLUDE|OPTIONAL} LABEL regex # These are extended grep -E style regexs, disjuncted by '|' # Full string matches must be anchored, e.g. ^matchme$|*endswith$ header_filter = ''.join(line.split(':', 2)[1]).lstrip() self.filterlist.append(header_filter) elif line.startswith('METADATA:'): # strip off 'METADATA:' and any leading space # each list entry is (uppercase are literal) : # METADATA: regex metadata ... to EOL # metadata = ''.join(line.split(':', 2)[1]).lstrip() metadata = line.split(' ')[1:] self.metadatalist.append(metadata) else: self.pmda.err('%s ignored unrecognised config entry "%s"' % (self.url, line)) self.pmda.debug("DEBUG url: %s HEADERS: %s" % (self.url, self.headers)) if self.pmda.dbg else None self.pmda.debug("DEBUG url: %s FILTERS: %s" % (self.url, self.filterlist)) if self.pmda.dbg else None self.pmda.debug("DEBUG url: %s METADATA: %s" % (self.url, self.metadatalist)) if self.pmda.dbg else None def refresh1(self, timeout): ''' If the Source config entry is a URL (with ".url" extension), find the target URL by reading the .url file, fetch (http GET) the target and then save resulting document. If the Source config entry is executable, run it, expecting openmetric formatted data on it's stdout, which is then saved. ''' # clear cached values from all my metrics for _, m in self.metrics_by_name.items(): m.clear_values() # TODO: ditch metrics no longer found in document # bump the fetch call counter for this source, and for the total (cluster 0) self.pmda.stats_fetch_calls[self.cluster] += 1 self.pmda.stats_fetch_calls[0] += 1 self.refresh_time = fetch_time = time.time() try: s = os.stat(self.path) if self.is_scripted: # check file still exists and is executable if not s[ST_MODE] & S_IXUSR: self.pmda.err("cannot execute script '%s'" % self.path) return elif self.parse_url_time < s[ST_CTIME]: # (re)parse the URL from given file self.parse_url_config(self.path) self.parse_url_time = s[ST_CTIME] except Exception as e: self.pmda.err("cannot read %s: %s" % (self.path, e)) return # fetch the document status_code = 0 try: if self.is_scripted: # Execute file, expecting openmetrics metric data on stdout. # stderr goes to the PMDA log. Failures are caught below. self.document = subprocess.check_output(self.path, shell=False).decode() else: # fetch the URL if self.url.startswith('file://'): self.document = open(self.url[7:], 'r').read() else: r = self.requests.get(self.url, headers=self.headers, timeout=timeout) status_code = r.status_code r.raise_for_status() # non-200? ERROR # NB: the requests package automatically enables http keep-alive and compression self.document = r.text # update fetch time counter stats, in ms incr = int(1000 * (time.time() - fetch_time)) self.pmda.stats_fetch_time[self.cluster] += incr self.pmda.stats_fetch_time[0] += incr # total for all sources self.pmda.stats_status[self.cluster] = "success" self.pmda.stats_status_code[self.cluster] = status_code except Exception as e: self.pmda.stats_status[self.cluster] = 'failed to fetch URL or execute script %s: %s' % (self.path, e) self.pmda.stats_status_code[self.cluster] = status_code self.pmda.debug('cannot fetch URL or execute script %s: %s' % (self.path, e)) def refresh2(self, timeout): ''' Parse the saved document that was recently saved in refresh1(). ''' if self.document is None: # error during fetch? return # parse and handle the openmetrics formatted metric data parse_time = time.time() s = self.parse_lines(self.document) # update parse time counter stats, in ms incr = int(1000 * (time.time() - parse_time)) self.pmda.stats_parse_time[self.cluster] += incr self.pmda.stats_parse_time[0] += incr # total # save metric & indom lookup tables changes, if any for _, m in self.metrics_by_name.items(): try: # NB: must process whole list even if exceptions escape m.save() except: # ... which they won't, this is just belt & suspenders pass self.pmids_table.save() self.pmda.debug("fetched %d bytes with %d metrics from URL or script %s" % (len(self.document), s, self.path)) if self.pmda.dbg else None self.document = None # don't hang onto it def fetch(self, item, inst): ''' Retrieve metric/instance values that ought to have been found by a recent refresh(). PM_ERR_AGAIN signals a no go.''' try: m = self.metrics_by_num[item] self.pmda.debug("fetch: item=%d inst=%d m.mname=%s" % (item, inst, m.mname)) if self.pmda.dbg else None return m.fetch_inst(inst) except Exception as e: self.pmda.debug("cannot fetch item %d inst %d: %s" % (item, inst, e)) return [c_api.PM_ERR_AGAIN, 0] class OpenMetricsPMDA(PMDA): def __init__(self, pmda_name, domain, config, timeout, user, debugflag, logfile): ''' Initialize the PMDA. This can take a while for large configurations. The openmetrics entry in pmcd.conf specifies to start up in "notready" mode. Once startup and init is complete, we call pmda.pmda_ready() to tell PMCD we are ready to process requests, just prior to calling run() to enter the main loop. See below. ''' # must first set user and connect to the invoking pmcd PMDA.__init__(self, pmda_name, domain, logfile) if user is not None: self.set_user(user) self.log('Note: running as user "%s"' % user) self.connect_pmcd() # Write debugging messages to log, see --debug cmdline option # and the storable metric $(pmda_name).control.debug self.dbg = debugflag # now everything else may take time self.pmda_name = pmda_name self.config_dir = os.path.normpath(config) self.config_dir_ctime = None self.timeout = timeout # a single central Session that all our sources can concurrently reuse self.requests = requests.Session() # allow persistent connections # the list of configured sources self.source_by_name = {} # persistently assign numeric id to source names -> pmid "cluster" numbers self.cluster_table = PersistentNameTable(self, 0, MAX_CLUSTER) reserved_cluster = self.cluster_table.intern_lookup_value("control") assert reserved_cluster == 0 self.source_by_cluster = {} # compiled regex cache self.regex_cache = {} # Add a IS_DYNAMIC_ROOT metric that serves as a reminder to # pmcd to delegate all pmns requests to us. Do it early, before # other metrics may populate beneath the $(pmda_name).* prefix. dynamic_pmid = (0 << 31) | (511 << 22) | (domain << 10) | 0 dynamic_root = pmdaMetric(dynamic_pmid, 0, 0, 0, pmUnits()) self.add_metric(self.pmda_name, dynamic_root, 'dynamic root for %s metrics' % self.pmda_name) # Add statistical metrics self.sources_indom = self.indom(0) # fetch call counter, per-source end-point self.stats_fetch_calls = {0:0} # counter, keyed by cluster number self.add_metric('%s.control.calls' % self.pmda_name, pmdaMetric(self.pmid(0, 1), c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER, pmUnits(0, 0, 1, 0, 0, c_api.PM_COUNT_ONE)), 'per-end-point source call counter') # fetch time counter, per-source end-point self.stats_fetch_time = {0:0} # time counter in msec, keyed by cluster number self.add_metric('%s.control.fetch_time' % self.pmda_name, pmdaMetric(self.pmid(0, 2), c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER, pmUnits(0, 1, 0, 0, c_api.PM_TIME_MSEC, 0)), # millisecond counter 'per-end-point source fetch time counter, excluding parse time') # parse time counter, per-source end-point self.stats_parse_time = {0:0} # time counter in msec, keyed by cluster number self.add_metric('%s.control.parse_time' % self.pmda_name, pmdaMetric(self.pmid(0, 3), c_api.PM_TYPE_U64, self.sources_indom, c_api.PM_SEM_COUNTER, pmUnits(0, 1, 0, 0, c_api.PM_TIME_MSEC, 0)), # millisecond counter 'per-end-point source parse time counter, excluding fetch time') # verbose/debug messages metric self.add_metric('%s.control.debug' % self.pmda_name, pmdaMetric(self.pmid(0, 4), c_api.PM_TYPE_U32, c_api.PM_INDOM_NULL, c_api.PM_SEM_DISCRETE, pmUnits(0, 0, 0, 0, 0, 0)), 'debug flag to enable verbose log messages, to enable: pmstore %s.control.debug 1' % self.pmda_name) # response status string, per-source end-point self.stats_status = {0:"none"} # status string, keyed by cluster number self.add_metric('%s.control.status' % self.pmda_name, pmdaMetric(self.pmid(0, 5), c_api.PM_TYPE_STRING, self.sources_indom, c_api.PM_SEM_INSTANT, pmUnits(0, 0, 0, 0, 0, 0)), # no units 'per-end-point source URL response status after the most recent fetch') # response status code, per-source end-point self.stats_status_code = {0:0} # status code, keyed by cluster number self.add_metric('%s.control.status_code' % self.pmda_name, pmdaMetric(self.pmid(0, 6), c_api.PM_TYPE_32, self.sources_indom, c_api.PM_SEM_DISCRETE, pmUnits(0, 0, 0, 0, 0, 0)), # no units 'per-end-point source URL response status code after the most recent fetch') # schedule a refresh self.set_need_refresh() # store callback for openmetrics.control.debug self.set_store_callback(self.openmetrics_store_callback) # used frequently, but we welcome it only for initial pmns population purposes # NB: If exceptions propagate out to cpmda, the log will contain entries such as: # Error: fetch_callback: callback failed self.set_refresh_metrics(self.refresh_metrics_for_pmns) self.set_refresh_all(self.refresh_some_clusters_for_fetch) # "all" is a misnomer self.set_fetch_callback(self.fetch_callback) # label callbacks self.set_label(self.openmetrics_label) self.set_label_callback(self.openmetrics_label_callback) # notes callbacks (for optional/extrinsic labels) self.set_notes(self.openmetrics_notes) # ITEM level optional labels def lookup_regex(self, pat): ''' cache of compiled regex ''' if pat not in self.regex_cache: self.regex_cache[pat] = re.compile(r"%s" % pat) self.debug("lookup_regex: added '%s'" % pat) if self.dbg else None return self.regex_cache[pat] def assert_source_invariants(self, name=None, cluster=None): ''' Assert some invariants about the known sources ''' if name: s = self.source_by_name[name] assert s == self.source_by_cluster[s.cluster] if cluster: s = self.source_by_cluster[cluster] assert s == self.source_by_name[s.name] def traverse(self, directory, ctime): ''' Return list of files below dir, recursively ''' ret = [] m = os.path.getctime(directory) if ctime is None or m > ctime: ctime = m for path, subdirs, files in os.walk(directory): for f in files: if not f.startswith("."): fname = os.path.join(path, f) m = os.path.getctime(fname) if ctime is None or m > ctime: ctime = m ret.append(fname) for d in subdirs: m, _ = self.traverse(os.path.join(path, d), ctime) if ctime is None or m > ctime: ctime = m return ctime, ret def rescan_confdir(self): '''Scan the configuration directories for any new .url files or scripts. Ensure there is a Source registered in the self.source_by_name dictionary for each one. First check if anything in the config dirs has changed lately, else do nothing. This is important because this callback is invoked frequently by src/python/pmda.c. ''' traverse_time = time.time() dir_ctime, conf_filelist = self.traverse(self.config_dir, self.config_dir_ctime) traverse_time = time.time() - traverse_time if self.config_dir_ctime is None or self.config_dir_ctime < dir_ctime: self.config_dir_ctime = dir_ctime else: # no new or changed conf files, don't rescan directory return self.log("Config change detected, traversed %d config entries in %.04fs, rescanning ..." % (len(conf_filelist), traverse_time)) nickname_regexp = self.lookup_regex(r"^[A-Za-z][A-Za-z0-9_.]*$") # TODO: maybe nuke sources related to removed files save_cluster_table = False if sort_conf_list: # sorted for indom cluster consistency conf_filelist = sorted(conf_filelist) for file in conf_filelist: # compute nickname for source: # the part of the filename before .url file_split = os.path.splitext(file) self.debug("found %s => %s" % (file, file_split)) if self.dbg else None # check if it's executable, or has ".url" suffix is_scripted = os.stat(file)[ST_MODE] & S_IXUSR != 0 if not is_scripted and (len(file_split) != 2 or file_split[1] != ".url"): # ignore this file - not executable and doesn't end in ".url" self.debug("Warning: ignored config file '%s', doesn't end in '.url' and not executable." % file) if self.dbg else None continue # convert file path name into a PCP metric name name = file_split[0].replace(self.config_dir + "/", "").replace("/", ".") if name == "control": self.err("Warning: ignored config file '%s', '%s.control' is a reserved PMNS subtree for PMDA statistics" % (file, self.pmda_name)) continue if not nickname_regexp.match(name): self.err("Warning: ignored config file '%s', unsuitable for PCP namespace" % file) continue if name in self.source_by_name: # this source is already known self.assert_source_invariants(name=name) else: try: path = file cluster = self.cluster_table.intern_lookup_value(name) source = Source(name, cluster, path, is_scripted, self) self.source_by_name[source.name] = source self.source_by_cluster[source.cluster] = source # initialize statistics self.stats_fetch_calls[cluster] = 0 self.stats_fetch_time[cluster] = 0 self.stats_parse_time[cluster] = 0 self.stats_status[cluster] = "unknown" self.stats_status_code[cluster] = 0 save_cluster_table = True self.log("Found source %s cluster %d" % (name, cluster)) except Exception as e: self.err("Error allocating new cluster/source %s (%s)" % (name, e)) if save_cluster_table: self.cluster_table.save() self.set_notify_change() def refresh_metrics_for_pmns(self): '''Refresh our list of Sources. Then have each "old" Source do a fetch, so as to populate/refresh the PMNS. ''' self.rescan_confdir() # get our Source list up to date # do a batch fetch of all empty sources to ensure pmns is populated clusters = [] for k, v in self.source_by_cluster.items(): if v.old_enough_for_refresh(): clusters.append(k) if clusters: self.refresh_some_clusters_for_fetch(clusters) def fetch_callback(self, cluster, item, inst): ''' Main fetch callback which returns the value of the metric ''' if cluster == 0: # The reserved 'control' cluster: statistics if item == 1: # per-source calls counter return [self.stats_fetch_calls[inst], 1] if inst in self.stats_fetch_calls else [c_api.PM_ERR_VALUE, 0] elif item == 2: # per-source fetch time counter return [self.stats_fetch_time[inst], 1] if inst in self.stats_fetch_time else [c_api.PM_ERR_VALUE, 0] elif item == 3: # per-source parse time counter return [self.stats_parse_time[inst], 1] if inst in self.stats_parse_time else [c_api.PM_ERR_VALUE, 0] elif item == 4: # $(pmda_name).control.debug return [self.dbg, 1] elif item == 5: # per-source status string return [self.stats_status[inst], 1] if inst in self.stats_status else [c_api.PM_ERR_VALUE, 0] elif item == 6: # per-source status code return [self.stats_status_code[inst], 1] if inst in self.stats_status_code else [c_api.PM_ERR_VALUE, 0] return [c_api.PM_ERR_PMID, 0] self.assert_source_invariants(cluster=cluster) self.debug("fetch_callback: cluster=%d item=%d inst=%d .. about to fetch" % (cluster, item, inst)) try: if cluster in self.source_by_cluster: # end-points return self.source_by_cluster[cluster].fetch(item, inst) else: return [c_api.PM_ERR_PMID, 0] except Exception as e: self.err("Error: fetch_callback failed for cluster=%d item=%d inst=%d: %s" % (cluster, item, inst, e)) return [c_api.PM_ERR_AGAIN, 0] # was there before def refresh1_worker(self, inqueue, outqueue): while not inqueue.empty(): try: cluster = inqueue.get() if cluster > 0: self.assert_source_invariants(cluster=cluster) self.source_by_cluster[cluster].refresh1(self.timeout) except Exception as e: self.err("Error: Cannot refresh1 cluster %d: %s" % (cluster, e)) finally: outqueue.put(cluster) def refresh2_worker(self, cluster): try: if cluster > 0: self.assert_source_invariants(cluster=cluster) self.source_by_cluster[cluster].refresh2(self.timeout) except Exception as e: self.err("Error: Cannot refresh2 cluster %d: %s" % (cluster, e)) def refresh_some_clusters_for_fetch(self, _clusters): '''Called once per pmFetch batch handling, before openmetrics_fetch_callback calls. Creates threads to fetch data in parallel. ''' clusters = [int(l) for l in _clusters] # convert from PyLong inpqueue = queue.Queue() workqueue = queue.Queue() self.debug("refreshing clusters %s" % clusters) if self.dbg else None for c in clusters: inpqueue.put(c) # We start up only a limited number of concurrent fetcher threads. We don't # want to open an unlimited number of .url/.conf files nor sockets. num_threads = min(100, len(clusters)) threads = [] for _ in range(num_threads): t = threading.Thread(target=self.refresh1_worker, args=(inpqueue, workqueue)) threads.append(t) t.daemon = True # allow shutdown if some straggler is still running t.start() # Each will exit when the input workqueue becomes empty. # Consume the documents in the main thread, as they arrive in # the workqueue. Do this single-threaded only because the # python vm is effectively single-threaded for computations # anyway, so cpu-bound multithreaded apps get bogged down by # the big interpreter lock. The exception handler in # refresh1_worker() tries to guarantee that we will get all # the cluster numbers show up eventually. for c in clusters: # expected eventual size of workqueue cluster = workqueue.get() self.refresh2_worker(cluster) # TODO: timeout on this too? def set_need_refresh(self): cpmda.set_need_refresh() self.pmns_refresh() def jsonb_labelset(self, labelset): ''' Convert a dict of labels into a jsonb string. ''' ret = None if labelset is not None: for name, val in labelset.items(): if ret is None: ret = '"%s":"%s"' % (name, val) else: ret = ret + ',"%s":"%s"' % (name, val) if ret is None: return '{}' return '{%s}' % ret def openmetrics_label(self, ident, typeid): ''' return a JSONb formatted string (labelset) for given ident of given type ''' ret = "{}" self.debug('openmetrics_label(ident=%d (%#x), type=%s)' % (ident, ident, labeltype[typeid])) if self.dbg else None if typeid == c_api.PM_LABEL_INDOM: # labels for indom=ident indom = ident for _, v in self.source_by_cluster.items(): for _, mv in v.metrics_by_name.items(): if mv.mindom == indom: ret = '{"source":"%s"}' % v.name break elif typeid == c_api.PM_LABEL_CLUSTER: # Labels for a cluster, i.e. an individual source. # Note: if using dbpmda, test this with e.g. label cluster 144.1 # (you need the domain (default 144) and the cluster number (1, 2, ..) try: s = self.source_by_cluster[ident] if s.is_scripted: # The script should insert a hostname="somewhere" label if it # fetches from a remote host. This will override the hostname # label inserted by pmcd (at the context label level). Details # of the label hierarchy rules are in the "PRECEDENCE" section # of the pmLookupLabels(3) man page. ret = '{"script":"%s", "source":"%s"}' % (s.path, s.name) else: # extract hostname label from the URL if s.url.startswith("http"): host = s.url.replace("http://", "").replace("https://", "").split("/")[0] elif s.url.startswith("file://"): host = gethostname() else: host = "localhost" if host == "localhost": host = gethostname() ret = '{"hostname":"%s", "source":"%s", "url":"%s"}' % (host, s.name, s.url) except: pass # ignore key error for unknown cluster number elif typeid == c_api.PM_LABEL_ITEM: # item level (metric) labels, parsed from end-point URLs self.debug('openmetrics_label(ident=%d pmid=%s PM_LABEL_ITEM)' % (ident, pmContext.pmIDStr(ident))) if self.dbg else None for _, v in self.source_by_cluster.items(): for _, mv in v.metrics_by_name.items(): if mv.pmid == ident: self.debug(' FOUND pmid=%s labels=%s' % (pmContext.pmIDStr(mv.pmid), mv.labels)) if self.dbg else None self.debug(' FOUND pmid=%s optional_labels=%s' % (pmContext.pmIDStr(mv.pmid), mv.optional_labels)) if self.dbg else None if mv.labels is not None: intrinsic_labels = {} for ln, lv in mv.labels.items(): if ln not in mv.optional_labels: intrinsic_labels[ln] = lv ret = self.jsonb_labelset(intrinsic_labels) break else: # empty label set for all other types self.debug('openmetrics_label callback type=%d ("%s") NOT HANDLED' % (typeid, labeltype[typeid])) if self.dbg else None self.debug('openmetrics_label callback type=%d ("%s") returning "%s"' % (typeid, labeltype[typeid], ret)) return ret def openmetrics_label_callback(self, indom, inst): ''' return label for given instance ID in given indom ''' instlabels = None try: self.debug('openmetrics_label_callback(indom=%#x, inst=%d)' % (indom, inst)) if self.dbg else None for _, v in self.source_by_cluster.items(): for _, mv in v.metrics_by_name.items(): if indom == mv.mindom: for i, nm in enumerate(mv.indom_table.instances): if inst == i: self.debug('openmetrics_label_callback: found inst label "%s"' % nm) if self.dbg else None if i in mv.inst_labels: for l, val in mv.inst_labels[i].items(): if l != "instname": if instlabels is None: instlabels = '"%s":"%s"' % (l, val) else: instlabels = instlabels + "," + '"%s":"%s"' % (l, val) except Exception as e: self.debug("openmetrics_label_callback: exception %s" % e) self.debug(traceback.format_exc()) self.debug('openmetrics_label_callback returning "{%s}"' % instlabels) if self.dbg else None if instlabels is None: return '{}' return '{%s}' % instlabels def openmetrics_notes(self, ident, typeid): ''' return a JSONb formatted string (labelset) for given ident of given type for OPTIONAL labels (aka "notes") ''' ret = "{}" self.debug('openmetrics_notes(ident=%d (%#x), type=%s)' % (ident, ident, labeltype[typeid])) if self.dbg else None if typeid == c_api.PM_LABEL_ITEM: # item level (metric) optional labels, parsed from end-point URLs self.debug('openmetrics_notes(ident=%d pmid=%s PM_LABEL_ITEM)' % (ident, pmContext.pmIDStr(ident))) if self.dbg else None for _, v in self.source_by_cluster.items(): for _, mv in v.metrics_by_name.items(): if mv.pmid == ident: self.debug(' FOUND pmid=%s optional_labels=%s' % (pmContext.pmIDStr(mv.pmid), mv.optional_labels)) if self.dbg else None if mv.optional_labels is not None: ret = self.jsonb_labelset(mv.optional_labels) break else: # empty label set for all other types self.debug('openmetrics_notes callback type=%d ("%s") NOT HANDLED' % (typeid, labeltype[typeid])) self.debug('openmetrics_notes callback type=%d ("%s") returning "%s"' % (typeid, labeltype[typeid], ret)) return ret def openmetrics_store_callback(self, cluster, item, inst, val): ''' To enable verbose log messages: pmstore openmetrics.control.debug 1 ''' if cluster == 0 and item == 4: if val < 0: return c_api.PM_ERR_BADSTORE self.dbg = val != 0 self.log('%s.control.debug: set to %d' % (self.pmda_name, self.dbg)) return 0 if cluster < MAX_CLUSTER and item < MAX_METRIC: return c_api.PM_ERR_PERMISSION return c_api.PM_ERR_PMID def debug(self, s): if self.dbg: super(OpenMetricsPMDA, self).dbg(s) @staticmethod def log(message): PMDA.log(message) @staticmethod def err(message): PMDA.err(message) if __name__ == '__main__': parser = argparse.ArgumentParser( description='OpenMetrics PMDA.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( '-D', '--debug', default=False, help='enable debug messages in log (default disabled).', action='store_true') parser.add_argument( '-c', '--config', type=str, default='config.d', help='configuration directory') parser.add_argument( '-d', '--domain', type=int, default=144, help='PMDA domain number (default 144)') parser.add_argument( '-l', '--log', type=str, default='openmetrics.log', help='log filename') parser.add_argument( '-n', '--nosort', default=False, help='do not sort the config file list (default is to sort)', action='store_true') parser.add_argument( '-r', '--root', type=str, default='openmetrics', help='dynamic PMNS root name') parser.add_argument( '-t', '--timeout', type=int, default=2, help='HTTP GET timeout for each end-point URL (default 2 seconds)') parser.add_argument( '-u', '--user', type=str, default='pcp', help='set the username to run under (default is the pcp account)') args = parser.parse_args() if args.nosort: sort_conf_list = False if not args.config.startswith("/"): pmdadir = os.getenv('PCP_PMDAS_DIR') or '/' pmdadir = os.path.join(pmdadir, args.root) args.config = os.path.join(pmdadir, args.config) # This PMDA starts up in the "notready" state, see the Install script where # the IPC protocol is ipc_prot="binary notready". See also pmcd(1) man page. # The "binary notready" setting can also be manually configured in pmcd.conf. # Default domain number is PMDA(144), see -d option. pmda = OpenMetricsPMDA(args.root, args.domain, args.config, args.timeout, args.user, args.debug, args.log) # Uncomment to force -D or use: pmstore openmetrics.control.debug 1 # pmda.dbg = True # Scan initial config pmda.log("Initializing ... currently in notready state.") pmda.timeout = args.timeout * 10 pmda.refresh_metrics_for_pmns() pmda.timeout = args.timeout # Tell PMCD that we are now ready to process requests. pmda.pmda_ready() pmda.log("Ready to process requests") # Now enter the main loop pmda.run()