From 48f4089e39370d82f6e7e3637c31606e57905fcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20M=C3=A1gr?= Date: Wed, 3 Jul 2024 14:21:21 +0200 Subject: [PATCH] Add heart beat report for polling agents This patch adds possibility to start child service for heart beat reports of polling agents. Such data can be leveraged in service health check scripts for monitoring purposes. Change-Id: I721a1ef997c5dfb59cdd0e759fcd7d511c24bbb0 --- ceilometer/cmd/polling.py | 22 +++- ceilometer/polling/manager.py | 114 +++++++++++++++++- .../tests/unit/polling/test_heartbeat.py | 113 +++++++++++++++++ ceilometer/tests/unit/polling/test_manager.py | 23 ++-- 4 files changed, 261 insertions(+), 11 deletions(-) create mode 100644 ceilometer/tests/unit/polling/test_heartbeat.py diff --git a/ceilometer/cmd/polling.py b/ceilometer/cmd/polling.py index e61c099ad0..aa7e829bd7 100644 --- a/ceilometer/cmd/polling.py +++ b/ceilometer/cmd/polling.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import multiprocessing import shlex import cotyledon @@ -79,11 +80,20 @@ def _prepare_config(): return conf -def create_polling_service(worker_id, conf=None): +def create_polling_service(worker_id, conf=None, queue=None): if conf is None: conf = _prepare_config() conf.log_opt_values(LOG, log.DEBUG) - return manager.AgentManager(worker_id, conf, conf.polling_namespaces) + return manager.AgentManager(worker_id, conf, + conf.polling_namespaces, queue) + + +def create_heartbeat_service(worker_id, conf, queue=None): + if conf is None: + conf = _prepare_config() + conf.log_opt_values(LOG, log.DEBUG) + return manager.AgentHeartBeatManager(worker_id, conf, + conf.polling_namespaces, queue) def main(): @@ -91,5 +101,11 @@ def main(): conf = _prepare_config() priv_context.init(root_helper=shlex.split(utils._get_root_helper())) oslo_config_glue.setup(sm, conf) - sm.add(create_polling_service, args=(conf,)) + + if conf.polling.heartbeat_socket_dir is not None: + queue = multiprocessing.Queue() + sm.add(create_heartbeat_service, args=(conf, queue)) + else: + queue = None + sm.add(create_polling_service, args=(conf, queue)) sm.run() diff --git a/ceilometer/polling/manager.py b/ceilometer/polling/manager.py index c2543b9d42..58c9c5e3e8 100644 --- a/ceilometer/polling/manager.py +++ b/ceilometer/polling/manager.py @@ -19,7 +19,10 @@ import glob import itertools import logging import os +import queue import random +import socket +import threading import uuid from concurrent import futures @@ -51,6 +54,10 @@ POLLING_OPTS = [ default="polling.yaml", help="Configuration file for polling definition." ), + cfg.StrOpt('heartbeat_socket_dir', + default=None, + help="Path to directory where socket file for polling " + "heartbeat will be created."), cfg.StrOpt('partitioning_group_prefix', deprecated_group='central', help='Work-load partitioning group prefix. Use only if you ' @@ -89,6 +96,11 @@ class PollingException(agent.ConfigException): super(PollingException, self).__init__('Polling', message, cfg) +class HeartBeatException(agent.ConfigException): + def __init__(self, message, cfg): + super(HeartBeatException, self).__init__('Polling', message, cfg) + + class Resources(object): def __init__(self, agent_manager): self.agent_manager = agent_manager @@ -207,6 +219,8 @@ class PollingTask(object): ) sample_batch = [] + self.manager.heartbeat(pollster.name, polling_timestamp) + for sample in samples: # Note(yuywz): Unify the timestamp of polled samples sample.set_timestamp(polling_timestamp) @@ -289,15 +303,100 @@ class PollingTask(object): ) +class AgentHeartBeatManager(cotyledon.Service): + def __init__(self, worker_id, conf, namespaces=None, queue=None): + super(AgentHeartBeatManager, self).__init__(worker_id) + self.conf = conf + + if conf.polling.heartbeat_socket_dir is None: + raise HeartBeatException("path to a directory containing " + "heart beat sockets is required", conf) + + if type(namespaces) is not list: + if namespaces is None: + namespaces = "" + namespaces = [namespaces] + + self._lock = threading.Lock() + self._queue = queue + self._status = dict() + self._sock_pth = os.path.join( + conf.polling.heartbeat_socket_dir, + f"ceilometer-{'-'.join(sorted(namespaces))}.socket" + ) + + self._delete_socket() + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + self._sock.bind(self._sock_pth) + self._sock.listen(1) + except socket.error as err: + raise HeartBeatException("Failed to open socket file " + f"({self._sock_pth}): {err}", conf) + + LOG.info("Starting heartbeat child service. Listening" + f" on {self._sock_pth}") + + def _delete_socket(self): + try: + os.remove(self._sock_pth) + except OSError: + pass + + def terminate(self): + self._tpe.shutdown(wait=False, cancel_futures=True) + self._sock.close() + self._delete_socket() + + def _update_status(self): + hb = self._queue.get() + with self._lock: + self._status[hb['pollster']] = hb['timestamp'] + LOG.debug(f"Updated heartbeat for {hb['pollster']} " + f"({hb['timestamp']})") + + def _send_heartbeat(self): + s, addr = self._sock.accept() + LOG.debug("Heartbeat status report requested " + f"at {self._sock_pth}") + with self._lock: + out = '\n'.join([f"{k} {v}" + for k, v in self._status.items()]) + s.sendall(out.encode('utf-8')) + s.close() + LOG.debug(f"Reported heartbeat status:\n{out}") + + def run(self): + super(AgentHeartBeatManager, self).run() + + LOG.debug("Started heartbeat child process.") + + def _read_queue(): + LOG.debug("Started heartbeat update thread") + while True: + self._update_status() + + def _report_status(): + LOG.debug("Started heartbeat reporting thread") + while True: + self._send_heartbeat() + + with futures.ThreadPoolExecutor(max_workers=2) as executor: + self._tpe = executor + executor.submit(_read_queue) + executor.submit(_report_status) + + class AgentManager(cotyledon.Service): - def __init__(self, worker_id, conf, namespaces=None): + def __init__(self, worker_id, conf, namespaces=None, queue=None): namespaces = namespaces or ['compute', 'central'] group_prefix = conf.polling.partitioning_group_prefix super(AgentManager, self).__init__(worker_id) self.conf = conf + self._queue = queue if type(namespaces) is not list: namespaces = [namespaces] @@ -350,6 +449,19 @@ class AgentManager(cotyledon.Service): self._keystone = None self._keystone_last_exception = None + def heartbeat(self, name, timestamp): + """Send heartbeat data if the agent is configured to do so.""" + if self._queue is not None: + try: + hb = { + 'timestamp': timestamp, + 'pollster': name + } + self._queue.put_nowait(hb) + LOG.debug(f"Polster heartbeat update: {name}") + except queue.Full: + LOG.warning(f"Heartbeat queue full. Update failed: {hb}") + def create_dynamic_pollsters(self, namespaces): """Creates dynamic pollsters diff --git a/ceilometer/tests/unit/polling/test_heartbeat.py b/ceilometer/tests/unit/polling/test_heartbeat.py new file mode 100644 index 0000000000..b172de7742 --- /dev/null +++ b/ceilometer/tests/unit/polling/test_heartbeat.py @@ -0,0 +1,113 @@ +# +# Copyright 2024 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for ceilometer polling heartbeat process""" + +import multiprocessing +import shutil +import tempfile + +from oslo_utils import timeutils +from unittest import mock + +from ceilometer.polling import manager +from ceilometer import service +from ceilometer.tests import base + + +class TestHeartBeatManagert(base.BaseTestCase): + def setUp(self): + super(TestHeartBeatManagert, self).setUp() + self.conf = service.prepare_service([], []) + self.tmpdir = tempfile.mkdtemp() + + self.queue = multiprocessing.Queue() + self.mgr = manager.AgentManager(0, self.conf, namespaces='central', + queue=self.queue) + + def tearDown(self): + super(TestHeartBeatManagert, self).tearDown() + shutil.rmtree(self.tmpdir) + + def setup_polling(self, poll_cfg=None): + name = self.cfg2file(poll_cfg or self.polling_cfg) + self.conf.set_override('cfg_file', name, group='polling') + self.mgr.polling_manager = manager.PollingManager(self.conf) + + def test_hb_not_configured(self): + self.assertRaises(manager.HeartBeatException, + manager.AgentHeartBeatManager, + 0, self.conf, + namespaces='ipmi', + queue=self.queue) + + @mock.patch('ceilometer.polling.manager.LOG') + def test_hb_startup(self, LOG): + # activate heartbeat agent + self.conf.set_override('heartbeat_socket_dir', self.tmpdir, + group='polling') + manager.AgentHeartBeatManager(0, self.conf, namespaces='compute', + queue=self.queue) + calls = [mock.call("Starting heartbeat child service. Listening" + f" on {self.tmpdir}/ceilometer-compute.socket")] + LOG.info.assert_has_calls(calls) + + @mock.patch('ceilometer.polling.manager.LOG') + def test_hb_update(self, LOG): + self.conf.set_override('heartbeat_socket_dir', self.tmpdir, + group='polling') + hb = manager.AgentHeartBeatManager(0, self.conf, namespaces='central', + queue=self.queue) + + timestamp = timeutils.utcnow().isoformat() + self.queue.put_nowait({'timestamp': timestamp, 'pollster': 'test'}) + + hb._update_status() + calls = [mock.call(f"Updated heartbeat for test ({timestamp})")] + LOG.debug.assert_has_calls(calls) + + @mock.patch('ceilometer.polling.manager.LOG') + def test_hb_send(self, LOG): + with mock.patch('socket.socket') as FakeSocket: + sub_skt = mock.Mock() + sub_skt.sendall.return_value = None + sub_skt.sendall.return_value = None + + skt = FakeSocket.return_value + skt.bind.return_value = mock.Mock() + skt.listen.return_value = mock.Mock() + skt.accept.return_value = (sub_skt, "") + + self.conf.set_override('heartbeat_socket_dir', self.tmpdir, + group='polling') + hb = manager.AgentHeartBeatManager(0, self.conf, + namespaces='central', + queue=self.queue) + timestamp = timeutils.utcnow().isoformat() + self.queue.put_nowait({'timestamp': timestamp, + 'pollster': 'test1'}) + hb._update_status() + self.queue.put_nowait({'timestamp': timestamp, + 'pollster': 'test2'}) + hb._update_status() + + # test status report + hb._send_heartbeat() + calls = [mock.call("Heartbeat status report requested " + f"at {self.tmpdir}/ceilometer-central.socket"), + mock.call("Reported heartbeat status:\n" + f"test1 {timestamp}\n" + f"test2 {timestamp}")] + LOG.debug.assert_has_calls(calls) diff --git a/ceilometer/tests/unit/polling/test_manager.py b/ceilometer/tests/unit/polling/test_manager.py index 564143774e..0a2c75febd 100644 --- a/ceilometer/tests/unit/polling/test_manager.py +++ b/ceilometer/tests/unit/polling/test_manager.py @@ -18,6 +18,7 @@ """Tests for ceilometer agent manager""" import copy import datetime +import multiprocessing import shutil import tempfile from unittest import mock @@ -92,7 +93,8 @@ class TestManager(base.BaseTestCase): self.assertNotEqual(manager.hash_of_set(y), manager.hash_of_set(z)) def test_load_plugins(self): - mgr = manager.AgentManager(0, self.conf) + mgr = manager.AgentManager(0, self.conf, + queue=multiprocessing.Queue()) self.assertIsNotNone(list(mgr.extensions)) # Test plugin load behavior based on Node Manager pollsters. @@ -101,8 +103,8 @@ class TestManager(base.BaseTestCase): @mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__', mock.Mock(return_value=None)) def test_load_normal_plugins(self): - mgr = manager.AgentManager(0, self.conf, - namespaces=['ipmi']) + mgr = manager.AgentManager(0, self.conf, namespaces=['ipmi'], + queue=multiprocessing.Queue()) # 8 pollsters for Node Manager self.assertEqual(13, len(mgr.extensions)) @@ -114,7 +116,8 @@ class TestManager(base.BaseTestCase): def test_load_failed_plugins(self, LOG): # Here we additionally check that namespaces will be converted to the # list if param was not set as a list. - manager.AgentManager(0, self.conf, namespaces='ipmi') + manager.AgentManager(0, self.conf, namespaces='ipmi', + queue=multiprocessing.Queue()) err_msg = 'Skip loading extension for %s: %s' pollster_names = [ 'power', 'temperature', 'outlet_temperature', @@ -132,7 +135,8 @@ class TestManager(base.BaseTestCase): @mock.patch('ceilometer.polling.manager.LOG') def test_import_error_in_plugin(self, LOG): namespaces = ['ipmi'] - manager.AgentManager(0, self.conf, namespaces=namespaces) + manager.AgentManager(0, self.conf, namespaces=namespaces, + queue=multiprocessing.Queue()) LOG.warning.assert_called_with( 'No valid pollsters can be loaded from %s namespaces', namespaces) @@ -282,7 +286,8 @@ class BaseAgent(base.BaseTestCase): self.mgr.polling_manager = manager.PollingManager(self.CONF) def create_manager(self): - return manager.AgentManager(0, self.CONF) + queue = multiprocessing.Queue() + return manager.AgentManager(0, self.CONF, queue=queue) def fake_notifier_sample(self, ctxt, event_type, payload): for m in payload['samples']: @@ -301,7 +306,8 @@ class BaseAgent(base.BaseTestCase): self.CONF = service.prepare_service([], []) self.CONF.set_override( 'cfg_file', - self.path_get('etc/ceilometer/polling_all.yaml'), group='polling' + self.path_get('etc/ceilometer/polling_all.yaml'), + group='polling' ) self.polling_cfg = { 'sources': [{ @@ -703,6 +709,9 @@ class TestPollingAgent(BaseAgent): mock.call('Finished polling pollster %(poll)s in the context ' 'of %(src)s', {'poll': 'test', 'src': 'test_polling'}) ]) + LOG.debug.assert_has_calls([ + mock.call('Polster heartbeat update: test') + ]) @mock.patch('ceilometer.polling.manager.LOG') def test_skip_polling_and_notify_with_no_resources(self, LOG):