diff --git a/ceilometer/collector/dispatcher.py b/ceilometer/collector/dispatcher.py deleted file mode 100644 index bcf23c637..000000000 --- a/ceilometer/collector/dispatcher.py +++ /dev/null @@ -1,78 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# Copyright © 2012 New Dream Network, LLC (DreamHost) -# -# Author: Doug Hellmann -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Given an incoming message, process it through the registered converters -and publish the results. -""" - -import pkg_resources - -from ceilometer.openstack.common import log - -LOG = log.getLogger(__name__) - - -class NotificationDispatcher(object): - """Manages invoking plugins to convert notification messages to counters. - """ - - def __init__(self, plugin_namespace, publish_func): - self.plugin_namespace = plugin_namespace - self.publish_func = publish_func - self.handlers = {} - self.topics = set() - self._load_plugins() - - def _load_plugins(self): - # Listen for notifications from nova - for ep in pkg_resources.iter_entry_points(self.plugin_namespace): - LOG.info('attempting to load notification handler for %s:%s', - self.plugin_namespace, ep.name) - try: - # FIXME(dhellmann): Currently assumes all plugins are - # enabled when they are discovered and - # importable. Need to add check against global - # configuration flag and check that asks the plugin if - # it should be enabled. - plugin_class = ep.load() - plugin = plugin_class() - self.topics.update(plugin.topics) - for event_type in plugin.get_event_types(): - LOG.info('subscribing %s handler to %s events', - ep.name, event_type) - self.handlers.setdefault(event_type, []).append(plugin) - except Exception as err: - LOG.warning('Failed to load notification handler %s: %s', - ep.name, err) - LOG.exception(err) - if not self.handlers: - LOG.warning('Failed to load any notification handlers for %s', - self.plugin_namespace) - - def notify(self, topic, body): - """Dispatch the notification to the appropriate handler - and publish the counters returned. - """ - event_type = body.get('event_type') - LOG.info('NOTIFICATION: %s', event_type) - for handler in self.handlers.get(event_type, []): - if topic in handler.topics: - for c in handler.process_notification(body): - LOG.info('COUNTER: %s', c) - # FIXME(dhellmann): Spawn green thread? - self.publish_func(c) - return diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index 85b033cb5..ab3d37c8b 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -17,6 +17,8 @@ # under the License. import functools +import itertools +import pkg_resources from nova import context from nova import manager @@ -24,28 +26,47 @@ from nova import manager from ceilometer import meter from ceilometer import publish from ceilometer import storage -from ceilometer.collector import dispatcher from ceilometer.openstack.common import cfg from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher -# Import rabbit_notifier to register notification_topics flag -import ceilometer.openstack.common.notifier.rabbit_notifier try: import ceilometer.openstack.common.rpc as rpc except ImportError: # For Essex import nova.rpc as rpc + LOG = log.getLogger(__name__) -COLLECTOR_NAMESPACE = 'ceilometer.collector' - - class CollectorManager(manager.Manager): + COLLECTOR_NAMESPACE = 'ceilometer.collector' + + @staticmethod + def _load_plugins(plugin_namespace): + handlers = [] + # Listen for notifications from nova + for ep in pkg_resources.iter_entry_points(plugin_namespace): + LOG.info('attempting to load notification handler for %s:%s', + plugin_namespace, ep.name) + try: + # FIXME(dhellmann): Currently assumes all plugins are + # enabled when they are discovered and + # importable. Need to add check against global + # configuration flag and check that asks the plugin if + # it should be enabled. + plugin_class = ep.load() + plugin = plugin_class() + handlers.append(plugin) + except Exception as err: + LOG.warning('Failed to load notification handler %s: %s', + ep.name, err) + LOG.exception(err) + return handlers + def init_host(self): # Use the nova configuration flags to get # a connection to the RPC mechanism nova @@ -56,19 +77,26 @@ class CollectorManager(manager.Manager): self.storage_engine = storage.get_engine(cfg.CONF) self.storage_conn = self.storage_engine.get_connection(cfg.CONF) - self.handler = dispatcher.NotificationDispatcher( - COLLECTOR_NAMESPACE, - self._publish_counter, - ) + self.handlers = self._load_plugins(self.COLLECTOR_NAMESPACE) + + if not self.handlers: + LOG.warning('Failed to load any notification handlers for %s', + self.plugin_namespace) + # FIXME(dhellmann): Should be using create_worker(), except # that notification messages do not conform to the RPC # invocation protocol (they do not include a "method" # parameter). - for topic in self.handler.topics: - self.connection.declare_topic_consumer( - topic=topic, - queue_name="ceilometer.notifications", - callback=functools.partial(self.handler.notify, topic)) + for handler in self.handlers: + for exchange_topic in handler.get_exchange_topics(cfg.CONF): + for topic in exchange_topic.topics: + self.connection.declare_topic_consumer( + queue_name="ceilometer.notifications", + topic=topic, + exchange_name=exchange_topic.exchange, + callback=functools.partial( + self.process_notification, + handler)) # Set ourselves up as a separate worker for the metering data, # since the default for manager is to use create_consumer(). @@ -80,7 +108,16 @@ class CollectorManager(manager.Manager): self.connection.consume_in_thread() - def _publish_counter(self, counter): + def process_notification(self, handler, notification): + """Make a notification processed by an handler.""" + if notification['event_type'] in handler.get_event_types(): + for c in handler.process_notification(notification): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Spawn green thread? + self.publish_counter(c) + + @staticmethod + def publish_counter(counter): """Create a metering message for the counter and publish it.""" ctxt = context.get_admin_context() publish.publish_counter(ctxt, counter) diff --git a/ceilometer/compute/notifications.py b/ceilometer/compute/notifications.py index 6836fcdac..c10a5ae0d 100644 --- a/ceilometer/compute/notifications.py +++ b/ceilometer/compute/notifications.py @@ -21,6 +21,17 @@ from ceilometer import counter from ceilometer import plugin from ceilometer.compute import instance +from ceilometer.openstack.common import cfg + + +OPTS = [ + cfg.StrOpt('nova_control_exchange', + default='nova', + help="Exchange name for Cinder notifications"), +] + + +cfg.CONF.register_opts(OPTS) class _Base(plugin.NotificationBase): @@ -40,6 +51,17 @@ class _Base(plugin.NotificationBase): 'compute.instance.delete.start', ] + @staticmethod + def get_exchange_topics(conf): + """Return a sequence of ExchangeTopics defining the exchange and + topics to be connected for this plugin.""" + return [ + plugin.ExchangeTopics( + exchange=conf.nova_control_exchange, + topics=set(topic + ".info" + for topic in conf.notification_topics)), + ] + class Instance(_Base): diff --git a/ceilometer/image/notifications.py b/ceilometer/image/notifications.py index 674513521..2a3eb0486 100644 --- a/ceilometer/image/notifications.py +++ b/ceilometer/image/notifications.py @@ -22,6 +22,18 @@ from ceilometer import counter from ceilometer import plugin +from ceilometer.openstack.common import cfg + + +OPTS = [ + cfg.StrOpt('glance_control_exchange', + default='glance_notifications', + help="Exchange name for Cinder notifications"), +] + + +cfg.CONF.register_opts(OPTS) + class ImageBase(plugin.NotificationBase): """ @@ -33,6 +45,17 @@ class ImageBase(plugin.NotificationBase): def get_event_types(): return ['image.send'] + @staticmethod + def get_exchange_topics(conf): + """Return a sequence of ExchangeTopics defining the exchange and + topics to be connected for this plugin.""" + return [ + plugin.ExchangeTopics( + exchange=conf.glance_control_exchange, + topics=set(topic + ".info" + for topic in conf.notification_topics)), + ] + def _counter(self, message, name, user_id, project_id): metadata = self.notification_to_metadata(message) return counter.Counter( diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index 5348ad121..1edf2012c 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -19,8 +19,15 @@ """ import abc +from collections import namedtuple from ceilometer.openstack.common import cfg +# Import rabbit_notifier to register notification_topics flag so that +# plugins can use it +import ceilometer.openstack.common.notifier.rabbit_notifier + + +ExchangeTopics = namedtuple('ExchangeTopics', ['exchange', 'topics']) class NotificationBase(object): @@ -28,15 +35,16 @@ class NotificationBase(object): __metaclass__ = abc.ABCMeta - def __init__(self): - self.topics = set(topic + ".info" - for topic in cfg.CONF.notification_topics) - @abc.abstractmethod def get_event_types(self): """Return a sequence of strings defining the event types to be given to this plugin.""" + @abc.abstractmethod + def get_exchange_topics(self, conf): + """Return a sequence of ExchangeTopics defining the exchange and + topics to be connected for this plugin.""" + @abc.abstractmethod def process_notification(self, message): """Return a sequence of Counter instances for the given message.""" diff --git a/ceilometer/volume/notifications.py b/ceilometer/volume/notifications.py index ea138ae6c..054a5d478 100644 --- a/ceilometer/volume/notifications.py +++ b/ceilometer/volume/notifications.py @@ -22,6 +22,18 @@ events. from ceilometer import counter from ceilometer import plugin +from ceilometer.openstack.common import cfg + + +OPTS = [ + cfg.StrOpt('cinder_control_exchange', + default='cinder', + help="Exchange name for Cinder notifications"), +] + + +cfg.CONF.register_opts(OPTS) + class _Base(plugin.NotificationBase): """Convert compute.instance.* notifications into Counters @@ -34,6 +46,17 @@ class _Base(plugin.NotificationBase): "size", ] + @staticmethod + def get_exchange_topics(conf): + """Return a sequence of ExchangeTopics defining the exchange and + topics to be connected for this plugin.""" + return [ + plugin.ExchangeTopics( + exchange=conf.cinder_control_exchange, + topics=set(topic + ".info" + for topic in conf.notification_topics)), + ] + @staticmethod def get_event_types(): return ['volume.exists', diff --git a/tests/collector/test_dispatcher.py b/tests/collector/test_dispatcher.py deleted file mode 100644 index 82aff2a48..000000000 --- a/tests/collector/test_dispatcher.py +++ /dev/null @@ -1,129 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# Copyright © 2012 New Dream Network, LLC (DreamHost) -# -# Author: Doug Hellmann -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Tests for ceilometer/nova/dispatcher.py -""" - -from ceilometer.compute import notifications -from ceilometer.collector import dispatcher - - -class StubDispatcher(dispatcher.NotificationDispatcher): - def _load_plugins(self): - self.handlers['compute.instance.create.end'] = [ - notifications.Instance(), - ] - - -TEST_NOTICE = { - u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', - u'_context_is_admin': True, - u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e', - u'_context_quota_class': None, - u'_context_read_deleted': u'no', - u'_context_remote_address': u'10.0.2.15', - u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', - u'_context_roles': [u'admin'], - u'_context_timestamp': u'2012-05-08T20:23:41.425105', - u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'event_type': u'compute.instance.create.end', - u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', - u'payload': {u'created_at': u'2012-05-08 20:23:41', - u'deleted_at': u'', - u'disk_gb': 0, - u'display_name': u'testme', - u'fixed_ips': [{u'address': u'10.0.0.2', - u'floating_ips': [], - u'meta': {}, - u'type': u'fixed', - u'version': 4}], - u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', - u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', - u'instance_type': u'm1.tiny', - u'instance_type_id': 2, - u'launched_at': u'2012-05-08 20:23:47.985999', - u'memory_mb': 512, - u'state': u'active', - u'state_description': u'', - u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', - u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3', - u'vcpus': 1, - u'root_gb': 0, - u'ephemeral_gb': 0, - u'host': u'compute-host-name', - u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4', - u'os_type': u'linux?', - u'architecture': u'x86', - u'image_ref': u'UUID', - u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5', - u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6', - }, - u'priority': u'INFO', - u'publisher_id': u'compute.vagrant-precise', - u'timestamp': u'2012-05-08 20:23:48.028195', - } - - -def test_notify(): - results = [] - d = StubDispatcher(None, lambda x: results.append(x)) - d.notify("notifications.info", TEST_NOTICE) - assert len(results) >= 1 - counter = results[0] - assert counter.name == 'instance' - - -def test_load_plugins(): - results = [] - d = dispatcher.NotificationDispatcher( - 'ceilometer.collector', - lambda x: results.append(x) - ) - assert d.handlers, 'No handlers were loaded' - - -def test_load_no_plugins(): - results = [] - d = dispatcher.NotificationDispatcher( - 'ceilometer.collector.none', - lambda x: results.append(x) - ) - assert not d.handlers, 'Handlers were loaded' - - -def test_notify_through_plugin(): - results = [] - d = dispatcher.NotificationDispatcher( - 'ceilometer.collector', - lambda x: results.append(x) - ) - d.notify("notifications.info", TEST_NOTICE) - assert len(results) >= 1 - results_name = [result.name for result in results] - assert 'instance' in results_name - assert 'memory' in results_name - - -def test_notify_topics(): - results = [] - d = dispatcher.NotificationDispatcher( - 'ceilometer.collector', - lambda x: results.append(x) - ) - d.notify("dont.care", TEST_NOTICE) - assert len(results) == 0 diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index e257acc75..ba5994f1b 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -23,7 +23,71 @@ import datetime from ceilometer import meter from ceilometer.collector import manager from ceilometer.storage import base +from ceilometer.openstack.common import rpc +from ceilometer.openstack.common import cfg from ceilometer.tests import base as tests_base +from ceilometer.compute import notifications + + +TEST_NOTICE = { + u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', + u'_context_is_admin': True, + u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e', + u'_context_quota_class': None, + u'_context_read_deleted': u'no', + u'_context_remote_address': u'10.0.2.15', + u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', + u'_context_roles': [u'admin'], + u'_context_timestamp': u'2012-05-08T20:23:41.425105', + u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2', + u'event_type': u'compute.instance.create.end', + u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', + u'payload': {u'created_at': u'2012-05-08 20:23:41', + u'deleted_at': u'', + u'disk_gb': 0, + u'display_name': u'testme', + u'fixed_ips': [{u'address': u'10.0.0.2', + u'floating_ips': [], + u'meta': {}, + u'type': u'fixed', + u'version': 4}], + u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', + u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', + u'instance_type': u'm1.tiny', + u'instance_type_id': 2, + u'launched_at': u'2012-05-08 20:23:47.985999', + u'memory_mb': 512, + u'state': u'active', + u'state_description': u'', + u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', + u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', + u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3', + u'vcpus': 1, + u'root_gb': 0, + u'ephemeral_gb': 0, + u'host': u'compute-host-name', + u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4', + u'os_type': u'linux?', + u'architecture': u'x86', + u'image_ref': u'UUID', + u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5', + u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6', + }, + u'priority': u'INFO', + u'publisher_id': u'compute.vagrant-precise', + u'timestamp': u'2012-05-08 20:23:48.028195', + } + + +class StubConnection(object): + def declare_topic_consumer(*args, **kwargs): + pass + + def create_worker(*args, **kwargs): + pass + + def consume_in_thread(self): + pass class TestCollectorManager(tests_base.TestCase): @@ -33,6 +97,11 @@ class TestCollectorManager(tests_base.TestCase): self.mgr = manager.CollectorManager() self.ctx = None + def test_init_host(self): + self.stubs.Set(rpc, 'create_connection', lambda: StubConnection()) + cfg.CONF.database_connection = 'log://localhost' + self.mgr.init_host() + def test_valid_message(self): msg = {'counter_name': 'test', 'resource_id': self.id(), @@ -86,3 +155,18 @@ class TestCollectorManager(tests_base.TestCase): self.mgr.record_metering_data(self.ctx, msg) self.mox.VerifyAll() + + def test_load_plugins(self): + results = self.mgr._load_plugins(self.mgr.COLLECTOR_NAMESPACE) + self.assert_(len(results) > 0) + + def test_load_no_plugins(self): + results = self.mgr._load_plugins("foobar.namespace") + self.assertEqual(results, []) + + def test_process_notification(self): + results = [] + self.stubs.Set(self.mgr, 'publish_counter', + lambda counter: results.append(counter)) + self.mgr.process_notification(notifications.Instance(), TEST_NOTICE) + self.assert_(len(results) >= 1)