From 80c3a1a23d00a0f53f7f7d310b208606cae13160 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Tue, 7 Apr 2015 15:23:10 -0400 Subject: [PATCH] use oslo.messaging dispatch filter oslo.messaging has the ability to filter notifications prior to dispatching message to endpoints. we should use it. Change-Id: Icafa81cce25e173e109b221010402ceb02023abb --- ceilometer/agent/plugin_base.py | 25 ++----- ceilometer/tests/agent/test_plugin.py | 96 +-------------------------- 2 files changed, 9 insertions(+), 112 deletions(-) diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py index e52103a65..43d606be8 100644 --- a/ceilometer/agent/plugin_base.py +++ b/ceilometer/agent/plugin_base.py @@ -17,12 +17,11 @@ import abc import collections -import fnmatch from keystoneclient.v2_0 import client as ksclient -import oslo.messaging from oslo_config import cfg from oslo_context import context +import oslo_messaging import six from ceilometer.i18n import _ @@ -93,6 +92,10 @@ class NotificationBase(PluginBase): """Base class for plugins that support the notification API.""" def __init__(self, transporter): super(NotificationBase, self).__init__() + # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch + # messages to an endpoint. + self.filter_rule = oslo_messaging.NotificationFilter( + event_type='|'.join(self.event_types)) self.transporter = transporter # NOTE(gordc): if no publisher, this isn't a PipelineManager and # data should be requeued. @@ -120,7 +123,7 @@ class NotificationBase(PluginBase): targets = [] for exchange, topics in self.get_exchange_topics(conf): - targets.extend(oslo.messaging.Target(topic=topic, + targets.extend(oslo_messaging.Target(topic=topic, exchange=exchange) for topic in topics) return targets @@ -132,15 +135,6 @@ class NotificationBase(PluginBase): :param message: Message to process. """ - @staticmethod - def _handle_event_type(event_type, event_type_to_handle): - """Check whether event_type should be handled. - - It is according to event_type_to_handle. - """ - return any(map(lambda e: fnmatch.fnmatch(event_type, e), - event_type_to_handle)) - def info(self, ctxt, publisher_id, event_type, payload, metadata): """RPC endpoint for notification messages @@ -165,13 +159,6 @@ class NotificationBase(PluginBase): :param context: Execution context from the service or RPC call :param notification: The notification to process. """ - - # TODO(sileht): this will be moved into oslo.messaging - # see oslo.messaging bp notification-dispatcher-filter - if not self._handle_event_type(notification['event_type'], - self.event_types): - return - if self.requeue: meters = [ utils.meter_message_from_counter( diff --git a/ceilometer/tests/agent/test_plugin.py b/ceilometer/tests/agent/test_plugin.py index de5d53352..ad5d9057c 100644 --- a/ceilometer/tests/agent/test_plugin.py +++ b/ceilometer/tests/agent/test_plugin.py @@ -20,80 +20,14 @@ from oslotest import base from ceilometer.agent import plugin_base -TEST_NOTIFICATION = { - u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', - u'_context_is_admin': True, - u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e', - u'_context_quota_class': None, - u'_context_read_deleted': u'no', - u'_context_remote_address': u'10.0.2.15', - u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', - u'_context_roles': [u'admin'], - u'_context_timestamp': u'2012-05-08T20:23:41.425105', - u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'event_type': u'compute.instance.create.end', - u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', - u'payload': {u'created_at': u'2012-05-08 20:23:41', - u'deleted_at': u'', - u'disk_gb': 0, - u'display_name': u'testme', - u'fixed_ips': [{u'address': u'10.0.0.2', - u'floating_ips': [], - u'meta': {}, - u'type': u'fixed', - u'version': 4}], - u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', - u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', - u'instance_type': u'm1.tiny', - u'instance_type_id': 2, - u'launched_at': u'2012-05-08 20:23:47.985999', - u'memory_mb': 512, - u'state': u'active', - u'state_description': u'', - u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', - u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3', - u'vcpus': 1, - u'root_gb': 0, - u'ephemeral_gb': 0, - u'host': u'compute-host-name', - u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4', - u'os_type': u'linux?', - u'architecture': u'x86', - u'image_ref': u'UUID', - u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5', - u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6', - }, - u'priority': u'INFO', - u'publisher_id': u'compute.vagrant-precise', - u'timestamp': u'2012-05-08 20:23:48.028195', -} - - class NotificationBaseTestCase(base.BaseTestCase): def setUp(self): super(NotificationBaseTestCase, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf - def test_handle_event_type(self): - self.assertFalse(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['compute'])) - self.assertFalse(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['compute.*.foobar'])) - self.assertFalse(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['compute.*.*.foobar'])) - self.assertTrue(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['compute.*'])) - self.assertTrue(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['*'])) - self.assertTrue(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['compute.*.start'])) - self.assertTrue(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['*.start'])) - self.assertTrue(plugin_base.NotificationBase._handle_event_type( - 'compute.instance.start', ['compute.*.*.foobar', 'compute.*'])) - class FakePlugin(plugin_base.NotificationBase): + event_types = ['compute.*'] + @staticmethod def get_exchange_topics(conf): return [plugin_base.ExchangeTopics(exchange="exchange1", @@ -104,32 +38,8 @@ class NotificationBaseTestCase(base.BaseTestCase): def process_notification(self, message): return message - class FakeComputePlugin(FakePlugin): - event_types = ['compute.*'] - - class FakeNetworkPlugin(FakePlugin): - event_types = ['network.*'] - - def _do_test_to_samples(self, plugin_class, match): - pm = mock.MagicMock() - plug = plugin_class(pm) - publish = pm.publisher.return_value.__enter__.return_value - - plug.to_samples_and_publish(mock.Mock(), TEST_NOTIFICATION) - - if match: - publish.assert_called_once_with(list(TEST_NOTIFICATION)) - else: - self.assertEqual(0, publish.call_count) - - def test_to_samples_match(self): - self._do_test_to_samples(self.FakeComputePlugin, True) - - def test_to_samples_no_match(self): - self._do_test_to_samples(self.FakeNetworkPlugin, False) - def test_get_targets_compat(self): - targets = self.FakeComputePlugin(mock.Mock()).get_targets(self.CONF) + targets = self.FakePlugin(mock.Mock()).get_targets(self.CONF) self.assertEqual(3, len(targets)) self.assertEqual('t1', targets[0].topic) self.assertEqual('exchange1', targets[0].exchange)