Split collector

The collector grew from doing one thing to doing too many. This encumber
scalability and he's really a bad design as it is. Let's fix that.

This patch take out the notification handling code to a new daemon
called ceilometer-agent-notification. This daemon will be in charge of
receiving notifications, storing them if needed, and building samples in
from of it.

Change-Id: I093f3b7855bd6ffff2018db9cd485ed2fc0f37a5
Blueprint: split-collector
This commit is contained in:
Julien Danjou 2013-11-13 17:08:47 +01:00
parent ef5351f806
commit 76c6d465d8
11 changed files with 571 additions and 392 deletions

View File

@ -36,10 +36,12 @@ F: api/
== events == == events ==
M: Julien Danjou (jd__)
M: Sandy Walsh (sandywalsh) M: Sandy Walsh (sandywalsh)
M: Monsyne Dragon (dragondm) M: Monsyne Dragon (dragondm)
S: Maintained S: Maintained
F: collector/, storage/ F: notification.py
F: storage/
== pipeline == == pipeline ==

View File

@ -20,20 +20,13 @@ import socket
import msgpack import msgpack
from oslo.config import cfg from oslo.config import cfg
from stevedore import extension
from stevedore import named
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
from ceilometer.openstack.common.rpc import service as rpc_service from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import service from ceilometer import service
from ceilometer.storage import models
from ceilometer import transformer
OPTS = [ OPTS = [
cfg.StrOpt('udp_address', cfg.StrOpt('udp_address',
@ -43,15 +36,6 @@ OPTS = [
cfg.IntOpt('udp_port', cfg.IntOpt('udp_port',
default=4952, default=4952,
help='port to bind the UDP socket to'), help='port to bind the UDP socket to'),
cfg.BoolOpt('ack_on_event_error',
default=True,
help='Acknowledge message when event persistence fails'),
cfg.BoolOpt('store_events',
default=False,
help='Save event details'),
cfg.MultiStrOpt('dispatcher',
default=['database'],
help='dispatcher to process metering data'),
] ]
cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.register_opts(OPTS, group="collector")
@ -59,25 +43,7 @@ cfg.CONF.register_opts(OPTS, group="collector")
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class CollectorBase(object): class UDPCollectorService(service.DispatchedService, os_service.Service):
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def __init__(self, *args, **kwargs):
super(CollectorBase, self).__init__(*args, **kwargs)
LOG.debug('loading dispatchers from %s',
self.DISPATCHER_NAMESPACE)
self.dispatcher_manager = named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.collector.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(self.dispatcher_manager):
LOG.warning('Failed to load any dispatchers for %s',
self.DISPATCHER_NAMESPACE)
class UDPCollectorService(CollectorBase, os_service.Service):
"""UDP listener for the collector service.""" """UDP listener for the collector service."""
def start(self): def start(self):
@ -121,18 +87,7 @@ def udp_collector():
os_service.launch(UDPCollectorService()).wait() os_service.launch(UDPCollectorService()).wait()
class UnableToSaveEventException(Exception): class CollectorService(service.DispatchedService, rpc_service.Service):
"""Thrown when we want to requeue an event.
Any exception is fine, but this one should make debugging
a little easier.
"""
pass
class CollectorService(CollectorBase, rpc_service.Service):
COLLECTOR_NAMESPACE = 'ceilometer.collector'
def start(self): def start(self):
super(CollectorService, self).start() super(CollectorService, self).start()
@ -141,26 +96,6 @@ class CollectorService(CollectorBase, rpc_service.Service):
def initialize_service_hook(self, service): def initialize_service_hook(self, service):
'''Consumers must be declared before consume_thread start.''' '''Consumers must be declared before consume_thread start.'''
LOG.debug('initialize_service_hooks')
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
LOG.debug('loading notification handlers from %s',
self.COLLECTOR_NAMESPACE)
self.notification_manager = \
extension.ExtensionManager(
namespace=self.COLLECTOR_NAMESPACE,
invoke_on_load=True,
)
if not list(self.notification_manager):
LOG.warning('Failed to load any notification handlers for %s',
self.COLLECTOR_NAMESPACE)
self.notification_manager.map(self._setup_subscription)
# Set ourselves up as a separate worker for the metering data, # Set ourselves up as a separate worker for the metering data,
# since the default for service is to use create_consumer(). # since the default for service is to use create_consumer().
self.conn.create_worker( self.conn.create_worker(
@ -169,38 +104,8 @@ class CollectorService(CollectorBase, rpc_service.Service):
'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic, 'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic,
) )
def _setup_subscription(self, ext, *args, **kwds):
"""Connect to message bus to get notifications
Configure the RPC connection to listen for messages on the
right exchanges and topics so we receive all of the
notifications.
Use a connection pool so that multiple collector instances can
run in parallel to share load and without competing with each
other for incoming messages.
"""
handler = ext.obj
ack_on_error = cfg.CONF.collector.ack_on_event_error
LOG.debug('Event types from %s: %s (ack_on_error=%s)',
ext.name, ', '.join(handler.event_types),
ack_on_error)
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics:
try:
self.conn.join_consumer_pool(
callback=self.process_notification,
pool_name=topic,
topic=topic,
exchange_name=exchange_topic.exchange,
ack_on_error=ack_on_error)
except Exception:
LOG.exception('Could not join consumer pool %s/%s' %
(topic, exchange_topic.exchange))
def record_metering_data(self, context, data): def record_metering_data(self, context, data):
"""RPC endpoint for messages we send to ourself """RPC endpoint for messages we send to ourselves.
When the notification messages are re-published through the When the notification messages are re-published through the
RPC publisher, this method receives them for processing. RPC publisher, this method receives them for processing.
@ -209,67 +114,6 @@ class CollectorService(CollectorBase, rpc_service.Service):
context=context, context=context,
data=data) data=data)
def process_notification(self, notification):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it. See _setup_subscription().
"""
LOG.debug('notification %r', notification.get('event_type'))
self.notification_manager.map(self._process_notification_for_ext,
notification=notification)
if cfg.CONF.collector.store_events:
self._message_to_event(notification)
@staticmethod
def _extract_when(body):
"""Extract the generated datetime from the notification.
"""
when = body.get('timestamp', body.get('_context_timestamp'))
if when:
return timeutils.normalize_time(timeutils.parse_isotime(when))
return timeutils.utcnow()
def _message_to_event(self, body):
"""Convert message to Ceilometer Event.
NOTE: this is currently based on the Nova notification format.
We will need to make this driver-based to support other formats.
NOTE: the rpc layer currently rips out the notification
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
"""
message_id = body.get('message_id')
event_type = body['event_type']
when = self._extract_when(body)
LOG.debug('Saving event "%s"', event_type)
publisher = body.get('publisher_id')
request_id = body.get('_context_request_id')
tenant_id = body.get('_context_tenant')
text = models.Trait.TEXT_TYPE
all_traits = [models.Trait('service', text, publisher),
models.Trait('request_id', text, request_id),
models.Trait('tenant_id', text, tenant_id),
]
# Only store non-None value traits ...
traits = [trait for trait in all_traits if trait.value is not None]
event = models.Event(message_id, event_type, when, traits)
problem_events = []
for dispatcher in self.dispatcher_manager:
problem_events.extend(dispatcher.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
# Don't ack the message, raise to requeue it
# if ack_on_error = False
raise UnableToSaveEventException()
@staticmethod @staticmethod
def _record_metering_data_for_ext(ext, context, data): def _record_metering_data_for_ext(ext, context, data):
"""Wrapper for calling dispatcher plugin when a sample arrives """Wrapper for calling dispatcher plugin when a sample arrives
@ -280,18 +124,6 @@ class CollectorService(CollectorBase, rpc_service.Service):
""" """
ext.obj.record_metering_data(context, data) ext.obj.record_metering_data(context, data)
def _process_notification_for_ext(self, ext, notification):
"""Wrapper for calling pipelines when a notification arrives
When a message is received by process_notification(), it calls
this method with each notification plugin to allow all the
plugins process the notification.
"""
with self.pipeline_manager.publisher(context.get_admin_context()) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(ext.obj.to_samples(notification)))
def collector(): def collector():
service.prepare_service() service.prepare_service()

195
ceilometer/notification.py Normal file
View File

@ -0,0 +1,195 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012-2013 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from stevedore import extension
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log
from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import service
from ceilometer.storage import models
from ceilometer import transformer
LOG = log.getLogger(__name__)
OPTS = [
cfg.BoolOpt('ack_on_event_error',
default=True,
deprecated_group='collector',
help='Acknowledge message when event persistence fails'),
cfg.BoolOpt('store_events',
deprecated_group='collector',
default=False,
help='Save event details'),
]
cfg.CONF.register_opts(OPTS, group="notification")
class UnableToSaveEventException(Exception):
"""Thrown when we want to requeue an event.
Any exception is fine, but this one should make debugging
a little easier.
"""
class NotificationService(service.DispatchedService, rpc_service.Service):
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
def start(self):
super(NotificationService, self).start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def initialize_service_hook(self, service):
'''Consumers must be declared before consume_thread start.'''
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
self.notification_manager = \
extension.ExtensionManager(
namespace=self.NOTIFICATION_NAMESPACE,
invoke_on_load=True,
)
if not list(self.notification_manager):
LOG.warning('Failed to load any notification handlers for %s',
self.NOTIFICATION_NAMESPACE)
self.notification_manager.map(self._setup_subscription)
def _setup_subscription(self, ext, *args, **kwds):
"""Connect to message bus to get notifications
Configure the RPC connection to listen for messages on the
right exchanges and topics so we receive all of the
notifications.
Use a connection pool so that multiple notification agent instances
can run in parallel to share load and without competing with each
other for incoming messages.
"""
handler = ext.obj
ack_on_error = cfg.CONF.notification.ack_on_event_error
LOG.debug('Event types from %s: %s (ack_on_error=%s)',
ext.name, ', '.join(handler.event_types),
ack_on_error)
for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics:
try:
self.conn.join_consumer_pool(
callback=self.process_notification,
pool_name=topic,
topic=topic,
exchange_name=exchange_topic.exchange,
ack_on_error=ack_on_error)
except Exception:
LOG.exception('Could not join consumer pool %s/%s' %
(topic, exchange_topic.exchange))
def process_notification(self, notification):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it. See _setup_subscription().
"""
LOG.debug('notification %r', notification.get('event_type'))
self.notification_manager.map(self._process_notification_for_ext,
notification=notification)
if cfg.CONF.notification.store_events:
self._message_to_event(notification)
@staticmethod
def _extract_when(body):
"""Extract the generated datetime from the notification.
"""
when = body.get('timestamp', body.get('_context_timestamp'))
if when:
return timeutils.normalize_time(timeutils.parse_isotime(when))
return timeutils.utcnow()
def _message_to_event(self, body):
"""Convert message to Ceilometer Event.
NOTE: this is currently based on the Nova notification format.
We will need to make this driver-based to support other formats.
NOTE: the rpc layer currently rips out the notification
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
"""
message_id = body.get('message_id')
event_type = body['event_type']
when = self._extract_when(body)
LOG.debug('Saving event "%s"', event_type)
publisher = body.get('publisher_id')
request_id = body.get('_context_request_id')
tenant_id = body.get('_context_tenant')
text = models.Trait.TEXT_TYPE
all_traits = [models.Trait('service', text, publisher),
models.Trait('request_id', text, request_id),
models.Trait('tenant_id', text, tenant_id),
]
# Only store non-None value traits ...
traits = [trait for trait in all_traits if trait.value is not None]
event = models.Event(message_id, event_type, when, traits)
problem_events = []
for dispatcher in self.dispatcher_manager:
problem_events.extend(dispatcher.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
# Don't ack the message, raise to requeue it
# if ack_on_error = False
raise UnableToSaveEventException()
def _process_notification_for_ext(self, ext, notification):
"""Wrapper for calling pipelines when a notification arrives
When a message is received by process_notification(), it calls
this method with each notification plugin to allow all the
plugins process the notification.
"""
with self.pipeline_manager.publisher(context.get_admin_context()) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(ext.obj.to_samples(notification)))
def agent():
service.prepare_service()
os_service.launch(NotificationService(
cfg.CONF.host,
'ceilometer.agent.notification')).wait()

View File

@ -34,7 +34,7 @@ _pipeline_manager = None
def _load_notification_manager(): def _load_notification_manager():
global _notification_manager global _notification_manager
namespace = 'ceilometer.collector' namespace = 'ceilometer.notification'
LOG.debug('loading notification handlers from %s', namespace) LOG.debug('loading notification handlers from %s', namespace)

View File

@ -23,13 +23,14 @@ import socket
import sys import sys
from oslo.config import cfg from oslo.config import cfg
from stevedore import named
from ceilometer.openstack.common import gettextutils from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import rpc from ceilometer.openstack.common import rpc
cfg.CONF.register_opts([ OPTS = [
cfg.StrOpt('host', cfg.StrOpt('host',
default=socket.gethostname(), default=socket.gethostname(),
help='Name of this node. This can be an opaque identifier. ' help='Name of this node. This can be an opaque identifier. '
@ -37,7 +38,12 @@ cfg.CONF.register_opts([
'However, the node name must be valid within ' 'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid ' 'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address'), 'hostname, FQDN, or IP address'),
]) cfg.MultiStrOpt('dispatcher',
deprecated_group="collector",
default=['database'],
help='dispatcher to process data'),
]
cfg.CONF.register_opts(OPTS)
CLI_OPTIONS = [ CLI_OPTIONS = [
cfg.StrOpt('os-username', cfg.StrOpt('os-username',
@ -81,6 +87,27 @@ CLI_OPTIONS = [
cfg.CONF.register_cli_opts(CLI_OPTIONS, group="service_credentials") cfg.CONF.register_cli_opts(CLI_OPTIONS, group="service_credentials")
LOG = log.getLogger(__name__)
class DispatchedService(object):
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
def __init__(self, *args, **kwargs):
super(DispatchedService, self).__init__(*args, **kwargs)
LOG.debug('loading dispatchers from %s',
self.DISPATCHER_NAMESPACE)
self.dispatcher_manager = named.NamedExtensionManager(
namespace=self.DISPATCHER_NAMESPACE,
names=cfg.CONF.dispatcher,
invoke_on_load=True,
invoke_args=[cfg.CONF])
if not list(self.dispatcher_manager):
LOG.warning('Failed to load any dispatchers for %s',
self.DISPATCHER_NAMESPACE)
def prepare_service(argv=None): def prepare_service(argv=None):
eventlet.monkey_patch() eventlet.monkey_patch()
gettextutils.install('ceilometer', lazy=False) gettextutils.install('ceilometer', lazy=False)

View File

@ -18,7 +18,6 @@
"""Tests for ceilometer/agent/service.py """Tests for ceilometer/agent/service.py
""" """
import datetime
import socket import socket
import mock import mock
@ -28,64 +27,11 @@ from stevedore import extension
from stevedore.tests import manager as test_manager from stevedore.tests import manager as test_manager
from ceilometer.collector import service from ceilometer.collector import service
from ceilometer.compute import notifications
from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import timeutils
from ceilometer import sample from ceilometer import sample
from ceilometer.storage import models
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
TEST_NOTICE = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
class TestCollector(tests_base.BaseTestCase): class TestCollector(tests_base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestCollector, self).setUp() super(TestCollector, self).setUp()
@ -194,7 +140,6 @@ class TestCollectorService(TestCollector):
def setUp(self): def setUp(self):
super(TestCollectorService, self).setUp() super(TestCollectorService, self).setUp()
self.srv = service.CollectorService('the-host', 'the-topic') self.srv = service.CollectorService('the-host', 'the-topic')
self.ctx = None
@patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) @patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_init_host(self): def test_init_host(self):
@ -203,115 +148,3 @@ class TestCollectorService(TestCollector):
# configuration. # configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'): with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start() self.srv.start()
@patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the service
# configuration.
self.CONF.set_override("store_events", False, group="collector")
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.srv.notification_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
notifications.Instance(),
),
])
self.srv.process_notification(TEST_NOTICE)
self.assertTrue(
self.srv.pipeline_manager.publisher.called)
def test_process_notification_no_events(self):
self.CONF.set_override("store_events", False, group="collector")
self.srv.notification_manager = mock.MagicMock()
with patch.object(self.srv, '_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertFalse(fake_msg_to_event.called)
def test_process_notification_with_events(self):
self.CONF.set_override("store_events", True, group="collector")
self.srv.notification_manager = mock.MagicMock()
with patch.object(self.srv, '_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertTrue(fake_msg_to_event.called)
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo",
'message_id': "abc",
'publisher_id': "1"}
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
with patch('ceilometer.collector.service.LOG') as mylog:
self.srv._message_to_event(message)
self.assertFalse(mylog.exception.called)
events = mock_dispatcher.record_events.call_args[0]
self.assertEqual(1, len(events))
event = events[0]
self.assertEqual("foo", event.event_type)
self.assertEqual(now, event.generated)
self.assertEqual(1, len(event.traits))
def test_message_to_event_duplicate(self):
self.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.srv._message_to_event(message) # Should return silently.
def test_message_to_event_bad_event(self):
self.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.assertRaises(service.UnableToSaveEventException,
self.srv._message_to_event, message)
def test_extract_when(self):
now = timeutils.utcnow()
modified = now + datetime.timedelta(minutes=1)
timeutils.set_time_override(now)
body = {"timestamp": str(modified)}
when = service.CollectorService._extract_when(body)
self.assertTimestampEqual(modified, when)
body = {"_context_timestamp": str(modified)}
when = service.CollectorService._extract_when(body)
self.assertTimestampEqual(modified, when)
then = now + datetime.timedelta(hours=1)
body = {"timestamp": str(modified), "_context_timestamp": str(then)}
when = service.CollectorService._extract_when(body)
self.assertTimestampEqual(modified, when)
when = service.CollectorService._extract_when({})
self.assertTimestampEqual(now, when)

View File

@ -0,0 +1,201 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Ceilometer notify daemon."""
import datetime
import mock
from stevedore import extension
from stevedore.tests import manager as test_manager
from ceilometer.compute import notifications
from ceilometer import notification
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
TEST_NOTICE = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
u'vcpus': 1,
u'root_gb': 0,
u'ephemeral_gb': 0,
u'host': u'compute-host-name',
u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
u'os_type': u'linux?',
u'architecture': u'x86',
u'image_ref': u'UUID',
u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
class TestNotification(tests_base.BaseTestCase):
def setUp(self):
super(TestNotification, self).setUp()
self.srv = notification.NotificationService('the-host', 'the-topic')
self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override("connection", "log://", group='database')
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the service
# configuration.
self.CONF.set_override("store_events", False, group="notification")
with mock.patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.srv.notification_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
notifications.Instance(),
),
])
self.srv.process_notification(TEST_NOTICE)
self.assertTrue(
self.srv.pipeline_manager.publisher.called)
def test_process_notification_no_events(self):
self.CONF.set_override("store_events", False, group="notification")
self.srv.notification_manager = mock.MagicMock()
with mock.patch.object(self.srv,
'_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertFalse(fake_msg_to_event.called)
def test_process_notification_with_events(self):
self.CONF.set_override("store_events", True, group="notification")
self.srv.notification_manager = mock.MagicMock()
with mock.patch.object(self.srv,
'_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertTrue(fake_msg_to_event.called)
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo",
'message_id': "abc",
'publisher_id': "1"}
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
self.srv._message_to_event(message)
events = mock_dispatcher.record_events.call_args[0]
self.assertEqual(1, len(events))
event = events[0]
self.assertEqual("foo", event.event_type)
self.assertEqual(now, event.generated)
self.assertEqual(1, len(event.traits))
def test_message_to_event_duplicate(self):
self.CONF.set_override("store_events", True, group="notification")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.srv._message_to_event(message) # Should return silently.
def test_message_to_event_bad_event(self):
self.CONF.set_override("store_events", True, group="notification")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.assertRaises(notification.UnableToSaveEventException,
self.srv._message_to_event, message)
def test_extract_when(self):
now = timeutils.utcnow()
modified = now + datetime.timedelta(minutes=1)
timeutils.set_time_override(now)
body = {"timestamp": str(modified)}
when = notification.NotificationService._extract_when(body)
self.assertTimestampEqual(modified, when)
body = {"_context_timestamp": str(modified)}
when = notification.NotificationService._extract_when(body)
self.assertTimestampEqual(modified, when)
then = now + datetime.timedelta(hours=1)
body = {"timestamp": str(modified), "_context_timestamp": str(then)}
when = notification.NotificationService._extract_when(body)
self.assertTimestampEqual(modified, when)
when = notification.NotificationService._extract_when({})
self.assertTimestampEqual(now, when)

View File

@ -370,34 +370,35 @@ Handling Notifications
.. index:: .. index::
double: notifications; architecture double: notifications; architecture
The heart of the system is the collector, which monitors the message The heart of the system are the notification daemon (agent-notification) and
bus for data being provided by the pollsters via the agent as well as the collector, which monitor the message bus for data being provided by the
notification messages from other OpenStack components such as nova, pollsters via the agent as well as notification messages from other
glance, neutron, and swift. OpenStack components such as nova, glance, neutron, and swift.
The collector loads one or more *listener* plugins, using the namespace The notification daemon loads one or more *listener* plugins, using the
``ceilometer.collector``. Each plugin can listen to any topics, but by namespace ``ceilometer.notification``. Each plugin can listen to any topics,
default it will listen to ``notifications.info``. but by default it will listen to ``notifications.info``.
The plugin provides a method to list the event types it wants and a The plugin provides a method to list the event types it wants and a callback
callback for processing incoming messages. The registered name of the for processing incoming messages. The registered name of the callback is
callback is used to enable or disable it using the global used to enable or disable it using the pipeline of the notification daemon.
configuration option of the collector daemon. The incoming messages The incoming messages are filtered based on their event type value before
are filtered based on their event type value before being passed to being passed to the callback so the plugin only receives events it has
the callback so the plugin only receives events it has expressed an expressed an interest in seeing. For example, a callback asking for
interest in seeing. For example, a callback asking for
``compute.instance.create.end`` events under ``compute.instance.create.end`` events under
``ceilometer.collector.compute`` would be invoked for those ``ceilometer.collector.compute`` would be invoked for those notification
notification events on the ``nova`` exchange using the events on the ``nova`` exchange using the ``notifications.info`` topic.
``notifications.info`` topic.
The listener plugin returns an iterable with zero or more Counter The listener plugin returns an iterable with zero or more Sample instances
instances based on the data in the incoming message. The collector based on the data in the incoming message. The collector framework code
framework code converts the Counter instances to metering messages and converts the Sample instances to metering messages and publishes them on the
publishes them on the metering message bus. Although ceilomter metering message bus. Although Ceilometer includes a default storage
includes a default storage solution to work with the API service, by solution to work with the API service, by republishing on the metering
republishing on the metering message bus we can support installations message bus we can support installations that want to handle their own data
that want to handle their own data storage. storage.
The Ceilometer collector daemon then receives this Sample on the bus and
stores them into a database.
Handling Metering Messages Handling Metering Messages
-------------------------- --------------------------

View File

@ -20,11 +20,10 @@
Installing Manually Installing Manually
===================== =====================
Installing the Collector Installing the notification agent
======================== ======================================
.. index:: .. index::
double: installing; collector double: installing; agent-notification
1. If you want to be able to retrieve image samples, you need to instruct 1. If you want to be able to retrieve image samples, you need to instruct
Glance to send notifications to the bus by changing ``notifier_strategy`` Glance to send notifications to the bus by changing ``notifier_strategy``
@ -71,23 +70,18 @@ Installing the Collector
use = egg:ceilometer#swift use = egg:ceilometer#swift
metadata_headers = X-FOO, X-BAR metadata_headers = X-FOO, X-BAR
4. Install MongoDB. 4. Clone the ceilometer git repository to the management server::
Follow the instructions to install the MongoDB_ package for your
operating system, then start the service.
5. Clone the ceilometer git repository to the management server::
$ cd /opt/stack $ cd /opt/stack
$ git clone https://git.openstack.org/openstack/ceilometer.git $ git clone https://git.openstack.org/openstack/ceilometer.git
6. As a user with ``root`` permissions or ``sudo`` privileges, run the 5. As a user with ``root`` permissions or ``sudo`` privileges, run the
ceilometer installer:: ceilometer installer::
$ cd ceilometer $ cd ceilometer
$ sudo python setup.py install $ sudo python setup.py install
7. Copy the sample configuration files from the source tree 6. Copy the sample configuration files from the source tree
to their final location. to their final location.
:: ::
@ -97,7 +91,7 @@ Installing the Collector
$ cp etc/ceilometer/*.yaml /etc/ceilometer $ cp etc/ceilometer/*.yaml /etc/ceilometer
$ cp etc/ceilometer/ceilometer.conf.sample /etc/ceilometer/ceilometer.conf $ cp etc/ceilometer/ceilometer.conf.sample /etc/ceilometer/ceilometer.conf
8. Edit ``/etc/ceilometer/ceilometer.conf`` 7. Edit ``/etc/ceilometer/ceilometer.conf``
1. Configure RPC 1. Configure RPC
@ -126,7 +120,82 @@ Installing the Collector
Refer to :doc:`/configuration` for details about any other options Refer to :doc:`/configuration` for details about any other options
you might want to modify before starting the service. you might want to modify before starting the service.
9. Start the collector. 8. Start the notification daemon.
::
$ ceilometer-agent-notification
.. note::
The default development configuration of the collector logs to
stderr, so you may want to run this step using a screen session
or other tool for maintaining a long-running program in the
background.
Installing the collector
========================
.. index::
double: installing; collector
1. Install MongoDB.
Follow the instructions to install the MongoDB_ package for your
operating system, then start the service.
2. Clone the ceilometer git repository to the management server::
$ cd /opt/stack
$ git clone https://git.openstack.org/openstack/ceilometer.git
3. As a user with ``root`` permissions or ``sudo`` privileges, run the
ceilometer installer::
$ cd ceilometer
$ sudo python setup.py install
4. Copy the sample configuration files from the source tree
to their final location.
::
$ mkdir -p /etc/ceilometer
$ cp etc/ceilometer/*.json /etc/ceilometer
$ cp etc/ceilometer/*.yaml /etc/ceilometer
$ cp etc/ceilometer/ceilometer.conf.sample /etc/ceilometer/ceilometer.conf
5. Edit ``/etc/ceilometer/ceilometer.conf``
1. Configure RPC
Set the RPC-related options correctly so ceilometer's daemons
can communicate with each other and receive notifications from
the other projects.
In particular, look for the ``*_control_exchange`` options and
make sure the names are correct. If you did not change the
``control_exchange`` settings for the other components, the
defaults should be correct.
.. note::
Ceilometer makes extensive use of the messaging bus, but has
not yet been tested with ZeroMQ. We recommend using Rabbit or
qpid for now.
2. Set the ``metering_secret`` value.
Set the ``metering_secret`` value to a large, random, value. Use
the same value in all ceilometer configuration files, on all
nodes, so that messages passing between the nodes can be
validated.
Refer to :doc:`/configuration` for details about any other options
you might want to modify before starting the service.
6. Start the collector.
:: ::

View File

@ -27,6 +27,20 @@
#sample_source=openstack #sample_source=openstack
#
# Options defined in ceilometer.service
#
# Name of this node. This can be an opaque identifier. It is
# not necessarily a hostname, FQDN, or IP address. However,
# the node name must be valid within an AMQP key, and if using
# ZeroMQ, a valid hostname, FQDN, or IP address (string value)
#host=ceilometer
# dispatcher to process data (multi valued)
#dispatcher=database
# #
# Options defined in ceilometer.api.app # Options defined in ceilometer.api.app
# #
@ -602,6 +616,20 @@
#time_to_live=-1 #time_to_live=-1
[notification]
#
# Options defined in ceilometer.notification
#
# Acknowledge message when event persistence fails (boolean
# value)
#ack_on_event_error=true
# Save event details (boolean value)
#store_events=false
[alarm] [alarm]
# #
@ -858,16 +886,6 @@
# port to bind the UDP socket to (integer value) # port to bind the UDP socket to (integer value)
#udp_port=4952 #udp_port=4952
# Acknowledge message when event persistence fails (boolean
# value)
#ack_on_event_error=true
# Save event details (boolean value)
#store_events=false
# dispatcher to process metering data (multi valued)
#dispatcher=database
[matchmaker_ring] [matchmaker_ring]

View File

@ -28,7 +28,7 @@ packages =
ceilometer ceilometer
[entry_points] [entry_points]
ceilometer.collector = ceilometer.notification =
instance = ceilometer.compute.notifications:Instance instance = ceilometer.compute.notifications:Instance
instance_flavor = ceilometer.compute.notifications:InstanceFlavor instance_flavor = ceilometer.compute.notifications:InstanceFlavor
instance_delete = ceilometer.compute.notifications:InstanceDelete instance_delete = ceilometer.compute.notifications:InstanceDelete
@ -119,6 +119,7 @@ console_scripts =
ceilometer-api = ceilometer.api.app:start ceilometer-api = ceilometer.api.app:start
ceilometer-agent-central = ceilometer.central.manager:agent_central ceilometer-agent-central = ceilometer.central.manager:agent_central
ceilometer-agent-compute = ceilometer.compute.manager:agent_compute ceilometer-agent-compute = ceilometer.compute.manager:agent_compute
ceilometer-agent-notification = ceilometer.notification:agent
ceilometer-dbsync = ceilometer.storage:dbsync ceilometer-dbsync = ceilometer.storage:dbsync
ceilometer-expirer = ceilometer.storage:expirer ceilometer-expirer = ceilometer.storage:expirer
ceilometer-collector = ceilometer.collector.service:collector ceilometer-collector = ceilometer.collector.service:collector