From 8d3d84053fb3af4457c0ca3c4c326591ca13ab26 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Mon, 17 Nov 2014 13:54:23 -0500 Subject: [PATCH] implement notification coordination this patch enables coordination of notification agents by splitting pipelines across agents. a single-worker agent is provided that skips the overhead of IPC. a multi-worker agent is added that uses internal queue to redirect samples to correct pipeline. Implements: blueprint notification-coordination Change-Id: If8503ee70217025106a188f40a6e4ea8c49a89f4 --- ceilometer/agent/plugin_base.py | 23 +++++- ceilometer/coordination.py | 21 ++++- ceilometer/notification.py | 108 ++++++++++++++++++++++---- ceilometer/pipeline.py | 25 ++++++ ceilometer/tests/test_notification.py | 49 ++++++++++-- 5 files changed, 199 insertions(+), 27 deletions(-) diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py index 8e418c27b..a2125af9d 100644 --- a/ceilometer/agent/plugin_base.py +++ b/ceilometer/agent/plugin_base.py @@ -30,6 +30,7 @@ from ceilometer.i18n import _ from ceilometer import messaging from ceilometer.openstack.common import context from ceilometer.openstack.common import log +from ceilometer.publisher import utils cfg.CONF.import_group('service_credentials', 'ceilometer.service') @@ -92,9 +93,12 @@ class PluginBase(object): @six.add_metaclass(abc.ABCMeta) class NotificationBase(PluginBase): """Base class for plugins that support the notification API.""" - def __init__(self, pipeline_manager): + def __init__(self, transporter): super(NotificationBase, self).__init__() - self.pipeline_manager = pipeline_manager + self.transporter = transporter + # NOTE(gordc): if no publisher, this isn't a PipelineManager and + # data should be requeued. + self.requeue = False if hasattr(transporter, 'publisher') else True @abc.abstractproperty def event_types(self): @@ -170,8 +174,19 @@ class NotificationBase(PluginBase): self.event_types): return - with self.pipeline_manager.publisher(context) as p: - p(list(self.process_notification(notification))) + if self.requeue: + meters = [ + utils.meter_message_from_counter( + sample, cfg.CONF.publisher.metering_secret) + for sample in self.process_notification(notification) + ] + for notifier in self.transporter: + notifier.sample(context.to_dict(), + event_type='ceilometer.pipeline', + payload=meters) + else: + with self.transporter.publisher(context) as p: + p(list(self.process_notification(notification))) @six.add_metaclass(abc.ABCMeta) diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py index db3c82f0d..f189fd62c 100644 --- a/ceilometer/coordination.py +++ b/ceilometer/coordination.py @@ -37,7 +37,12 @@ OPTS = [ cfg.FloatOpt('heartbeat', default=1.0, help='Number of seconds between heartbeats for distributed ' - 'coordination.') + 'coordination.'), + cfg.FloatOpt('check_watchers', + default=10.0, + help='Number of seconds between checks to see if group ' + 'membership has changed') + ] cfg.CONF.register_opts(OPTS, group='coordination') @@ -89,6 +94,15 @@ class PartitionCoordinator(object): LOG.exception(_LE('Error sending a heartbeat to coordination ' 'backend.')) + def watch_group(self, namespace, callback): + if self._coordinator: + self._coordinator.watch_join_group(namespace, callback) + self._coordinator.watch_leave_group(namespace, callback) + + def run_watchers(self): + if self._coordinator: + self._coordinator.run_watchers() + def join_group(self, group_id): if not self._coordinator or not self._started or not group_id: return @@ -108,6 +122,11 @@ class PartitionCoordinator(object): pass self._groups.add(group_id) + def leave_group(self, group_id): + if self._coordinator: + self._coordinator.leave_group(group_id) + LOG.info(_LI('Left partitioning group %s'), group_id) + def _get_members(self, group_id): if not self._coordinator: return [self._my_id] diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 274b363de..99630008e 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -16,11 +16,14 @@ # under the License. from oslo.config import cfg +import oslo.messaging from stevedore import extension +from ceilometer import coordination from ceilometer.event import endpoint as event_endpoint from ceilometer.i18n import _ from ceilometer import messaging +from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline @@ -38,6 +41,10 @@ OPTS = [ deprecated_group='collector', default=False, help='Save event details.'), + cfg.BoolOpt('workload_partitioning', + default=False, + help='Enable workload partitioning, allowing multiple ' + 'notification agents to be run simultaneously.'), cfg.MultiStrOpt('messaging_urls', default=[], help="Messaging URLs to listen for notifications. " @@ -50,32 +57,76 @@ cfg.CONF.register_opts(OPTS, group="notification") class NotificationService(os_service.Service): + """Notification service. + + When running multiple agents, additional queuing sequence is required for + inter process communication. Each agent has two listeners: one to listen + to the main OpenStack queue and another listener(and notifier) for IPC to + divide pipeline sink endpoints. Coordination should be enabled to have + proper active/active HA. + """ NOTIFICATION_NAMESPACE = 'ceilometer.notification' + NOTIFICATION_IPC = 'ceilometer-pipe' @classmethod - def _get_notifications_manager(cls, pm): + def _get_notifications_manager(cls, transporter): return extension.ExtensionManager( namespace=cls.NOTIFICATION_NAMESPACE, invoke_on_load=True, - invoke_args=(pm, ) + invoke_args=(transporter, ) ) def start(self): super(NotificationService, self).start() - # FIXME(sileht): endpoint use notification_topics option - # and it should not because this is oslo.messaging option - # not a ceilometer, until we have a something to get - # the notification_topics in an other way - # we must create a transport to ensure the option have - # beeen registered by oslo.messaging - transport = messaging.get_transport() - messaging.get_notifier(transport, '') - self.pipeline_manager = pipeline.setup_pipeline() + transport = messaging.get_transport() + self.partition_coordinator = coordination.PartitionCoordinator() + self.partition_coordinator.start() + + if cfg.CONF.notification.workload_partitioning: + transporter = [] + for pipe in self.pipeline_manager.pipelines: + transporter.append(oslo.messaging.Notifier( + transport, + driver=cfg.CONF.publisher_notifier.metering_driver, + publisher_id='ceilometer.notification', + topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))) + + self.ctxt = context.get_admin_context() + self.group_id = self.NOTIFICATION_NAMESPACE + else: + # FIXME(sileht): endpoint use notification_topics option + # and it should not because this is oslo.messaging option + # not a ceilometer, until we have a something to get + # the notification_topics in an other way + # we must create a transport to ensure the option have + # beeen registered by oslo.messaging + messaging.get_notifier(transport, '') + transporter = self.pipeline_manager + self.group_id = None + + self.listeners = self.pipeline_listeners = [] + self._configure_main_queue_listeners(transporter) + + if cfg.CONF.notification.workload_partitioning: + self.partition_coordinator.join_group(self.group_id) + self._configure_pipeline_listeners() + self.partition_coordinator.watch_group(self.group_id, + self._refresh_agent) + + self.tg.add_timer(cfg.CONF.coordination.heartbeat, + self.partition_coordinator.heartbeat) + self.tg.add_timer(cfg.CONF.coordination.check_watchers, + self.partition_coordinator.run_watchers) + + # Add a dummy thread to have wait() working + self.tg.add_timer(604800, lambda: None) + + def _configure_main_queue_listeners(self, transporter): self.notification_manager = self._get_notifications_manager( - self.pipeline_manager) + transporter) if not list(self.notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) @@ -103,7 +154,6 @@ class NotificationService(os_service.Service): endpoints.append(handler) urls = cfg.CONF.notification.messaging_urls or [None] - self.listeners = [] for url in urls: transport = messaging.get_transport(url) listener = messaging.get_notification_listener( @@ -111,9 +161,35 @@ class NotificationService(os_service.Service): listener.start() self.listeners.append(listener) - # Add a dummy thread to have wait() working - self.tg.add_timer(604800, lambda: None) + @staticmethod + def _kill_listeners(listeners): + # NOTE(gordc): correct usage of oslo.messaging listener is to stop(), + # which stops new messages, and wait(), which processes remaining + # messages and closes connection + for listener in listeners: + listener.stop() + listener.wait() + + def _refresh_agent(self, event): + self._kill_listeners(self.pipeline_listeners) + self._configure_pipeline_listeners() + + def _configure_pipeline_listeners(self): + if cfg.CONF.notification.workload_partitioning: + partitioned = self.partition_coordinator.extract_my_subset( + self.group_id, self.pipeline_manager.pipelines) + transport = messaging.get_transport() + for pipe in partitioned: + LOG.debug(_('Pipeline endpoint: %s'), pipe.name) + listener = messaging.get_notification_listener( + transport, + [oslo.messaging.Target( + topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], + [pipeline.PipelineEndpoint(self.ctxt, pipe)]) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): - map(lambda x: x.stop(), self.listeners) + self.partition_coordinator.leave_group(self.group_id) + self._kill_listeners(self.listeners + self.pipeline_listeners) super(NotificationService, self).stop() diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 9c9e1b55c..8181726b3 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -26,6 +26,7 @@ import yaml from ceilometer.i18n import _ from ceilometer.openstack.common import log from ceilometer import publisher +from ceilometer import sample as sample_util from ceilometer import transformer as xformer @@ -50,6 +51,30 @@ class PipelineException(Exception): return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) +class PipelineEndpoint(object): + + def __init__(self, context, pipeline): + self.publish_context = PublishContext(context, [pipeline]) + + def sample(self, ctxt, publisher_id, event_type, payload, metadata): + """RPC endpoint for pipeline messages.""" + samples = [ + sample_util.Sample(name=s['counter_name'], + type=s['counter_type'], + unit=s['counter_unit'], + volume=s['counter_volume'], + user_id=s['user_id'], + project_id=s['project_id'], + resource_id=s['resource_id'], + timestamp=s['timestamp'], + resource_metadata=s['resource_metadata'], + source=s.get('source')) + for s in payload + ] + with self.publish_context as p: + p(samples) + + class PublishContext(object): def __init__(self, context, pipelines=None): diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index a13e745ba..d62f24752 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -182,9 +182,9 @@ class TestNotification(tests_base.BaseTestCase): self.assertEqual(1, len(self.srv.listeners[0].dispatcher.targets)) -class TestRealNotification(tests_base.BaseTestCase): +class BaseRealNotification(tests_base.BaseTestCase): def setUp(self): - super(TestRealNotification, self).setUp() + super(BaseRealNotification, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf self.setup_messaging(self.CONF, 'nova') @@ -202,12 +202,9 @@ class TestRealNotification(tests_base.BaseTestCase): prefix="pipeline", suffix="yaml") self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self.srv = notification.NotificationService() self.publisher = test_publisher.TestPublisher("") - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_notification_service(self, fake_publisher_cls): - fake_publisher_cls.return_value = self.publisher + def _check_notification_service(self): self.srv.start() notifier = messaging.get_notifier(self.transport, @@ -225,3 +222,43 @@ class TestRealNotification(tests_base.BaseTestCase): resources = list(set(s.resource_id for s in self.publisher.samples)) self.assertEqual(self.expected_samples, len(self.publisher.samples)) self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources) + + +class TestRealNotification(BaseRealNotification): + + def setUp(self): + super(TestRealNotification, self).setUp() + self.srv = notification.NotificationService() + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_notification_service(self, fake_publisher_cls): + fake_publisher_cls.return_value = self.publisher + self._check_notification_service() + + @mock.patch('ceilometer.coordination.PartitionCoordinator') + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_ha_configured_agent_coord_disabled(self, fake_publisher_cls, + fake_coord): + fake_publisher_cls.return_value = self.publisher + fake_coord1 = mock.MagicMock() + fake_coord1.extract_my_subset.side_effect = lambda x, y: y + fake_coord.return_value = fake_coord1 + self._check_notification_service() + + +class TestRealNotificationHA(BaseRealNotification): + + def setUp(self): + super(TestRealNotificationHA, self).setUp() + self.CONF.set_override('workload_partitioning', True, + group='notification') + self.srv = notification.NotificationService() + + @mock.patch('ceilometer.coordination.PartitionCoordinator') + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_notification_service(self, fake_publisher_cls, fake_coord): + fake_publisher_cls.return_value = self.publisher + fake_coord1 = mock.MagicMock() + fake_coord1.extract_my_subset.side_effect = lambda x, y: y + fake_coord.return_value = fake_coord1 + self._check_notification_service()