Merge "Pluggable resource discovery for agents"

This commit is contained in:
Jenkins 2014-02-28 23:12:55 +00:00 committed by Gerrit Code Review
commit 16535b9b6a
18 changed files with 402 additions and 214 deletions

View File

@ -1,8 +1,10 @@
# -*- encoding: utf-8 -*- # -*- encoding: utf-8 -*-
# #
# Copyright © 2013 Julien Danjou # Copyright © 2013 Julien Danjou
# Copyright © 2014 Red Hat, Inc
# #
# Author: Julien Danjou <julien@danjou.info> # Authors: Julien Danjou <julien@danjou.info>
# Eoghan Glynn <eglynn@redhat.com>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -16,13 +18,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import abc
import collections import collections
import itertools import itertools
import urlparse
import six from stevedore import extension
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline from ceilometer import pipeline
@ -31,7 +34,6 @@ from ceilometer import transformer
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class PollingTask(object): class PollingTask(object):
"""Polling task for polling samples and inject into pipeline. """Polling task for polling samples and inject into pipeline.
A polling task can be invoked periodically or only once. A polling task can be invoked periodically or only once.
@ -53,24 +55,49 @@ class PollingTask(object):
self.resources[pollster.name].update(pipeline.resources) self.resources[pollster.name].update(pipeline.resources)
self.pollsters.update([pollster]) self.pollsters.update([pollster])
@abc.abstractmethod
def poll_and_publish(self): def poll_and_publish(self):
"""Polling sample and publish into pipeline.""" """Polling sample and publish into pipeline."""
agent_resources = self.manager.discover()
with self.publish_context as publisher:
cache = {}
for pollster in self.pollsters:
LOG.info(_("Polling pollster %s"), pollster.name)
source_resources = list(self.resources[pollster.name])
try:
samples = list(pollster.obj.get_samples(
self.manager,
cache,
resources=source_resources or agent_resources,
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
@six.add_metaclass(abc.ABCMeta)
class AgentManager(os_service.Service): class AgentManager(os_service.Service):
def __init__(self, extension_manager): def __init__(self, namespace, default_discovery=[]):
super(AgentManager, self).__init__() super(AgentManager, self).__init__()
self.default_discovery = default_discovery
self.pollster_manager = extension_manager self.pollster_manager = self._extensions('poll', namespace)
self.discovery_manager = self._extensions('discover')
self.context = context.RequestContext('admin', 'admin', is_admin=True) self.context = context.RequestContext('admin', 'admin', is_admin=True)
@abc.abstractmethod @staticmethod
def _extensions(category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.%s' % category)
return extension.ExtensionManager(
namespace=namespace,
invoke_on_load=True,
)
def create_polling_task(self): def create_polling_task(self):
"""Create an empty polling task.""" """Create an initially empty polling task."""
return PollingTask(self)
def setup_polling_tasks(self): def setup_polling_tasks(self):
polling_tasks = {} polling_tasks = {}
@ -101,3 +128,29 @@ class AgentManager(os_service.Service):
@staticmethod @staticmethod
def interval_task(task): def interval_task(task):
task.poll_and_publish() task.poll_and_publish()
@staticmethod
def _parse_discoverer(url):
s = urlparse.urlparse(url)
return (s.scheme or s.path), (s.netloc + s.path if s.scheme else None)
def _discoverer(self, name):
for d in self.discovery_manager:
if d.name == name:
return d.obj
return None
def discover(self, discovery=[]):
resources = []
for url in (discovery or self.default_discovery):
name, param = self._parse_discoverer(url)
discoverer = self._discoverer(name)
if discoverer:
try:
discovered = discoverer.discover(param)
resources.extend(discovered)
except Exception as err:
LOG.exception(_('Unable to discover resources: %s') % err)
else:
LOG.warning(_('Unknown discovery extension: %s') % name)
return resources

View File

@ -18,10 +18,8 @@
from keystoneclient.v2_0 import client as ksclient from keystoneclient.v2_0 import client as ksclient
from oslo.config import cfg from oslo.config import cfg
from stevedore import extension
from ceilometer import agent from ceilometer import agent
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer import service from ceilometer import service
@ -31,42 +29,10 @@ cfg.CONF.import_group('service_credentials', 'ceilometer.service')
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class PollingTask(agent.PollingTask):
def poll_and_publish(self):
"""Tasks to be run at a periodic interval."""
with self.publish_context as publisher:
# TODO(yjiang5) passing samples into get_samples to avoid
# polling all counters one by one
cache = {}
for pollster in self.pollsters:
try:
LOG.info(_("Polling pollster %s"), pollster.name)
resources = list(self.resources[pollster.name])
samples = list(pollster.obj.get_samples(
self.manager,
cache,
resources=resources,
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}))
LOG.exception(err)
class AgentManager(agent.AgentManager): class AgentManager(agent.AgentManager):
def __init__(self): def __init__(self):
super(AgentManager, self).__init__( super(AgentManager, self).__init__('central')
extension.ExtensionManager(
namespace='ceilometer.poll.central',
invoke_on_load=True,
)
)
def create_polling_task(self):
return PollingTask(self)
def interval_task(self, task): def interval_task(self, task):
self.keystone = ksclient.Client( self.keystone = ksclient.Client(

View File

@ -0,0 +1,35 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2014 Red Hat, Inc
#
# Author: Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from ceilometer import nova_client
from ceilometer import plugin
class InstanceDiscovery(plugin.DiscoveryBase):
def __init__(self):
super(InstanceDiscovery, self).__init__()
self.nova_cli = nova_client.Client()
def discover(self, param=None):
"""Discover resources to monitor.
"""
instances = self.nova_cli.instance_get_all_by_host(cfg.CONF.host)
return [i for i in instances
if getattr(i, 'OS-EXT-STS:vm_state', None) != 'error']

View File

@ -16,13 +16,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo.config import cfg
from stevedore import extension
from ceilometer import agent from ceilometer import agent
from ceilometer.compute.virt import inspector as virt_inspector from ceilometer.compute.virt import inspector as virt_inspector
from ceilometer import nova_client
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer import service from ceilometer import service
@ -30,51 +25,11 @@ from ceilometer import service
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class PollingTask(agent.PollingTask):
def poll_and_publish_instances(self, instances):
with self.publish_context as publisher:
for instance in instances:
if getattr(instance, 'OS-EXT-STS:vm_state', None) == 'error':
continue
cache = {}
for pollster in self.pollsters:
try:
LOG.info(_("Polling pollster %s"), pollster.name)
samples = list(pollster.obj.get_samples(
self.manager,
cache,
instance,
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}))
LOG.exception(err)
def poll_and_publish(self):
try:
instances = self.manager.nv.instance_get_all_by_host(cfg.CONF.host)
except Exception as err:
LOG.exception(_('Unable to retrieve instances: %s') % err)
else:
self.poll_and_publish_instances(instances)
class AgentManager(agent.AgentManager): class AgentManager(agent.AgentManager):
def __init__(self): def __init__(self):
super(AgentManager, self).__init__( super(AgentManager, self).__init__('compute', ['local_instances'])
extension.ExtensionManager(
namespace='ceilometer.poll.compute',
invoke_on_load=True,
),
)
self._inspector = virt_inspector.get_hypervisor_inspector() self._inspector = virt_inspector.get_hypervisor_inspector()
self.nv = nova_client.Client()
def create_polling_task(self):
return PollingTask(self)
@property @property
def inspector(self): def inspector(self):

View File

@ -30,10 +30,10 @@ class ComputePollster(plugin.PollsterBase):
""" """
@abc.abstractmethod @abc.abstractmethod
def get_samples(self, manager, cache, instance): def get_samples(self, manager, cache, resources):
"""Return a sequence of Counter instances from polling the resources. """Return a sequence of Counter instances from polling the resources.
:param manager: The service manager invoking the plugin :param manager: The service manager invoking the plugin
:param cache: A dictionary for passing data between plugins :param cache: A dictionary for passing data between plugins
:param instance: The instance to examine :param resources: The resources to examine (expected to be instances)
""" """

View File

@ -30,26 +30,28 @@ LOG = log.getLogger(__name__)
class CPUPollster(plugin.ComputePollster): class CPUPollster(plugin.ComputePollster):
def get_samples(self, manager, cache, instance): def get_samples(self, manager, cache, resources):
LOG.info(_('checking instance %s'), instance.id) for instance in resources:
instance_name = util.instance_name(instance) LOG.info(_('checking instance %s'), instance.id)
try: instance_name = util.instance_name(instance)
cpu_info = manager.inspector.inspect_cpus(instance_name) try:
LOG.info(_("CPUTIME USAGE: %(instance)s %(time)d") % ( cpu_info = manager.inspector.inspect_cpus(instance_name)
{'instance': instance.__dict__, 'time': cpu_info.time})) LOG.info(_("CPUTIME USAGE: %(instance)s %(time)d") % (
cpu_num = {'cpu_number': cpu_info.number} {'instance': instance.__dict__,
yield util.make_sample_from_instance( 'time': cpu_info.time}))
instance, cpu_num = {'cpu_number': cpu_info.number}
name='cpu', yield util.make_sample_from_instance(
type=sample.TYPE_CUMULATIVE, instance,
unit='ns', name='cpu',
volume=cpu_info.time, type=sample.TYPE_CUMULATIVE,
additional_metadata=cpu_num, unit='ns',
) volume=cpu_info.time,
except virt_inspector.InstanceNotFoundException as err: additional_metadata=cpu_num,
# Instance was deleted while getting samples. Ignore it. )
LOG.debug(_('Exception while getting samples %s'), err) except virt_inspector.InstanceNotFoundException as err:
except Exception as err: # Instance was deleted while getting samples. Ignore it.
LOG.error(_('could not get CPU time for %(id)s: %(e)s') % ( LOG.debug(_('Exception while getting samples %s'), err)
{'id': instance.id, 'e': err})) except Exception as err:
LOG.exception(err) LOG.error(_('could not get CPU time for %(id)s: %(e)s') % (
{'id': instance.id, 'e': err}))
LOG.exception(err)

View File

@ -81,23 +81,24 @@ class _Base(plugin.ComputePollster):
def _get_sample(instance, c_data): def _get_sample(instance, c_data):
"""Return one Sample.""" """Return one Sample."""
def get_samples(self, manager, cache, instance): def get_samples(self, manager, cache, resources):
instance_name = util.instance_name(instance) for instance in resources:
try: instance_name = util.instance_name(instance)
c_data = self._populate_cache( try:
manager.inspector, c_data = self._populate_cache(
cache, manager.inspector,
instance, cache,
instance_name, instance,
) instance_name,
yield self._get_sample(instance, c_data) )
except virt_inspector.InstanceNotFoundException as err: yield self._get_sample(instance, c_data)
# Instance was deleted while getting samples. Ignore it. except virt_inspector.InstanceNotFoundException as err:
LOG.debug(_('Exception while getting samples %s'), err) # Instance was deleted while getting samples. Ignore it.
except Exception as err: LOG.debug(_('Exception while getting samples %s'), err)
LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( except Exception as err:
{'name': instance_name, 'error': err})) LOG.warning(_('Ignoring instance %(name)s: %(error)s') % (
LOG.exception(err) {'name': instance_name, 'error': err}))
LOG.exception(err)
class ReadRequestsPollster(_Base): class ReadRequestsPollster(_Base):

View File

@ -26,26 +26,28 @@ from ceilometer import sample
class InstancePollster(plugin.ComputePollster): class InstancePollster(plugin.ComputePollster):
@staticmethod @staticmethod
def get_samples(manager, cache, instance): def get_samples(manager, cache, resources):
yield util.make_sample_from_instance( for instance in resources:
instance, yield util.make_sample_from_instance(
name='instance', instance,
type=sample.TYPE_GAUGE, name='instance',
unit='instance', type=sample.TYPE_GAUGE,
volume=1, unit='instance',
) volume=1,
)
class InstanceFlavorPollster(plugin.ComputePollster): class InstanceFlavorPollster(plugin.ComputePollster):
@staticmethod @staticmethod
def get_samples(manager, cache, instance): def get_samples(manager, cache, resources):
yield util.make_sample_from_instance( for instance in resources:
instance, yield util.make_sample_from_instance(
# Use the "meter name + variable" syntax instance,
name='instance:%s' % # Use the "meter name + variable" syntax
instance.flavor['name'], name='instance:%s' %
type=sample.TYPE_GAUGE, instance.flavor['name'],
unit='instance', type=sample.TYPE_GAUGE,
volume=1, unit='instance',
) volume=1,
)

View File

@ -72,26 +72,27 @@ class _Base(plugin.ComputePollster):
) )
return i_cache[instance_name] return i_cache[instance_name]
def get_samples(self, manager, cache, instance): def get_samples(self, manager, cache, resources):
instance_name = util.instance_name(instance) for instance in resources:
LOG.info(_('checking instance %s'), instance.id) instance_name = util.instance_name(instance)
try: LOG.info(_('checking instance %s'), instance.id)
vnics = self._get_vnics_for_instance( try:
cache, vnics = self._get_vnics_for_instance(
manager.inspector, cache,
instance_name, manager.inspector,
) instance_name,
for vnic, info in vnics: )
LOG.info(self.NET_USAGE_MESSAGE, instance_name, for vnic, info in vnics:
vnic.name, info.rx_bytes, info.tx_bytes) LOG.info(self.NET_USAGE_MESSAGE, instance_name,
yield self._get_sample(instance, vnic, info) vnic.name, info.rx_bytes, info.tx_bytes)
except virt_inspector.InstanceNotFoundException as err: yield self._get_sample(instance, vnic, info)
# Instance was deleted while getting samples. Ignore it. except virt_inspector.InstanceNotFoundException as err:
LOG.debug(_('Exception while getting samples %s'), err) # Instance was deleted while getting samples. Ignore it.
except Exception as err: LOG.debug(_('Exception while getting samples %s'), err)
LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( except Exception as err:
{'name': instance_name, 'error': err})) LOG.warning(_('Ignoring instance %(name)s: %(error)s') % (
LOG.exception(err) {'name': instance_name, 'error': err}))
LOG.exception(err)
class IncomingBytesPollster(_Base): class IncomingBytesPollster(_Base):

View File

@ -104,3 +104,12 @@ class PollsterBase(PluginBase):
how to use it. how to use it.
""" """
@six.add_metaclass(abc.ABCMeta)
class DiscoveryBase(object):
@abc.abstractmethod
def discover(self, param=None):
"""Discover resources to monitor.
:param param: an optional parameter to guide the discovery
"""

View File

@ -3,9 +3,11 @@
# Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2012 New Dream Network, LLC (DreamHost)
# Copyright © 2013 Intel corp. # Copyright © 2013 Intel corp.
# Copyright © 2013 eNovance # Copyright © 2013 eNovance
# Copyright © 2014 Red Hat, Inc
# #
# Author: Yunhong Jiang <yunhong.jiang@intel.com> # Authors: Yunhong Jiang <yunhong.jiang@intel.com>
# Julien Danjou <julien@danjou.info> # Julien Danjou <julien@danjou.info>
# Eoghan Glynn <eglynn@redhat.com>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -20,6 +22,7 @@
# under the License. # under the License.
import abc import abc
import copy
import datetime import datetime
import mock import mock
@ -27,14 +30,33 @@ import six
from stevedore import extension from stevedore import extension
from ceilometer.openstack.common.fixture import config from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import plugin from ceilometer import plugin
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
from ceilometer import sample from ceilometer import sample
from ceilometer.tests import base from ceilometer.tests import base
from ceilometer import transformer from ceilometer import transformer
default_test_data = sample.Sample( class TestSample(sample.Sample):
def __init__(self, name, type, unit, volume, user_id, project_id,
resource_id, timestamp, resource_metadata, source=None):
super(TestSample, self).__init__(name, type, unit, volume, user_id,
project_id, resource_id, timestamp,
resource_metadata, source)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
return not self.__eq__(other)
default_test_data = TestSample(
name='test', name='test',
type=sample.TYPE_CUMULATIVE, type=sample.TYPE_CUMULATIVE,
unit='', unit='',
@ -50,23 +72,33 @@ default_test_data = sample.Sample(
class TestPollster(plugin.PollsterBase): class TestPollster(plugin.PollsterBase):
test_data = default_test_data test_data = default_test_data
def get_samples(self, manager, cache, instance=None, resources=[]): def get_samples(self, manager, cache, resources=[]):
self.samples.append((manager, instance)) self.samples.append((manager, resources))
self.resources.extend(resources) self.resources.extend(resources)
return [self.test_data] c = copy.copy(self.test_data)
c.resource_metadata['resources'] = resources
return [c]
class TestPollsterException(TestPollster): class TestPollsterException(TestPollster):
def get_samples(self, manager, cache, instance=None, resources=[]): def get_samples(self, manager, cache, resources=[]):
# Put an instance parameter here so that it can be used self.samples.append((manager, resources))
# by both central manager and compute manager
# In future, we possibly don't need such hack if we
# combine the get_samples() function again
self.samples.append((manager, instance))
self.resources.extend(resources) self.resources.extend(resources)
raise Exception() raise Exception()
class TestDiscovery(plugin.DiscoveryBase):
def discover(self, param=None):
self.params.append(param)
return self.resources
class TestDiscoveryException(plugin.DiscoveryBase):
def discover(self, param=None):
self.params.append(param)
raise Exception()
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class BaseAgentManagerTestCase(base.BaseTestCase): class BaseAgentManagerTestCase(base.BaseTestCase):
@ -78,7 +110,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class PollsterAnother(TestPollster): class PollsterAnother(TestPollster):
samples = [] samples = []
resources = [] resources = []
test_data = sample.Sample( test_data = TestSample(
name='testanother', name='testanother',
type=default_test_data.type, type=default_test_data.type,
unit=default_test_data.unit, unit=default_test_data.unit,
@ -92,7 +124,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class PollsterException(TestPollsterException): class PollsterException(TestPollsterException):
samples = [] samples = []
resources = [] resources = []
test_data = sample.Sample( test_data = TestSample(
name='testexception', name='testexception',
type=default_test_data.type, type=default_test_data.type,
unit=default_test_data.unit, unit=default_test_data.unit,
@ -106,7 +138,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
class PollsterExceptionAnother(TestPollsterException): class PollsterExceptionAnother(TestPollsterException):
samples = [] samples = []
resources = [] resources = []
test_data = sample.Sample( test_data = TestSample(
name='testexceptionanother', name='testexceptionanother',
type=default_test_data.type, type=default_test_data.type,
unit=default_test_data.unit, unit=default_test_data.unit,
@ -117,6 +149,17 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
timestamp=default_test_data.timestamp, timestamp=default_test_data.timestamp,
resource_metadata=default_test_data.resource_metadata) resource_metadata=default_test_data.resource_metadata)
class Discovery(TestDiscovery):
params = []
resources = []
class DiscoveryAnother(TestDiscovery):
params = []
resources = []
class DiscoveryException(TestDiscoveryException):
params = []
def setup_pipeline(self): def setup_pipeline(self):
self.transformer_manager = transformer.TransformerExtensionManager( self.transformer_manager = transformer.TransformerExtensionManager(
'ceilometer.transformer', 'ceilometer.transformer',
@ -125,7 +168,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.pipeline_cfg, self.pipeline_cfg,
self.transformer_manager) self.transformer_manager)
def create_extension_manager(self): def create_pollster_manager(self):
return extension.ExtensionManager.make_test_instance( return extension.ExtensionManager.make_test_instance(
[ [
extension.Extension( extension.Extension(
@ -151,6 +194,27 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
], ],
) )
def create_discovery_manager(self):
return extension.ExtensionManager.make_test_instance(
[
extension.Extension(
'testdiscovery',
None,
None,
self.Discovery(), ),
extension.Extension(
'testdiscoveryanother',
None,
None,
self.DiscoveryAnother(), ),
extension.Extension(
'testdiscoveryexception',
None,
None,
self.DiscoveryException(), ),
],
)
@abc.abstractmethod @abc.abstractmethod
def create_manager(self): def create_manager(self):
"""Return subclass specific manager.""" """Return subclass specific manager."""
@ -159,12 +223,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(BaseAgentManagerTestCase, self).setUp() super(BaseAgentManagerTestCase, self).setUp()
self.mgr = self.create_manager() self.mgr = self.create_manager()
self.mgr.pollster_manager = self.create_extension_manager() self.mgr.pollster_manager = self.create_pollster_manager()
self.pipeline_cfg = [{ self.pipeline_cfg = [{
'name': "test_pipeline", 'name': "test_pipeline",
'interval': 60, 'interval': 60,
'counters': ['test'], 'counters': ['test'],
'resources': ['test://'], 'resources': ['test://'] if self.source_resources else [],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}, ] }, ]
@ -174,6 +238,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'pipeline_cfg_file', 'pipeline_cfg_file',
self.path_get('etc/ceilometer/pipeline.yaml') self.path_get('etc/ceilometer/pipeline.yaml')
) )
self.useFixture(mockpatch.PatchObject(
publisher, 'get_publisher', side_effect=self.get_publisher))
def get_publisher(self, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
'rpc://': test_publisher.TestPublisher}
return fake_drivers[url](url)
def tearDown(self): def tearDown(self):
self.Pollster.samples = [] self.Pollster.samples = []
@ -184,16 +256,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.PollsterAnother.resources = [] self.PollsterAnother.resources = []
self.PollsterException.resources = [] self.PollsterException.resources = []
self.PollsterExceptionAnother.resources = [] self.PollsterExceptionAnother.resources = []
self.Discovery.params = []
self.DiscoveryAnother.params = []
self.DiscoveryException.params = []
self.Discovery.resources = []
self.DiscoveryAnother.resources = []
super(BaseAgentManagerTestCase, self).tearDown() super(BaseAgentManagerTestCase, self).tearDown()
def test_setup_polling_tasks(self): def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks() polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1) self.assertEqual(len(polling_tasks), 1)
self.assertTrue(60 in polling_tasks.keys()) self.assertTrue(60 in polling_tasks.keys())
self.assertEqual(len(polling_tasks[60].resources), 1) per_task_resources = polling_tasks[60].resources
self.assertEqual(len(polling_tasks[60].resources['test']), 1) self.assertEqual(len(per_task_resources), 1)
self.assertEqual(per_task_resources['test'],
set(self.pipeline_cfg[0]['resources']))
self.mgr.interval_task(polling_tasks.values()[0]) self.mgr.interval_task(polling_tasks.values()[0])
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
del pub.samples[0].resource_metadata['resources']
self.assertEqual(pub.samples[0], self.Pollster.test_data) self.assertEqual(pub.samples[0], self.Pollster.test_data)
def test_setup_polling_tasks_multiple_interval(self): def test_setup_polling_tasks_multiple_interval(self):
@ -201,7 +281,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline", 'name': "test_pipeline",
'interval': 10, 'interval': 10,
'counters': ['test'], 'counters': ['test'],
'resources': ['test://'], 'resources': ['test://'] if self.source_resources else [],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}) })
@ -230,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline", 'name': "test_pipeline",
'interval': 60, 'interval': 60,
'counters': ['testanother'], 'counters': ['testanother'],
'resources': ['testanother://'], 'resources': ['testanother://'] if self.source_resources else [],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}) })
@ -239,9 +319,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(len(polling_tasks), 1) self.assertEqual(len(polling_tasks), 1)
pollsters = polling_tasks.get(60).pollsters pollsters = polling_tasks.get(60).pollsters
self.assertEqual(len(pollsters), 2) self.assertEqual(len(pollsters), 2)
self.assertEqual(len(polling_tasks[60].resources), 2) per_task_resources = polling_tasks[60].resources
self.assertEqual(len(polling_tasks[60].resources['test']), 1) self.assertEqual(len(per_task_resources), 2)
self.assertEqual(len(polling_tasks[60].resources['testanother']), 1) self.assertEqual(per_task_resources['test'],
set(self.pipeline_cfg[0]['resources']))
self.assertEqual(per_task_resources['testanother'],
set(self.pipeline_cfg[1]['resources']))
def test_interval_exception_isolation(self): def test_interval_exception_isolation(self):
self.pipeline_cfg = [ self.pipeline_cfg = [
@ -249,7 +332,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline_1", 'name': "test_pipeline_1",
'interval': 10, 'interval': 10,
'counters': ['testexceptionanother'], 'counters': ['testexceptionanother'],
'resources': ['test://'], 'resources': ['test://'] if self.source_resources else [],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}, },
@ -257,7 +340,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'name': "test_pipeline_2", 'name': "test_pipeline_2",
'interval': 10, 'interval': 10,
'counters': ['testexception'], 'counters': ['testexception'],
'resources': ['test://'], 'resources': ['test://'] if self.source_resources else [],
'transformers': [], 'transformers': [],
'publishers': ["test"], 'publishers': ["test"],
}, },
@ -290,3 +373,71 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'publishers': ["test"], 'publishers': ["test"],
}) })
self.setup_pipeline() self.setup_pipeline()
def _verify_discovery_params(self, expected):
self.assertEqual(self.Discovery.params, expected)
self.assertEqual(self.DiscoveryAnother.params, expected)
self.assertEqual(self.DiscoveryException.params, expected)
def _do_test_per_agent_discovery(self,
discovered_resources,
static_resources):
self.mgr.discovery_manager = self.create_discovery_manager()
if discovered_resources:
self.mgr.default_discovery = [d.name
for d in self.mgr.discovery_manager]
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
self.pipeline_cfg[0]['resources'] = static_resources
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self._verify_discovery_params([None] if discovered_resources else [])
discovery = self.Discovery.resources + self.DiscoveryAnother.resources
# compare resource lists modulo ordering
self.assertEqual(set(self.Pollster.resources),
set(static_resources or discovery))
def test_per_agent_discovery_discovered_only(self):
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
[])
def test_per_agent_discovery_static_only(self):
self._do_test_per_agent_discovery([],
['static_1', 'static_2'])
def test_per_agent_discovery_discovered_overridden_by_static(self):
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
def test_multiple_pipelines_different_static_resources(self):
# assert that the amalgation of all static resources for a set
# of pipelines with a common interval is passed to individual
# pollsters matching those pipelines
self.pipeline_cfg[0]['resources'] = ['test://']
self.pipeline_cfg.append({
'name': "another_pipeline",
'interval': 60,
'counters': ['test'],
'resources': ['another://'],
'transformers': [],
'publishers': ["new"],
})
self.mgr.discovery_manager = self.create_discovery_manager()
self.Discovery.resources = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1)
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
self._verify_discovery_params([])
self.assertEqual(len(self.Pollster.samples), 1)
amalgamated_resources = set(['test://', 'another://'])
self.assertEqual(set(self.Pollster.samples[0][1]),
amalgamated_resources)
for pipeline in self.mgr.pipeline_manager.pipelines:
self.assertEqual(len(pipeline.publishers[0].samples), 1)
published = pipeline.publishers[0].samples[0]
self.assertEqual(set(published.resource_metadata['resources']),
amalgamated_resources)

View File

@ -40,6 +40,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
return manager.AgentManager() return manager.AgentManager()
def setUp(self): def setUp(self):
self.source_resources = True
super(TestRunTasks, self).setUp() super(TestRunTasks, self).setUp()
self.useFixture(mockpatch.Patch( self.useFixture(mockpatch.Patch(
'keystoneclient.v2_0.client.Client', 'keystoneclient.v2_0.client.Client',

View File

@ -53,7 +53,7 @@ class TestCPUPollster(base.TestPollsterBase):
def _verify_cpu_metering(expected_time): def _verify_cpu_metering(expected_time):
cache = {} cache = {}
samples = list(pollster.get_samples(mgr, cache, self.instance)) samples = list(pollster.get_samples(mgr, cache, [self.instance]))
self.assertEqual(len(samples), 1) self.assertEqual(len(samples), 1)
self.assertEqual(set([s.name for s in samples]), self.assertEqual(set([s.name for s in samples]),
set(['cpu'])) set(['cpu']))
@ -75,7 +75,7 @@ class TestCPUPollster(base.TestPollsterBase):
pollster = cpu.CPUPollster() pollster = cpu.CPUPollster()
cache = {} cache = {}
samples = list(pollster.get_samples(mgr, cache, self.instance)) samples = list(pollster.get_samples(mgr, cache, [self.instance]))
self.assertEqual(len(samples), 1) self.assertEqual(len(samples), 1)
self.assertEqual(samples[0].volume, 10 ** 6) self.assertEqual(samples[0].volume, 10 ** 6)
self.assertEqual(len(cache), 0) self.assertEqual(len(cache), 0)

View File

@ -45,7 +45,7 @@ class TestDiskPollsters(base.TestPollsterBase):
mgr = manager.AgentManager() mgr = manager.AgentManager()
cache = {} cache = {}
samples = list(pollster.get_samples(mgr, cache, self.instance)) samples = list(pollster.get_samples(mgr, cache, [self.instance]))
assert samples assert samples
self.assertIn(pollster.CACHE_KEY_DISK, cache) self.assertIn(pollster.CACHE_KEY_DISK, cache)
self.assertIn(self.instance.name, cache[pollster.CACHE_KEY_DISK]) self.assertIn(self.instance.name, cache[pollster.CACHE_KEY_DISK])

View File

@ -34,7 +34,7 @@ class TestInstancePollster(base.TestPollsterBase):
def test_get_samples_instance(self): def test_get_samples_instance(self):
mgr = manager.AgentManager() mgr = manager.AgentManager()
pollster = pollsters_instance.InstancePollster() pollster = pollsters_instance.InstancePollster()
samples = list(pollster.get_samples(mgr, {}, self.instance)) samples = list(pollster.get_samples(mgr, {}, [self.instance]))
self.assertEqual(len(samples), 1) self.assertEqual(len(samples), 1)
self.assertEqual(samples[0].name, 'instance') self.assertEqual(samples[0].name, 'instance')
self.assertEqual(samples[0].resource_metadata['vcpus'], 1) self.assertEqual(samples[0].resource_metadata['vcpus'], 1)
@ -47,6 +47,6 @@ class TestInstancePollster(base.TestPollsterBase):
def test_get_samples_instance_flavor(self): def test_get_samples_instance_flavor(self):
mgr = manager.AgentManager() mgr = manager.AgentManager()
pollster = pollsters_instance.InstanceFlavorPollster() pollster = pollsters_instance.InstanceFlavorPollster()
samples = list(pollster.get_samples(mgr, {}, self.instance)) samples = list(pollster.get_samples(mgr, {}, [self.instance]))
self.assertEqual(len(samples), 1) self.assertEqual(len(samples), 1)
self.assertEqual(samples[0].name, 'instance:m1.small') self.assertEqual(samples[0].name, 'instance:m1.small')

View File

@ -72,7 +72,7 @@ class TestNetPollster(base.TestPollsterBase):
def _check_get_samples(self, factory, expected): def _check_get_samples(self, factory, expected):
mgr = manager.AgentManager() mgr = manager.AgentManager()
pollster = factory() pollster = factory()
samples = list(pollster.get_samples(mgr, {}, self.instance)) samples = list(pollster.get_samples(mgr, {}, [self.instance]))
self.assertEqual(len(samples), 3) # one for each nic self.assertEqual(len(samples), 3) # one for each nic
self.assertEqual(set([s.name for s in samples]), self.assertEqual(set([s.name for s in samples]),
set([samples[0].name])) set([samples[0].name]))
@ -157,7 +157,7 @@ class TestNetPollsterCache(base.TestPollsterBase):
self.instance.name: vnics, self.instance.name: vnics,
}, },
} }
samples = list(pollster.get_samples(mgr, cache, self.instance)) samples = list(pollster.get_samples(mgr, cache, [self.instance]))
self.assertEqual(len(samples), 1) self.assertEqual(len(samples), 1)
def test_incoming_bytes(self): def test_incoming_bytes(self):

View File

@ -19,6 +19,7 @@
""" """
import mock import mock
from ceilometer import agent
from ceilometer.compute import manager from ceilometer.compute import manager
from ceilometer import nova_client from ceilometer import nova_client
from ceilometer.openstack.common.fixture import mockpatch from ceilometer.openstack.common.fixture import mockpatch
@ -51,25 +52,24 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self): def setUp(self):
self.source_resources = False
super(TestRunTasks, self).setUp() super(TestRunTasks, self).setUp()
# Set up a fake instance value to be returned by # Set up a fake instance value to be returned by
# instance_get_all_by_host() so when the manager gets the list # instance_get_all_by_host() so when the manager gets the list
# of instances to poll we can control the results. # of instances to poll we can control the results.
self.instance = self._fake_instance('faux', 'active') self.instances = [self._fake_instance('doing', 'active'),
self._fake_instance('resting', 'paused')]
stillborn_instance = self._fake_instance('stillborn', 'error') stillborn_instance = self._fake_instance('stillborn', 'error')
def instance_get_all_by_host(*args):
return [self.instance, stillborn_instance]
self.useFixture(mockpatch.PatchObject( self.useFixture(mockpatch.PatchObject(
nova_client.Client, nova_client.Client,
'instance_get_all_by_host', 'instance_get_all_by_host',
side_effect=lambda *x: [self.instance, stillborn_instance])) side_effect=lambda *x: self.instances + [stillborn_instance]))
def test_setup_polling_tasks(self): def test_setup_polling_tasks(self):
super(TestRunTasks, self).test_setup_polling_tasks() super(TestRunTasks, self).test_setup_polling_tasks()
self.assertTrue(self.Pollster.samples[0][1] is self.instance) self.assertEqual(self.Pollster.samples[0][1], self.instances)
def test_interval_exception_isolation(self): def test_interval_exception_isolation(self):
super(TestRunTasks, self).test_interval_exception_isolation() super(TestRunTasks, self).test_interval_exception_isolation()
@ -81,5 +81,14 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
with mock.patch.object(nova_client.Client, 'instance_get_all_by_host', with mock.patch.object(nova_client.Client, 'instance_get_all_by_host',
side_effect=lambda *x: self._raise_exception()): side_effect=lambda *x: self._raise_exception()):
mgr = manager.AgentManager() mgr = manager.AgentManager()
polling_task = manager.PollingTask(mgr) polling_task = agent.PollingTask(mgr)
polling_task.poll_and_publish() polling_task.poll_and_publish()
def self_local_instances_default_agent_discovery(self):
self.setup_pipeline()
self.assertEqual(self.mgr.default_discovery, ['local_instances'])
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
self._verify_discovery_params([None])
self.assertEqual(set(self.Pollster.resources),
set(self.instances))

View File

@ -64,6 +64,9 @@ ceilometer.notification =
http.response = ceilometer.middleware:HTTPResponse http.response = ceilometer.middleware:HTTPResponse
stack_crud = ceilometer.orchestration.notifications:StackCRUD stack_crud = ceilometer.orchestration.notifications:StackCRUD
ceilometer.discover =
local_instances = ceilometer.compute.discovery:InstanceDiscovery
ceilometer.poll.compute = ceilometer.poll.compute =
disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster
disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster