make the pollsters in the agent plugins

Move user_id and project_id into the Counter
since the polling code has to pull those
values from the instance object.
Convert the libvirt polling code to plugins.
Have the agent manager load the plugins and run
them as part of its periodic task step.

Change-Id: Id59a696beb33c7c9a232460ce52e272f5408e70d
This commit is contained in:
Doug Hellmann 2012-05-23 13:07:29 -04:00
parent 5717e9c5c9
commit bc1f004f8f
14 changed files with 269 additions and 92 deletions

View File

@ -28,7 +28,7 @@ from nova import service
from nova import utils
if __name__ == '__main__':
utils.default_flagfile()
utils.default_cfgfile()
flags.FLAGS(sys.argv)
logging.setup()
utils.monkey_patch()

View File

@ -16,12 +16,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
import pkg_resources
from nova import log as logging
from nova import manager
from nova import flags
import nova.virt.connection
FLAGS = flags.FLAGS
# FIXME(dhellmann): We need to have the main program set up logging
@ -29,38 +28,47 @@ FLAGS = flags.FLAGS
# appear in the output.
LOG = logging.getLogger('nova.' + __name__)
COMPUTE_PLUGIN_NAMESPACE = 'ceilometer.poll.compute'
class AgentManager(manager.Manager):
@staticmethod
def _get_disks(conn, instance):
"""Get disks of an instance, only used to bypass bug#998089."""
domain = conn._conn.lookupByName(instance)
tree = etree.fromstring(domain.XMLDesc(0))
return filter(bool,
[target.get('dev')
for target in tree.findall('devices/disk/target')
])
@manager.periodic_task
def _fetch_diskio(self, context):
if FLAGS.connection_type == 'libvirt':
conn = nova.virt.connection.get_connection(read_only=True)
for instance in self.db.instance_get_all_by_host(context,
self.host):
# TODO(jd) This does not work see bug#998089
# for disk in conn.get_disks(instance.name):
try:
disks = self._get_disks(conn, instance.name)
except Exception as err:
LOG.warning('Ignoring instance %s: %s', instance.name, err)
LOG.exception(err)
continue
for disk in disks:
stats = conn.block_stats(instance.name, disk)
LOG.info("DISKIO USAGE: %s %s: read-requests=%d read-bytes=%d write-requests=%d write-bytes=%d errors=%d" % (instance, disk, stats[0], stats[1], stats[2], stats[3], stats[4]))
def init_host(self):
self._load_plugins()
return
@manager.periodic_task
def _fetch_cputime(self, context):
conn = nova.virt.connection.get_connection(read_only=True)
for instance in self.db.instance_get_all_by_host(context, self.host):
LOG.info("CPUTIME USAGE: %s %d" % (instance, conn.get_info(instance)['cpu_time']))
def _load_plugins(self):
self.pollsters = []
for ep in pkg_resources.iter_entry_points(COMPUTE_PLUGIN_NAMESPACE):
LOG.info('attempting to load pollster %s:%s',
COMPUTE_PLUGIN_NAMESPACE, ep.name)
try:
plugin_class = ep.load()
plugin = plugin_class()
# FIXME(dhellmann): Currently assumes all plugins are
# enabled when they are discovered and
# importable. Need to add check against global
# configuration flag and check that asks the plugin if
# it should be enabled.
self.pollsters.append((ep.name, plugin))
except Exception as err:
LOG.warning('Failed to load pollster %s:%s',
ep.name, err)
LOG.exception(err)
if not self.pollsters:
LOG.warning('Failed to load any pollsters for %s',
COMPUTE_PLUGIN_NAMESPACE)
return
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for name, pollster in self.pollsters:
try:
LOG.info('polling %s', name)
for c in pollster.get_counters(self, context):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Convert to meter data and
# publish.
except Exception as err:
LOG.warning('Continuing after error from %s: %s', name, err)
LOG.exception(err)

View File

@ -74,5 +74,5 @@ class NotificationDispatcher(object):
for c in handler.process_notification(body):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_func(body, c)
self.publish_func(c)
return

View File

@ -50,9 +50,9 @@ class CollectorManager(manager.Manager):
callback=self.compute_handler.notify)
self.connection.consume_in_thread()
def _publish_counter(self, notice, c):
def _publish_counter(self, counter):
"""Create a metering message for the counter and publish it."""
msg = meter.meter_message_from_counter(notice, c)
msg = meter.meter_message_from_counter(counter)
LOG.info('PUBLISH: %s', str(msg))
# FIXME(dhellmann): Need to publish the message on the
# metering queue.

View File

@ -0,0 +1,114 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
from nova import log as logging
from nova import flags
import nova.virt.connection
from .. import counter
from .. import plugin
FLAGS = flags.FLAGS
# FIXME(dhellmann): We need to have the main program set up logging
# correctly so messages from modules outside of the nova package
# appear in the output.
LOG = logging.getLogger('nova.' + __name__)
MIB = 2 ** 20 # mebibytes
def make_counter_from_instance(instance, type, volume):
return counter.Counter(
source='?',
type=type,
volume=volume,
user_id=instance.user_id,
project_id=instance.project_id,
resource_id=instance.uuid,
datetime=None,
duration=None,
resource_metadata={
'display_name': instance.display_name,
'instance_type': instance.instance_type.flavorid,
'host': instance.host,
},
)
class DiskIOPollster(plugin.PollsterBase):
LOG = logging.getLogger('nova.' + __name__ + '.diskio')
def _get_disks(self, conn, instance):
"""Get disks of an instance, only used to bypass bug#998089."""
domain = conn._conn.lookupByName(instance)
tree = etree.fromstring(domain.XMLDesc(0))
return filter(bool,
[target.get('dev')
for target in tree.findall('devices/disk/target')
])
def get_counters(self, manager, context):
if FLAGS.connection_type == 'libvirt':
conn = nova.virt.connection.get_connection(read_only=True)
for instance in manager.db.instance_get_all_by_host(context,
manager.host):
# TODO(jd) This does not work see bug#998089
# for disk in conn.get_disks(instance.name):
try:
disks = self._get_disks(conn, instance.name)
except Exception as err:
self.LOG.warning('Ignoring instance %s: %s', instance.name, err)
self.LOG.exception(err)
continue
bytes = 0
for disk in disks:
stats = conn.block_stats(instance.name, disk)
self.LOG.info("DISKIO USAGE: %s %s: read-requests=%d read-bytes=%d write-requests=%d write-bytes=%d errors=%d",
instance, disk, stats[0], stats[1], stats[2], stats[3], stats[4])
bytes += stats[1] + stats[3] # combine read and write
yield make_counter_from_instance(instance,
type='disk',
volume=bytes / MIB,
)
class CPUPollster(plugin.PollsterBase):
LOG = logging.getLogger('nova.' + __name__ + '.cpu')
def get_counters(self, manager, context):
conn = nova.virt.connection.get_connection(read_only=True)
# FIXME(dhellmann): How do we get a list of instances without
# talking directly to the database?
for instance in manager.db.instance_get_all_by_host(context, manager.host):
self.LOG.info('checking instance %s', instance.uuid)
try:
cpu_info = conn.get_info(instance)
self.LOG.info("CPUTIME USAGE: %s %d", instance, cpu_info['cpu_time'])
yield make_counter_from_instance(instance,
type='cpu',
volume=cpu_info['cpu_time'],
)
except Exception as err:
self.LOG.error('could not get CPU time for %s: %s',
instance.uuid, err)
self.LOG.exception(err)

View File

@ -28,6 +28,8 @@ def c1(body):
source='?',
type='instance',
volume=1,
user_id=body['payload']['user_id'],
project_id=body['payload']['tenant_id'],
resource_id=body['payload']['instance_id'],
datetime=body['timestamp'],
duration=0,

View File

@ -29,8 +29,11 @@ Counter = collections.namedtuple('Counter',
' '.join(['source',
'type',
'volume',
'user_id',
'project_id',
'resource_id',
'datetime',
'duration',
'resource_metadata'])
'resource_metadata',
])
)

View File

@ -41,7 +41,7 @@ def compute_signature(message):
return digest_maker.hexdigest()
def meter_message_from_counter(notice, counter):
def meter_message_from_counter(counter):
"""Make a metering message ready to be published or stored.
Returns a dictionary containing a metering message
@ -50,8 +50,8 @@ def meter_message_from_counter(notice, counter):
msg = {'source': counter.source,
'counter_type': counter.type,
'counter_volume': counter.volume,
'user_id': notice['payload']['user_id'],
'project_id': notice['payload']['tenant_id'],
'user_id': counter.user_id,
'project_id': counter.project_id,
'resource_id': counter.resource_id,
'counter_datetime': counter.datetime,
'counter_duration': counter.duration,

View File

@ -30,9 +30,17 @@ class NotificationBase(object):
def get_event_types(self):
"""Return a sequence of strings defining the event types to be
given to this plugin."""
return []
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message."""
pass
class PollsterBase(object):
"""Base class for plugins that support the polling API."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_counters(self, manager, context):
"""Return a sequence of Counter instances from polling the resources."""

View File

@ -35,5 +35,9 @@ setuptools.setup(
'ceilometer.collector.compute': [
'instance_create = ceilometer.compute.notifications:InstanceCreate',
],
'ceilometer.poll.compute': [
'libvirt_diskio = ceilometer.compute.libvirt:DiskIOPollster',
'libvirt_cpu = ceilometer.compute.libvirt:CPUPollster',
],
},
)

View File

@ -1,9 +1,8 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Julien Danjou <julien@danjou.info>
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -16,40 +15,28 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for manager.
"""Tests for ceilometer/agent/manager.py
"""
# import unittest
# import mox
# import stubout
# import nova.tests.fakelibvirt as libvirt
from nova import context
from nova import flags
from nova import test
from nova import db
from nova import test
from ceilometer.agent import manager
class TestAgentManager(test.TestCase):
def setUp(self):
self.context = context.RequestContext('admin', 'admin', is_admin=True)
self.manager = manager.AgentManager()
super(TestAgentManager, self).setUp()
def test_load_plugins():
mgr = manager.AgentManager()
mgr.init_host()
assert mgr.pollsters, 'Failed to load any plugins'
return
def test_fetch_diskio(self):
self.manager._fetch_diskio(self.context)
def test_fetch_diskio_with_libvirt_non_existent_instance(self):
flags.FLAGS.connection_type = 'libvirt'
def test_run_tasks():
class Pollster:
counters = []
instance = db.instance_create(self.context, {})
def get_counters(self, manager, context):
self.counters.append((manager, context))
return ['test data']
self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host')
self.manager.db.instance_get_all_by_host(self.context, self.manager.host).AndReturn([instance])
self.mox.ReplayAll()
self.manager._fetch_diskio(self.context)
mgr = manager.AgentManager()
mgr.pollsters = [('test', Pollster())]
mgr.periodic_tasks('context')
assert Pollster.counters[0] == (mgr, 'context')

View File

@ -67,10 +67,10 @@ TEST_NOTICE = {
def test_notify():
results = []
d = StubDispatcher(None, lambda x, y: results.append((x, y)))
d = StubDispatcher(None, lambda x: results.append(x))
d.notify(TEST_NOTICE)
assert len(results) == 1
counter = results[0][1]
counter = results[0]
assert counter.type == 'instance'
@ -78,7 +78,7 @@ def test_load_compute_plugins():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.compute',
lambda x, y: results.append((x, y))
lambda x: results.append(x)
)
assert d.handlers, 'No handlers were loaded'
@ -87,7 +87,7 @@ def test_load_no_plugins():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.none',
lambda x, y: results.append((x, y))
lambda x: results.append(x)
)
assert not d.handlers, 'Handlers were loaded'
@ -96,9 +96,9 @@ def test_notify_through_plugin():
results = []
d = dispatcher.NotificationDispatcher(
'ceilometer.collector.compute',
lambda x, y: results.append((x, y))
lambda x: results.append(x)
)
d.notify(TEST_NOTICE)
assert len(results) == 1
counter = results[0][1]
counter = results[0]
assert counter.type == 'instance'

View File

@ -0,0 +1,59 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 eNovance <licensing@enovance.com>
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for manager.
"""
# import unittest
# import mox
# import stubout
# import nova.tests.fakelibvirt as libvirt
from nova import context
from nova import flags
from nova import test
from nova import db
from ceilometer.compute import libvirt
from ceilometer.agent import manager
class TestDiskIOPollster(test.TestCase):
def setUp(self):
self.context = context.RequestContext('admin', 'admin', is_admin=True)
self.manager = manager.AgentManager()
self.pollster = libvirt.DiskIOPollster()
super(TestDiskIOPollster, self).setUp()
def test_fetch_diskio(self):
list(self.pollster.get_counters(self.manager, self.context))
def test_fetch_diskio_with_libvirt_non_existent_instance(self):
flags.FLAGS.connection_type = 'libvirt'
instance = db.instance_create(self.context, {})
self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host')
self.manager.db.instance_get_all_by_host(self.context,
self.manager.host,
).AndReturn([instance])
self.mox.ReplayAll()
list(self.pollster.get_counters(self.manager, self.context))

View File

@ -51,6 +51,8 @@ def test_compute_signature_signed():
TEST_COUNTER = counter.Counter(source='src',
type='typ',
volume=1,
user_id='user',
project_id='project',
resource_id=2,
datetime='today',
duration=3,
@ -95,25 +97,15 @@ TEST_NOTICE = {
}
def test_meter_message_from_counter_user_id():
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
assert msg['user_id'] == TEST_NOTICE['payload']['user_id']
def test_meter_message_from_counter_project_id():
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
assert msg['project_id'] == TEST_NOTICE['payload']['tenant_id']
def test_meter_message_from_counter_signed():
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
msg = meter.meter_message_from_counter(TEST_COUNTER)
assert 'message_signature' in msg
def test_meter_message_from_counter_field():
def compare(f, c, msg_f, msg):
assert msg == c
msg = meter.meter_message_from_counter(TEST_NOTICE, TEST_COUNTER)
msg = meter.meter_message_from_counter(TEST_COUNTER)
name_map = {'type': 'counter_type',
'volume': 'counter_volume',
'datetime': 'counter_datetime',