Add heart beat report for polling agents

This patch adds possibility to start child service for heart beat reports
of polling agents. Such data can be leveraged in service health check
scripts for monitoring purposes.

Change-Id: I721a1ef997c5dfb59cdd0e759fcd7d511c24bbb0
This commit is contained in:
Martin Mágr 2024-07-03 14:21:21 +02:00
parent 31cf6dc8c7
commit 48f4089e39
4 changed files with 261 additions and 11 deletions

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import multiprocessing
import shlex
import cotyledon
@ -79,11 +80,20 @@ def _prepare_config():
return conf
def create_polling_service(worker_id, conf=None):
def create_polling_service(worker_id, conf=None, queue=None):
if conf is None:
conf = _prepare_config()
conf.log_opt_values(LOG, log.DEBUG)
return manager.AgentManager(worker_id, conf, conf.polling_namespaces)
return manager.AgentManager(worker_id, conf,
conf.polling_namespaces, queue)
def create_heartbeat_service(worker_id, conf, queue=None):
if conf is None:
conf = _prepare_config()
conf.log_opt_values(LOG, log.DEBUG)
return manager.AgentHeartBeatManager(worker_id, conf,
conf.polling_namespaces, queue)
def main():
@ -91,5 +101,11 @@ def main():
conf = _prepare_config()
priv_context.init(root_helper=shlex.split(utils._get_root_helper()))
oslo_config_glue.setup(sm, conf)
sm.add(create_polling_service, args=(conf,))
if conf.polling.heartbeat_socket_dir is not None:
queue = multiprocessing.Queue()
sm.add(create_heartbeat_service, args=(conf, queue))
else:
queue = None
sm.add(create_polling_service, args=(conf, queue))
sm.run()

View File

@ -19,7 +19,10 @@ import glob
import itertools
import logging
import os
import queue
import random
import socket
import threading
import uuid
from concurrent import futures
@ -51,6 +54,10 @@ POLLING_OPTS = [
default="polling.yaml",
help="Configuration file for polling definition."
),
cfg.StrOpt('heartbeat_socket_dir',
default=None,
help="Path to directory where socket file for polling "
"heartbeat will be created."),
cfg.StrOpt('partitioning_group_prefix',
deprecated_group='central',
help='Work-load partitioning group prefix. Use only if you '
@ -89,6 +96,11 @@ class PollingException(agent.ConfigException):
super(PollingException, self).__init__('Polling', message, cfg)
class HeartBeatException(agent.ConfigException):
def __init__(self, message, cfg):
super(HeartBeatException, self).__init__('Polling', message, cfg)
class Resources(object):
def __init__(self, agent_manager):
self.agent_manager = agent_manager
@ -207,6 +219,8 @@ class PollingTask(object):
)
sample_batch = []
self.manager.heartbeat(pollster.name, polling_timestamp)
for sample in samples:
# Note(yuywz): Unify the timestamp of polled samples
sample.set_timestamp(polling_timestamp)
@ -289,15 +303,100 @@ class PollingTask(object):
)
class AgentHeartBeatManager(cotyledon.Service):
def __init__(self, worker_id, conf, namespaces=None, queue=None):
super(AgentHeartBeatManager, self).__init__(worker_id)
self.conf = conf
if conf.polling.heartbeat_socket_dir is None:
raise HeartBeatException("path to a directory containing "
"heart beat sockets is required", conf)
if type(namespaces) is not list:
if namespaces is None:
namespaces = ""
namespaces = [namespaces]
self._lock = threading.Lock()
self._queue = queue
self._status = dict()
self._sock_pth = os.path.join(
conf.polling.heartbeat_socket_dir,
f"ceilometer-{'-'.join(sorted(namespaces))}.socket"
)
self._delete_socket()
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
self._sock.bind(self._sock_pth)
self._sock.listen(1)
except socket.error as err:
raise HeartBeatException("Failed to open socket file "
f"({self._sock_pth}): {err}", conf)
LOG.info("Starting heartbeat child service. Listening"
f" on {self._sock_pth}")
def _delete_socket(self):
try:
os.remove(self._sock_pth)
except OSError:
pass
def terminate(self):
self._tpe.shutdown(wait=False, cancel_futures=True)
self._sock.close()
self._delete_socket()
def _update_status(self):
hb = self._queue.get()
with self._lock:
self._status[hb['pollster']] = hb['timestamp']
LOG.debug(f"Updated heartbeat for {hb['pollster']} "
f"({hb['timestamp']})")
def _send_heartbeat(self):
s, addr = self._sock.accept()
LOG.debug("Heartbeat status report requested "
f"at {self._sock_pth}")
with self._lock:
out = '\n'.join([f"{k} {v}"
for k, v in self._status.items()])
s.sendall(out.encode('utf-8'))
s.close()
LOG.debug(f"Reported heartbeat status:\n{out}")
def run(self):
super(AgentHeartBeatManager, self).run()
LOG.debug("Started heartbeat child process.")
def _read_queue():
LOG.debug("Started heartbeat update thread")
while True:
self._update_status()
def _report_status():
LOG.debug("Started heartbeat reporting thread")
while True:
self._send_heartbeat()
with futures.ThreadPoolExecutor(max_workers=2) as executor:
self._tpe = executor
executor.submit(_read_queue)
executor.submit(_report_status)
class AgentManager(cotyledon.Service):
def __init__(self, worker_id, conf, namespaces=None):
def __init__(self, worker_id, conf, namespaces=None, queue=None):
namespaces = namespaces or ['compute', 'central']
group_prefix = conf.polling.partitioning_group_prefix
super(AgentManager, self).__init__(worker_id)
self.conf = conf
self._queue = queue
if type(namespaces) is not list:
namespaces = [namespaces]
@ -350,6 +449,19 @@ class AgentManager(cotyledon.Service):
self._keystone = None
self._keystone_last_exception = None
def heartbeat(self, name, timestamp):
"""Send heartbeat data if the agent is configured to do so."""
if self._queue is not None:
try:
hb = {
'timestamp': timestamp,
'pollster': name
}
self._queue.put_nowait(hb)
LOG.debug(f"Polster heartbeat update: {name}")
except queue.Full:
LOG.warning(f"Heartbeat queue full. Update failed: {hb}")
def create_dynamic_pollsters(self, namespaces):
"""Creates dynamic pollsters

View File

@ -0,0 +1,113 @@
#
# Copyright 2024 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer polling heartbeat process"""
import multiprocessing
import shutil
import tempfile
from oslo_utils import timeutils
from unittest import mock
from ceilometer.polling import manager
from ceilometer import service
from ceilometer.tests import base
class TestHeartBeatManagert(base.BaseTestCase):
def setUp(self):
super(TestHeartBeatManagert, self).setUp()
self.conf = service.prepare_service([], [])
self.tmpdir = tempfile.mkdtemp()
self.queue = multiprocessing.Queue()
self.mgr = manager.AgentManager(0, self.conf, namespaces='central',
queue=self.queue)
def tearDown(self):
super(TestHeartBeatManagert, self).tearDown()
shutil.rmtree(self.tmpdir)
def setup_polling(self, poll_cfg=None):
name = self.cfg2file(poll_cfg or self.polling_cfg)
self.conf.set_override('cfg_file', name, group='polling')
self.mgr.polling_manager = manager.PollingManager(self.conf)
def test_hb_not_configured(self):
self.assertRaises(manager.HeartBeatException,
manager.AgentHeartBeatManager,
0, self.conf,
namespaces='ipmi',
queue=self.queue)
@mock.patch('ceilometer.polling.manager.LOG')
def test_hb_startup(self, LOG):
# activate heartbeat agent
self.conf.set_override('heartbeat_socket_dir', self.tmpdir,
group='polling')
manager.AgentHeartBeatManager(0, self.conf, namespaces='compute',
queue=self.queue)
calls = [mock.call("Starting heartbeat child service. Listening"
f" on {self.tmpdir}/ceilometer-compute.socket")]
LOG.info.assert_has_calls(calls)
@mock.patch('ceilometer.polling.manager.LOG')
def test_hb_update(self, LOG):
self.conf.set_override('heartbeat_socket_dir', self.tmpdir,
group='polling')
hb = manager.AgentHeartBeatManager(0, self.conf, namespaces='central',
queue=self.queue)
timestamp = timeutils.utcnow().isoformat()
self.queue.put_nowait({'timestamp': timestamp, 'pollster': 'test'})
hb._update_status()
calls = [mock.call(f"Updated heartbeat for test ({timestamp})")]
LOG.debug.assert_has_calls(calls)
@mock.patch('ceilometer.polling.manager.LOG')
def test_hb_send(self, LOG):
with mock.patch('socket.socket') as FakeSocket:
sub_skt = mock.Mock()
sub_skt.sendall.return_value = None
sub_skt.sendall.return_value = None
skt = FakeSocket.return_value
skt.bind.return_value = mock.Mock()
skt.listen.return_value = mock.Mock()
skt.accept.return_value = (sub_skt, "")
self.conf.set_override('heartbeat_socket_dir', self.tmpdir,
group='polling')
hb = manager.AgentHeartBeatManager(0, self.conf,
namespaces='central',
queue=self.queue)
timestamp = timeutils.utcnow().isoformat()
self.queue.put_nowait({'timestamp': timestamp,
'pollster': 'test1'})
hb._update_status()
self.queue.put_nowait({'timestamp': timestamp,
'pollster': 'test2'})
hb._update_status()
# test status report
hb._send_heartbeat()
calls = [mock.call("Heartbeat status report requested "
f"at {self.tmpdir}/ceilometer-central.socket"),
mock.call("Reported heartbeat status:\n"
f"test1 {timestamp}\n"
f"test2 {timestamp}")]
LOG.debug.assert_has_calls(calls)

View File

@ -18,6 +18,7 @@
"""Tests for ceilometer agent manager"""
import copy
import datetime
import multiprocessing
import shutil
import tempfile
from unittest import mock
@ -92,7 +93,8 @@ class TestManager(base.BaseTestCase):
self.assertNotEqual(manager.hash_of_set(y), manager.hash_of_set(z))
def test_load_plugins(self):
mgr = manager.AgentManager(0, self.conf)
mgr = manager.AgentManager(0, self.conf,
queue=multiprocessing.Queue())
self.assertIsNotNone(list(mgr.extensions))
# Test plugin load behavior based on Node Manager pollsters.
@ -101,8 +103,8 @@ class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__',
mock.Mock(return_value=None))
def test_load_normal_plugins(self):
mgr = manager.AgentManager(0, self.conf,
namespaces=['ipmi'])
mgr = manager.AgentManager(0, self.conf, namespaces=['ipmi'],
queue=multiprocessing.Queue())
# 8 pollsters for Node Manager
self.assertEqual(13, len(mgr.extensions))
@ -114,7 +116,8 @@ class TestManager(base.BaseTestCase):
def test_load_failed_plugins(self, LOG):
# Here we additionally check that namespaces will be converted to the
# list if param was not set as a list.
manager.AgentManager(0, self.conf, namespaces='ipmi')
manager.AgentManager(0, self.conf, namespaces='ipmi',
queue=multiprocessing.Queue())
err_msg = 'Skip loading extension for %s: %s'
pollster_names = [
'power', 'temperature', 'outlet_temperature',
@ -132,7 +135,8 @@ class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.polling.manager.LOG')
def test_import_error_in_plugin(self, LOG):
namespaces = ['ipmi']
manager.AgentManager(0, self.conf, namespaces=namespaces)
manager.AgentManager(0, self.conf, namespaces=namespaces,
queue=multiprocessing.Queue())
LOG.warning.assert_called_with(
'No valid pollsters can be loaded from %s namespaces', namespaces)
@ -282,7 +286,8 @@ class BaseAgent(base.BaseTestCase):
self.mgr.polling_manager = manager.PollingManager(self.CONF)
def create_manager(self):
return manager.AgentManager(0, self.CONF)
queue = multiprocessing.Queue()
return manager.AgentManager(0, self.CONF, queue=queue)
def fake_notifier_sample(self, ctxt, event_type, payload):
for m in payload['samples']:
@ -301,7 +306,8 @@ class BaseAgent(base.BaseTestCase):
self.CONF = service.prepare_service([], [])
self.CONF.set_override(
'cfg_file',
self.path_get('etc/ceilometer/polling_all.yaml'), group='polling'
self.path_get('etc/ceilometer/polling_all.yaml'),
group='polling'
)
self.polling_cfg = {
'sources': [{
@ -703,6 +709,9 @@ class TestPollingAgent(BaseAgent):
mock.call('Finished polling pollster %(poll)s in the context '
'of %(src)s', {'poll': 'test', 'src': 'test_polling'})
])
LOG.debug.assert_has_calls([
mock.call('Polster heartbeat update: test')
])
@mock.patch('ceilometer.polling.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):