diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py index 43d606be8..3c97192fb 100644 --- a/ceilometer/agent/plugin_base.py +++ b/ceilometer/agent/plugin_base.py @@ -27,7 +27,6 @@ import six from ceilometer.i18n import _ from ceilometer import messaging from ceilometer.openstack.common import log -from ceilometer.publisher import utils cfg.CONF.import_group('service_credentials', 'ceilometer.service') @@ -90,16 +89,13 @@ class PluginBase(object): @six.add_metaclass(abc.ABCMeta) class NotificationBase(PluginBase): """Base class for plugins that support the notification API.""" - def __init__(self, transporter): + def __init__(self, manager): super(NotificationBase, self).__init__() # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch # messages to an endpoint. self.filter_rule = oslo_messaging.NotificationFilter( event_type='|'.join(self.event_types)) - self.transporter = transporter - # NOTE(gordc): if no publisher, this isn't a PipelineManager and - # data should be requeued. - self.requeue = False if hasattr(transporter, 'publisher') else True + self.manager = manager @abc.abstractproperty def event_types(self): @@ -159,19 +155,8 @@ class NotificationBase(PluginBase): :param context: Execution context from the service or RPC call :param notification: The notification to process. """ - if self.requeue: - meters = [ - utils.meter_message_from_counter( - sample, cfg.CONF.publisher.telemetry_secret) - for sample in self.process_notification(notification) - ] - for notifier in self.transporter: - notifier.sample(context.to_dict(), - event_type='ceilometer.pipeline', - payload=meters) - else: - with self.transporter.publisher(context) as p: - p(list(self.process_notification(notification))) + with self.manager.publisher(context) as p: + p(list(self.process_notification(notification))) class NonMetricNotificationBase(object): diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py index aca525781..e7162ba43 100644 --- a/ceilometer/event/endpoint.py +++ b/ceilometer/event/endpoint.py @@ -23,23 +23,19 @@ from stevedore import extension from ceilometer.event import converter as event_converter from ceilometer.i18n import _ from ceilometer import messaging -from ceilometer.publisher import utils LOG = logging.getLogger(__name__) class EventsNotificationEndpoint(object): - def __init__(self, transporter): + def __init__(self, manager): super(EventsNotificationEndpoint, self).__init__() LOG.debug(_('Loading event definitions')) self.ctxt = context.get_admin_context() self.event_converter = event_converter.setup_events( extension.ExtensionManager( namespace='ceilometer.event.trait_plugin')) - self.transporter = transporter - # NOTE(gordc): if no publisher, this isn't a PipelineManager and - # data should be requeued. - self.requeue = not hasattr(transporter, 'publisher') + self.manager = manager def info(self, ctxt, publisher_id, event_type, payload, metadata): """Convert message to Ceilometer Event. @@ -79,17 +75,8 @@ class EventsNotificationEndpoint(object): try: event = self.event_converter.to_event(notification) if event is not None: - if self.requeue: - serialized_event = utils.message_from_event( - event, cfg.CONF.publisher.telemetry_secret) - for notifier in self.transporter: - notifier.sample( - self.ctxt.to_dict(), - event_type='pipeline.event', - payload=[serialized_event]) - else: - with self.transporter.publisher(self.ctxt) as p: - p(event) + with self.manager.publisher(self.ctxt) as p: + p(event) except Exception: if not cfg.CONF.notification.ack_on_event_error: return oslo.messaging.NotificationResult.REQUEUE diff --git a/ceilometer/notification.py b/ceilometer/notification.py index d6643a44f..a87649580 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -86,11 +86,11 @@ class NotificationService(os_service.Service): self.group_id = None @classmethod - def _get_notifications_manager(cls, transporter): + def _get_notifications_manager(cls, pm): return extension.ExtensionManager( namespace=cls.NOTIFICATION_NAMESPACE, invoke_on_load=True, - invoke_args=(transporter, ) + invoke_args=(pm, ) ) def _get_notifier(self, transport, pipe): @@ -110,16 +110,19 @@ class NotificationService(os_service.Service): self.partition_coordinator = coordination.PartitionCoordinator() self.partition_coordinator.start() - event_transporter = None + event_pipe_manager = None if cfg.CONF.notification.workload_partitioning: - transporter = [] + pipe_manager = pipeline.SamplePipelineTransportManager() for pipe in self.pipeline_manager.pipelines: - transporter.append(self._get_notifier(transport, pipe)) + pipe_manager.add_transporter( + (pipe.source.support_meter, + self._get_notifier(transport, pipe))) if cfg.CONF.notification.store_events: - event_transporter = [] + event_pipe_manager = pipeline.EventPipelineTransportManager() for pipe in self.event_pipeline_manager.pipelines: - event_transporter.append(self._get_notifier(transport, - pipe)) + event_pipe_manager.add_transporter( + (pipe.source.support_event, + self._get_notifier(transport, pipe))) self.ctxt = context.get_admin_context() self.group_id = self.NOTIFICATION_NAMESPACE @@ -131,13 +134,13 @@ class NotificationService(os_service.Service): # we must create a transport to ensure the option have # beeen registered by oslo.messaging messaging.get_notifier(transport, '') - transporter = self.pipeline_manager + pipe_manager = self.pipeline_manager if cfg.CONF.notification.store_events: - event_transporter = self.event_pipeline_manager + event_pipe_manager = self.event_pipeline_manager self.group_id = None self.listeners, self.pipeline_listeners = [], [] - self._configure_main_queue_listeners(transporter, event_transporter) + self._configure_main_queue_listeners(pipe_manager, event_pipe_manager) if cfg.CONF.notification.workload_partitioning: self.partition_coordinator.join_group(self.group_id) @@ -157,8 +160,9 @@ class NotificationService(os_service.Service): # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) - def _configure_main_queue_listeners(self, transporter, event_transporter): - notification_manager = self._get_notifications_manager(transporter) + def _configure_main_queue_listeners(self, pipe_manager, + event_pipe_manager): + notification_manager = self._get_notifications_manager(pipe_manager) if not list(notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) @@ -168,7 +172,7 @@ class NotificationService(os_service.Service): endpoints = [] if cfg.CONF.notification.store_events: endpoints.append( - event_endpoint.EventsNotificationEndpoint(event_transporter)) + event_endpoint.EventsNotificationEndpoint(event_pipe_manager)) targets = [] for ext in notification_manager: diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index c6200acb7..46ff1a25e 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -110,6 +110,57 @@ class EventPipelineEndpoint(PipelineEndpoint): p(events) +class _PipelineTransportManager(object): + def __init__(self): + self.transporters = [] + + def add_transporter(self, transporter): + self.transporters.append(transporter) + + def publisher(self, context): + serializer = self.serializer + transporters = self.transporters + filter_attr = self.filter_attr + event_type = self.event_type + + class PipelinePublishContext(object): + def __enter__(self): + def p(data): + serialized_data = serializer(data) + for d_filter, notifier in transporters: + if any(d_filter(d[filter_attr]) + for d in serialized_data): + notifier.sample(context.to_dict(), + event_type=event_type, + payload=serialized_data) + return p + + def __exit__(self, exc_type, exc_value, traceback): + pass + + return PipelinePublishContext() + + +class SamplePipelineTransportManager(_PipelineTransportManager): + filter_attr = 'counter_name' + event_type = 'ceilometer.pipeline' + + @staticmethod + def serializer(data): + return [publisher_utils.meter_message_from_counter( + sample, cfg.CONF.publisher.telemetry_secret) for sample in data] + + +class EventPipelineTransportManager(_PipelineTransportManager): + filter_attr = 'event_type' + event_type = 'pipeline.event' + + @staticmethod + def serializer(data): + return [publisher_utils.message_from_event( + data, cfg.CONF.publisher.telemetry_secret)] + + class PublishContext(object): def __init__(self, context, pipelines=None): diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index f4713f52a..071729b94 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -194,7 +194,7 @@ class BaseRealNotification(tests_base.BaseTestCase): ev_pipeline = yaml.dump({ 'sources': [{ 'name': 'test_event', - 'events': '*', + 'events': ['compute.instance.*'], 'sinks': ['test_sink'] }], 'sinks': [{ @@ -326,3 +326,34 @@ class TestRealNotificationHA(BaseRealNotification): self.srv._refresh_agent(None) self.assertEqual(2, len(self.srv.pipeline_listeners)) self.srv.stop() + + @mock.patch('oslo.messaging.Notifier.sample') + def test_broadcast_to_relevant_pipes_only(self, mock_notifier): + self.srv.start() + for endpoint in self.srv.listeners[0].dispatcher.endpoints: + if (hasattr(endpoint, 'filter_rule') and + not endpoint.filter_rule.match(None, None, 'nonmatching.end', + None, None)): + continue + endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', + 'nonmatching.end', + TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + self.assertFalse(mock_notifier.called) + for endpoint in self.srv.listeners[0].dispatcher.endpoints: + if (hasattr(endpoint, 'filter_rule') and + not endpoint.filter_rule.match(None, None, + 'compute.instance.create.end', + None, None)): + continue + endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', + 'compute.instance.create.end', + TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + self.assertTrue(mock_notifier.called) + self.assertEqual(3, mock_notifier.call_count) + self.assertEqual('pipeline.event', + mock_notifier.call_args_list[0][1]['event_type']) + self.assertEqual('ceilometer.pipeline', + mock_notifier.call_args_list[1][1]['event_type']) + self.assertEqual('ceilometer.pipeline', + mock_notifier.call_args_list[2][1]['event_type']) + self.srv.stop()