implement notification coordination

this patch enables coordination of notification agents by splitting
pipelines across agents. a single-worker agent is provided that
skips the overhead of IPC. a multi-worker agent is added that uses
internal queue to redirect samples to correct pipeline.

Implements: blueprint notification-coordination
Change-Id: If8503ee70217025106a188f40a6e4ea8c49a89f4
This commit is contained in:
gordon chung 2014-11-17 13:54:23 -05:00
parent d68f8afcc9
commit 8d3d84053f
5 changed files with 199 additions and 27 deletions

View File

@ -30,6 +30,7 @@ from ceilometer.i18n import _
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.publisher import utils
cfg.CONF.import_group('service_credentials', 'ceilometer.service') cfg.CONF.import_group('service_credentials', 'ceilometer.service')
@ -92,9 +93,12 @@ class PluginBase(object):
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase): class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API.""" """Base class for plugins that support the notification API."""
def __init__(self, pipeline_manager): def __init__(self, transporter):
super(NotificationBase, self).__init__() super(NotificationBase, self).__init__()
self.pipeline_manager = pipeline_manager self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
self.requeue = False if hasattr(transporter, 'publisher') else True
@abc.abstractproperty @abc.abstractproperty
def event_types(self): def event_types(self):
@ -170,8 +174,19 @@ class NotificationBase(PluginBase):
self.event_types): self.event_types):
return return
with self.pipeline_manager.publisher(context) as p: if self.requeue:
p(list(self.process_notification(notification))) meters = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.metering_secret)
for sample in self.process_notification(notification)
]
for notifier in self.transporter:
notifier.sample(context.to_dict(),
event_type='ceilometer.pipeline',
payload=meters)
else:
with self.transporter.publisher(context) as p:
p(list(self.process_notification(notification)))
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)

View File

@ -37,7 +37,12 @@ OPTS = [
cfg.FloatOpt('heartbeat', cfg.FloatOpt('heartbeat',
default=1.0, default=1.0,
help='Number of seconds between heartbeats for distributed ' help='Number of seconds between heartbeats for distributed '
'coordination.') 'coordination.'),
cfg.FloatOpt('check_watchers',
default=10.0,
help='Number of seconds between checks to see if group '
'membership has changed')
] ]
cfg.CONF.register_opts(OPTS, group='coordination') cfg.CONF.register_opts(OPTS, group='coordination')
@ -89,6 +94,15 @@ class PartitionCoordinator(object):
LOG.exception(_LE('Error sending a heartbeat to coordination ' LOG.exception(_LE('Error sending a heartbeat to coordination '
'backend.')) 'backend.'))
def watch_group(self, namespace, callback):
if self._coordinator:
self._coordinator.watch_join_group(namespace, callback)
self._coordinator.watch_leave_group(namespace, callback)
def run_watchers(self):
if self._coordinator:
self._coordinator.run_watchers()
def join_group(self, group_id): def join_group(self, group_id):
if not self._coordinator or not self._started or not group_id: if not self._coordinator or not self._started or not group_id:
return return
@ -108,6 +122,11 @@ class PartitionCoordinator(object):
pass pass
self._groups.add(group_id) self._groups.add(group_id)
def leave_group(self, group_id):
if self._coordinator:
self._coordinator.leave_group(group_id)
LOG.info(_LI('Left partitioning group %s'), group_id)
def _get_members(self, group_id): def _get_members(self, group_id):
if not self._coordinator: if not self._coordinator:
return [self._my_id] return [self._my_id]

View File

@ -16,11 +16,14 @@
# under the License. # under the License.
from oslo.config import cfg from oslo.config import cfg
import oslo.messaging
from stevedore import extension from stevedore import extension
from ceilometer import coordination
from ceilometer.event import endpoint as event_endpoint from ceilometer.event import endpoint as event_endpoint
from ceilometer.i18n import _ from ceilometer.i18n import _
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline from ceilometer import pipeline
@ -38,6 +41,10 @@ OPTS = [
deprecated_group='collector', deprecated_group='collector',
default=False, default=False,
help='Save event details.'), help='Save event details.'),
cfg.BoolOpt('workload_partitioning',
default=False,
help='Enable workload partitioning, allowing multiple '
'notification agents to be run simultaneously.'),
cfg.MultiStrOpt('messaging_urls', cfg.MultiStrOpt('messaging_urls',
default=[], default=[],
help="Messaging URLs to listen for notifications. " help="Messaging URLs to listen for notifications. "
@ -50,32 +57,76 @@ cfg.CONF.register_opts(OPTS, group="notification")
class NotificationService(os_service.Service): class NotificationService(os_service.Service):
"""Notification service.
When running multiple agents, additional queuing sequence is required for
inter process communication. Each agent has two listeners: one to listen
to the main OpenStack queue and another listener(and notifier) for IPC to
divide pipeline sink endpoints. Coordination should be enabled to have
proper active/active HA.
"""
NOTIFICATION_NAMESPACE = 'ceilometer.notification' NOTIFICATION_NAMESPACE = 'ceilometer.notification'
NOTIFICATION_IPC = 'ceilometer-pipe'
@classmethod @classmethod
def _get_notifications_manager(cls, pm): def _get_notifications_manager(cls, transporter):
return extension.ExtensionManager( return extension.ExtensionManager(
namespace=cls.NOTIFICATION_NAMESPACE, namespace=cls.NOTIFICATION_NAMESPACE,
invoke_on_load=True, invoke_on_load=True,
invoke_args=(pm, ) invoke_args=(transporter, )
) )
def start(self): def start(self):
super(NotificationService, self).start() super(NotificationService, self).start()
# FIXME(sileht): endpoint use notification_topics option
# and it should not because this is oslo.messaging option
# not a ceilometer, until we have a something to get
# the notification_topics in an other way
# we must create a transport to ensure the option have
# beeen registered by oslo.messaging
transport = messaging.get_transport()
messaging.get_notifier(transport, '')
self.pipeline_manager = pipeline.setup_pipeline() self.pipeline_manager = pipeline.setup_pipeline()
transport = messaging.get_transport()
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
if cfg.CONF.notification.workload_partitioning:
transporter = []
for pipe in self.pipeline_manager.pipelines:
transporter.append(oslo.messaging.Notifier(
transport,
driver=cfg.CONF.publisher_notifier.metering_driver,
publisher_id='ceilometer.notification',
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)))
self.ctxt = context.get_admin_context()
self.group_id = self.NOTIFICATION_NAMESPACE
else:
# FIXME(sileht): endpoint use notification_topics option
# and it should not because this is oslo.messaging option
# not a ceilometer, until we have a something to get
# the notification_topics in an other way
# we must create a transport to ensure the option have
# beeen registered by oslo.messaging
messaging.get_notifier(transport, '')
transporter = self.pipeline_manager
self.group_id = None
self.listeners = self.pipeline_listeners = []
self._configure_main_queue_listeners(transporter)
if cfg.CONF.notification.workload_partitioning:
self.partition_coordinator.join_group(self.group_id)
self._configure_pipeline_listeners()
self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent)
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def _configure_main_queue_listeners(self, transporter):
self.notification_manager = self._get_notifications_manager( self.notification_manager = self._get_notifications_manager(
self.pipeline_manager) transporter)
if not list(self.notification_manager): if not list(self.notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'), LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE) self.NOTIFICATION_NAMESPACE)
@ -103,7 +154,6 @@ class NotificationService(os_service.Service):
endpoints.append(handler) endpoints.append(handler)
urls = cfg.CONF.notification.messaging_urls or [None] urls = cfg.CONF.notification.messaging_urls or [None]
self.listeners = []
for url in urls: for url in urls:
transport = messaging.get_transport(url) transport = messaging.get_transport(url)
listener = messaging.get_notification_listener( listener = messaging.get_notification_listener(
@ -111,9 +161,35 @@ class NotificationService(os_service.Service):
listener.start() listener.start()
self.listeners.append(listener) self.listeners.append(listener)
# Add a dummy thread to have wait() working @staticmethod
self.tg.add_timer(604800, lambda: None) def _kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()
def _refresh_agent(self, event):
self._kill_listeners(self.pipeline_listeners)
self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self):
if cfg.CONF.notification.workload_partitioning:
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id, self.pipeline_manager.pipelines)
transport = messaging.get_transport()
for pipe in partitioned:
LOG.debug(_('Pipeline endpoint: %s'), pipe.name)
listener = messaging.get_notification_listener(
transport,
[oslo.messaging.Target(
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))],
[pipeline.PipelineEndpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self): def stop(self):
map(lambda x: x.stop(), self.listeners) self.partition_coordinator.leave_group(self.group_id)
self._kill_listeners(self.listeners + self.pipeline_listeners)
super(NotificationService, self).stop() super(NotificationService, self).stop()

View File

@ -26,6 +26,7 @@ import yaml
from ceilometer.i18n import _ from ceilometer.i18n import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer import publisher from ceilometer import publisher
from ceilometer import sample as sample_util
from ceilometer import transformer as xformer from ceilometer import transformer as xformer
@ -50,6 +51,30 @@ class PipelineException(Exception):
return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
class PipelineEndpoint(object):
def __init__(self, context, pipeline):
self.publish_context = PublishContext(context, [pipeline])
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for pipeline messages."""
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
unit=s['counter_unit'],
volume=s['counter_volume'],
user_id=s['user_id'],
project_id=s['project_id'],
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'))
for s in payload
]
with self.publish_context as p:
p(samples)
class PublishContext(object): class PublishContext(object):
def __init__(self, context, pipelines=None): def __init__(self, context, pipelines=None):

View File

@ -182,9 +182,9 @@ class TestNotification(tests_base.BaseTestCase):
self.assertEqual(1, len(self.srv.listeners[0].dispatcher.targets)) self.assertEqual(1, len(self.srv.listeners[0].dispatcher.targets))
class TestRealNotification(tests_base.BaseTestCase): class BaseRealNotification(tests_base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestRealNotification, self).setUp() super(BaseRealNotification, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF = self.useFixture(fixture_config.Config()).conf
self.setup_messaging(self.CONF, 'nova') self.setup_messaging(self.CONF, 'nova')
@ -202,12 +202,9 @@ class TestRealNotification(tests_base.BaseTestCase):
prefix="pipeline", prefix="pipeline",
suffix="yaml") suffix="yaml")
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.srv = notification.NotificationService()
self.publisher = test_publisher.TestPublisher("") self.publisher = test_publisher.TestPublisher("")
@mock.patch('ceilometer.publisher.test.TestPublisher') def _check_notification_service(self):
def test_notification_service(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv.start() self.srv.start()
notifier = messaging.get_notifier(self.transport, notifier = messaging.get_notifier(self.transport,
@ -225,3 +222,43 @@ class TestRealNotification(tests_base.BaseTestCase):
resources = list(set(s.resource_id for s in self.publisher.samples)) resources = list(set(s.resource_id for s in self.publisher.samples))
self.assertEqual(self.expected_samples, len(self.publisher.samples)) self.assertEqual(self.expected_samples, len(self.publisher.samples))
self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources) self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources)
class TestRealNotification(BaseRealNotification):
def setUp(self):
super(TestRealNotification, self).setUp()
self.srv = notification.NotificationService()
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch('ceilometer.coordination.PartitionCoordinator')
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_ha_configured_agent_coord_disabled(self, fake_publisher_cls,
fake_coord):
fake_publisher_cls.return_value = self.publisher
fake_coord1 = mock.MagicMock()
fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1
self._check_notification_service()
class TestRealNotificationHA(BaseRealNotification):
def setUp(self):
super(TestRealNotificationHA, self).setUp()
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.srv = notification.NotificationService()
@mock.patch('ceilometer.coordination.PartitionCoordinator')
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls, fake_coord):
fake_publisher_cls.return_value = self.publisher
fake_coord1 = mock.MagicMock()
fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1
self._check_notification_service()