diff --git a/bin/ceilometer-agent b/bin/ceilometer-agent index 3ea9279a6..600e80917 100755 --- a/bin/ceilometer-agent +++ b/bin/ceilometer-agent @@ -28,7 +28,7 @@ from nova import service from nova import utils if __name__ == '__main__': - utils.default_flagfile() + utils.default_cfgfile() flags.FLAGS(sys.argv) logging.setup() utils.monkey_patch() diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py index 40eebe8b9..13332cd5f 100644 --- a/ceilometer/agent/manager.py +++ b/ceilometer/agent/manager.py @@ -16,12 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. -from lxml import etree +import pkg_resources from nova import log as logging from nova import manager from nova import flags -import nova.virt.connection FLAGS = flags.FLAGS # FIXME(dhellmann): We need to have the main program set up logging @@ -29,38 +28,47 @@ FLAGS = flags.FLAGS # appear in the output. LOG = logging.getLogger('nova.' + __name__) +COMPUTE_PLUGIN_NAMESPACE = 'ceilometer.poll.compute' + class AgentManager(manager.Manager): - @staticmethod - def _get_disks(conn, instance): - """Get disks of an instance, only used to bypass bug#998089.""" - domain = conn._conn.lookupByName(instance) - tree = etree.fromstring(domain.XMLDesc(0)) - return filter(bool, - [target.get('dev') - for target in tree.findall('devices/disk/target') - ]) - @manager.periodic_task - def _fetch_diskio(self, context): - if FLAGS.connection_type == 'libvirt': - conn = nova.virt.connection.get_connection(read_only=True) - for instance in self.db.instance_get_all_by_host(context, - self.host): - # TODO(jd) This does not work see bug#998089 - # for disk in conn.get_disks(instance.name): - try: - disks = self._get_disks(conn, instance.name) - except Exception as err: - LOG.warning('Ignoring instance %s: %s', instance.name, err) - LOG.exception(err) - continue - for disk in disks: - stats = conn.block_stats(instance.name, disk) - LOG.info("DISKIO USAGE: %s %s: read-requests=%d read-bytes=%d write-requests=%d write-bytes=%d errors=%d" % (instance, disk, stats[0], stats[1], stats[2], stats[3], stats[4])) + def init_host(self): + self._load_plugins() + return - @manager.periodic_task - def _fetch_cputime(self, context): - conn = nova.virt.connection.get_connection(read_only=True) - for instance in self.db.instance_get_all_by_host(context, self.host): - LOG.info("CPUTIME USAGE: %s %d" % (instance, conn.get_info(instance)['cpu_time'])) + def _load_plugins(self): + self.pollsters = [] + for ep in pkg_resources.iter_entry_points(COMPUTE_PLUGIN_NAMESPACE): + LOG.info('attempting to load pollster %s:%s', + COMPUTE_PLUGIN_NAMESPACE, ep.name) + try: + plugin_class = ep.load() + plugin = plugin_class() + # FIXME(dhellmann): Currently assumes all plugins are + # enabled when they are discovered and + # importable. Need to add check against global + # configuration flag and check that asks the plugin if + # it should be enabled. + self.pollsters.append((ep.name, plugin)) + except Exception as err: + LOG.warning('Failed to load pollster %s:%s', + ep.name, err) + LOG.exception(err) + if not self.pollsters: + LOG.warning('Failed to load any pollsters for %s', + COMPUTE_PLUGIN_NAMESPACE) + return + + def periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + for name, pollster in self.pollsters: + try: + LOG.info('polling %s', name) + for c in pollster.get_counters(self, context): + LOG.info('COUNTER: %s', c) + # FIXME(dhellmann): Convert to meter data and + # publish. + except Exception as err: + LOG.warning('Continuing after error from %s: %s', name, err) + LOG.exception(err) diff --git a/ceilometer/collector/dispatcher.py b/ceilometer/collector/dispatcher.py index 44c44eadf..57ce14410 100644 --- a/ceilometer/collector/dispatcher.py +++ b/ceilometer/collector/dispatcher.py @@ -74,5 +74,5 @@ class NotificationDispatcher(object): for c in handler.process_notification(body): LOG.info('COUNTER: %s', c) # FIXME(dhellmann): Spawn green thread? - self.publish_func(body, c) + self.publish_func(c) return diff --git a/ceilometer/collector/manager.py b/ceilometer/collector/manager.py index 96cf4b50a..f3ec65b51 100644 --- a/ceilometer/collector/manager.py +++ b/ceilometer/collector/manager.py @@ -50,9 +50,9 @@ class CollectorManager(manager.Manager): callback=self.compute_handler.notify) self.connection.consume_in_thread() - def _publish_counter(self, notice, c): + def _publish_counter(self, counter): """Create a metering message for the counter and publish it.""" - msg = meter.meter_message_from_counter(notice, c) + msg = meter.meter_message_from_counter(counter) LOG.info('PUBLISH: %s', str(msg)) # FIXME(dhellmann): Need to publish the message on the # metering queue. diff --git a/ceilometer/compute/libvirt.py b/ceilometer/compute/libvirt.py new file mode 100644 index 000000000..8544f7700 --- /dev/null +++ b/ceilometer/compute/libvirt.py @@ -0,0 +1,114 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from lxml import etree + +from nova import log as logging +from nova import flags +import nova.virt.connection + +from .. import counter +from .. import plugin + + +FLAGS = flags.FLAGS +# FIXME(dhellmann): We need to have the main program set up logging +# correctly so messages from modules outside of the nova package +# appear in the output. +LOG = logging.getLogger('nova.' + __name__) + +MIB = 2 ** 20 # mebibytes + + +def make_counter_from_instance(instance, type, volume): + return counter.Counter( + source='?', + type=type, + volume=volume, + user_id=instance.user_id, + project_id=instance.project_id, + resource_id=instance.uuid, + datetime=None, + duration=None, + resource_metadata={ + 'display_name': instance.display_name, + 'instance_type': instance.instance_type.flavorid, + 'host': instance.host, + }, + ) + + +class DiskIOPollster(plugin.PollsterBase): + + LOG = logging.getLogger('nova.' + __name__ + '.diskio') + + def _get_disks(self, conn, instance): + """Get disks of an instance, only used to bypass bug#998089.""" + domain = conn._conn.lookupByName(instance) + tree = etree.fromstring(domain.XMLDesc(0)) + return filter(bool, + [target.get('dev') + for target in tree.findall('devices/disk/target') + ]) + + def get_counters(self, manager, context): + if FLAGS.connection_type == 'libvirt': + conn = nova.virt.connection.get_connection(read_only=True) + for instance in manager.db.instance_get_all_by_host(context, + manager.host): + # TODO(jd) This does not work see bug#998089 + # for disk in conn.get_disks(instance.name): + try: + disks = self._get_disks(conn, instance.name) + except Exception as err: + self.LOG.warning('Ignoring instance %s: %s', instance.name, err) + self.LOG.exception(err) + continue + bytes = 0 + for disk in disks: + stats = conn.block_stats(instance.name, disk) + self.LOG.info("DISKIO USAGE: %s %s: read-requests=%d read-bytes=%d write-requests=%d write-bytes=%d errors=%d", + instance, disk, stats[0], stats[1], stats[2], stats[3], stats[4]) + bytes += stats[1] + stats[3] # combine read and write + yield make_counter_from_instance(instance, + type='disk', + volume=bytes / MIB, + ) + + +class CPUPollster(plugin.PollsterBase): + + LOG = logging.getLogger('nova.' + __name__ + '.cpu') + + def get_counters(self, manager, context): + conn = nova.virt.connection.get_connection(read_only=True) + # FIXME(dhellmann): How do we get a list of instances without + # talking directly to the database? + for instance in manager.db.instance_get_all_by_host(context, manager.host): + self.LOG.info('checking instance %s', instance.uuid) + try: + cpu_info = conn.get_info(instance) + self.LOG.info("CPUTIME USAGE: %s %d", instance, cpu_info['cpu_time']) + yield make_counter_from_instance(instance, + type='cpu', + volume=cpu_info['cpu_time'], + ) + except Exception as err: + self.LOG.error('could not get CPU time for %s: %s', + instance.uuid, err) + self.LOG.exception(err) diff --git a/ceilometer/compute/notifications.py b/ceilometer/compute/notifications.py index 6e3b6d5f5..e43e97e79 100644 --- a/ceilometer/compute/notifications.py +++ b/ceilometer/compute/notifications.py @@ -28,6 +28,8 @@ def c1(body): source='?', type='instance', volume=1, + user_id=body['payload']['user_id'], + project_id=body['payload']['tenant_id'], resource_id=body['payload']['instance_id'], datetime=body['timestamp'], duration=0, diff --git a/ceilometer/counter.py b/ceilometer/counter.py index 3d55d4b7e..17e14711c 100644 --- a/ceilometer/counter.py +++ b/ceilometer/counter.py @@ -29,8 +29,11 @@ Counter = collections.namedtuple('Counter', ' '.join(['source', 'type', 'volume', + 'user_id', + 'project_id', 'resource_id', 'datetime', 'duration', - 'resource_metadata']) + 'resource_metadata', + ]) ) diff --git a/ceilometer/meter.py b/ceilometer/meter.py index 5c5ef2499..3fe9cc198 100644 --- a/ceilometer/meter.py +++ b/ceilometer/meter.py @@ -41,7 +41,7 @@ def compute_signature(message): return digest_maker.hexdigest() -def meter_message_from_counter(notice, counter): +def meter_message_from_counter(counter): """Make a metering message ready to be published or stored. Returns a dictionary containing a metering message @@ -50,8 +50,8 @@ def meter_message_from_counter(notice, counter): msg = {'source': counter.source, 'counter_type': counter.type, 'counter_volume': counter.volume, - 'user_id': notice['payload']['user_id'], - 'project_id': notice['payload']['tenant_id'], + 'user_id': counter.user_id, + 'project_id': counter.project_id, 'resource_id': counter.resource_id, 'counter_datetime': counter.datetime, 'counter_duration': counter.duration, diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index d292b57c0..e298b0b2b 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -30,9 +30,17 @@ class NotificationBase(object): def get_event_types(self): """Return a sequence of strings defining the event types to be given to this plugin.""" - return [] @abc.abstractmethod def process_notification(self, message): """Return a sequence of Counter instances for the given message.""" - pass + + +class PollsterBase(object): + """Base class for plugins that support the polling API.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def get_counters(self, manager, context): + """Return a sequence of Counter instances from polling the resources.""" diff --git a/setup.py b/setup.py index 5f0848610..22e4613c5 100755 --- a/setup.py +++ b/setup.py @@ -35,5 +35,9 @@ setuptools.setup( 'ceilometer.collector.compute': [ 'instance_create = ceilometer.compute.notifications:InstanceCreate', ], + 'ceilometer.poll.compute': [ + 'libvirt_diskio = ceilometer.compute.libvirt:DiskIOPollster', + 'libvirt_cpu = ceilometer.compute.libvirt:CPUPollster', + ], }, ) diff --git a/tests/agent/test_manager.py b/tests/agent/test_manager.py index bf098b406..c90d4fd8f 100644 --- a/tests/agent/test_manager.py +++ b/tests/agent/test_manager.py @@ -1,9 +1,8 @@ -#!/usr/bin/env python # -*- encoding: utf-8 -*- # -# Copyright © 2012 eNovance +# Copyright © 2012 New Dream Network, LLC (DreamHost) # -# Author: Julien Danjou +# Author: Doug Hellmann # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,40 +15,28 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Tests for manager. +"""Tests for ceilometer/agent/manager.py """ -# import unittest -# import mox -# import stubout - -# import nova.tests.fakelibvirt as libvirt -from nova import context -from nova import flags -from nova import test -from nova import db -from nova import test - from ceilometer.agent import manager -class TestAgentManager(test.TestCase): - def setUp(self): - self.context = context.RequestContext('admin', 'admin', is_admin=True) - self.manager = manager.AgentManager() - super(TestAgentManager, self).setUp() +def test_load_plugins(): + mgr = manager.AgentManager() + mgr.init_host() + assert mgr.pollsters, 'Failed to load any plugins' + return - def test_fetch_diskio(self): - self.manager._fetch_diskio(self.context) - def test_fetch_diskio_with_libvirt_non_existent_instance(self): - flags.FLAGS.connection_type = 'libvirt' +def test_run_tasks(): + class Pollster: + counters = [] - instance = db.instance_create(self.context, {}) + def get_counters(self, manager, context): + self.counters.append((manager, context)) + return ['test data'] - self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host') - self.manager.db.instance_get_all_by_host(self.context, self.manager.host).AndReturn([instance]) - - self.mox.ReplayAll() - - self.manager._fetch_diskio(self.context) + mgr = manager.AgentManager() + mgr.pollsters = [('test', Pollster())] + mgr.periodic_tasks('context') + assert Pollster.counters[0] == (mgr, 'context') diff --git a/tests/collector/test_dispatcher.py b/tests/collector/test_dispatcher.py index deb5792d4..d328a81cc 100644 --- a/tests/collector/test_dispatcher.py +++ b/tests/collector/test_dispatcher.py @@ -67,10 +67,10 @@ TEST_NOTICE = { def test_notify(): results = [] - d = StubDispatcher(None, lambda x, y: results.append((x, y))) + d = StubDispatcher(None, lambda x: results.append(x)) d.notify(TEST_NOTICE) assert len(results) == 1 - counter = results[0][1] + counter = results[0] assert counter.type == 'instance' @@ -78,7 +78,7 @@ def test_load_compute_plugins(): results = [] d = dispatcher.NotificationDispatcher( 'ceilometer.collector.compute', - lambda x, y: results.append((x, y)) + lambda x: results.append(x) ) assert d.handlers, 'No handlers were loaded' @@ -87,7 +87,7 @@ def test_load_no_plugins(): results = [] d = dispatcher.NotificationDispatcher( 'ceilometer.collector.none', - lambda x, y: results.append((x, y)) + lambda x: results.append(x) ) assert not d.handlers, 'Handlers were loaded' @@ -96,9 +96,9 @@ def test_notify_through_plugin(): results = [] d = dispatcher.NotificationDispatcher( 'ceilometer.collector.compute', - lambda x, y: results.append((x, y)) + lambda x: results.append(x) ) d.notify(TEST_NOTICE) assert len(results) == 1 - counter = results[0][1] + counter = results[0] assert counter.type == 'instance' diff --git a/tests/compute/test_libvirt.py b/tests/compute/test_libvirt.py new file mode 100644 index 000000000..a93beb7c8 --- /dev/null +++ b/tests/compute/test_libvirt.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for manager. +""" + +# import unittest +# import mox +# import stubout + +# import nova.tests.fakelibvirt as libvirt +from nova import context +from nova import flags +from nova import test +from nova import db + +from ceilometer.compute import libvirt +from ceilometer.agent import manager + + +class TestDiskIOPollster(test.TestCase): + + def setUp(self): + self.context = context.RequestContext('admin', 'admin', is_admin=True) + self.manager = manager.AgentManager() + self.pollster = libvirt.DiskIOPollster() + super(TestDiskIOPollster, self).setUp() + + def test_fetch_diskio(self): + list(self.pollster.get_counters(self.manager, self.context)) + + def test_fetch_diskio_with_libvirt_non_existent_instance(self): + flags.FLAGS.connection_type = 'libvirt' + + instance = db.instance_create(self.context, {}) + + self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host') + self.manager.db.instance_get_all_by_host(self.context, + self.manager.host, + ).AndReturn([instance]) + + self.mox.ReplayAll() + + list(self.pollster.get_counters(self.manager, self.context)) diff --git a/tests/test_meter.py b/tests/test_meter.py index 20aee2bf1..6c9de68a2 100644 --- a/tests/test_meter.py +++ b/tests/test_meter.py @@ -51,6 +51,8 @@ def test_compute_signature_signed(): TEST_COUNTER = counter.Counter(source='src', type='typ', volume=1, + user_id='user', + project_id='project', resource_id=2, datetime='today', duration=3, @@ -95,25 +97,15 @@ TEST_NOTICE = { } -def test_meter_message_from_counter_user_id(): - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) - assert msg['user_id'] == TEST_NOTICE['payload']['user_id'] - - -def test_meter_message_from_counter_project_id(): - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) - assert msg['project_id'] == TEST_NOTICE['payload']['tenant_id'] - - def test_meter_message_from_counter_signed(): - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) + msg = meter.meter_message_from_counter(TEST_COUNTER) assert 'message_signature' in msg def test_meter_message_from_counter_field(): def compare(f, c, msg_f, msg): assert msg == c - msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER) + msg = meter.meter_message_from_counter(TEST_COUNTER) name_map = {'type': 'counter_type', 'volume': 'counter_volume', 'datetime': 'counter_datetime',