#!/usr/bin/env pmpython # Copyright (C) 2020 Ashwin Nayak # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # pylint: disable=bad-whitespace, line-too-long, too-many-return-statements # pylint: disable=broad-except, too-many-branches, too-many-statements, inconsistent-return-statements # pylint: disable=no-name-in-module, too-many-instance-attributes, c-extension-no-member """ Performance Metrics Domain Agent exporting rabbitmq metrics. """ import sys from pcp.pmda import PMDA, pmdaMetric, pmdaIndom, pmdaInstid from pcp.pmapi import pmUnits from pcp.pmapi import pmContext as PCP from cpmapi import PM_TYPE_U64, PM_TYPE_STRING, PM_TYPE_FLOAT from cpmapi import PM_SPACE_BYTE, PM_COUNT_ONE from cpmapi import PM_SEM_COUNTER, PM_SEM_INSTANT, PM_SEM_DISCRETE from cpmapi import PM_ERR_APPVERSION from cpmda import PMDA_FETCH_NOVALUES import cpmapi as c_api import requests try: import ConfigParser except ImportError: import configparser as ConfigParser DEFAULT_USERNAME = 'guest' DEFAULT_PASSWORD = 'guest' DEFAULT_TIMEOUT = 2.5 # seconds DEFAULT_PORT = 15672 class RabbitmqPMDA(PMDA): """ PCP rabbitmq PMDA """ def __init__(self, name, domain): """ (Constructor) Initialisation - register metrics, callbacks, drop privileges """ PMDA.__init__(self, name, domain) self.username = DEFAULT_USERNAME self.password = DEFAULT_PASSWORD self.timeout = DEFAULT_TIMEOUT self.port = DEFAULT_PORT self.url = None self.queue_names = [] self.stats = {} self.read_config() self.build_url() self.rabbit_fetch() self.connect_pmcd() self.queue_indom = self.indom(0) self.rabbitmq_instances() self.queue_cluster = 0 self.queue_metrics = [ # Name - type - semantics - units - help [ 'queue.autodelete', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'queue autodeletable?: boolean value'], # 0 [ 'queue.backing_queue_status.avg_ack_egress_rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'Rate at which unacknowledged message records leave RAM'], # 1 [ 'queue.backing_queue_status.avg_ack_ingress_rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'Rate at which unacknowledged message records enter RAM'], # 2 [ 'queue.backing_queue_status.avg_egress_rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'Average egress (outbound) rate, not including messages that are sent straight through to auto-acking consumers'], # 3 [ 'queue.backing_queue_status.avg_ingress_rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'Average ingress (inbound) rate, not including messages that are sent straight through to auto-acking consumers'], # 4 [ 'queue.backing_queue_status.delta', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'message classification, followed by 4 values; type: array'], # 5 [ 'queue.backing_queue_status.len', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Total backing queue length, in messages'], # 6 [ 'queue.backing_queue_status.mode', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'Mode of backing queue'], # 7 [ 'queue.backing_queue_status.next_seq_id', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(), 'The next sequence ID to be used in the backing queue'], # 8 [ 'queue.backing_queue_status.q1', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of messages in backing queue q1'], # 9 [ 'queue.backing_queue_status.q2', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of messages in backing queue q2'], # 10 [ 'queue.backing_queue_status.q3', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of messages in backing queue q3'], # 11 [ 'queue.backing_queue_status.q4', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of messages in backing queue q4'], # 12 [ 'queue.backing_queue_status.target_ram_count', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'backiing queue target ram count'], # 13 [ 'queue.consumer_utilization', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers'], # 14 [ 'queue.consumers', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of consumers of the queue'], # 15 [ 'queue.durable', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'Durable queues are persisted to disk and thus survive broker restarts. Boolean value'], # 16 [ 'queue.effective_policy_definition', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'dict value: effective policy definitions'], # 17 [ 'queue.exclusive', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'boolean value. Request the consumer to be the only one on the target queue.'], # 18 [ 'queue.exclusive_consumer_tag', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'Tag of exclusive consumer'], # 19 [ 'queue.garbage_collection.fullsweep_after', PM_TYPE_U64, PM_SEM_COUNTER, pmUnits(), 'fullsweep after N, where N is the number of gc to do before forcing a gc of both young and old heap'], # 20 [ 'queue.garbage_collection.max_heap_size', PM_TYPE_U64, PM_SEM_COUNTER, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'erlang gc max heap size'], # 21 [ 'queue.garbage_collection.min_bin_vheap_size', PM_TYPE_U64, PM_SEM_COUNTER, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'erlang gc min_bin_vheap_size'], # 22 [ 'queue.garbage_collection.min_heap_size', PM_TYPE_U64, PM_SEM_COUNTER, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'erlang gc min heap size'], # 23 [ 'queue.garbage_collection.minor_gcs', PM_TYPE_U64, PM_SEM_COUNTER, pmUnits(), 'The generational(minor) gc just collects the young heap'], # 24 [ 'queue.head_message_timestamp', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'Timestamp of head message'], # 25 [ 'queue.idle_since', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'Queue idle since'], # 26 [ 'queue.memory', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures'], # 27 [ 'queue.message_bytes', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Sum of the size of all message bodies in the queue'], # 28 [ 'queue.message_bytes_paged_out', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'All message bodies paged out'], # 29 [ 'queue.message_bytes_persistent', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Total number of persistent messages in the queue (will always be 0 for transient queues)'], # 30 [ 'queue.message_bytes_ram', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Like message_bytes but counting only those messages which are in RAM'], # 31 [ 'queue.message_bytes_ready', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Like message_bytes but counting only those messages ready to be delivered to clients'], # 32 [ 'queue.message_bytes_unacknowledged', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Like message_bytes but counting only those messages delivered to clients but not yet acknowledged'], # 33 [ 'queue.messages', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Total number of messages (ready plus unacknowledged)'], # 34 [ 'queue.messages_unacknowledged_details.rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'How much the queue depth has changed per second in the most recent sampling interval in regards messages_unacknowledged'], # 35 [ 'queue.messages_unacknowledged_ram', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of unacknowledged messages in ram'], # 36 [ 'queue.name', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'name of queue'], # 37 [ 'queue.node', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'A RabbitMQ broker is a logical grouping of one or several Erlang nodes, each running the RabbitMQ application and sharing users, virtual hosts, queues, exchanges, etc. Sometimes we refer to the collection of nodes as a cluster'], # 38 [ 'queue.operator_policy', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'operator_policy'], # 39 [ 'queue.policy', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'queue policy'], # 40 [ 'queue.recoverable_slaves', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'list of slaves'], # 41 [ 'queue.reductions', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(), 'Reductions (unit of scheduler/CPU consumption)'], # 42 [ 'queue.reductions_details.rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'Reductions per second'], # 43 [ 'queue.single_active_consumer_tag', PM_TYPE_STRING, PM_SEM_INSTANT, pmUnits(), 'single active consumer tag'], # 44 [ 'queue.state', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'state of the queue(ex: running,idle etc.'], # 45 [ 'queue.type', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'Queue type'], # 46 [ 'queue.vhost', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'name of vhost'], # 47 [ 'queue.messages_details.rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'How much the queue depth has changed per second in the most recent sampling interval'], # 48 [ 'queue.messages_paged_out', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Total number of messages paged out in the queue'], # 49 [ 'queue.messages_persistent', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(), 'Total number of persistent messages in the queue (will always be 0 for transient queues)'], # 50 [ 'queue.messages_ram', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(1,0,0,PM_SPACE_BYTE,0,0), 'Total number of messages which are resident in RAM'], # 51 [ 'queue.messages_ready', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of messages ready for delivery'], # 52 [ 'queue.messages_ready_details.rate', PM_TYPE_FLOAT, PM_SEM_INSTANT, pmUnits(), 'How much the queue depth has changed per second in the most recent sampling interval in regards of messages ready'], # 53 [ 'queue.messages_ready_ram', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Total number of messages_ready which are resident in RAM'], # 54 [ 'queue.messages_unacknowledged', PM_TYPE_U64, PM_SEM_INSTANT, pmUnits(0,0,1,0,0,PM_COUNT_ONE), 'Number of unacknowledged messages'], # 55 [ 'queue.arguments', PM_TYPE_STRING, PM_SEM_DISCRETE, pmUnits(), 'arguments given to http request'], # 56 ] for item, metric in enumerate(self.queue_metrics): self.add_metric(name + '.' + metric[0], pmdaMetric(self.pmid(self.queue_cluster, item), metric[1], self.queue_indom, metric[2], metric[3]), metric[4], metric[4]) self.set_fetch_callback(self.rabbit_fetch_callback) self.set_refresh(self.rabbitmq_refresh) self.set_label(self.rabbitmq_label) self.set_label_callback(self.rabbitmq_label_callback) self.set_user(PCP.pmGetConfig('PCP_USER')) def read_config(self): """ Read Configuration file""" configfile = PCP.pmGetConfig('PCP_PMDAS_DIR') configfile += '/rabbitmq/rabbitmq.conf' # Python < 3.2 compatible if sys.version_info[0] >=3 and sys.version_info[1] >= 2: config = ConfigParser.ConfigParser() else: config = ConfigParser.SafeConfigParser() # Read the config file if config.has_section('pmda'): self.username = config.get('pmda','username') self.password = config.get('pmda','password') self.port = config.get('pmda','port') def build_url(self): """Build up url needed for HTTP GET request""" self.url = 'http://'+self.username+':'+self.password+'@localhost:'+str(self.port)+'/api/queues' def rabbit_fetch(self): """ fetches result from HTTP GET request """ self.stats = {} self.queue_names = [] try: temp = requests.get(self.url, timeout=self.timeout).json() for _, val in enumerate(temp): self.stats[val['name']] = val self.queue_names.append(val['name']) except Exception as e: self.log("Failed to get info from rabbitmq: %s" % (str(e))) def rabbitmq_instances(self): """ set names for rabbitmq instances """ insts = [] for idx, val in enumerate(self.queue_names): insts.append(pmdaInstid(idx, val)) self.add_indom(pmdaIndom(self.queue_indom, insts)) def rabbitmq_refresh(self, cluster): """rabbitmq refresh function""" if cluster == self.queue_cluster: self.rabbit_fetch() insts = [] for idx, val in enumerate(self.queue_names): insts.append(pmdaInstid(idx, val)) self.replace_indom(self.queue_indom, insts) def rabbitmq_label(self, ident, target): ''' Return JSONB format labelset for identifier of given type, as a string ''' if target == c_api.PM_LABEL_INDOM: indom = ident if indom == self.queue_indom: ret = '"indom_name":"queue"' else: # no labels added for PM_LABEL_{DOMAIN,CLUSTER,ITEM} ret = '' # default is an empty labelset string return '{%s}' % ret def rabbitmq_label_callback(self, indom, inst): ''' Return JSONB format labelset for an inst in given indom, as a string ''' if indom == self.queue_indom: ret = '"queue":"%s"' % (self.inst_name_lookup(self.queue_indom, inst)) else: ret = '' return '{%s}' % ret def rabbit_fetch_callback(self, cluster, item, inst): """ fetch callback method""" if cluster == self.queue_cluster: if len(self.stats) == 0: return [PMDA_FETCH_NOVALUES] try: queue = self.inst_name_lookup(self.queue_indom,inst) if item == 0: return [str(self.stats[queue]['auto_delete']),1] if item == 1: return [self.stats[queue]['backing_queue_status']['avg_ack_egress_rate'],1] if item == 2: return [self.stats[queue]['backing_queue_status']['avg_ack_ingress_rate'],1] if item == 3: return [self.stats[queue]['backing_queue_status']['avg_egress_rate'],1] if item == 4: return [self.stats[queue]['backing_queue_status']['avg_ingress_rate'],1] if item == 5: return [str(self.stats[queue]['backing_queue_status']['delta']),1] if item == 6: return [self.stats[queue]['backing_queue_status']['len'],1] if item == 7: return [str(self.stats[queue]['backing_queue_status']['mode']),1] if item == 8: return [self.stats[queue]['backing_queue_status']['next_seq_id'],1] if item == 9: return [self.stats[queue]['backing_queue_status']['q1'],1] if item == 10: return [self.stats[queue]['backing_queue_status']['q2'],1] if item == 11: return [self.stats[queue]['backing_queue_status']['q3'],1] if item == 12: return [self.stats[queue]['backing_queue_status']['q4'],1] if item == 13: return [str(self.stats[queue]['backing_queue_status']['target_ram_count']),1] if item == 14: return [str(self.stats[queue]['consumer_utilisation']),1] if item == 15: return [self.stats[queue]['consumers'],1] if item == 16: return [str(self.stats[queue]['durable']),1] if item == 17: return [str(self.stats[queue]['effective_policy_definition']),1] if item == 18: return [str(self.stats[queue]['exclusive']),1] if item == 19: return [str(self.stats[queue]['exclusive_consumer_tag']),1] if item == 20: return [self.stats[queue]['garbage_collection']['fullsweep_after'],1] if item == 21: return [self.stats[queue]['garbage_collection']['max_heap_size'],1] if item == 22: return [self.stats[queue]['garbage_collection']['min_bin_vheap_size'],1] if item == 23: return [self.stats[queue]['garbage_collection']['min_heap_size'],1] if item == 24: return [self.stats[queue]['garbage_collection']['minor_gcs'],1] if item == 25: return [str(self.stats[queue]['head_message_timestamp']),1] if item == 26: return [str(self.stats[queue]['idle_since']),1] if item == 27: return [self.stats[queue]['memory'],1] if item == 28: return [self.stats[queue]['message_bytes'],1] if item == 29: return [self.stats[queue]['message_bytes_paged_out'],1] if item == 30: return [self.stats[queue]['message_bytes_persistent'],1] if item == 31: return [self.stats[queue]['message_bytes_ram'],1] if item == 32: return [self.stats[queue]['message_bytes_ready'],1] if item == 33: return [self.stats[queue]['message_bytes_unacknowledged'],1] if item == 34: return [self.stats[queue]['messages'],1] if item == 35: return [self.stats[queue]['messages_unacknowledged_details']['rate'],1] if item == 36: return [self.stats[queue]['messages_unacknowledged_ram'],1] if item == 37: return [str(self.stats[queue]['name']),1] if item == 38: return [str(self.stats[queue]['node']),1] if item == 39: return [str(self.stats[queue]['operator_policy']),1] if item == 40: return [str(self.stats[queue]['policy']),1] if item == 41: return [str(self.stats[queue]['recoverable_slaves']),1] if item == 42: return [self.stats[queue]['reductions'],1] if item == 43: return [self.stats[queue]['reductions_details']['rate'],1] if item == 44: return [str(self.stats[queue]['single_active_consumer_tag']),1] if item == 45: return [str(self.stats[queue]['state']),1] if item == 46: return [str(self.stats[queue]['type']),1] if item == 47: return [str(self.stats[queue]['vhost']),1] if item == 48: return [self.stats[queue]['messages_details']['rate'],1] if item == 49: return [self.stats[queue]['messages_paged_out'],1] if item == 50: return [self.stats[queue]['messages_persistent'],1] if item == 51: return [self.stats[queue]['messages_ram'],1] if item == 52: return [self.stats[queue]['messages_ready'],1] if item == 53: return [self.stats[queue]['messages_ready_details']['rate'],1] if item == 54: return [self.stats[queue]['messages_ready_ram'],1] if item == 55: return [self.stats[queue]['messages_unacknowledged'],1] if item == 56: return [str(self.stats[queue]['arguments']),1] except Exception: return [PM_ERR_APPVERSION,0] if __name__ == '__main__': RabbitmqPMDA('rabbitmq', 143).run()