From 37c46dcdc94dbed8ba01fa0baea4f40c7d483c2b Mon Sep 17 00:00:00 2001 From: rajeshP524 Date: Wed, 31 Jul 2024 00:00:50 +0530 Subject: [PATCH] Add RabbitMQ monitoring for RHOSO controlplane Change-Id: I6085017463567589e990c85243c3aca2efa19cae --- ansible/install/group_vars/all.yml | 9 + .../collectd-rhoso/files/collectd_deploy.yaml | 16 ++ .../roles/collectd-rhoso/tasks/main.yml | 21 ++ .../templates/controlplane.collectd.conf.j2 | 19 ++ browbeat-containers/collectd-rhoso/Dockerfile | 3 +- .../files/collectd_rabbitmq_monitoring.py | 205 ++++++++++++++++++ 6 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py diff --git a/ansible/install/group_vars/all.yml b/ansible/install/group_vars/all.yml index 3d2fd95fb..82f2b1133 100644 --- a/ansible/install/group_vars/all.yml +++ b/ansible/install/group_vars/all.yml @@ -327,6 +327,15 @@ controller_monitored_queues: - "notifications.error" - "notifications.critical" +# Queues to monitor message count on RHOSO controlplane +rhoso_ctlplane_monitored_queues: + - "notifications.sample" + - "notifications.audit" + - "notifications.info" + - "notifications.warn" + - "notifications.error" + - "notifications.critical" + ######################## # ovsagent monitoring ######################## diff --git a/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml b/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml index 2d4fe5970..f78fd4537 100644 --- a/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml +++ b/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml @@ -91,6 +91,18 @@ spec: securityContext: privileged: true volumeMounts: + - mountPath: /etc/pki/rabbitmq/tls/certs/rabbitmq.crt + name: rabbitmq-tls + readOnly: true + subPath: tls.crt + - mountPath: /etc/pki/rabbitmq/tls/private/rabbitmq.key + name: rabbitmq-tls + readOnly: true + subPath: tls.key + - mountPath: /etc/pki/rabbitmq/tls/certs/rabbitmqca.crt + name: rabbitmq-tls + readOnly: true + subPath: ca.crt - mountPath: /etc/pki/ovnsb/tls/certs/ovndb.crt name: ovsdbserver-sb-tls-certs readOnly: true @@ -133,6 +145,10 @@ spec: - configMapRef: name: collectd-env-vars volumes: + - name: rabbitmq-tls + secret: + defaultMode: 256 + secretName: cert-rabbitmq-svc - name: ovsdbserver-sb-tls-certs secret: defaultMode: 256 diff --git a/ansible/install/roles/collectd-rhoso/tasks/main.yml b/ansible/install/roles/collectd-rhoso/tasks/main.yml index 2fa8c9ee4..00a4caf11 100644 --- a/ansible/install/roles/collectd-rhoso/tasks/main.yml +++ b/ansible/install/roles/collectd-rhoso/tasks/main.yml @@ -28,6 +28,16 @@ - name: Fetch all the worker node names command: oc get nodes -o jsonpath='{range .items[?(@.metadata.labels.node-role\.kubernetes\.io/worker)]}{.metadata.name}{"\n"}{end}' register: worker_nodes + + - name: Fetch Cluster IP and Ports for rabbitmq service + shell: | + oc get svc rabbitmq -n openstack -o jsonpath='{.spec.clusterIP} {.spec.ports[1].port}' + register: rabbitmq_svc + + - name: Fetch rabbitmq default username and password + shell: | + oc get secret rabbitmq-default-user -n openstack -o jsonpath='{.data.default_user\.conf}' | base64 --decode + register: rabbitmq_default_user_conf environment: KUBECONFIG: "{{ kubeconfig_path }}" @@ -51,6 +61,16 @@ ovsdbserver_sb_0_svc_ip: "{{ ovsdbserver_sb_0_svc.stdout.split(' ')[0] }}" ovsdbserver_sb_0_svc_port: "{{ ovsdbserver_sb_0_svc.stdout.split(' ')[1] }}" +- name: Parse rabbitmq service details + set_fact: + rabbitmq_svc_ip: "{{ rabbitmq_svc.stdout.split(' ')[0] }}" + rabbitmq_svc_port: "{{ rabbitmq_svc.stdout.split(' ')[1] }}" + +- name: Parse rabbitmq username and password + set_fact: + rabbitmq_username: "{{ rabbitmq_default_user_conf.stdout | regex_findall('default_user = (\\S+)') | first }}" + rabbitmq_password: "{{ rabbitmq_default_user_conf.stdout | regex_findall('default_pass = (\\S+)') | first }}" + - name: generate logfiles vars for each worker nodes include_tasks: gen_logfile_vars.yml loop: "{{ worker_nodes.stdout_lines }}" @@ -111,6 +131,7 @@ loop: - cert-ovndbcluster-sb-ovndbs - cert-ovndbcluster-nb-ovndbs + - cert-rabbitmq-svc - name: Create configmaps for collectd configs shell: | diff --git a/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2 b/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2 index 7142ae846..c46b2da3a 100644 --- a/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2 +++ b/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2 @@ -383,8 +383,27 @@ LoadPlugin unixsock +{%if rabbitmq_controller_collectd_plugin %} + + ModulePath "/usr/local/bin/" + LogTraces true + Interactive false + Import "collectd_rabbitmq_monitoring" + + interval {{rabbitmq_controller_collectd_interval}} + host "{{ rabbitmq_svc_ip }}" + port "15671" + username "{{ rabbitmq_username }}" + password "{{ rabbitmq_password }}" + message_count {% for a_queue in rhoso_ctlplane_monitored_queues %}"{{a_queue}}" {% endfor %} + + + {% endif %} +{% endif %} +# db_conf end + {%if iostat_controller_collectd_plugin %} ModulePath "/usr/local/bin/" diff --git a/browbeat-containers/collectd-rhoso/Dockerfile b/browbeat-containers/collectd-rhoso/Dockerfile index edbc4551a..3e30301b9 100644 --- a/browbeat-containers/collectd-rhoso/Dockerfile +++ b/browbeat-containers/collectd-rhoso/Dockerfile @@ -9,7 +9,7 @@ RUN dnf clean all && \ dnf install -y sysstat && \ dnf install -y python3-pip python3-devel && \ pip3 install --upgrade pip && \ - pip3 install pyrabbit && \ + pip3 install pyrabbit2 && \ dnf install -y collectd-dbi && \ dnf install -y https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/Packages/l/libdbi-drivers-0.9.0-14.el8.x86_64.rpm && \ dnf install -y https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/Packages/l/libdbi-dbd-mysql-0.9.0-14.el8.x86_64.rpm && \ @@ -31,5 +31,6 @@ RUN rm /etc/collectd.d/virt.conf ADD files/ovn_monitoring.sh /usr/local/bin/ovn_monitoring.sh ADD files/collectd_iostat_python.py /usr/local/bin/collectd_iostat_python.py ADD files/collectd_ovn_raft_monitoring.py /usr/local/bin/collectd_ovn_raft_monitoring.py +ADD files/collectd_rabbitmq_monitoring.py /usr/local/bin/collectd_rabbitmq_monitoring.py CMD ["collectd", "-f"] diff --git a/browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py b/browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py new file mode 100644 index 000000000..3e1d0a027 --- /dev/null +++ b/browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Collectd python plugin to read rabbitmq metrics from rabbitmq management +plugin. +""" +from pyrabbit2.api import Client +from pyrabbit2.http import HTTPError +import collectd +import os +import json +import time +try: + # python 2.x + from urllib import quote +except ImportError: + # python 3.x + from urllib.parse import quote + +class PyrabbitClient(Client): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_messages(self, vhost, qname, count=1, + requeue=False, truncate=None, encoding='auto'): + """ + Gets messages from the queue. + + :param string vhost: Name of vhost containing the queue + :param string qname: Name of the queue to consume from + :param int count: Number of messages to get. + :param bool requeue: Whether to requeue the message after getting it. + This will cause the 'redelivered' flag to be set in the message on + the queue. + :param int truncate: The length, in bytes, beyond which the server will + truncate the message before returning it. + :returns: list of dicts. messages[msg-index]['payload'] will contain + the message body. + """ + + vhost = quote(vhost, '') + base_body = {'count': count, 'requeue': requeue, 'encoding': encoding} + + # 3.7.X now uses ackmode to denote the requeuing capability + if requeue: + base_body['ackmode'] = 'ack_requeue_true' + else: + base_body['ackmode'] = 'ack_requeue_false' + + if truncate: + base_body['truncate'] = truncate + body = json.dumps(base_body) + + qname = quote(qname, '') + path = Client.urls['get_from_queue'] % (vhost, qname) + messages = self.http.do_call(path, 'POST', body, + headers=Client.json_headers) + return messages + + +def configure(configobj): + global INTERVAL + global cl + global queues_to_count + + config = {c.key: c.values for c in configobj.children} + INTERVAL = config['interval'][0] + host = config['host'][0] + port = int(config['port'][0]) + username = config['username'][0] + password = config['password'][0] + queues_to_count = [] + if 'message_count' in config: + queues_to_count = config['message_count'] + collectd.info('rabbitmq_monitoring: Interval: {}'.format(INTERVAL)) + + tls_cert_path = "/etc/pki/rabbitmq/tls/certs/rabbitmq.crt" + tls_key_path = "/etc/pki/rabbitmq/tls/private/rabbitmq.key" + + cl = PyrabbitClient( + '{}:{}'.format(host, port), + username, + password, + scheme='https', + verify=False, + cert=(tls_cert_path, tls_key_path) + ) + collectd.info( + 'rabbitmq_monitoring: Connecting to: {}:{} as user:{} password:{}' + .format(host, port, username, password)) + collectd.info( + 'rabbitmq_monitoring: Counting messages on: {}' + .format(queues_to_count)) + collectd.register_read(read, INTERVAL) + + +def read(data=None): + starttime = time.time() + + overview = cl.get_overview() + + # Object counts + for m_instance in \ + ['channels', 'connections', 'consumers', 'exchanges', 'queues']: + if m_instance in overview['object_totals']: + metric = collectd.Values() + metric.plugin = 'rabbitmq_monitoring' + metric.interval = INTERVAL + metric.type = 'gauge' + metric.type_instance = m_instance + metric.values = [overview['object_totals'][m_instance]] + metric.dispatch() + + # Aggregated Queue message stats + for m_instance in \ + ['messages', 'messages_ready', 'messages_unacknowledged']: + if m_instance in overview['queue_totals']: + metric = collectd.Values() + metric.plugin = 'rabbitmq_monitoring' + metric.interval = INTERVAL + metric.type = 'gauge' + metric.type_instance = 'queue_total-{}-count'.format(m_instance) + metric.values = [overview['queue_totals'][m_instance]] + metric.dispatch() + + metric = collectd.Values() + metric.plugin = 'rabbitmq_monitoring' + metric.interval = INTERVAL + metric.type = 'gauge' + metric.type_instance = 'queue_total-{}-rate'.format( + m_instance) + metric.values = \ + [ + overview['queue_totals']['{}_details'.format(m_instance)] + ['rate'] + ] + metric.dispatch() + + # Aggregated Message Stats + for m_instance in \ + [ + 'ack', 'confirm', 'deliver', 'deliver_get', 'deliver_no_ack', + 'get', 'get_no_ack', 'publish', 'publish_in', 'publish_out', + 'redeliver', 'return_unroutable' + ]: + if m_instance in overview['message_stats']: + metric = collectd.Values() + metric.plugin = 'rabbitmq_monitoring' + metric.interval = INTERVAL + metric.type = 'gauge' + metric.type_instance = 'message_total-{}-count'.format(m_instance) + metric.values = [overview['message_stats'][m_instance]] + metric.dispatch() + + metric = collectd.Values() + metric.plugin = 'rabbitmq_monitoring' + metric.interval = INTERVAL + metric.type = 'gauge' + metric.type_instance = 'message_total-{}-rate'.format(m_instance) + metric.values = \ + [ + overview['message_stats']['{}_details'.format(m_instance)] + ['rate'] + ] + metric.dispatch() + + # Configurable per-queue message counts + for queue_name in queues_to_count: + messages_detail = None + try: + messages_detail = cl.get_messages('/', queue_name, requeue=False) + except HTTPError as err: + collectd.error( + 'Error Opening Queue [{}] details: {}' + .format(queue_name, err)) + if messages_detail is not None and len(messages_detail) > 0: + count = messages_detail[0]['message_count'] + else: + count = 0 + metric = collectd.Values() + metric.plugin = 'rabbitmq_monitoring' + metric.interval = INTERVAL + metric.type = 'gauge' + metric.type_instance = 'msg_count-{}'.format(queue_name) + metric.values = [count] + metric.dispatch() + + timediff = time.time() - starttime + if timediff > INTERVAL: + collectd.warning( + 'rabbitmq_monitoring: Took: {} > {}'.format( + round(timediff, 2), + INTERVAL) + ) + +collectd.register_config(configure)