Storing events via dispatchers

Removes the database handle from CollectorService and
moves the logic for storing events to Dispatcher classes.

Change-Id: I22cf02685e3a33a9014d65d8d32a5b585dabe185
Fixes: bug #1204520
This commit is contained in:
Nejc Saje 2013-08-08 13:39:47 +02:00
parent 4e99c1392e
commit fd61f630e9
5 changed files with 63 additions and 34 deletions

View File

@ -29,3 +29,7 @@ class Base(object):
@abc.abstractmethod
def record_metering_data(self, context, data):
"""Recording metering data interface."""
@abc.abstractmethod
def record_events(self, events):
"""Recording events interface."""

View File

@ -70,3 +70,8 @@ class DatabaseDispatcher(dispatcher.Base):
LOG.warning(
'message signature invalid, discarding message: %r',
meter)
def record_events(self, events):
if not isinstance(events, list):
events = [events]
self.storage_conn.record_events(events)

View File

@ -79,3 +79,7 @@ class FileDispatcher(dispatcher.Base):
def record_metering_data(self, context, data):
if self.log:
self.log.info(data)
def record_events(self, events):
if self.log:
self.log.info(events)

View File

@ -21,6 +21,7 @@ from oslo.config import cfg
import socket
from stevedore import extension
from stevedore import named
import sys
from ceilometer.service import prepare_service
from ceilometer.openstack.common import context
@ -112,10 +113,6 @@ class CollectorService(rpc_service.Service):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def __init__(self, host, topic, manager=None):
super(CollectorService, self).__init__(host, topic, manager)
self.storage_conn = storage.get_connection(cfg.CONF)
def start(self):
super(CollectorService, self).start()
# Add a dummy thread to have wait() working
@ -143,17 +140,16 @@ class CollectorService(rpc_service.Service):
self.COLLECTOR_NAMESPACE)
self.notification_manager.map(self._setup_subscription)
# Load all configured dispatchers
self.dispatchers = []
for dispatcher in named.NamedExtensionManager(
LOG.debug('loading dispatchers from %s',
self.DISPATCHER_NAMESPACE)
self.dispatcher_manager = named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.collector.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF]):
if dispatcher.obj:
self.dispatchers.append(dispatcher.obj)
LOG.info('dispatchers loaded %s' % str(self.dispatchers))
invoke_args=[cfg.CONF])
if not list(self.dispatcher_manager):
LOG.warning('Failed to load any dispatchers for %s',
self.DISPATCHER_NAMESPACE)
# Set ourselves up as a separate worker for the metering data,
# since the default for service is to use create_consumer().
@ -184,8 +180,9 @@ class CollectorService(rpc_service.Service):
(topic, exchange_topic.exchange))
def record_metering_data(self, context, data):
for dispatcher in self.dispatchers:
dispatcher.record_metering_data(context, data)
self.dispatcher_manager.map(self._record_metering_data_for_ext,
context=context,
data=data)
def process_notification(self, notification):
"""Make a notification processed by an handler."""
@ -239,12 +236,22 @@ class CollectorService(rpc_service.Service):
traits = [trait for trait in all_traits if trait.value is not None]
event = models.Event(event_name, when, traits)
exc_info = None
for dispatcher in self.dispatcher_manager:
try:
self.storage_conn.record_events([event, ])
except Exception as err:
LOG.exception(_("Unable to store events: %s"), err)
# By re-raising we avoid ack()'ing the message.
raise
dispatcher.obj.record_events(event)
except Exception:
LOG.exception('Error while saving events with dispatcher %s',
dispatcher)
exc_info = sys.exc_info()
# Don't ack the message if any of the dispatchers fail
if exc_info:
raise exc_info[1], None, exc_info[2]
@staticmethod
def _record_metering_data_for_ext(ext, context, data):
ext.obj.record_metering_data(context, data)
def _process_notification_for_ext(self, ext, notification):
with self.pipeline_manager.publisher(context.get_admin_context()) as p:

View File

@ -179,11 +179,6 @@ class TestCollectorService(TestCollector):
self.srv = service.CollectorService('the-host', 'the-topic')
self.ctx = None
def test_service_has_storage_conn(self):
# Test an unmocked default CollectorService
srv = service.CollectorService('the-host', 'the-topic')
self.assertIsNotNone(srv.storage_conn)
@patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_init_host(self):
# If we try to create a real RPC connection, init_host() never
@ -231,27 +226,41 @@ class TestCollectorService(TestCollector):
timeutils.set_time_override(now)
message = {'event_type': "foo", 'message_id': "abc"}
self.srv.storage_conn = MagicMock()
mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
with patch('ceilometer.collector.service.LOG') as mylog:
self.srv._message_to_event(message)
self.assertFalse(mylog.exception.called)
events = self.srv.storage_conn.record_events.call_args[0]
events = mock_dispatcher.record_events.call_args[0]
self.assertEqual(1, len(events))
event = events[0][0]
event = events[0]
self.assertEqual("foo", event.event_name)
self.assertEqual(now, event.generated)
self.assertEqual(1, len(event.traits))
def test_message_to_event_bad_save(self):
cfg.CONF.set_override("store_events", True, group="collector")
self.srv.storage_conn = MagicMock()
self.srv.storage_conn.record_events.side_effect = MyException("Boom")
mock_dispatcher = MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.side_effect = MyException("Boom")
message = {'event_type': "foo", 'message_id': "abc"}
try:
self.srv._message_to_event(message)
self.fail("failing save should raise")
except MyException:
except Exception:
pass
def test_extract_when(self):