diff --git a/ansible/install/group_vars/all.yml b/ansible/install/group_vars/all.yml
index 3d2fd95fb..82f2b1133 100644
--- a/ansible/install/group_vars/all.yml
+++ b/ansible/install/group_vars/all.yml
@@ -327,6 +327,15 @@ controller_monitored_queues:
- "notifications.error"
- "notifications.critical"
+# Queues to monitor message count on RHOSO controlplane
+rhoso_ctlplane_monitored_queues:
+ - "notifications.sample"
+ - "notifications.audit"
+ - "notifications.info"
+ - "notifications.warn"
+ - "notifications.error"
+ - "notifications.critical"
+
########################
# ovsagent monitoring
########################
diff --git a/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml b/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml
index 2d4fe5970..f78fd4537 100644
--- a/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml
+++ b/ansible/install/roles/collectd-rhoso/files/collectd_deploy.yaml
@@ -91,6 +91,18 @@ spec:
securityContext:
privileged: true
volumeMounts:
+ - mountPath: /etc/pki/rabbitmq/tls/certs/rabbitmq.crt
+ name: rabbitmq-tls
+ readOnly: true
+ subPath: tls.crt
+ - mountPath: /etc/pki/rabbitmq/tls/private/rabbitmq.key
+ name: rabbitmq-tls
+ readOnly: true
+ subPath: tls.key
+ - mountPath: /etc/pki/rabbitmq/tls/certs/rabbitmqca.crt
+ name: rabbitmq-tls
+ readOnly: true
+ subPath: ca.crt
- mountPath: /etc/pki/ovnsb/tls/certs/ovndb.crt
name: ovsdbserver-sb-tls-certs
readOnly: true
@@ -133,6 +145,10 @@ spec:
- configMapRef:
name: collectd-env-vars
volumes:
+ - name: rabbitmq-tls
+ secret:
+ defaultMode: 256
+ secretName: cert-rabbitmq-svc
- name: ovsdbserver-sb-tls-certs
secret:
defaultMode: 256
diff --git a/ansible/install/roles/collectd-rhoso/tasks/main.yml b/ansible/install/roles/collectd-rhoso/tasks/main.yml
index 2fa8c9ee4..00a4caf11 100644
--- a/ansible/install/roles/collectd-rhoso/tasks/main.yml
+++ b/ansible/install/roles/collectd-rhoso/tasks/main.yml
@@ -28,6 +28,16 @@
- name: Fetch all the worker node names
command: oc get nodes -o jsonpath='{range .items[?(@.metadata.labels.node-role\.kubernetes\.io/worker)]}{.metadata.name}{"\n"}{end}'
register: worker_nodes
+
+ - name: Fetch Cluster IP and Ports for rabbitmq service
+ shell: |
+ oc get svc rabbitmq -n openstack -o jsonpath='{.spec.clusterIP} {.spec.ports[1].port}'
+ register: rabbitmq_svc
+
+ - name: Fetch rabbitmq default username and password
+ shell: |
+ oc get secret rabbitmq-default-user -n openstack -o jsonpath='{.data.default_user\.conf}' | base64 --decode
+ register: rabbitmq_default_user_conf
environment:
KUBECONFIG: "{{ kubeconfig_path }}"
@@ -51,6 +61,16 @@
ovsdbserver_sb_0_svc_ip: "{{ ovsdbserver_sb_0_svc.stdout.split(' ')[0] }}"
ovsdbserver_sb_0_svc_port: "{{ ovsdbserver_sb_0_svc.stdout.split(' ')[1] }}"
+- name: Parse rabbitmq service details
+ set_fact:
+ rabbitmq_svc_ip: "{{ rabbitmq_svc.stdout.split(' ')[0] }}"
+ rabbitmq_svc_port: "{{ rabbitmq_svc.stdout.split(' ')[1] }}"
+
+- name: Parse rabbitmq username and password
+ set_fact:
+ rabbitmq_username: "{{ rabbitmq_default_user_conf.stdout | regex_findall('default_user = (\\S+)') | first }}"
+ rabbitmq_password: "{{ rabbitmq_default_user_conf.stdout | regex_findall('default_pass = (\\S+)') | first }}"
+
- name: generate logfiles vars for each worker nodes
include_tasks: gen_logfile_vars.yml
loop: "{{ worker_nodes.stdout_lines }}"
@@ -111,6 +131,7 @@
loop:
- cert-ovndbcluster-sb-ovndbs
- cert-ovndbcluster-nb-ovndbs
+ - cert-rabbitmq-svc
- name: Create configmaps for collectd configs
shell: |
diff --git a/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2 b/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2
index 7142ae846..c46b2da3a 100644
--- a/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2
+++ b/ansible/install/roles/collectd-rhoso/templates/controlplane.collectd.conf.j2
@@ -383,8 +383,27 @@ LoadPlugin unixsock
+{%if rabbitmq_controller_collectd_plugin %}
+
+ ModulePath "/usr/local/bin/"
+ LogTraces true
+ Interactive false
+ Import "collectd_rabbitmq_monitoring"
+
+ interval {{rabbitmq_controller_collectd_interval}}
+ host "{{ rabbitmq_svc_ip }}"
+ port "15671"
+ username "{{ rabbitmq_username }}"
+ password "{{ rabbitmq_password }}"
+ message_count {% for a_queue in rhoso_ctlplane_monitored_queues %}"{{a_queue}}" {% endfor %}
+
+
+
{% endif %}
+{% endif %}
+# db_conf end
+
{%if iostat_controller_collectd_plugin %}
ModulePath "/usr/local/bin/"
diff --git a/browbeat-containers/collectd-rhoso/Dockerfile b/browbeat-containers/collectd-rhoso/Dockerfile
index edbc4551a..3e30301b9 100644
--- a/browbeat-containers/collectd-rhoso/Dockerfile
+++ b/browbeat-containers/collectd-rhoso/Dockerfile
@@ -9,7 +9,7 @@ RUN dnf clean all && \
dnf install -y sysstat && \
dnf install -y python3-pip python3-devel && \
pip3 install --upgrade pip && \
- pip3 install pyrabbit && \
+ pip3 install pyrabbit2 && \
dnf install -y collectd-dbi && \
dnf install -y https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/Packages/l/libdbi-drivers-0.9.0-14.el8.x86_64.rpm && \
dnf install -y https://dl.fedoraproject.org/pub/epel/8/Everything/x86_64/Packages/l/libdbi-dbd-mysql-0.9.0-14.el8.x86_64.rpm && \
@@ -31,5 +31,6 @@ RUN rm /etc/collectd.d/virt.conf
ADD files/ovn_monitoring.sh /usr/local/bin/ovn_monitoring.sh
ADD files/collectd_iostat_python.py /usr/local/bin/collectd_iostat_python.py
ADD files/collectd_ovn_raft_monitoring.py /usr/local/bin/collectd_ovn_raft_monitoring.py
+ADD files/collectd_rabbitmq_monitoring.py /usr/local/bin/collectd_rabbitmq_monitoring.py
CMD ["collectd", "-f"]
diff --git a/browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py b/browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py
new file mode 100644
index 000000000..3e1d0a027
--- /dev/null
+++ b/browbeat-containers/collectd-rhoso/files/collectd_rabbitmq_monitoring.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Collectd python plugin to read rabbitmq metrics from rabbitmq management
+plugin.
+"""
+from pyrabbit2.api import Client
+from pyrabbit2.http import HTTPError
+import collectd
+import os
+import json
+import time
+try:
+ # python 2.x
+ from urllib import quote
+except ImportError:
+ # python 3.x
+ from urllib.parse import quote
+
+class PyrabbitClient(Client):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ def get_messages(self, vhost, qname, count=1,
+ requeue=False, truncate=None, encoding='auto'):
+ """
+ Gets messages from the queue.
+
+ :param string vhost: Name of vhost containing the queue
+ :param string qname: Name of the queue to consume from
+ :param int count: Number of messages to get.
+ :param bool requeue: Whether to requeue the message after getting it.
+ This will cause the 'redelivered' flag to be set in the message on
+ the queue.
+ :param int truncate: The length, in bytes, beyond which the server will
+ truncate the message before returning it.
+ :returns: list of dicts. messages[msg-index]['payload'] will contain
+ the message body.
+ """
+
+ vhost = quote(vhost, '')
+ base_body = {'count': count, 'requeue': requeue, 'encoding': encoding}
+
+ # 3.7.X now uses ackmode to denote the requeuing capability
+ if requeue:
+ base_body['ackmode'] = 'ack_requeue_true'
+ else:
+ base_body['ackmode'] = 'ack_requeue_false'
+
+ if truncate:
+ base_body['truncate'] = truncate
+ body = json.dumps(base_body)
+
+ qname = quote(qname, '')
+ path = Client.urls['get_from_queue'] % (vhost, qname)
+ messages = self.http.do_call(path, 'POST', body,
+ headers=Client.json_headers)
+ return messages
+
+
+def configure(configobj):
+ global INTERVAL
+ global cl
+ global queues_to_count
+
+ config = {c.key: c.values for c in configobj.children}
+ INTERVAL = config['interval'][0]
+ host = config['host'][0]
+ port = int(config['port'][0])
+ username = config['username'][0]
+ password = config['password'][0]
+ queues_to_count = []
+ if 'message_count' in config:
+ queues_to_count = config['message_count']
+ collectd.info('rabbitmq_monitoring: Interval: {}'.format(INTERVAL))
+
+ tls_cert_path = "/etc/pki/rabbitmq/tls/certs/rabbitmq.crt"
+ tls_key_path = "/etc/pki/rabbitmq/tls/private/rabbitmq.key"
+
+ cl = PyrabbitClient(
+ '{}:{}'.format(host, port),
+ username,
+ password,
+ scheme='https',
+ verify=False,
+ cert=(tls_cert_path, tls_key_path)
+ )
+ collectd.info(
+ 'rabbitmq_monitoring: Connecting to: {}:{} as user:{} password:{}'
+ .format(host, port, username, password))
+ collectd.info(
+ 'rabbitmq_monitoring: Counting messages on: {}'
+ .format(queues_to_count))
+ collectd.register_read(read, INTERVAL)
+
+
+def read(data=None):
+ starttime = time.time()
+
+ overview = cl.get_overview()
+
+ # Object counts
+ for m_instance in \
+ ['channels', 'connections', 'consumers', 'exchanges', 'queues']:
+ if m_instance in overview['object_totals']:
+ metric = collectd.Values()
+ metric.plugin = 'rabbitmq_monitoring'
+ metric.interval = INTERVAL
+ metric.type = 'gauge'
+ metric.type_instance = m_instance
+ metric.values = [overview['object_totals'][m_instance]]
+ metric.dispatch()
+
+ # Aggregated Queue message stats
+ for m_instance in \
+ ['messages', 'messages_ready', 'messages_unacknowledged']:
+ if m_instance in overview['queue_totals']:
+ metric = collectd.Values()
+ metric.plugin = 'rabbitmq_monitoring'
+ metric.interval = INTERVAL
+ metric.type = 'gauge'
+ metric.type_instance = 'queue_total-{}-count'.format(m_instance)
+ metric.values = [overview['queue_totals'][m_instance]]
+ metric.dispatch()
+
+ metric = collectd.Values()
+ metric.plugin = 'rabbitmq_monitoring'
+ metric.interval = INTERVAL
+ metric.type = 'gauge'
+ metric.type_instance = 'queue_total-{}-rate'.format(
+ m_instance)
+ metric.values = \
+ [
+ overview['queue_totals']['{}_details'.format(m_instance)]
+ ['rate']
+ ]
+ metric.dispatch()
+
+ # Aggregated Message Stats
+ for m_instance in \
+ [
+ 'ack', 'confirm', 'deliver', 'deliver_get', 'deliver_no_ack',
+ 'get', 'get_no_ack', 'publish', 'publish_in', 'publish_out',
+ 'redeliver', 'return_unroutable'
+ ]:
+ if m_instance in overview['message_stats']:
+ metric = collectd.Values()
+ metric.plugin = 'rabbitmq_monitoring'
+ metric.interval = INTERVAL
+ metric.type = 'gauge'
+ metric.type_instance = 'message_total-{}-count'.format(m_instance)
+ metric.values = [overview['message_stats'][m_instance]]
+ metric.dispatch()
+
+ metric = collectd.Values()
+ metric.plugin = 'rabbitmq_monitoring'
+ metric.interval = INTERVAL
+ metric.type = 'gauge'
+ metric.type_instance = 'message_total-{}-rate'.format(m_instance)
+ metric.values = \
+ [
+ overview['message_stats']['{}_details'.format(m_instance)]
+ ['rate']
+ ]
+ metric.dispatch()
+
+ # Configurable per-queue message counts
+ for queue_name in queues_to_count:
+ messages_detail = None
+ try:
+ messages_detail = cl.get_messages('/', queue_name, requeue=False)
+ except HTTPError as err:
+ collectd.error(
+ 'Error Opening Queue [{}] details: {}'
+ .format(queue_name, err))
+ if messages_detail is not None and len(messages_detail) > 0:
+ count = messages_detail[0]['message_count']
+ else:
+ count = 0
+ metric = collectd.Values()
+ metric.plugin = 'rabbitmq_monitoring'
+ metric.interval = INTERVAL
+ metric.type = 'gauge'
+ metric.type_instance = 'msg_count-{}'.format(queue_name)
+ metric.values = [count]
+ metric.dispatch()
+
+ timediff = time.time() - starttime
+ if timediff > INTERVAL:
+ collectd.warning(
+ 'rabbitmq_monitoring: Took: {} > {}'.format(
+ round(timediff, 2),
+ INTERVAL)
+ )
+
+collectd.register_config(configure)