From fd61f630e9db7589b01a89c7cb4f7d6e5e62ee3a Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Thu, 8 Aug 2013 13:39:47 +0200 Subject: [PATCH] Storing events via dispatchers Removes the database handle from CollectorService and moves the logic for storing events to Dispatcher classes. Change-Id: I22cf02685e3a33a9014d65d8d32a5b585dabe185 Fixes: bug #1204520 --- ceilometer/collector/dispatcher/__init__.py | 4 ++ ceilometer/collector/dispatcher/database.py | 5 ++ ceilometer/collector/dispatcher/file.py | 4 ++ ceilometer/collector/service.py | 53 ++++++++++++--------- tests/collector/test_service.py | 31 +++++++----- 5 files changed, 63 insertions(+), 34 deletions(-) diff --git a/ceilometer/collector/dispatcher/__init__.py b/ceilometer/collector/dispatcher/__init__.py index 94003b697..f9011053e 100644 --- a/ceilometer/collector/dispatcher/__init__.py +++ b/ceilometer/collector/dispatcher/__init__.py @@ -29,3 +29,7 @@ class Base(object): @abc.abstractmethod def record_metering_data(self, context, data): """Recording metering data interface.""" + + @abc.abstractmethod + def record_events(self, events): + """Recording events interface.""" diff --git a/ceilometer/collector/dispatcher/database.py b/ceilometer/collector/dispatcher/database.py index 732a26e79..767393ea6 100644 --- a/ceilometer/collector/dispatcher/database.py +++ b/ceilometer/collector/dispatcher/database.py @@ -70,3 +70,8 @@ class DatabaseDispatcher(dispatcher.Base): LOG.warning( 'message signature invalid, discarding message: %r', meter) + + def record_events(self, events): + if not isinstance(events, list): + events = [events] + self.storage_conn.record_events(events) diff --git a/ceilometer/collector/dispatcher/file.py b/ceilometer/collector/dispatcher/file.py index 1c818df4b..295f88e5e 100644 --- a/ceilometer/collector/dispatcher/file.py +++ b/ceilometer/collector/dispatcher/file.py @@ -79,3 +79,7 @@ class FileDispatcher(dispatcher.Base): def record_metering_data(self, context, data): if self.log: self.log.info(data) + + def record_events(self, events): + if self.log: + self.log.info(events) diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 9fd3cdacd..f4a49f40b 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -21,6 +21,7 @@ from oslo.config import cfg import socket from stevedore import extension from stevedore import named +import sys from ceilometer.service import prepare_service from ceilometer.openstack.common import context @@ -112,10 +113,6 @@ class CollectorService(rpc_service.Service): COLLECTOR_NAMESPACE = 'ceilometer.collector' DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' - def __init__(self, host, topic, manager=None): - super(CollectorService, self).__init__(host, topic, manager) - self.storage_conn = storage.get_connection(cfg.CONF) - def start(self): super(CollectorService, self).start() # Add a dummy thread to have wait() working @@ -143,17 +140,16 @@ class CollectorService(rpc_service.Service): self.COLLECTOR_NAMESPACE) self.notification_manager.map(self._setup_subscription) - # Load all configured dispatchers - self.dispatchers = [] - for dispatcher in named.NamedExtensionManager( - namespace=self.DISPATCHER_NAMESPACE, - names=cfg.CONF.collector.dispatcher, - invoke_on_load=True, - invoke_args=[cfg.CONF]): - if dispatcher.obj: - self.dispatchers.append(dispatcher.obj) - - LOG.info('dispatchers loaded %s' % str(self.dispatchers)) + LOG.debug('loading dispatchers from %s', + self.DISPATCHER_NAMESPACE) + self.dispatcher_manager = named.NamedExtensionManager( + namespace=self.DISPATCHER_NAMESPACE, + names=cfg.CONF.collector.dispatcher, + invoke_on_load=True, + invoke_args=[cfg.CONF]) + if not list(self.dispatcher_manager): + LOG.warning('Failed to load any dispatchers for %s', + self.DISPATCHER_NAMESPACE) # Set ourselves up as a separate worker for the metering data, # since the default for service is to use create_consumer(). @@ -184,8 +180,9 @@ class CollectorService(rpc_service.Service): (topic, exchange_topic.exchange)) def record_metering_data(self, context, data): - for dispatcher in self.dispatchers: - dispatcher.record_metering_data(context, data) + self.dispatcher_manager.map(self._record_metering_data_for_ext, + context=context, + data=data) def process_notification(self, notification): """Make a notification processed by an handler.""" @@ -239,12 +236,22 @@ class CollectorService(rpc_service.Service): traits = [trait for trait in all_traits if trait.value is not None] event = models.Event(event_name, when, traits) - try: - self.storage_conn.record_events([event, ]) - except Exception as err: - LOG.exception(_("Unable to store events: %s"), err) - # By re-raising we avoid ack()'ing the message. - raise + + exc_info = None + for dispatcher in self.dispatcher_manager: + try: + dispatcher.obj.record_events(event) + except Exception: + LOG.exception('Error while saving events with dispatcher %s', + dispatcher) + exc_info = sys.exc_info() + # Don't ack the message if any of the dispatchers fail + if exc_info: + raise exc_info[1], None, exc_info[2] + + @staticmethod + def _record_metering_data_for_ext(ext, context, data): + ext.obj.record_metering_data(context, data) def _process_notification_for_ext(self, ext, notification): with self.pipeline_manager.publisher(context.get_admin_context()) as p: diff --git a/tests/collector/test_service.py b/tests/collector/test_service.py index 64ec75f25..949d9b921 100644 --- a/tests/collector/test_service.py +++ b/tests/collector/test_service.py @@ -179,11 +179,6 @@ class TestCollectorService(TestCollector): self.srv = service.CollectorService('the-host', 'the-topic') self.ctx = None - def test_service_has_storage_conn(self): - # Test an unmocked default CollectorService - srv = service.CollectorService('the-host', 'the-topic') - self.assertIsNotNone(srv.storage_conn) - @patch('ceilometer.pipeline.setup_pipeline', MagicMock()) def test_init_host(self): # If we try to create a real RPC connection, init_host() never @@ -231,27 +226,41 @@ class TestCollectorService(TestCollector): timeutils.set_time_override(now) message = {'event_type': "foo", 'message_id': "abc"} - self.srv.storage_conn = MagicMock() + mock_dispatcher = MagicMock() + self.srv.dispatcher_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + mock_dispatcher + ), + ]) with patch('ceilometer.collector.service.LOG') as mylog: self.srv._message_to_event(message) self.assertFalse(mylog.exception.called) - events = self.srv.storage_conn.record_events.call_args[0] + events = mock_dispatcher.record_events.call_args[0] self.assertEqual(1, len(events)) - event = events[0][0] + event = events[0] self.assertEqual("foo", event.event_name) self.assertEqual(now, event.generated) self.assertEqual(1, len(event.traits)) def test_message_to_event_bad_save(self): cfg.CONF.set_override("store_events", True, group="collector") - self.srv.storage_conn = MagicMock() - self.srv.storage_conn.record_events.side_effect = MyException("Boom") + mock_dispatcher = MagicMock() + self.srv.dispatcher_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + mock_dispatcher + ), + ]) + mock_dispatcher.record_events.side_effect = MyException("Boom") message = {'event_type': "foo", 'message_id': "abc"} try: self.srv._message_to_event(message) self.fail("failing save should raise") - except MyException: + except Exception: pass def test_extract_when(self):