Rework RPC notification mechanism

We allow plugin to specify clearly which exchange and topic there are
interested in, and we call directly the handler process notification without
looking in a dict to be faster.

Change-Id: I867f0b2568d140bfc0dae63163fd02c8c080d8d1
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2012-09-28 10:47:06 +02:00
parent cafeb43f28
commit 6ddde20759
8 changed files with 217 additions and 227 deletions

View File

@ -1,78 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Given an incoming message, process it through the registered converters
and publish the results.
"""
import pkg_resources
from ceilometer.openstack.common import log
LOG = log.getLogger(__name__)
class NotificationDispatcher(object):
"""Manages invoking plugins to convert notification messages to counters.
"""
def __init__(self, plugin_namespace, publish_func):
self.plugin_namespace = plugin_namespace
self.publish_func = publish_func
self.handlers = {}
self.topics = set()
self._load_plugins()
def _load_plugins(self):
# Listen for notifications from nova
for ep in pkg_resources.iter_entry_points(self.plugin_namespace):
LOG.info('attempting to load notification handler for %s:%s',
self.plugin_namespace, ep.name)
try:
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
plugin_class = ep.load()
plugin = plugin_class()
self.topics.update(plugin.topics)
for event_type in plugin.get_event_types():
LOG.info('subscribing %s handler to %s events',
ep.name, event_type)
self.handlers.setdefault(event_type, []).append(plugin)
except Exception as err:
LOG.warning('Failed to load notification handler %s: %s',
ep.name, err)
LOG.exception(err)
if not self.handlers:
LOG.warning('Failed to load any notification handlers for %s',
self.plugin_namespace)
def notify(self, topic, body):
"""Dispatch the notification to the appropriate handler
and publish the counters returned.
"""
event_type = body.get('event_type')
LOG.info('NOTIFICATION: %s', event_type)
for handler in self.handlers.get(event_type, []):
if topic in handler.topics:
for c in handler.process_notification(body):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_func(c)
return

View File

@ -17,6 +17,8 @@
# under the License.
import functools
import itertools
import pkg_resources
from nova import context
from nova import manager
@ -24,27 +26,46 @@ from nova import manager
from ceilometer import meter
from ceilometer import publish
from ceilometer import storage
from ceilometer.collector import dispatcher
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
# Import rabbit_notifier to register notification_topics flag
import ceilometer.openstack.common.notifier.rabbit_notifier
try:
import ceilometer.openstack.common.rpc as rpc
except ImportError:
# For Essex
import nova.rpc as rpc
LOG = log.getLogger(__name__)
class CollectorManager(manager.Manager):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
class CollectorManager(manager.Manager):
@staticmethod
def _load_plugins(plugin_namespace):
handlers = []
# Listen for notifications from nova
for ep in pkg_resources.iter_entry_points(plugin_namespace):
LOG.info('attempting to load notification handler for %s:%s',
plugin_namespace, ep.name)
try:
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
plugin_class = ep.load()
plugin = plugin_class()
handlers.append(plugin)
except Exception as err:
LOG.warning('Failed to load notification handler %s: %s',
ep.name, err)
LOG.exception(err)
return handlers
def init_host(self):
# Use the nova configuration flags to get
@ -56,19 +77,26 @@ class CollectorManager(manager.Manager):
self.storage_engine = storage.get_engine(cfg.CONF)
self.storage_conn = self.storage_engine.get_connection(cfg.CONF)
self.handler = dispatcher.NotificationDispatcher(
COLLECTOR_NAMESPACE,
self._publish_counter,
)
self.handlers = self._load_plugins(self.COLLECTOR_NAMESPACE)
if not self.handlers:
LOG.warning('Failed to load any notification handlers for %s',
self.plugin_namespace)
# FIXME(dhellmann): Should be using create_worker(), except
# that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method"
# parameter).
for topic in self.handler.topics:
for handler in self.handlers:
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics:
self.connection.declare_topic_consumer(
topic=topic,
queue_name="ceilometer.notifications",
callback=functools.partial(self.handler.notify, topic))
topic=topic,
exchange_name=exchange_topic.exchange,
callback=functools.partial(
self.process_notification,
handler))
# Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer().
@ -80,7 +108,16 @@ class CollectorManager(manager.Manager):
self.connection.consume_in_thread()
def _publish_counter(self, counter):
def process_notification(self, handler, notification):
"""Make a notification processed by an handler."""
if notification['event_type'] in handler.get_event_types():
for c in handler.process_notification(notification):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_counter(c)
@staticmethod
def publish_counter(counter):
"""Create a metering message for the counter and publish it."""
ctxt = context.get_admin_context()
publish.publish_counter(ctxt, counter)

View File

@ -21,6 +21,17 @@
from ceilometer import counter
from ceilometer import plugin
from ceilometer.compute import instance
from ceilometer.openstack.common import cfg
OPTS = [
cfg.StrOpt('nova_control_exchange',
default='nova',
help="Exchange name for Cinder notifications"),
]
cfg.CONF.register_opts(OPTS)
class _Base(plugin.NotificationBase):
@ -40,6 +51,17 @@ class _Base(plugin.NotificationBase):
'compute.instance.delete.start',
]
@staticmethod
def get_exchange_topics(conf):
"""Return a sequence of ExchangeTopics defining the exchange and
topics to be connected for this plugin."""
return [
plugin.ExchangeTopics(
exchange=conf.nova_control_exchange,
topics=set(topic + ".info"
for topic in conf.notification_topics)),
]
class Instance(_Base):

View File

@ -22,6 +22,18 @@
from ceilometer import counter
from ceilometer import plugin
from ceilometer.openstack.common import cfg
OPTS = [
cfg.StrOpt('glance_control_exchange',
default='glance_notifications',
help="Exchange name for Cinder notifications"),
]
cfg.CONF.register_opts(OPTS)
class ImageBase(plugin.NotificationBase):
"""
@ -33,6 +45,17 @@ class ImageBase(plugin.NotificationBase):
def get_event_types():
return ['image.send']
@staticmethod
def get_exchange_topics(conf):
"""Return a sequence of ExchangeTopics defining the exchange and
topics to be connected for this plugin."""
return [
plugin.ExchangeTopics(
exchange=conf.glance_control_exchange,
topics=set(topic + ".info"
for topic in conf.notification_topics)),
]
def _counter(self, message, name, user_id, project_id):
metadata = self.notification_to_metadata(message)
return counter.Counter(

View File

@ -19,8 +19,15 @@
"""
import abc
from collections import namedtuple
from ceilometer.openstack.common import cfg
# Import rabbit_notifier to register notification_topics flag so that
# plugins can use it
import ceilometer.openstack.common.notifier.rabbit_notifier
ExchangeTopics = namedtuple('ExchangeTopics', ['exchange', 'topics'])
class NotificationBase(object):
@ -28,15 +35,16 @@ class NotificationBase(object):
__metaclass__ = abc.ABCMeta
def __init__(self):
self.topics = set(topic + ".info"
for topic in cfg.CONF.notification_topics)
@abc.abstractmethod
def get_event_types(self):
"""Return a sequence of strings defining the event types to be
given to this plugin."""
@abc.abstractmethod
def get_exchange_topics(self, conf):
"""Return a sequence of ExchangeTopics defining the exchange and
topics to be connected for this plugin."""
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message."""

View File

@ -22,6 +22,18 @@ events.
from ceilometer import counter
from ceilometer import plugin
from ceilometer.openstack.common import cfg
OPTS = [
cfg.StrOpt('cinder_control_exchange',
default='cinder',
help="Exchange name for Cinder notifications"),
]
cfg.CONF.register_opts(OPTS)
class _Base(plugin.NotificationBase):
"""Convert compute.instance.* notifications into Counters
@ -34,6 +46,17 @@ class _Base(plugin.NotificationBase):
"size",
]
@staticmethod
def get_exchange_topics(conf):
"""Return a sequence of ExchangeTopics defining the exchange and
topics to be connected for this plugin."""
return [
plugin.ExchangeTopics(
exchange=conf.cinder_control_exchange,
topics=set(topic + ".info"
for topic in conf.notification_topics)),
]
@staticmethod
def get_event_types():
return ['volume.exists',

View File

@ -1,129 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/nova/dispatcher.py
"""
from ceilometer.compute import notifications
from ceilometer.collector import dispatcher
class StubDispatcher(dispatcher.NotificationDispatcher):
def _load_plugins(self):
self.handlers['compute.instance.create.end'] = [
notifications.Instance(),
]
TEST_NOTICE = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
def test_notify():
results = []
d = StubDispatcher(None, lambda x: results.append(x))
d.notify("notifications.info", TEST_NOTICE)
assert len(results) >= 1
counter = results[0]
assert counter.name == 'instance'
def test_load_plugins():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector',
lambda x: results.append(x)
)
assert d.handlers, 'No handlers were loaded'
def test_load_no_plugins():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.none',
lambda x: results.append(x)
)
assert not d.handlers, 'Handlers were loaded'
def test_notify_through_plugin():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector',
lambda x: results.append(x)
)
d.notify("notifications.info", TEST_NOTICE)
assert len(results) >= 1
results_name = [result.name for result in results]
assert 'instance' in results_name
assert 'memory' in results_name
def test_notify_topics():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector',
lambda x: results.append(x)
)
d.notify("dont.care", TEST_NOTICE)
assert len(results) == 0

View File

@ -23,7 +23,71 @@ import datetime
from ceilometer import meter
from ceilometer.collector import manager
from ceilometer.storage import base
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import cfg
from ceilometer.tests import base as tests_base
from ceilometer.compute import notifications
TEST_NOTICE = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
class StubConnection(object):
def declare_topic_consumer(*args, **kwargs):
pass
def create_worker(*args, **kwargs):
pass
def consume_in_thread(self):
pass
class TestCollectorManager(tests_base.TestCase):
@ -33,6 +97,11 @@ class TestCollectorManager(tests_base.TestCase):
self.mgr = manager.CollectorManager()
self.ctx = None
def test_init_host(self):
self.stubs.Set(rpc, 'create_connection', lambda: StubConnection())
cfg.CONF.database_connection = 'log://localhost'
self.mgr.init_host()
def test_valid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
@ -86,3 +155,18 @@ class TestCollectorManager(tests_base.TestCase):
self.mgr.record_metering_data(self.ctx, msg)
self.mox.VerifyAll()
def test_load_plugins(self):
results = self.mgr._load_plugins(self.mgr.COLLECTOR_NAMESPACE)
self.assert_(len(results) > 0)
def test_load_no_plugins(self):
results = self.mgr._load_plugins("foobar.namespace")
self.assertEqual(results, [])
def test_process_notification(self):
results = []
self.stubs.Set(self.mgr, 'publish_counter',
lambda counter: results.append(counter))
self.mgr.process_notification(notifications.Instance(), TEST_NOTICE)
self.assert_(len(results) >= 1)