broadcast data to relevant queues only

system resources are required to broadcast messages to queues. currently,
when requeuing data for pipeline coordination, we broadcast to all
queues. this patch adds a check to broadcast only to relevant
pipelines.

Change-Id: If4cee643537fe136fdcf66acc35e2671dbd2ac74
Closes-Bug: #1436489
This commit is contained in:
gordon chung 2015-03-25 18:45:34 -04:00
parent 756ea8a453
commit 1d3c452b37
5 changed files with 109 additions and 51 deletions

View File

@ -27,7 +27,6 @@ import six
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer.openstack.common import log
from ceilometer.publisher import utils
cfg.CONF.import_group('service_credentials', 'ceilometer.service')
@ -90,16 +89,13 @@ class PluginBase(object):
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, transporter):
def __init__(self, manager):
super(NotificationBase, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
self.requeue = False if hasattr(transporter, 'publisher') else True
self.manager = manager
@abc.abstractproperty
def event_types(self):
@ -159,19 +155,8 @@ class NotificationBase(PluginBase):
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
if self.requeue:
meters = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
for sample in self.process_notification(notification)
]
for notifier in self.transporter:
notifier.sample(context.to_dict(),
event_type='ceilometer.pipeline',
payload=meters)
else:
with self.transporter.publisher(context) as p:
p(list(self.process_notification(notification)))
with self.manager.publisher(context) as p:
p(list(self.process_notification(notification)))
class NonMetricNotificationBase(object):

View File

@ -23,23 +23,19 @@ from stevedore import extension
from ceilometer.event import converter as event_converter
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer.publisher import utils
LOG = logging.getLogger(__name__)
class EventsNotificationEndpoint(object):
def __init__(self, transporter):
def __init__(self, manager):
super(EventsNotificationEndpoint, self).__init__()
LOG.debug(_('Loading event definitions'))
self.ctxt = context.get_admin_context()
self.event_converter = event_converter.setup_events(
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
self.requeue = not hasattr(transporter, 'publisher')
self.manager = manager
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Convert message to Ceilometer Event.
@ -79,17 +75,8 @@ class EventsNotificationEndpoint(object):
try:
event = self.event_converter.to_event(notification)
if event is not None:
if self.requeue:
serialized_event = utils.message_from_event(
event, cfg.CONF.publisher.telemetry_secret)
for notifier in self.transporter:
notifier.sample(
self.ctxt.to_dict(),
event_type='pipeline.event',
payload=[serialized_event])
else:
with self.transporter.publisher(self.ctxt) as p:
p(event)
with self.manager.publisher(self.ctxt) as p:
p(event)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:
return oslo.messaging.NotificationResult.REQUEUE

View File

@ -86,11 +86,11 @@ class NotificationService(os_service.Service):
self.group_id = None
@classmethod
def _get_notifications_manager(cls, transporter):
def _get_notifications_manager(cls, pm):
return extension.ExtensionManager(
namespace=cls.NOTIFICATION_NAMESPACE,
invoke_on_load=True,
invoke_args=(transporter, )
invoke_args=(pm, )
)
def _get_notifier(self, transport, pipe):
@ -110,16 +110,19 @@ class NotificationService(os_service.Service):
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
event_transporter = None
event_pipe_manager = None
if cfg.CONF.notification.workload_partitioning:
transporter = []
pipe_manager = pipeline.SamplePipelineTransportManager()
for pipe in self.pipeline_manager.pipelines:
transporter.append(self._get_notifier(transport, pipe))
pipe_manager.add_transporter(
(pipe.source.support_meter,
self._get_notifier(transport, pipe)))
if cfg.CONF.notification.store_events:
event_transporter = []
event_pipe_manager = pipeline.EventPipelineTransportManager()
for pipe in self.event_pipeline_manager.pipelines:
event_transporter.append(self._get_notifier(transport,
pipe))
event_pipe_manager.add_transporter(
(pipe.source.support_event,
self._get_notifier(transport, pipe)))
self.ctxt = context.get_admin_context()
self.group_id = self.NOTIFICATION_NAMESPACE
@ -131,13 +134,13 @@ class NotificationService(os_service.Service):
# we must create a transport to ensure the option have
# beeen registered by oslo.messaging
messaging.get_notifier(transport, '')
transporter = self.pipeline_manager
pipe_manager = self.pipeline_manager
if cfg.CONF.notification.store_events:
event_transporter = self.event_pipeline_manager
event_pipe_manager = self.event_pipeline_manager
self.group_id = None
self.listeners, self.pipeline_listeners = [], []
self._configure_main_queue_listeners(transporter, event_transporter)
self._configure_main_queue_listeners(pipe_manager, event_pipe_manager)
if cfg.CONF.notification.workload_partitioning:
self.partition_coordinator.join_group(self.group_id)
@ -157,8 +160,9 @@ class NotificationService(os_service.Service):
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def _configure_main_queue_listeners(self, transporter, event_transporter):
notification_manager = self._get_notifications_manager(transporter)
def _configure_main_queue_listeners(self, pipe_manager,
event_pipe_manager):
notification_manager = self._get_notifications_manager(pipe_manager)
if not list(notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE)
@ -168,7 +172,7 @@ class NotificationService(os_service.Service):
endpoints = []
if cfg.CONF.notification.store_events:
endpoints.append(
event_endpoint.EventsNotificationEndpoint(event_transporter))
event_endpoint.EventsNotificationEndpoint(event_pipe_manager))
targets = []
for ext in notification_manager:

View File

@ -110,6 +110,57 @@ class EventPipelineEndpoint(PipelineEndpoint):
p(events)
class _PipelineTransportManager(object):
def __init__(self):
self.transporters = []
def add_transporter(self, transporter):
self.transporters.append(transporter)
def publisher(self, context):
serializer = self.serializer
transporters = self.transporters
filter_attr = self.filter_attr
event_type = self.event_type
class PipelinePublishContext(object):
def __enter__(self):
def p(data):
serialized_data = serializer(data)
for d_filter, notifier in transporters:
if any(d_filter(d[filter_attr])
for d in serialized_data):
notifier.sample(context.to_dict(),
event_type=event_type,
payload=serialized_data)
return p
def __exit__(self, exc_type, exc_value, traceback):
pass
return PipelinePublishContext()
class SamplePipelineTransportManager(_PipelineTransportManager):
filter_attr = 'counter_name'
event_type = 'ceilometer.pipeline'
@staticmethod
def serializer(data):
return [publisher_utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret) for sample in data]
class EventPipelineTransportManager(_PipelineTransportManager):
filter_attr = 'event_type'
event_type = 'pipeline.event'
@staticmethod
def serializer(data):
return [publisher_utils.message_from_event(
data, cfg.CONF.publisher.telemetry_secret)]
class PublishContext(object):
def __init__(self, context, pipelines=None):

View File

@ -194,7 +194,7 @@ class BaseRealNotification(tests_base.BaseTestCase):
ev_pipeline = yaml.dump({
'sources': [{
'name': 'test_event',
'events': '*',
'events': ['compute.instance.*'],
'sinks': ['test_sink']
}],
'sinks': [{
@ -326,3 +326,34 @@ class TestRealNotificationHA(BaseRealNotification):
self.srv._refresh_agent(None)
self.assertEqual(2, len(self.srv.pipeline_listeners))
self.srv.stop()
@mock.patch('oslo.messaging.Notifier.sample')
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
self.srv.start()
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
None, None)):
continue
endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'nonmatching.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
self.assertFalse(mock_notifier.called)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None,
'compute.instance.create.end',
None, None)):
continue
endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
'compute.instance.create.end',
TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
self.assertTrue(mock_notifier.called)
self.assertEqual(3, mock_notifier.call_count)
self.assertEqual('pipeline.event',
mock_notifier.call_args_list[0][1]['event_type'])
self.assertEqual('ceilometer.pipeline',
mock_notifier.call_args_list[1][1]['event_type'])
self.assertEqual('ceilometer.pipeline',
mock_notifier.call_args_list[2][1]['event_type'])
self.srv.stop()