From bc1f004f8f3e182e198702d286dcb56b31fa8ba8 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Wed, 23 May 2012 13:07:29 -0400 Subject: [PATCH] make the pollsters in the agent plugins Move user_id and project_id into the Counter since the polling code has to pull those values from the instance object. Convert the libvirt polling code to plugins. Have the agent manager load the plugins and run them as part of its periodic task step. Change-Id: Id59a696beb33c7c9a232460ce52e272f5408e70d --- bin/ceilometer-agent | 2 +- ceilometer/agent/manager.py | 74 ++++++++++-------- ceilometer/collector/dispatcher.py | 2 +- ceilometer/collector/manager.py | 4 +- ceilometer/compute/libvirt.py | 114 ++++++++++++++++++++++++++++ ceilometer/compute/notifications.py | 2 + ceilometer/counter.py | 5 +- ceilometer/meter.py | 6 +- ceilometer/plugin.py | 12 ++- setup.py | 4 + tests/agent/test_manager.py | 49 +++++------- tests/collector/test_dispatcher.py | 12 +-- tests/compute/test_libvirt.py | 59 ++++++++++++++ tests/test_meter.py | 16 +--- 14 files changed, 269 insertions(+), 92 deletions(-) create mode 100644 ceilometer/compute/libvirt.py create mode 100644 tests/compute/test_libvirt.py diff --git a/bin/ceilometer-agent b/bin/ceilometer-agent index 3ea9279a6..600e80917 100755 --- a/bin/ceilometer-agent +++ b/bin/ceilometer-agent @@ -28,7 +28,7 @@ from nova import service from nova import utils if __name__ == '__main__': - utils.default_flagfile() + utils.default_cfgfile() flags.FLAGS(sys.argv) logging.setup() utils.monkey_patch() diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py index 40eebe8b9..13332cd5f 100644 --- a/ceilometer/agent/manager.py +++ b/ceilometer/agent/manager.py @@ -16,12 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. -from lxml import etree +import pkg_resources from nova import log as logging from nova import manager from nova import flags -import nova.virt.connection FLAGS = flags.FLAGS # FIXME(dhellmann): We need to have the main program set up logging @@ -29,38 +28,47 @@ FLAGS = flags.FLAGS # appear in the output. LOG = logging.getLogger('nova.' + __name__) +COMPUTE_PLUGIN_NAMESPACE = 'ceilometer.poll.compute' + class AgentManager(manager.Manager): - @staticmethod - def _get_disks(conn, instance): - """Get disks of an instance, only used to bypass bug#998089.""" - domain = conn._conn.lookupByName(instance) - tree = etree.fromstring(domain.XMLDesc(0)) - return filter(bool, - [target.get('dev') - for target in tree.findall('devices/disk/target') - ]) - @manager.periodic_task - def _fetch_diskio(self, context): - if FLAGS.connection_type == 'libvirt': - conn = nova.virt.connection.get_connection(read_only=True) - for instance in self.db.instance_get_all_by_host(context, - self.host): - # TODO(jd) This does not work see bug#998089 - # for disk in conn.get_disks(instance.name): - try: - disks = self._get_disks(conn, instance.name) - except Exception as err: - LOG.warning('Ignoring instance %s: %s', instance.name, err) - LOG.exception(err) - continue - for disk in disks: - stats = conn.block_stats(instance.name, disk) - LOG.info("DISKIO USAGE: %s %s: read-requests=%d read-bytes=%d write-requests=%d write-bytes=%d errors=%d" % (instance, disk, stats[0], stats[1], stats[2], stats[3], stats[4])) + def init_host(self): + self._load_plugins() + return - @manager.periodic_task - def _fetch_cputime(self, context): - conn = nova.virt.connection.get_connection(read_only=True) - for instance in self.db.instance_get_all_by_host(context, self.host): - LOG.info("CPUTIME USAGE: %s %d" % (instance, conn.get_info(instance)['cpu_time'])) + def _load_plugins(self): + self.pollsters = [] + for ep in pkg_resources.iter_entry_points(COMPUTE_PLUGIN_NAMESPACE): + LOG.info('attempting to load pollster %s:%s', + COMPUTE_PLUGIN_NAMESPACE, ep.name) + try: + plugin_class = ep.load() + plugin = plugin_class() + # FIXME(dhellmann): Currently assumes all plugins are + # enabled when they are discovered and + # importable. Need to add check against global + # configuration flag and check that asks the plugin if + # it should be enabled. + self.pollsters.append((ep.name, plugin)) + except Exception as err: + LOG.warning('Failed to load pollster %s:%s', + ep.name, err) + LOG.exception(err) + if not self.pollsters: + LOG.warning('Failed to load any pollsters for %s', + COMPUTE_PLUGIN_NAMESPACE) + return + + def periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + for name, pollster in self.pollsters: + try: + LOG.info('polling %s', name) + for c in pollster.get_counters(self, context): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Convert to meter data and + # publish. + except Exception as err: + LOG.warning('Continuing after error from %s: %s', name, err) + LOG.exception(err) diff --git a/ceilometer/collector/dispatcher.py b/ceilometer/collector/dispatcher.py index 44c44eadf..57ce14410 100644 --- a/ceilometer/collector/dispatcher.py +++ b/ceilometer/collector/dispatcher.py @@ -74,5 +74,5 @@ class NotificationDispatcher(object): for c in handler.process_notification(body): LOG.info('COUNTER: %s', c) # FIXME(dhellmann): Spawn green thread? - self.publish_func(body, c) + self.publish_func(c) return diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index 96cf4b50a..f3ec65b51 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -50,9 +50,9 @@ class CollectorManager(manager.Manager): callback=self.compute_handler.notify) self.connection.consume_in_thread() - def _publish_counter(self, notice, c): + def _publish_counter(self, counter): """Create a metering message for the counter and publish it.""" - msg = meter.meter_message_from_counter(notice, c) + msg = meter.meter_message_from_counter(counter) LOG.info('PUBLISH: %s', str(msg)) # FIXME(dhellmann): Need to publish the message on the # metering queue. diff --git a/ceilometer/compute/libvirt.py b/ceilometer/compute/libvirt.py new file mode 100644 index 000000000..8544f7700 --- /dev/null +++ b/ceilometer/compute/libvirt.py @@ -0,0 +1,114 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from lxml import etree + +from nova import log as logging +from nova import flags +import nova.virt.connection + +from .. import counter +from .. import plugin + + +FLAGS = flags.FLAGS +# FIXME(dhellmann): We need to have the main program set up logging +# correctly so messages from modules outside of the nova package +# appear in the output. +LOG = logging.getLogger('nova.' + __name__) + +MIB = 2 ** 20 # mebibytes + + +def make_counter_from_instance(instance, type, volume): + return counter.Counter( + source='?', + type=type, + volume=volume, + user_id=instance.user_id, + project_id=instance.project_id, + resource_id=instance.uuid, + datetime=None, + duration=None, + resource_metadata={ + 'display_name': instance.display_name, + 'instance_type': instance.instance_type.flavorid, + 'host': instance.host, + }, + ) + + +class DiskIOPollster(plugin.PollsterBase): + + LOG = logging.getLogger('nova.' + __name__ + '.diskio') + + def _get_disks(self, conn, instance): + """Get disks of an instance, only used to bypass bug#998089.""" + domain = conn._conn.lookupByName(instance) + tree = etree.fromstring(domain.XMLDesc(0)) + return filter(bool, + [target.get('dev') + for target in tree.findall('devices/disk/target') + ]) + + def get_counters(self, manager, context): + if FLAGS.connection_type == 'libvirt': + conn = nova.virt.connection.get_connection(read_only=True) + for instance in manager.db.instance_get_all_by_host(context, + manager.host): + # TODO(jd) This does not work see bug#998089 + # for disk in conn.get_disks(instance.name): + try: + disks = self._get_disks(conn, instance.name) + except Exception as err: + self.LOG.warning('Ignoring instance %s: %s', instance.name, err) + self.LOG.exception(err) + continue + bytes = 0 + for disk in disks: + stats = conn.block_stats(instance.name, disk) + self.LOG.info("DISKIO USAGE: %s %s: read-requests=%d read-bytes=%d write-requests=%d write-bytes=%d errors=%d", + instance, disk, stats[0], stats[1], stats[2], stats[3], stats[4]) + bytes += stats[1] + stats[3] # combine read and write + yield make_counter_from_instance(instance, + type='disk', + volume=bytes / MIB, + ) + + +class CPUPollster(plugin.PollsterBase): + + LOG = logging.getLogger('nova.' + __name__ + '.cpu') + + def get_counters(self, manager, context): + conn = nova.virt.connection.get_connection(read_only=True) + # FIXME(dhellmann): How do we get a list of instances without + # talking directly to the database? + for instance in manager.db.instance_get_all_by_host(context, manager.host): + self.LOG.info('checking instance %s', instance.uuid) + try: + cpu_info = conn.get_info(instance) + self.LOG.info("CPUTIME USAGE: %s %d", instance, cpu_info['cpu_time']) + yield make_counter_from_instance(instance, + type='cpu', + volume=cpu_info['cpu_time'], + ) + except Exception as err: + self.LOG.error('could not get CPU time for %s: %s', + instance.uuid, err) + self.LOG.exception(err) diff --git a/ceilometer/compute/notifications.py b/ceilometer/compute/notifications.py index 6e3b6d5f5..e43e97e79 100644 --- a/ceilometer/compute/notifications.py +++ b/ceilometer/compute/notifications.py @@ -28,6 +28,8 @@ def c1(body): source='?', type='instance', volume=1, + user_id=body['payload']['user_id'], + project_id=body['payload']['tenant_id'], resource_id=body['payload']['instance_id'], datetime=body['timestamp'], duration=0, diff --git a/ceilometer/counter.py b/ceilometer/counter.py index 3d55d4b7e..17e14711c 100644 --- a/ceilometer/counter.py +++ b/ceilometer/counter.py @@ -29,8 +29,11 @@ Counter = collections.namedtuple('Counter', ' '.join(['source', 'type', 'volume', + 'user_id', + 'project_id', 'resource_id', 'datetime', 'duration', - 'resource_metadata']) + 'resource_metadata', + ]) ) diff --git a/ceilometer/meter.py b/ceilometer/meter.py index 5c5ef2499..3fe9cc198 100644 --- a/ceilometer/meter.py +++ b/ceilometer/meter.py @@ -41,7 +41,7 @@ def compute_signature(message): return digest_maker.hexdigest() -def meter_message_from_counter(notice, counter): +def meter_message_from_counter(counter): """Make a metering message ready to be published or stored. Returns a dictionary containing a metering message @@ -50,8 +50,8 @@ def meter_message_from_counter(notice, counter): msg = {'source': counter.source, 'counter_type': counter.type, 'counter_volume': counter.volume, - 'user_id': notice['payload']['user_id'], - 'project_id': notice['payload']['tenant_id'], + 'user_id': counter.user_id, + 'project_id': counter.project_id, 'resource_id': counter.resource_id, 'counter_datetime': counter.datetime, 'counter_duration': counter.duration, diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index d292b57c0..e298b0b2b 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -30,9 +30,17 @@ class NotificationBase(object): def get_event_types(self): """Return a sequence of strings defining the event types to be given to this plugin.""" - return [] @abc.abstractmethod def process_notification(self, message): """Return a sequence of Counter instances for the given message.""" - pass + + +class PollsterBase(object): + """Base class for plugins that support the polling API.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def get_counters(self, manager, context): + """Return a sequence of Counter instances from polling the resources.""" diff --git a/setup.py b/setup.py index 5f0848610..22e4613c5 100755 --- a/setup.py +++ b/setup.py @@ -35,5 +35,9 @@ setuptools.setup( 'ceilometer.collector.compute': [ 'instance_create = ceilometer.compute.notifications:InstanceCreate', ], + 'ceilometer.poll.compute': [ + 'libvirt_diskio = ceilometer.compute.libvirt:DiskIOPollster', + 'libvirt_cpu = ceilometer.compute.libvirt:CPUPollster', + ], }, ) diff --git a/tests/agent/test_manager.py b/tests/agent/test_manager.py index bf098b406..c90d4fd8f 100644 --- a/tests/agent/test_manager.py +++ b/tests/agent/test_manager.py @@ -1,9 +1,8 @@ -#!/usr/bin/env python # -*- encoding: utf-8 -*- # -# Copyright © 2012 eNovance +# Copyright © 2012 New Dream Network, LLC (DreamHost) # -# Author: Julien Danjou +# Author: Doug Hellmann # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,40 +15,28 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Tests for manager. +"""Tests for ceilometer/agent/manager.py """ -# import unittest -# import mox -# import stubout - -# import nova.tests.fakelibvirt as libvirt -from nova import context -from nova import flags -from nova import test -from nova import db -from nova import test - from ceilometer.agent import manager -class TestAgentManager(test.TestCase): - def setUp(self): - self.context = context.RequestContext('admin', 'admin', is_admin=True) - self.manager = manager.AgentManager() - super(TestAgentManager, self).setUp() +def test_load_plugins(): + mgr = manager.AgentManager() + mgr.init_host() + assert mgr.pollsters, 'Failed to load any plugins' + return - def test_fetch_diskio(self): - self.manager._fetch_diskio(self.context) - def test_fetch_diskio_with_libvirt_non_existent_instance(self): - flags.FLAGS.connection_type = 'libvirt' +def test_run_tasks(): + class Pollster: + counters = [] - instance = db.instance_create(self.context, {}) + def get_counters(self, manager, context): + self.counters.append((manager, context)) + return ['test data'] - self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host') - self.manager.db.instance_get_all_by_host(self.context, self.manager.host).AndReturn([instance]) - - self.mox.ReplayAll() - - self.manager._fetch_diskio(self.context) + mgr = manager.AgentManager() + mgr.pollsters = [('test', Pollster())] + mgr.periodic_tasks('context') + assert Pollster.counters[0] == (mgr, 'context') diff --git a/tests/collector/test_dispatcher.py b/tests/collector/test_dispatcher.py index deb5792d4..d328a81cc 100644 --- a/tests/collector/test_dispatcher.py +++ b/tests/collector/test_dispatcher.py @@ -67,10 +67,10 @@ TEST_NOTICE = { def test_notify(): results = [] - d = StubDispatcher(None, lambda x, y: results.append((x, y))) + d = StubDispatcher(None, lambda x: results.append(x)) d.notify(TEST_NOTICE) assert len(results) == 1 - counter = results[0][1] + counter = results[0] assert counter.type == 'instance' @@ -78,7 +78,7 @@ def test_load_compute_plugins(): results = [] d = dispatcher.NotificationDispatcher( 'ceilometer.collector.compute', - lambda x, y: results.append((x, y)) + lambda x: results.append(x) ) assert d.handlers, 'No handlers were loaded' @@ -87,7 +87,7 @@ def test_load_no_plugins(): results = [] d = dispatcher.NotificationDispatcher( 'ceilometer.collector.none', - lambda x, y: results.append((x, y)) + lambda x: results.append(x) ) assert not d.handlers, 'Handlers were loaded' @@ -96,9 +96,9 @@ def test_notify_through_plugin(): results = [] d = dispatcher.NotificationDispatcher( 'ceilometer.collector.compute', - lambda x, y: results.append((x, y)) + lambda x: results.append(x) ) d.notify(TEST_NOTICE) assert len(results) == 1 - counter = results[0][1] + counter = results[0] assert counter.type == 'instance' diff --git a/tests/compute/test_libvirt.py b/tests/compute/test_libvirt.py new file mode 100644 index 000000000..a93beb7c8 --- /dev/null +++ b/tests/compute/test_libvirt.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for manager. +""" + +# import unittest +# import mox +# import stubout + +# import nova.tests.fakelibvirt as libvirt +from nova import context +from nova import flags +from nova import test +from nova import db + +from ceilometer.compute import libvirt +from ceilometer.agent import manager + + +class TestDiskIOPollster(test.TestCase): + + def setUp(self): + self.context = context.RequestContext('admin', 'admin', is_admin=True) + self.manager = manager.AgentManager() + self.pollster = libvirt.DiskIOPollster() + super(TestDiskIOPollster, self).setUp() + + def test_fetch_diskio(self): + list(self.pollster.get_counters(self.manager, self.context)) + + def test_fetch_diskio_with_libvirt_non_existent_instance(self): + flags.FLAGS.connection_type = 'libvirt' + + instance = db.instance_create(self.context, {}) + + self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host') + self.manager.db.instance_get_all_by_host(self.context, + self.manager.host, + ).AndReturn([instance]) + + self.mox.ReplayAll() + + list(self.pollster.get_counters(self.manager, self.context)) diff --git a/tests/test_meter.py b/tests/test_meter.py index 20aee2bf1..6c9de68a2 100644 --- a/tests/test_meter.py +++ b/tests/test_meter.py @@ -51,6 +51,8 @@ def test_compute_signature_signed(): TEST_COUNTER = counter.Counter(source='src', type='typ', volume=1, + user_id='user', + project_id='project', resource_id=2, datetime='today', duration=3, @@ -95,25 +97,15 @@ TEST_NOTICE = { } -def test_meter_message_from_counter_user_id(): - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) - assert msg['user_id'] == TEST_NOTICE['payload']['user_id'] - - -def test_meter_message_from_counter_project_id(): - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) - assert msg['project_id'] == TEST_NOTICE['payload']['tenant_id'] - - def test_meter_message_from_counter_signed(): - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) + msg = meter.meter_message_from_counter(TEST_COUNTER) assert 'message_signature' in msg def test_meter_message_from_counter_field(): def compare(f, c, msg_f, msg): assert msg == c - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) + msg = meter.meter_message_from_counter(TEST_COUNTER) name_map = {'type': 'counter_type', 'volume': 'counter_volume', 'datetime': 'counter_datetime',