From 76c6d465d86b9424db2df378ff55caa13aec6983 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Wed, 13 Nov 2013 17:08:47 +0100 Subject: [PATCH] Split collector The collector grew from doing one thing to doing too many. This encumber scalability and he's really a bad design as it is. Let's fix that. This patch take out the notification handling code to a new daemon called ceilometer-agent-notification. This daemon will be in charge of receiving notifications, storing them if needed, and building samples in from of it. Change-Id: I093f3b7855bd6ffff2018db9cd485ed2fc0f37a5 Blueprint: split-collector --- MAINTAINERS | 4 +- ceilometer/collector/service.py | 174 +----------------- ceilometer/notification.py | 195 ++++++++++++++++++++ ceilometer/notifier.py | 2 +- ceilometer/service.py | 31 +++- ceilometer/tests/collector/test_service.py | 167 ----------------- ceilometer/tests/test_notification.py | 201 +++++++++++++++++++++ doc/source/architecture.rst | 49 ++--- doc/source/install/manual.rst | 99 ++++++++-- etc/ceilometer/ceilometer.conf.sample | 38 +++- setup.cfg | 3 +- 11 files changed, 571 insertions(+), 392 deletions(-) create mode 100644 ceilometer/notification.py create mode 100644 ceilometer/tests/test_notification.py diff --git a/MAINTAINERS b/MAINTAINERS index 062bbff36..4b948071f 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -36,10 +36,12 @@ F: api/ == events == +M: Julien Danjou (jd__) M: Sandy Walsh (sandywalsh) M: Monsyne Dragon (dragondm) S: Maintained -F: collector/, storage/ +F: notification.py +F: storage/ == pipeline == diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 2d92f6ce1..ebf67fedf 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -20,20 +20,13 @@ import socket import msgpack from oslo.config import cfg -from stevedore import extension -from stevedore import named -from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common.rpc import service as rpc_service from ceilometer.openstack.common import service as os_service -from ceilometer.openstack.common import timeutils -from ceilometer import pipeline from ceilometer import service -from ceilometer.storage import models -from ceilometer import transformer OPTS = [ cfg.StrOpt('udp_address', @@ -43,15 +36,6 @@ OPTS = [ cfg.IntOpt('udp_port', default=4952, help='port to bind the UDP socket to'), - cfg.BoolOpt('ack_on_event_error', - default=True, - help='Acknowledge message when event persistence fails'), - cfg.BoolOpt('store_events', - default=False, - help='Save event details'), - cfg.MultiStrOpt('dispatcher', - default=['database'], - help='dispatcher to process metering data'), ] cfg.CONF.register_opts(OPTS, group="collector") @@ -59,25 +43,7 @@ cfg.CONF.register_opts(OPTS, group="collector") LOG = log.getLogger(__name__) -class CollectorBase(object): - - DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' - - def __init__(self, *args, **kwargs): - super(CollectorBase, self).__init__(*args, **kwargs) - LOG.debug('loading dispatchers from %s', - self.DISPATCHER_NAMESPACE) - self.dispatcher_manager = named.NamedExtensionManager( - namespace=self.DISPATCHER_NAMESPACE, - names=cfg.CONF.collector.dispatcher, - invoke_on_load=True, - invoke_args=[cfg.CONF]) - if not list(self.dispatcher_manager): - LOG.warning('Failed to load any dispatchers for %s', - self.DISPATCHER_NAMESPACE) - - -class UDPCollectorService(CollectorBase, os_service.Service): +class UDPCollectorService(service.DispatchedService, os_service.Service): """UDP listener for the collector service.""" def start(self): @@ -121,18 +87,7 @@ def udp_collector(): os_service.launch(UDPCollectorService()).wait() -class UnableToSaveEventException(Exception): - """Thrown when we want to requeue an event. - - Any exception is fine, but this one should make debugging - a little easier. - """ - pass - - -class CollectorService(CollectorBase, rpc_service.Service): - - COLLECTOR_NAMESPACE = 'ceilometer.collector' +class CollectorService(service.DispatchedService, rpc_service.Service): def start(self): super(CollectorService, self).start() @@ -141,26 +96,6 @@ class CollectorService(CollectorBase, rpc_service.Service): def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' - LOG.debug('initialize_service_hooks') - self.pipeline_manager = pipeline.setup_pipeline( - transformer.TransformerExtensionManager( - 'ceilometer.transformer', - ), - ) - - LOG.debug('loading notification handlers from %s', - self.COLLECTOR_NAMESPACE) - self.notification_manager = \ - extension.ExtensionManager( - namespace=self.COLLECTOR_NAMESPACE, - invoke_on_load=True, - ) - - if not list(self.notification_manager): - LOG.warning('Failed to load any notification handlers for %s', - self.COLLECTOR_NAMESPACE) - self.notification_manager.map(self._setup_subscription) - # Set ourselves up as a separate worker for the metering data, # since the default for service is to use create_consumer(). self.conn.create_worker( @@ -169,38 +104,8 @@ class CollectorService(CollectorBase, rpc_service.Service): 'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic, ) - def _setup_subscription(self, ext, *args, **kwds): - """Connect to message bus to get notifications - - Configure the RPC connection to listen for messages on the - right exchanges and topics so we receive all of the - notifications. - - Use a connection pool so that multiple collector instances can - run in parallel to share load and without competing with each - other for incoming messages. - """ - handler = ext.obj - ack_on_error = cfg.CONF.collector.ack_on_event_error - LOG.debug('Event types from %s: %s (ack_on_error=%s)', - ext.name, ', '.join(handler.event_types), - ack_on_error) - - for exchange_topic in handler.get_exchange_topics(cfg.CONF): - for topic in exchange_topic.topics: - try: - self.conn.join_consumer_pool( - callback=self.process_notification, - pool_name=topic, - topic=topic, - exchange_name=exchange_topic.exchange, - ack_on_error=ack_on_error) - except Exception: - LOG.exception('Could not join consumer pool %s/%s' % - (topic, exchange_topic.exchange)) - def record_metering_data(self, context, data): - """RPC endpoint for messages we send to ourself + """RPC endpoint for messages we send to ourselves. When the notification messages are re-published through the RPC publisher, this method receives them for processing. @@ -209,67 +114,6 @@ class CollectorService(CollectorBase, rpc_service.Service): context=context, data=data) - def process_notification(self, notification): - """RPC endpoint for notification messages - - When another service sends a notification over the message - bus, this method receives it. See _setup_subscription(). - - """ - LOG.debug('notification %r', notification.get('event_type')) - self.notification_manager.map(self._process_notification_for_ext, - notification=notification) - - if cfg.CONF.collector.store_events: - self._message_to_event(notification) - - @staticmethod - def _extract_when(body): - """Extract the generated datetime from the notification. - """ - when = body.get('timestamp', body.get('_context_timestamp')) - if when: - return timeutils.normalize_time(timeutils.parse_isotime(when)) - - return timeutils.utcnow() - - def _message_to_event(self, body): - """Convert message to Ceilometer Event. - - NOTE: this is currently based on the Nova notification format. - We will need to make this driver-based to support other formats. - - NOTE: the rpc layer currently rips out the notification - delivery_info, which is critical to determining the - source of the notification. This will have to get added back later. - """ - message_id = body.get('message_id') - event_type = body['event_type'] - when = self._extract_when(body) - LOG.debug('Saving event "%s"', event_type) - - publisher = body.get('publisher_id') - request_id = body.get('_context_request_id') - tenant_id = body.get('_context_tenant') - - text = models.Trait.TEXT_TYPE - all_traits = [models.Trait('service', text, publisher), - models.Trait('request_id', text, request_id), - models.Trait('tenant_id', text, tenant_id), - ] - # Only store non-None value traits ... - traits = [trait for trait in all_traits if trait.value is not None] - - event = models.Event(message_id, event_type, when, traits) - - problem_events = [] - for dispatcher in self.dispatcher_manager: - problem_events.extend(dispatcher.obj.record_events(event)) - if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: - # Don't ack the message, raise to requeue it - # if ack_on_error = False - raise UnableToSaveEventException() - @staticmethod def _record_metering_data_for_ext(ext, context, data): """Wrapper for calling dispatcher plugin when a sample arrives @@ -280,18 +124,6 @@ class CollectorService(CollectorBase, rpc_service.Service): """ ext.obj.record_metering_data(context, data) - def _process_notification_for_ext(self, ext, notification): - """Wrapper for calling pipelines when a notification arrives - - When a message is received by process_notification(), it calls - this method with each notification plugin to allow all the - plugins process the notification. - - """ - with self.pipeline_manager.publisher(context.get_admin_context()) as p: - # FIXME(dhellmann): Spawn green thread? - p(list(ext.obj.to_samples(notification))) - def collector(): service.prepare_service() diff --git a/ceilometer/notification.py b/ceilometer/notification.py new file mode 100644 index 000000000..69034d061 --- /dev/null +++ b/ceilometer/notification.py @@ -0,0 +1,195 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012-2013 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg +from stevedore import extension + +from ceilometer.openstack.common import context +from ceilometer.openstack.common import log +from ceilometer.openstack.common.rpc import service as rpc_service +from ceilometer.openstack.common import service as os_service +from ceilometer.openstack.common import timeutils +from ceilometer import pipeline +from ceilometer import service +from ceilometer.storage import models +from ceilometer import transformer + + +LOG = log.getLogger(__name__) + + +OPTS = [ + cfg.BoolOpt('ack_on_event_error', + default=True, + deprecated_group='collector', + help='Acknowledge message when event persistence fails'), + cfg.BoolOpt('store_events', + deprecated_group='collector', + default=False, + help='Save event details'), +] + +cfg.CONF.register_opts(OPTS, group="notification") + + +class UnableToSaveEventException(Exception): + """Thrown when we want to requeue an event. + + Any exception is fine, but this one should make debugging + a little easier. + """ + + +class NotificationService(service.DispatchedService, rpc_service.Service): + + NOTIFICATION_NAMESPACE = 'ceilometer.notification' + + def start(self): + super(NotificationService, self).start() + # Add a dummy thread to have wait() working + self.tg.add_timer(604800, lambda: None) + + def initialize_service_hook(self, service): + '''Consumers must be declared before consume_thread start.''' + self.pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ), + ) + + self.notification_manager = \ + extension.ExtensionManager( + namespace=self.NOTIFICATION_NAMESPACE, + invoke_on_load=True, + ) + + if not list(self.notification_manager): + LOG.warning('Failed to load any notification handlers for %s', + self.NOTIFICATION_NAMESPACE) + self.notification_manager.map(self._setup_subscription) + + def _setup_subscription(self, ext, *args, **kwds): + """Connect to message bus to get notifications + + Configure the RPC connection to listen for messages on the + right exchanges and topics so we receive all of the + notifications. + + Use a connection pool so that multiple notification agent instances + can run in parallel to share load and without competing with each + other for incoming messages. + + """ + handler = ext.obj + ack_on_error = cfg.CONF.notification.ack_on_event_error + LOG.debug('Event types from %s: %s (ack_on_error=%s)', + ext.name, ', '.join(handler.event_types), + ack_on_error) + + for exchange_topic in handler.get_exchange_topics(cfg.CONF): + for topic in exchange_topic.topics: + try: + self.conn.join_consumer_pool( + callback=self.process_notification, + pool_name=topic, + topic=topic, + exchange_name=exchange_topic.exchange, + ack_on_error=ack_on_error) + except Exception: + LOG.exception('Could not join consumer pool %s/%s' % + (topic, exchange_topic.exchange)) + + def process_notification(self, notification): + """RPC endpoint for notification messages + + When another service sends a notification over the message + bus, this method receives it. See _setup_subscription(). + + """ + LOG.debug('notification %r', notification.get('event_type')) + self.notification_manager.map(self._process_notification_for_ext, + notification=notification) + + if cfg.CONF.notification.store_events: + self._message_to_event(notification) + + @staticmethod + def _extract_when(body): + """Extract the generated datetime from the notification. + """ + when = body.get('timestamp', body.get('_context_timestamp')) + if when: + return timeutils.normalize_time(timeutils.parse_isotime(when)) + + return timeutils.utcnow() + + def _message_to_event(self, body): + """Convert message to Ceilometer Event. + + NOTE: this is currently based on the Nova notification format. + We will need to make this driver-based to support other formats. + + NOTE: the rpc layer currently rips out the notification + delivery_info, which is critical to determining the + source of the notification. This will have to get added back later. + """ + message_id = body.get('message_id') + event_type = body['event_type'] + when = self._extract_when(body) + LOG.debug('Saving event "%s"', event_type) + + publisher = body.get('publisher_id') + request_id = body.get('_context_request_id') + tenant_id = body.get('_context_tenant') + + text = models.Trait.TEXT_TYPE + all_traits = [models.Trait('service', text, publisher), + models.Trait('request_id', text, request_id), + models.Trait('tenant_id', text, tenant_id), + ] + # Only store non-None value traits ... + traits = [trait for trait in all_traits if trait.value is not None] + + event = models.Event(message_id, event_type, when, traits) + + problem_events = [] + for dispatcher in self.dispatcher_manager: + problem_events.extend(dispatcher.obj.record_events(event)) + if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: + # Don't ack the message, raise to requeue it + # if ack_on_error = False + raise UnableToSaveEventException() + + def _process_notification_for_ext(self, ext, notification): + """Wrapper for calling pipelines when a notification arrives + + When a message is received by process_notification(), it calls + this method with each notification plugin to allow all the + plugins process the notification. + + """ + with self.pipeline_manager.publisher(context.get_admin_context()) as p: + # FIXME(dhellmann): Spawn green thread? + p(list(ext.obj.to_samples(notification))) + + +def agent(): + service.prepare_service() + os_service.launch(NotificationService( + cfg.CONF.host, + 'ceilometer.agent.notification')).wait() diff --git a/ceilometer/notifier.py b/ceilometer/notifier.py index be7ef6e8a..cf4bdc32c 100644 --- a/ceilometer/notifier.py +++ b/ceilometer/notifier.py @@ -34,7 +34,7 @@ _pipeline_manager = None def _load_notification_manager(): global _notification_manager - namespace = 'ceilometer.collector' + namespace = 'ceilometer.notification' LOG.debug('loading notification handlers from %s', namespace) diff --git a/ceilometer/service.py b/ceilometer/service.py index 9cb51a7ec..26995690b 100644 --- a/ceilometer/service.py +++ b/ceilometer/service.py @@ -23,13 +23,14 @@ import socket import sys from oslo.config import cfg +from stevedore import named from ceilometer.openstack.common import gettextutils from ceilometer.openstack.common import log from ceilometer.openstack.common import rpc -cfg.CONF.register_opts([ +OPTS = [ cfg.StrOpt('host', default=socket.gethostname(), help='Name of this node. This can be an opaque identifier. ' @@ -37,7 +38,12 @@ cfg.CONF.register_opts([ 'However, the node name must be valid within ' 'an AMQP key, and if using ZeroMQ, a valid ' 'hostname, FQDN, or IP address'), -]) + cfg.MultiStrOpt('dispatcher', + deprecated_group="collector", + default=['database'], + help='dispatcher to process data'), +] +cfg.CONF.register_opts(OPTS) CLI_OPTIONS = [ cfg.StrOpt('os-username', @@ -81,6 +87,27 @@ CLI_OPTIONS = [ cfg.CONF.register_cli_opts(CLI_OPTIONS, group="service_credentials") +LOG = log.getLogger(__name__) + + +class DispatchedService(object): + + DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' + + def __init__(self, *args, **kwargs): + super(DispatchedService, self).__init__(*args, **kwargs) + LOG.debug('loading dispatchers from %s', + self.DISPATCHER_NAMESPACE) + self.dispatcher_manager = named.NamedExtensionManager( + namespace=self.DISPATCHER_NAMESPACE, + names=cfg.CONF.dispatcher, + invoke_on_load=True, + invoke_args=[cfg.CONF]) + if not list(self.dispatcher_manager): + LOG.warning('Failed to load any dispatchers for %s', + self.DISPATCHER_NAMESPACE) + + def prepare_service(argv=None): eventlet.monkey_patch() gettextutils.install('ceilometer', lazy=False) diff --git a/ceilometer/tests/collector/test_service.py b/ceilometer/tests/collector/test_service.py index a3ec3c8bf..191e0f37c 100644 --- a/ceilometer/tests/collector/test_service.py +++ b/ceilometer/tests/collector/test_service.py @@ -18,7 +18,6 @@ """Tests for ceilometer/agent/service.py """ -import datetime import socket import mock @@ -28,64 +27,11 @@ from stevedore import extension from stevedore.tests import manager as test_manager from ceilometer.collector import service -from ceilometer.compute import notifications from ceilometer.openstack.common.fixture import config -from ceilometer.openstack.common import timeutils from ceilometer import sample -from ceilometer.storage import models from ceilometer.tests import base as tests_base -TEST_NOTICE = { - u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', - u'_context_is_admin': True, - u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e', - u'_context_quota_class': None, - u'_context_read_deleted': u'no', - u'_context_remote_address': u'10.0.2.15', - u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', - u'_context_roles': [u'admin'], - u'_context_timestamp': u'2012-05-08T20:23:41.425105', - u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'event_type': u'compute.instance.create.end', - u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', - u'payload': {u'created_at': u'2012-05-08 20:23:41', - u'deleted_at': u'', - u'disk_gb': 0, - u'display_name': u'testme', - u'fixed_ips': [{u'address': u'10.0.0.2', - u'floating_ips': [], - u'meta': {}, - u'type': u'fixed', - u'version': 4}], - u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', - u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', - u'instance_type': u'm1.tiny', - u'instance_type_id': 2, - u'launched_at': u'2012-05-08 20:23:47.985999', - u'memory_mb': 512, - u'state': u'active', - u'state_description': u'', - u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', - u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3', - u'vcpus': 1, - u'root_gb': 0, - u'ephemeral_gb': 0, - u'host': u'compute-host-name', - u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4', - u'os_type': u'linux?', - u'architecture': u'x86', - u'image_ref': u'UUID', - u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5', - u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6', - }, - u'priority': u'INFO', - u'publisher_id': u'compute.vagrant-precise', - u'timestamp': u'2012-05-08 20:23:48.028195', -} - - class TestCollector(tests_base.BaseTestCase): def setUp(self): super(TestCollector, self).setUp() @@ -194,7 +140,6 @@ class TestCollectorService(TestCollector): def setUp(self): super(TestCollectorService, self).setUp() self.srv = service.CollectorService('the-host', 'the-topic') - self.ctx = None @patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_init_host(self): @@ -203,115 +148,3 @@ class TestCollectorService(TestCollector): # configuration. with patch('ceilometer.openstack.common.rpc.create_connection'): self.srv.start() - - @patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) - def test_process_notification(self): - # If we try to create a real RPC connection, init_host() never - # returns. Mock it out so we can establish the service - # configuration. - self.CONF.set_override("store_events", False, group="collector") - with patch('ceilometer.openstack.common.rpc.create_connection'): - self.srv.start() - self.srv.pipeline_manager.pipelines[0] = mock.MagicMock() - self.srv.notification_manager = test_manager.TestExtensionManager( - [extension.Extension('test', - None, - None, - notifications.Instance(), - ), - ]) - self.srv.process_notification(TEST_NOTICE) - self.assertTrue( - self.srv.pipeline_manager.publisher.called) - - def test_process_notification_no_events(self): - self.CONF.set_override("store_events", False, group="collector") - self.srv.notification_manager = mock.MagicMock() - with patch.object(self.srv, '_message_to_event') as fake_msg_to_event: - self.srv.process_notification({}) - self.assertFalse(fake_msg_to_event.called) - - def test_process_notification_with_events(self): - self.CONF.set_override("store_events", True, group="collector") - self.srv.notification_manager = mock.MagicMock() - with patch.object(self.srv, '_message_to_event') as fake_msg_to_event: - self.srv.process_notification({}) - self.assertTrue(fake_msg_to_event.called) - - def test_message_to_event_missing_keys(self): - now = timeutils.utcnow() - timeutils.set_time_override(now) - message = {'event_type': "foo", - 'message_id': "abc", - 'publisher_id': "1"} - - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = test_manager.TestExtensionManager( - [extension.Extension('test', - None, - None, - mock_dispatcher - ), - ]) - - with patch('ceilometer.collector.service.LOG') as mylog: - self.srv._message_to_event(message) - self.assertFalse(mylog.exception.called) - events = mock_dispatcher.record_events.call_args[0] - self.assertEqual(1, len(events)) - event = events[0] - self.assertEqual("foo", event.event_type) - self.assertEqual(now, event.generated) - self.assertEqual(1, len(event.traits)) - - def test_message_to_event_duplicate(self): - self.CONF.set_override("store_events", True, group="collector") - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = test_manager.TestExtensionManager( - [extension.Extension('test', - None, - None, - mock_dispatcher - ), - ]) - mock_dispatcher.record_events.return_value = [ - (models.Event.DUPLICATE, object())] - message = {'event_type': "foo", 'message_id': "abc"} - self.srv._message_to_event(message) # Should return silently. - - def test_message_to_event_bad_event(self): - self.CONF.set_override("store_events", True, group="collector") - mock_dispatcher = mock.MagicMock() - self.srv.dispatcher_manager = test_manager.TestExtensionManager( - [extension.Extension('test', - None, - None, - mock_dispatcher - ), - ]) - mock_dispatcher.record_events.return_value = [ - (models.Event.UNKNOWN_PROBLEM, object())] - message = {'event_type': "foo", 'message_id': "abc"} - self.assertRaises(service.UnableToSaveEventException, - self.srv._message_to_event, message) - - def test_extract_when(self): - now = timeutils.utcnow() - modified = now + datetime.timedelta(minutes=1) - timeutils.set_time_override(now) - - body = {"timestamp": str(modified)} - when = service.CollectorService._extract_when(body) - self.assertTimestampEqual(modified, when) - - body = {"_context_timestamp": str(modified)} - when = service.CollectorService._extract_when(body) - self.assertTimestampEqual(modified, when) - - then = now + datetime.timedelta(hours=1) - body = {"timestamp": str(modified), "_context_timestamp": str(then)} - when = service.CollectorService._extract_when(body) - self.assertTimestampEqual(modified, when) - - when = service.CollectorService._extract_when({}) - self.assertTimestampEqual(now, when) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py new file mode 100644 index 000000000..81da5759e --- /dev/null +++ b/ceilometer/tests/test_notification.py @@ -0,0 +1,201 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 New Dream Network, LLC (DreamHost) +# +# Author: Doug Hellmann +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for Ceilometer notify daemon.""" + +import datetime +import mock + +from stevedore import extension +from stevedore.tests import manager as test_manager + +from ceilometer.compute import notifications +from ceilometer import notification +from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common import timeutils +from ceilometer.storage import models +from ceilometer.tests import base as tests_base + +TEST_NOTICE = { + u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', + u'_context_is_admin': True, + u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e', + u'_context_quota_class': None, + u'_context_read_deleted': u'no', + u'_context_remote_address': u'10.0.2.15', + u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', + u'_context_roles': [u'admin'], + u'_context_timestamp': u'2012-05-08T20:23:41.425105', + u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2', + u'event_type': u'compute.instance.create.end', + u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', + u'payload': {u'created_at': u'2012-05-08 20:23:41', + u'deleted_at': u'', + u'disk_gb': 0, + u'display_name': u'testme', + u'fixed_ips': [{u'address': u'10.0.0.2', + u'floating_ips': [], + u'meta': {}, + u'type': u'fixed', + u'version': 4}], + u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', + u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', + u'instance_type': u'm1.tiny', + u'instance_type_id': 2, + u'launched_at': u'2012-05-08 20:23:47.985999', + u'memory_mb': 512, + u'state': u'active', + u'state_description': u'', + u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', + u'user_id': u'1e3ce043029547f1a61c1996d1a531a2', + u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3', + u'vcpus': 1, + u'root_gb': 0, + u'ephemeral_gb': 0, + u'host': u'compute-host-name', + u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4', + u'os_type': u'linux?', + u'architecture': u'x86', + u'image_ref': u'UUID', + u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5', + u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6', + }, + u'priority': u'INFO', + u'publisher_id': u'compute.vagrant-precise', + u'timestamp': u'2012-05-08 20:23:48.028195', +} + + +class TestNotification(tests_base.BaseTestCase): + + def setUp(self): + super(TestNotification, self).setUp() + self.srv = notification.NotificationService('the-host', 'the-topic') + self.CONF = self.useFixture(config.Config()).conf + self.CONF.set_override("connection", "log://", group='database') + + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + def test_process_notification(self): + # If we try to create a real RPC connection, init_host() never + # returns. Mock it out so we can establish the service + # configuration. + self.CONF.set_override("store_events", False, group="notification") + with mock.patch('ceilometer.openstack.common.rpc.create_connection'): + self.srv.start() + self.srv.pipeline_manager.pipelines[0] = mock.MagicMock() + self.srv.notification_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + notifications.Instance(), + ), + ]) + self.srv.process_notification(TEST_NOTICE) + self.assertTrue( + self.srv.pipeline_manager.publisher.called) + + def test_process_notification_no_events(self): + self.CONF.set_override("store_events", False, group="notification") + self.srv.notification_manager = mock.MagicMock() + with mock.patch.object(self.srv, + '_message_to_event') as fake_msg_to_event: + self.srv.process_notification({}) + self.assertFalse(fake_msg_to_event.called) + + def test_process_notification_with_events(self): + self.CONF.set_override("store_events", True, group="notification") + self.srv.notification_manager = mock.MagicMock() + with mock.patch.object(self.srv, + '_message_to_event') as fake_msg_to_event: + self.srv.process_notification({}) + self.assertTrue(fake_msg_to_event.called) + + def test_message_to_event_missing_keys(self): + now = timeutils.utcnow() + timeutils.set_time_override(now) + message = {'event_type': "foo", + 'message_id': "abc", + 'publisher_id': "1"} + + mock_dispatcher = mock.MagicMock() + self.srv.dispatcher_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + mock_dispatcher + ), + ]) + + self.srv._message_to_event(message) + events = mock_dispatcher.record_events.call_args[0] + self.assertEqual(1, len(events)) + event = events[0] + self.assertEqual("foo", event.event_type) + self.assertEqual(now, event.generated) + self.assertEqual(1, len(event.traits)) + + def test_message_to_event_duplicate(self): + self.CONF.set_override("store_events", True, group="notification") + mock_dispatcher = mock.MagicMock() + self.srv.dispatcher_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + mock_dispatcher + ), + ]) + mock_dispatcher.record_events.return_value = [ + (models.Event.DUPLICATE, object())] + message = {'event_type': "foo", 'message_id': "abc"} + self.srv._message_to_event(message) # Should return silently. + + def test_message_to_event_bad_event(self): + self.CONF.set_override("store_events", True, group="notification") + mock_dispatcher = mock.MagicMock() + self.srv.dispatcher_manager = test_manager.TestExtensionManager( + [extension.Extension('test', + None, + None, + mock_dispatcher + ), + ]) + mock_dispatcher.record_events.return_value = [ + (models.Event.UNKNOWN_PROBLEM, object())] + message = {'event_type': "foo", 'message_id': "abc"} + self.assertRaises(notification.UnableToSaveEventException, + self.srv._message_to_event, message) + + def test_extract_when(self): + now = timeutils.utcnow() + modified = now + datetime.timedelta(minutes=1) + timeutils.set_time_override(now) + + body = {"timestamp": str(modified)} + when = notification.NotificationService._extract_when(body) + self.assertTimestampEqual(modified, when) + + body = {"_context_timestamp": str(modified)} + when = notification.NotificationService._extract_when(body) + self.assertTimestampEqual(modified, when) + + then = now + datetime.timedelta(hours=1) + body = {"timestamp": str(modified), "_context_timestamp": str(then)} + when = notification.NotificationService._extract_when(body) + self.assertTimestampEqual(modified, when) + + when = notification.NotificationService._extract_when({}) + self.assertTimestampEqual(now, when) diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst index 40d3c4de4..499ba0ef5 100644 --- a/doc/source/architecture.rst +++ b/doc/source/architecture.rst @@ -370,34 +370,35 @@ Handling Notifications .. index:: double: notifications; architecture -The heart of the system is the collector, which monitors the message -bus for data being provided by the pollsters via the agent as well as -notification messages from other OpenStack components such as nova, -glance, neutron, and swift. +The heart of the system are the notification daemon (agent-notification) and +the collector, which monitor the message bus for data being provided by the +pollsters via the agent as well as notification messages from other +OpenStack components such as nova, glance, neutron, and swift. -The collector loads one or more *listener* plugins, using the namespace -``ceilometer.collector``. Each plugin can listen to any topics, but by -default it will listen to ``notifications.info``. +The notification daemon loads one or more *listener* plugins, using the +namespace ``ceilometer.notification``. Each plugin can listen to any topics, +but by default it will listen to ``notifications.info``. -The plugin provides a method to list the event types it wants and a -callback for processing incoming messages. The registered name of the -callback is used to enable or disable it using the global -configuration option of the collector daemon. The incoming messages -are filtered based on their event type value before being passed to -the callback so the plugin only receives events it has expressed an -interest in seeing. For example, a callback asking for +The plugin provides a method to list the event types it wants and a callback +for processing incoming messages. The registered name of the callback is +used to enable or disable it using the pipeline of the notification daemon. +The incoming messages are filtered based on their event type value before +being passed to the callback so the plugin only receives events it has +expressed an interest in seeing. For example, a callback asking for ``compute.instance.create.end`` events under -``ceilometer.collector.compute`` would be invoked for those -notification events on the ``nova`` exchange using the -``notifications.info`` topic. +``ceilometer.collector.compute`` would be invoked for those notification +events on the ``nova`` exchange using the ``notifications.info`` topic. -The listener plugin returns an iterable with zero or more Counter -instances based on the data in the incoming message. The collector -framework code converts the Counter instances to metering messages and -publishes them on the metering message bus. Although ceilomter -includes a default storage solution to work with the API service, by -republishing on the metering message bus we can support installations -that want to handle their own data storage. +The listener plugin returns an iterable with zero or more Sample instances +based on the data in the incoming message. The collector framework code +converts the Sample instances to metering messages and publishes them on the +metering message bus. Although Ceilometer includes a default storage +solution to work with the API service, by republishing on the metering +message bus we can support installations that want to handle their own data +storage. + +The Ceilometer collector daemon then receives this Sample on the bus and +stores them into a database. Handling Metering Messages -------------------------- diff --git a/doc/source/install/manual.rst b/doc/source/install/manual.rst index 8530ec319..4f1fe0bd9 100644 --- a/doc/source/install/manual.rst +++ b/doc/source/install/manual.rst @@ -20,11 +20,10 @@ Installing Manually ===================== -Installing the Collector -======================== - +Installing the notification agent +====================================== .. index:: - double: installing; collector + double: installing; agent-notification 1. If you want to be able to retrieve image samples, you need to instruct Glance to send notifications to the bus by changing ``notifier_strategy`` @@ -71,23 +70,18 @@ Installing the Collector use = egg:ceilometer#swift metadata_headers = X-FOO, X-BAR -4. Install MongoDB. - - Follow the instructions to install the MongoDB_ package for your - operating system, then start the service. - -5. Clone the ceilometer git repository to the management server:: +4. Clone the ceilometer git repository to the management server:: $ cd /opt/stack $ git clone https://git.openstack.org/openstack/ceilometer.git -6. As a user with ``root`` permissions or ``sudo`` privileges, run the +5. As a user with ``root`` permissions or ``sudo`` privileges, run the ceilometer installer:: $ cd ceilometer $ sudo python setup.py install -7. Copy the sample configuration files from the source tree +6. Copy the sample configuration files from the source tree to their final location. :: @@ -97,7 +91,7 @@ Installing the Collector $ cp etc/ceilometer/*.yaml /etc/ceilometer $ cp etc/ceilometer/ceilometer.conf.sample /etc/ceilometer/ceilometer.conf -8. Edit ``/etc/ceilometer/ceilometer.conf`` +7. Edit ``/etc/ceilometer/ceilometer.conf`` 1. Configure RPC @@ -126,13 +120,88 @@ Installing the Collector Refer to :doc:`/configuration` for details about any other options you might want to modify before starting the service. -9. Start the collector. +8. Start the notification daemon. + + :: + + $ ceilometer-agent-notification + + .. note:: + + The default development configuration of the collector logs to + stderr, so you may want to run this step using a screen session + or other tool for maintaining a long-running program in the + background. + + +Installing the collector +======================== + +.. index:: + double: installing; collector + +1. Install MongoDB. + + Follow the instructions to install the MongoDB_ package for your + operating system, then start the service. + +2. Clone the ceilometer git repository to the management server:: + + $ cd /opt/stack + $ git clone https://git.openstack.org/openstack/ceilometer.git + +3. As a user with ``root`` permissions or ``sudo`` privileges, run the + ceilometer installer:: + + $ cd ceilometer + $ sudo python setup.py install + +4. Copy the sample configuration files from the source tree + to their final location. + + :: + + $ mkdir -p /etc/ceilometer + $ cp etc/ceilometer/*.json /etc/ceilometer + $ cp etc/ceilometer/*.yaml /etc/ceilometer + $ cp etc/ceilometer/ceilometer.conf.sample /etc/ceilometer/ceilometer.conf + +5. Edit ``/etc/ceilometer/ceilometer.conf`` + + 1. Configure RPC + + Set the RPC-related options correctly so ceilometer's daemons + can communicate with each other and receive notifications from + the other projects. + + In particular, look for the ``*_control_exchange`` options and + make sure the names are correct. If you did not change the + ``control_exchange`` settings for the other components, the + defaults should be correct. + + .. note:: + + Ceilometer makes extensive use of the messaging bus, but has + not yet been tested with ZeroMQ. We recommend using Rabbit or + qpid for now. + + 2. Set the ``metering_secret`` value. + + Set the ``metering_secret`` value to a large, random, value. Use + the same value in all ceilometer configuration files, on all + nodes, so that messages passing between the nodes can be + validated. + + Refer to :doc:`/configuration` for details about any other options + you might want to modify before starting the service. + +6. Start the collector. :: $ ceilometer-collector - .. note:: + .. note:: The default development configuration of the collector logs to stderr, so you may want to run this step using a screen session diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 8977e95d5..159b76af4 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -27,6 +27,20 @@ #sample_source=openstack +# +# Options defined in ceilometer.service +# + +# Name of this node. This can be an opaque identifier. It is +# not necessarily a hostname, FQDN, or IP address. However, +# the node name must be valid within an AMQP key, and if using +# ZeroMQ, a valid hostname, FQDN, or IP address (string value) +#host=ceilometer + +# dispatcher to process data (multi valued) +#dispatcher=database + + # # Options defined in ceilometer.api.app # @@ -602,6 +616,20 @@ #time_to_live=-1 +[notification] + +# +# Options defined in ceilometer.notification +# + +# Acknowledge message when event persistence fails (boolean +# value) +#ack_on_event_error=true + +# Save event details (boolean value) +#store_events=false + + [alarm] # @@ -858,16 +886,6 @@ # port to bind the UDP socket to (integer value) #udp_port=4952 -# Acknowledge message when event persistence fails (boolean -# value) -#ack_on_event_error=true - -# Save event details (boolean value) -#store_events=false - -# dispatcher to process metering data (multi valued) -#dispatcher=database - [matchmaker_ring] diff --git a/setup.cfg b/setup.cfg index b9fadad29..9d4d18a8a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,7 +28,7 @@ packages = ceilometer [entry_points] -ceilometer.collector = +ceilometer.notification = instance = ceilometer.compute.notifications:Instance instance_flavor = ceilometer.compute.notifications:InstanceFlavor instance_delete = ceilometer.compute.notifications:InstanceDelete @@ -119,6 +119,7 @@ console_scripts = ceilometer-api = ceilometer.api.app:start ceilometer-agent-central = ceilometer.central.manager:agent_central ceilometer-agent-compute = ceilometer.compute.manager:agent_compute + ceilometer-agent-notification = ceilometer.notification:agent ceilometer-dbsync = ceilometer.storage:dbsync ceilometer-expirer = ceilometer.storage:expirer ceilometer-collector = ceilometer.collector.service:collector