Initialize dispatcher manager in event endpoint

Dispatcher Manager is not a service, so moved it into the correct
namespace, this allows the event handler to initialize it correctly.

Closes-bug: #1314080
Change-Id: I02792eef2d49f1c6b15d144d97c8a4a280ca150f
This commit is contained in:
Mehdi Abaakouk 2014-04-29 10:54:43 +02:00
parent 1779020170
commit c929bf36fd
6 changed files with 66 additions and 38 deletions

View File

@ -21,12 +21,12 @@ import socket
import msgpack import msgpack
from oslo.config import cfg from oslo.config import cfg
from ceilometer import dispatcher
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common import units from ceilometer.openstack.common import units
from ceilometer import service
OPTS = [ OPTS = [
cfg.StrOpt('udp_address', cfg.StrOpt('udp_address',
@ -46,7 +46,7 @@ cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.rpc',
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class CollectorService(service.DispatchedService, os_service.Service): class CollectorService(os_service.Service):
"""Listener for the collector service.""" """Listener for the collector service."""
@staticmethod @staticmethod
@ -56,6 +56,8 @@ class CollectorService(service.DispatchedService, os_service.Service):
def start(self): def start(self):
"""Bind the UDP socket and handle incoming data.""" """Bind the UDP socket and handle incoming data."""
# ensure dispatcher is configured before starting other services
self.dispatcher_manager = dispatcher.load_dispatcher_manager()
super(CollectorService, self).start() super(CollectorService, self).start()
if cfg.CONF.collector.udp_address: if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp) self.tg.add_thread(self.start_udp)

View File

@ -19,6 +19,40 @@
import abc import abc
import six import six
from oslo.config import cfg
from stevedore import named
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
LOG = log.getLogger(__name__)
OPTS = [
cfg.MultiStrOpt('dispatcher',
deprecated_group="collector",
default=['database'],
help='Dispatcher to process data.'),
]
cfg.CONF.register_opts(OPTS)
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def load_dispatcher_manager():
LOG.debug(_('loading dispatchers from %s'),
DISPATCHER_NAMESPACE)
dispatcher_manager = named.NamedExtensionManager(
namespace=DISPATCHER_NAMESPACE,
names=cfg.CONF.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(dispatcher_manager):
LOG.warning(_('Failed to load any dispatchers for %s'),
DISPATCHER_NAMESPACE)
return dispatcher_manager
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Base(object): class Base(object):

View File

@ -22,17 +22,19 @@ from oslo.config import cfg
import oslo.messaging import oslo.messaging
from stevedore import extension from stevedore import extension
from ceilometer import dispatcher
from ceilometer.event import converter as event_converter from ceilometer.event import converter as event_converter
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common.gettextutils import _
from ceilometer import service
from ceilometer.storage import models from ceilometer.storage import models
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class EventsNotificationEndpoint(service.DispatchedService): class EventsNotificationEndpoint(object):
def __init__(self): def __init__(self):
super(EventsNotificationEndpoint, self).__init__()
self.dispatcher_manager = dispatcher.load_dispatcher_manager()
LOG.debug(_('Loading event definitions')) LOG.debug(_('Loading event definitions'))
self.event_converter = event_converter.setup_events( self.event_converter = event_converter.setup_events(
extension.ExtensionManager( extension.ExtensionManager(
@ -48,9 +50,9 @@ class EventsNotificationEndpoint(service.DispatchedService):
:param metadata: metadata about the notification :param metadata: metadata about the notification
""" """
#NOTE: the rpc layer currently rips out the notification # NOTE: the rpc layer currently rips out the notification
#delivery_info, which is critical to determining the # delivery_info, which is critical to determining the
#source of the notification. This will have to get added back later. # source of the notification. This will have to get added back later.
notification = messaging.convert_to_old_notification_format( notification = messaging.convert_to_old_notification_format(
'info', ctxt, publisher_id, event_type, payload, metadata) 'info', ctxt, publisher_id, event_type, payload, metadata)
self.process_notification(notification) self.process_notification(notification)
@ -61,8 +63,8 @@ class EventsNotificationEndpoint(service.DispatchedService):
if event is not None: if event is not None:
LOG.debug(_('Saving event "%s"'), event.event_type) LOG.debug(_('Saving event "%s"'), event.event_type)
problem_events = [] problem_events = []
for dispatcher in self.dispatcher_manager: for dispatcher_ext in self.dispatcher_manager:
problem_events.extend(dispatcher.obj.record_events(event)) problem_events.extend(dispatcher_ext.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
if not cfg.CONF.notification.ack_on_event_error: if not cfg.CONF.notification.ack_on_event_error:
return oslo.messaging.NotificationResult.REQUEUE return oslo.messaging.NotificationResult.REQUEUE

View File

@ -22,7 +22,6 @@ import socket
import sys import sys
from oslo.config import cfg from oslo.config import cfg
from stevedore import named
from ceilometer import messaging from ceilometer import messaging
from ceilometer.openstack.common import gettextutils from ceilometer.openstack.common import gettextutils
@ -37,10 +36,6 @@ OPTS = [
help='Name of this node, which must be valid in an AMQP ' help='Name of this node, which must be valid in an AMQP '
'key. Can be an opaque identifier. For ZeroMQ only, must ' 'key. Can be an opaque identifier. For ZeroMQ only, must '
'be a valid host name, FQDN, or IP address.'), 'be a valid host name, FQDN, or IP address.'),
cfg.MultiStrOpt('dispatcher',
deprecated_group="collector",
default=['database'],
help='Dispatcher to process data.'),
cfg.IntOpt('collector_workers', cfg.IntOpt('collector_workers',
default=1, default=1,
help='Number of workers for collector service. A single ' help='Number of workers for collector service. A single '
@ -102,25 +97,6 @@ class WorkerException(Exception):
""" """
class DispatchedService(object):
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def start(self):
LOG.debug(_('loading dispatchers from %s'),
self.DISPATCHER_NAMESPACE)
self.dispatcher_manager = named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(self.dispatcher_manager):
LOG.warning(_('Failed to load any dispatchers for %s'),
self.DISPATCHER_NAMESPACE)
# ensure dispatcher is configured before starting other services
super(DispatchedService, self).start()
def get_workers(name): def get_workers(name):
workers = (cfg.CONF.get('%s_workers' % name) or workers = (cfg.CONF.get('%s_workers' % name) or
utils.cpu_count()) utils.cpu_count())

View File

@ -91,16 +91,15 @@ class TestEventEndpoint(tests_base.BaseTestCase):
self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override("store_events", True, group="notification") self.CONF.set_override("store_events", True, group="notification")
self.endpoint = event_endpoint.EventsNotificationEndpoint()
self.mock_dispatcher = mock.MagicMock() self.mock_dispatcher = mock.MagicMock()
self.endpoint.event_converter = mock.MagicMock() self.endpoint = event_endpoint.EventsNotificationEndpoint()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(
event_type='test.test')
self.endpoint.dispatcher_manager = \ self.endpoint.dispatcher_manager = \
extension.ExtensionManager.make_test_instance([ extension.ExtensionManager.make_test_instance([
extension.Extension('test', None, None, self.mock_dispatcher) extension.Extension('test', None, None, self.mock_dispatcher)
]) ])
self.endpoint.event_converter = mock.MagicMock()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(
event_type='test.test')
def test_message_to_event(self): def test_message_to_event(self):
self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',

View File

@ -143,3 +143,18 @@ class TestNotification(tests_base.BaseTestCase):
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
self.assertEqual(self.fake_event_endpoint, self.assertEqual(self.fake_event_endpoint,
self.srv.listeners[0].dispatcher.endpoints[0]) self.srv.listeners[0].dispatcher.endpoints[0])
@mock.patch('ceilometer.event.converter.get_config_file',
mock.MagicMock(return_value=None))
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
mock.MagicMock())
def test_event_dispatcher_loaded(self):
self.CONF.set_override("store_events", True, group="notification")
with mock.patch.object(self.srv, '_get_notifications_manager') \
as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
self.srv.start()
self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints))
event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0]
self.assertEqual(1, len(list(event_endpoint.dispatcher_manager)))