Add plugin support to the notification portion of the collector daemon.

Implement a Counter class for use by notification plugins.
Define a base class for Notification plugins.
Define a dispatcher class for notification events to be passed to the plugins.
Add a notification plugin for instance creation and "instance" counters.
Add a reusable function for turning a Counter into a metering event dictionary.

Change-Id: Iaa626b98e1a661ed31cc8b8e95263c111df20888
This commit is contained in:
Doug Hellmann 2012-05-22 15:10:09 -04:00
parent cc5b02dc84
commit 73c9150afe
17 changed files with 508 additions and 82 deletions

2
.gitignore vendored
View File

@ -2,3 +2,5 @@
*.dat *.dat
TAGS TAGS
*.egg-info *.egg-info
build
.coverage

View File

@ -28,12 +28,12 @@ from nova import service
from nova import utils from nova import utils
if __name__ == '__main__': if __name__ == '__main__':
utils.default_flagfile() utils.default_cfgfile()
flags.FLAGS(sys.argv) flags.FLAGS(sys.argv)
logging.setup() logging.setup()
utils.monkey_patch() utils.monkey_patch()
server = service.Service.create(binary='ceilometer-nova-instance', server = service.Service.create(binary='ceilometer-nova-instance',
topic='ceilometer', topic='ceilometer',
manager='ceilometer.nova.manager.InstanceManager') manager='ceilometer.collector.manager.CollectorManager')
service.serve(server) service.serve(server)
service.wait() service.wait()

View File

View File

@ -0,0 +1,78 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Given an incoming message, process it through the registered converters
and publish the results.
"""
import pkg_resources
from nova import log as logging
# FIXME(dhellmann): We need to have the main program set up logging
# correctly so messages from modules outside of the nova package
# appear in the output.
LOG = logging.getLogger('nova.' + __name__)
class NotificationDispatcher(object):
"""Manages invoking plugins to convert notification messages to counters.
"""
def __init__(self, plugin_namespace, publish_func):
self.plugin_namespace = plugin_namespace
self.publish_func = publish_func
self.handlers = {}
self._load_plugins()
def _load_plugins(self):
# Listen for notifications from nova
for ep in pkg_resources.iter_entry_points(self.plugin_namespace):
LOG.info('attempting to load notification handler for %s:%s',
self.plugin_namespace, ep.name)
try:
plugin_class = ep.load()
plugin = plugin_class()
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
for event_type in plugin.get_event_types():
LOG.info('subscribing %s handler to %s events',
ep.name, event_type)
self.handlers.setdefault(event_type, []).append(plugin)
except Exception as err:
LOG.warning('Failed to load notification handler %s: %s',
ep.name, err)
LOG.exception(err)
if not self.handlers:
LOG.warning('Failed to load any notification handlers for %s',
self.plugin_namespace)
def notify(self, body):
"""Dispatch the notification to the appropriate handler
and publish the counters returned.
"""
event_type = body.get('event_type')
LOG.info('NOTIFICATION: %s', event_type)
for handler in self.handlers.get(event_type, []):
for c in handler.process_notification(body):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_func(body, c)
return

View File

@ -0,0 +1,58 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import flags
from nova import log as logging
from nova import manager
from ceilometer import rpc
from ceilometer import meter
from ceilometer.collector import dispatcher
# FIXME(dhellmann): There must be another way to do this.
# Import rabbit_notifier to register notification_topics flag
import nova.notifier.rabbit_notifier
FLAGS = flags.FLAGS
# FIXME(dhellmann): We need to have the main program set up logging
# correctly so messages from modules outside of the nova package
# appear in the output.
LOG = logging.getLogger('nova.' + __name__)
COMPUTE_COLLECTOR_NAMESPACE = 'ceilometer.collector.compute'
class CollectorManager(manager.Manager):
def init_host(self):
self.connection = rpc.Connection(flags.FLAGS)
self.compute_handler = dispatcher.NotificationDispatcher(
COMPUTE_COLLECTOR_NAMESPACE,
self._publish_counter,
)
self.connection.declare_topic_consumer(
topic='%s.info' % flags.FLAGS.notification_topics[0],
callback=self.compute_handler.notify)
self.connection.consume_in_thread()
def _publish_counter(self, notice, c):
"""Create a metering message for the counter and publish it."""
msg = meter.meter_message_from_counter(notice, c)
LOG.info('PUBLISH: %s', str(msg))
# FIXME(dhellmann): Need to publish the message on the
# metering queue.

View File

@ -18,26 +18,38 @@
"""Converters for producing compute counter messages from notification events. """Converters for producing compute counter messages from notification events.
""" """
from .. import signature from .. import counter
from .. import plugin
def c1(body): def c1(body):
"""Generate c1(instance) counters for a notice.""" """Generate c1(instance) counters for a notice."""
c = {'source': '?', return counter.Counter(
'counter_type': 'instance', source='?',
'counter_volume': 1, type='instance',
'user_id': body['payload']['user_id'], volume=1,
'project_id': body['payload']['tenant_id'], resource_id=body['payload']['instance_id'],
'resource_id': body['payload']['instance_id'], datetime=body['timestamp'],
'counter_datetime': body['timestamp'], duration=0,
'counter_duration': 0, # FIXME(dhellmann): Add region and other
# FIXME(dhellmann): Add region and other details to metadata # details to metadata
'resource_metadata': {'display_name': resource_metadata={
'display_name':
body['payload']['display_name'], body['payload']['display_name'],
'instance_type': 'instance_type':
body['payload']['instance_type_id'], body['payload']['instance_type_id'],
'host': body['publisher_id'], 'host': body['publisher_id'],
}, },
} )
c['message_signature'] = signature.compute_signature(c)
return [c]
class InstanceCreate(plugin.NotificationBase):
"""Convert compute.instance.create.end notifications into Counters
"""
def get_event_types(self):
return ['compute.instance.create.end']
def process_notification(self, message):
return [c1(message),
]

36
ceilometer/counter.py Normal file
View File

@ -0,0 +1,36 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Counter class for holding data about a metering event.
A Counter doesn't really do anything, but we need a way to
ensure that all of the appropriate fields have been filled
in by the plugins that create them.
"""
import collections
Counter = collections.namedtuple('Counter',
' '.join(['source',
'type',
'volume',
'resource_id',
'datetime',
'duration',
'resource_metadata'])
)

View File

@ -20,6 +20,7 @@
import hmac import hmac
import hashlib import hashlib
import uuid
# FIXME(dhellmann): Need to move this secret out of the code. Where? # FIXME(dhellmann): Need to move this secret out of the code. Where?
@ -38,3 +39,24 @@ def compute_signature(message):
digest_maker.update(name) digest_maker.update(name)
digest_maker.update(unicode(value).encode('utf-8')) digest_maker.update(unicode(value).encode('utf-8'))
return digest_maker.hexdigest() return digest_maker.hexdigest()
def meter_message_from_counter(notice, counter):
"""Make a metering message ready to be published or stored.
Returns a dictionary containing a metering message
for a notification message and a Counter instance.
"""
msg = {'source': counter.source,
'counter_type': counter.type,
'counter_volume': counter.volume,
'user_id': notice['payload']['user_id'],
'project_id': notice['payload']['tenant_id'],
'resource_id': counter.resource_id,
'counter_datetime': counter.datetime,
'counter_duration': counter.duration,
'resource_metadata': counter.resource_metadata,
'message_id': str(uuid.uuid1()),
}
msg['message_signature'] = compute_signature(msg)
return msg

View File

@ -23,11 +23,6 @@ from nova import manager
from nova import flags from nova import flags
import nova.virt.connection import nova.virt.connection
# Import rabbit_notifier to register notification_topics flag
import nova.notifier.rabbit_notifier
from ceilometer import rpc
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
# FIXME(dhellmann): We need to have the main program set up logging # FIXME(dhellmann): We need to have the main program set up logging
# correctly so messages from modules outside of the nova package # correctly so messages from modules outside of the nova package
@ -35,19 +30,6 @@ FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.' + __name__) LOG = logging.getLogger('nova.' + __name__)
class InstanceManager(manager.Manager):
def init_host(self):
self.connection = rpc.Connection(flags.FLAGS)
self.connection.declare_topic_consumer(
topic='%s.info' % flags.FLAGS.notification_topics[0],
callback=self._on_notification)
self.connection.consume_in_thread()
def _on_notification(self, body):
event_type = body.get('event_type')
LOG.info('NOTIFICATION: %s', event_type)
class ComputeManager(manager.Manager): class ComputeManager(manager.Manager):
def _get_disks(self, conn, instance): def _get_disks(self, conn, instance):
"""Get disks of an instance, only used to bypass bug#998089.""" """Get disks of an instance, only used to bypass bug#998089."""

View File

@ -15,26 +15,24 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Tests for converters for producing compute counter messages from """Base class for plugins.
notification events.
""" """
from ceilometer import signature import abc
def test_change_key(): class NotificationBase(object):
sig1 = signature.compute_signature({'a': 'A', 'b': 'B'}) """Base class for plugins that support the notification API."""
sig2 = signature.compute_signature({'A': 'A', 'b': 'B'})
assert sig1 != sig2
__metaclass__ = abc.ABCMeta
def test_change_value(): @abc.abstractmethod
sig1 = signature.compute_signature({'a': 'A', 'b': 'B'}) def get_event_types(self):
sig2 = signature.compute_signature({'a': 'a', 'b': 'B'}) """Return a sequence of strings defining the event types to be
assert sig1 != sig2 given to this plugin."""
return []
@abc.abstractmethod
def test_same(): def process_notification(self, message):
sig1 = signature.compute_signature({'a': 'A', 'b': 'B'}) """Return a sequence of Counter instances for the given message."""
sig2 = signature.compute_signature({'a': 'A', 'b': 'B'}) pass
assert sig1 == sig2

4
run_tests.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
# Simple test runner, should be replaced with tox
nosetests -P -d -v --with-coverage --cover-package=ceilometer --cover-inclusive tests

View File

@ -19,7 +19,8 @@
import setuptools import setuptools
setuptools.setup(name='ceilometer', setuptools.setup(
name='ceilometer',
version='0', version='0',
description='cloud computing metering', description='cloud computing metering',
author='OpenStack', author='OpenStack',
@ -29,4 +30,10 @@ setuptools.setup(name='ceilometer',
include_package_data=True, include_package_data=True,
test_suite='nose.collector', test_suite='nose.collector',
scripts=['bin/ceilometer-nova-compute'], scripts=['bin/ceilometer-nova-compute'],
py_modules=[]) py_modules=[],
entry_points={
'ceilometer.collector.compute': [
'instance_create = ceilometer.compute.notifications:InstanceCreate',
],
},
)

View File

View File

@ -0,0 +1,104 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/nova/dispatcher.py
"""
from ceilometer.compute import notifications
from ceilometer.collector import dispatcher
class StubDispatcher(dispatcher.NotificationDispatcher):
def _load_plugins(self):
self.handlers['compute.instance.create.end'] = [notifications.InstanceCreate()]
TEST_NOTICE = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2'},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
def test_notify():
results = []
d = StubDispatcher(None, lambda x, y: results.append((x, y)))
d.notify(TEST_NOTICE)
assert len(results) == 1
counter = results[0][1]
assert counter.type == 'instance'
def test_load_compute_plugins():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.compute',
lambda x, y: results.append((x, y))
)
assert d.handlers, 'No handlers were loaded'
def test_load_no_plugins():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.none',
lambda x, y: results.append((x, y))
)
assert not d.handlers, 'Handlers were loaded'
def test_notify_through_plugin():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.compute',
lambda x, y: results.append((x, y))
)
d.notify(TEST_NOTICE)
assert len(results) == 1
counter = results[0][1]
assert counter.type == 'instance'

View File

@ -65,29 +65,27 @@ def compare(name, actual, expected):
def test_c1(): def test_c1():
info = notifications.c1(INSTANCE_CREATE_END)[0] info = notifications.c1(INSTANCE_CREATE_END)
for name, actual, expected in [ for name, actual, expected in [
('counter_type', info['counter_type'], 'instance'), ('counter_type', info.type, 'instance'),
('counter_volume', info['counter_volume'], 1), ('counter_volume', info.volume, 1),
('counter_datetime', info['counter_datetime'], ('counter_datetime', info.datetime,
INSTANCE_CREATE_END['timestamp']), INSTANCE_CREATE_END['timestamp']),
('user_id', info['user_id'], ('resource_id', info.resource_id,
INSTANCE_CREATE_END['payload']['user_id']),
('project_id', info['project_id'],
INSTANCE_CREATE_END['payload']['tenant_id']),
('resource_id', info['resource_id'],
INSTANCE_CREATE_END['payload']['instance_id']), INSTANCE_CREATE_END['payload']['instance_id']),
('display_name', info['resource_metadata']['display_name'], ('display_name', info.resource_metadata['display_name'],
INSTANCE_CREATE_END['payload']['display_name']), INSTANCE_CREATE_END['payload']['display_name']),
('instance_type', info['resource_metadata']['instance_type'], ('instance_type', info.resource_metadata['instance_type'],
INSTANCE_CREATE_END['payload']['instance_type_id']), INSTANCE_CREATE_END['payload']['instance_type_id']),
('host', info['resource_metadata']['host'], ('host', info.resource_metadata['host'],
INSTANCE_CREATE_END['publisher_id']), INSTANCE_CREATE_END['publisher_id']),
]: ]:
yield compare, name, actual, expected yield compare, name, actual, expected
def test_c1_signed(): def test_instance_create():
info = notifications.c1(INSTANCE_CREATE_END)[0] ic = notifications.InstanceCreate()
assert 'message_signature' in info counters = ic.process_notification(INSTANCE_CREATE_END)
assert len(counters) == 1
assert counters[0].type == 'instance'

124
tests/test_meter.py Normal file
View File

@ -0,0 +1,124 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer.meter
"""
from ceilometer import counter
from ceilometer import meter
def test_compute_signature_change_key():
sig1 = meter.compute_signature({'a': 'A', 'b': 'B'})
sig2 = meter.compute_signature({'A': 'A', 'b': 'B'})
assert sig1 != sig2
def test_compute_signature_change_value():
sig1 = meter.compute_signature({'a': 'A', 'b': 'B'})
sig2 = meter.compute_signature({'a': 'a', 'b': 'B'})
assert sig1 != sig2
def test_compute_signature_same():
sig1 = meter.compute_signature({'a': 'A', 'b': 'B'})
sig2 = meter.compute_signature({'a': 'A', 'b': 'B'})
assert sig1 == sig2
def test_compute_signature_signed():
data = {'a': 'A', 'b': 'B'}
sig1 = meter.compute_signature(data)
data['message_signature'] = sig1
sig2 = meter.compute_signature(data)
assert sig1 == sig2
TEST_COUNTER = counter.Counter(source='src',
type='typ',
volume=1,
resource_id=2,
datetime='today',
duration=3,
resource_metadata={'key': 'value'},
)
TEST_NOTICE = {
u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
u'_context_is_admin': True,
u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
u'_context_quota_class': None,
u'_context_read_deleted': u'no',
u'_context_remote_address': u'10.0.2.15',
u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
u'_context_roles': [u'admin'],
u'_context_timestamp': u'2012-05-08T20:23:41.425105',
u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
u'event_type': u'compute.instance.create.end',
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
u'payload': {u'created_at': u'2012-05-08 20:23:41',
u'deleted_at': u'',
u'disk_gb': 0,
u'display_name': u'testme',
u'fixed_ips': [{u'address': u'10.0.0.2',
u'floating_ips': [],
u'meta': {},
u'type': u'fixed',
u'version': 4}],
u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
u'instance_type': u'm1.tiny',
u'instance_type_id': 2,
u'launched_at': u'2012-05-08 20:23:47.985999',
u'memory_mb': 512,
u'state': u'active',
u'state_description': u'',
u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
u'user_id': u'1e3ce043029547f1a61c1996d1a531a2'},
u'priority': u'INFO',
u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
def test_meter_message_from_counter_user_id():
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
assert msg['user_id'] == TEST_NOTICE['payload']['user_id']
def test_meter_message_from_counter_project_id():
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
assert msg['project_id'] == TEST_NOTICE['payload']['tenant_id']
def test_meter_message_from_counter_signed():
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
assert 'message_signature' in msg
def test_meter_message_from_counter_field():
def compare(f, c, msg_f, msg):
assert msg == c
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
name_map = {'type': 'counter_type',
'volume': 'counter_volume',
'datetime': 'counter_datetime',
'duration': 'counter_duration',
}
for f in TEST_COUNTER._fields:
msg_f = name_map.get(f, f)
yield compare, f, getattr(TEST_COUNTER, f), msg_f, msg[msg_f]

View File

@ -31,7 +31,8 @@ def test_send_messages():
def test_record_messages(): def test_record_messages():
conn = mox.MockObject(impl_kombu.Connection) conn = mox.MockObject(impl_kombu.Connection)
conn.declare_topic_consumer('notifications.info', mox.IsA(types.FunctionType)) conn.declare_topic_consumer('notifications.info',
mox.IsA(types.FunctionType))
conn.consume() conn.consume()
mox.Replay(conn) mox.Replay(conn)
notificationclient.record_messages(conn, StringIO()) notificationclient.record_messages(conn, StringIO())