From b54e97e30025aa52d37b7663fda57a1387db106d Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Thu, 6 Feb 2014 17:49:28 +0000 Subject: [PATCH] Pluggable resource discovery for agents Precursor-to: BP decoupled-source-sink-discoverable-resources Add the concept of per-AgentManager resource discovery extensions loaded via stevedore entry points. The extensions are loaded from a single namespace: ceilometer.discover shared by all agents. An agent may have a list of per-agent default discovery extensions associated with it on creation. This is used by the compute agent, which currently uses a single discovery extension for all matching pipelines. The per-pipeline statically configured resources remain in place to potentially override the per-agent discovered resources - these will be augmented by per-pipeline discovery in a subsequent patch. Recast the compute agent's interrogation of the nova servers API in the form of a discovery extension. Unify the pollster abstraction used by the central and compute agents (which had previously diverged along with the introduction of per-pipeline resources). Unify the polling task logic used by the central and compute agents (so that the compute agent now iterates over each pollster for all local instances, as opposed to interating over all pollsters for each local instance). Change-Id: I625df47231a1dbf7ef721c297701799b224ce67e --- ceilometer/agent.py | 77 ++++++- ceilometer/central/manager.py | 36 +-- ceilometer/compute/discovery.py | 35 +++ ceilometer/compute/manager.py | 47 +--- ceilometer/compute/plugin.py | 4 +- ceilometer/compute/pollsters/cpu.py | 48 ++-- ceilometer/compute/pollsters/disk.py | 35 +-- ceilometer/compute/pollsters/instance.py | 38 ++-- ceilometer/compute/pollsters/net.py | 41 ++-- ceilometer/plugin.py | 9 + ceilometer/tests/agentbase.py | 205 +++++++++++++++--- ceilometer/tests/central/test_manager.py | 1 + .../tests/compute/pollsters/test_cpu.py | 4 +- .../tests/compute/pollsters/test_diskio.py | 2 +- .../tests/compute/pollsters/test_instance.py | 4 +- .../tests/compute/pollsters/test_net.py | 4 +- ceilometer/tests/compute/test_manager.py | 23 +- setup.cfg | 3 + 18 files changed, 402 insertions(+), 214 deletions(-) create mode 100644 ceilometer/compute/discovery.py diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 308d178fd..738b951c4 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -1,8 +1,10 @@ # -*- encoding: utf-8 -*- # # Copyright © 2013 Julien Danjou +# Copyright © 2014 Red Hat, Inc # -# Author: Julien Danjou +# Authors: Julien Danjou +# Eoghan Glynn # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,13 +18,14 @@ # License for the specific language governing permissions and limitations # under the License. -import abc import collections import itertools +import urlparse -import six +from stevedore import extension from ceilometer.openstack.common import context +from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline @@ -31,7 +34,6 @@ from ceilometer import transformer LOG = log.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) class PollingTask(object): """Polling task for polling samples and inject into pipeline. A polling task can be invoked periodically or only once. @@ -53,24 +55,49 @@ class PollingTask(object): self.resources[pollster.name].update(pipeline.resources) self.pollsters.update([pollster]) - @abc.abstractmethod def poll_and_publish(self): """Polling sample and publish into pipeline.""" + agent_resources = self.manager.discover() + with self.publish_context as publisher: + cache = {} + for pollster in self.pollsters: + LOG.info(_("Polling pollster %s"), pollster.name) + source_resources = list(self.resources[pollster.name]) + try: + samples = list(pollster.obj.get_samples( + self.manager, + cache, + resources=source_resources or agent_resources, + )) + publisher(samples) + except Exception as err: + LOG.warning(_( + 'Continue after error from %(name)s: %(error)s') + % ({'name': pollster.name, 'error': err}), + exc_info=True) -@six.add_metaclass(abc.ABCMeta) class AgentManager(os_service.Service): - def __init__(self, extension_manager): + def __init__(self, namespace, default_discovery=[]): super(AgentManager, self).__init__() - - self.pollster_manager = extension_manager - + self.default_discovery = default_discovery + self.pollster_manager = self._extensions('poll', namespace) + self.discovery_manager = self._extensions('discover') self.context = context.RequestContext('admin', 'admin', is_admin=True) - @abc.abstractmethod + @staticmethod + def _extensions(category, agent_ns=None): + namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns + else 'ceilometer.%s' % category) + return extension.ExtensionManager( + namespace=namespace, + invoke_on_load=True, + ) + def create_polling_task(self): - """Create an empty polling task.""" + """Create an initially empty polling task.""" + return PollingTask(self) def setup_polling_tasks(self): polling_tasks = {} @@ -101,3 +128,29 @@ class AgentManager(os_service.Service): @staticmethod def interval_task(task): task.poll_and_publish() + + @staticmethod + def _parse_discoverer(url): + s = urlparse.urlparse(url) + return (s.scheme or s.path), (s.netloc + s.path if s.scheme else None) + + def _discoverer(self, name): + for d in self.discovery_manager: + if d.name == name: + return d.obj + return None + + def discover(self, discovery=[]): + resources = [] + for url in (discovery or self.default_discovery): + name, param = self._parse_discoverer(url) + discoverer = self._discoverer(name) + if discoverer: + try: + discovered = discoverer.discover(param) + resources.extend(discovered) + except Exception as err: + LOG.exception(_('Unable to discover resources: %s') % err) + else: + LOG.warning(_('Unknown discovery extension: %s') % name) + return resources diff --git a/ceilometer/central/manager.py b/ceilometer/central/manager.py index 842851769..e2b1542ca 100644 --- a/ceilometer/central/manager.py +++ b/ceilometer/central/manager.py @@ -18,10 +18,8 @@ from keystoneclient.v2_0 import client as ksclient from oslo.config import cfg -from stevedore import extension from ceilometer import agent -from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import service @@ -31,42 +29,10 @@ cfg.CONF.import_group('service_credentials', 'ceilometer.service') LOG = log.getLogger(__name__) -class PollingTask(agent.PollingTask): - def poll_and_publish(self): - """Tasks to be run at a periodic interval.""" - with self.publish_context as publisher: - # TODO(yjiang5) passing samples into get_samples to avoid - # polling all counters one by one - cache = {} - for pollster in self.pollsters: - try: - LOG.info(_("Polling pollster %s"), pollster.name) - resources = list(self.resources[pollster.name]) - samples = list(pollster.obj.get_samples( - self.manager, - cache, - resources=resources, - )) - publisher(samples) - except Exception as err: - LOG.warning(_( - 'Continue after error from %(name)s: %(error)s') - % ({'name': pollster.name, 'error': err})) - LOG.exception(err) - - class AgentManager(agent.AgentManager): def __init__(self): - super(AgentManager, self).__init__( - extension.ExtensionManager( - namespace='ceilometer.poll.central', - invoke_on_load=True, - ) - ) - - def create_polling_task(self): - return PollingTask(self) + super(AgentManager, self).__init__('central') def interval_task(self, task): self.keystone = ksclient.Client( diff --git a/ceilometer/compute/discovery.py b/ceilometer/compute/discovery.py new file mode 100644 index 000000000..393868ae0 --- /dev/null +++ b/ceilometer/compute/discovery.py @@ -0,0 +1,35 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2014 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + +from ceilometer import nova_client +from ceilometer import plugin + + +class InstanceDiscovery(plugin.DiscoveryBase): + def __init__(self): + super(InstanceDiscovery, self).__init__() + self.nova_cli = nova_client.Client() + + def discover(self, param=None): + """Discover resources to monitor. + """ + instances = self.nova_cli.instance_get_all_by_host(cfg.CONF.host) + return [i for i in instances + if getattr(i, 'OS-EXT-STS:vm_state', None) != 'error'] diff --git a/ceilometer/compute/manager.py b/ceilometer/compute/manager.py index 4cdcbdb29..5d3019386 100644 --- a/ceilometer/compute/manager.py +++ b/ceilometer/compute/manager.py @@ -16,13 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo.config import cfg -from stevedore import extension - from ceilometer import agent from ceilometer.compute.virt import inspector as virt_inspector -from ceilometer import nova_client -from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import service @@ -30,51 +25,11 @@ from ceilometer import service LOG = log.getLogger(__name__) -class PollingTask(agent.PollingTask): - def poll_and_publish_instances(self, instances): - with self.publish_context as publisher: - for instance in instances: - if getattr(instance, 'OS-EXT-STS:vm_state', None) == 'error': - continue - cache = {} - for pollster in self.pollsters: - try: - LOG.info(_("Polling pollster %s"), pollster.name) - samples = list(pollster.obj.get_samples( - self.manager, - cache, - instance, - )) - publisher(samples) - except Exception as err: - LOG.warning(_( - 'Continue after error from %(name)s: %(error)s') - % ({'name': pollster.name, 'error': err})) - LOG.exception(err) - - def poll_and_publish(self): - try: - instances = self.manager.nv.instance_get_all_by_host(cfg.CONF.host) - except Exception as err: - LOG.exception(_('Unable to retrieve instances: %s') % err) - else: - self.poll_and_publish_instances(instances) - - class AgentManager(agent.AgentManager): def __init__(self): - super(AgentManager, self).__init__( - extension.ExtensionManager( - namespace='ceilometer.poll.compute', - invoke_on_load=True, - ), - ) + super(AgentManager, self).__init__('compute', ['local_instances']) self._inspector = virt_inspector.get_hypervisor_inspector() - self.nv = nova_client.Client() - - def create_polling_task(self): - return PollingTask(self) @property def inspector(self): diff --git a/ceilometer/compute/plugin.py b/ceilometer/compute/plugin.py index 048ff2b0a..e1ee81ce0 100644 --- a/ceilometer/compute/plugin.py +++ b/ceilometer/compute/plugin.py @@ -30,10 +30,10 @@ class ComputePollster(plugin.PollsterBase): """ @abc.abstractmethod - def get_samples(self, manager, cache, instance): + def get_samples(self, manager, cache, resources): """Return a sequence of Counter instances from polling the resources. :param manager: The service manager invoking the plugin :param cache: A dictionary for passing data between plugins - :param instance: The instance to examine + :param resources: The resources to examine (expected to be instances) """ diff --git a/ceilometer/compute/pollsters/cpu.py b/ceilometer/compute/pollsters/cpu.py index 25cc4e139..1cb64b43f 100644 --- a/ceilometer/compute/pollsters/cpu.py +++ b/ceilometer/compute/pollsters/cpu.py @@ -30,26 +30,28 @@ LOG = log.getLogger(__name__) class CPUPollster(plugin.ComputePollster): - def get_samples(self, manager, cache, instance): - LOG.info(_('checking instance %s'), instance.id) - instance_name = util.instance_name(instance) - try: - cpu_info = manager.inspector.inspect_cpus(instance_name) - LOG.info(_("CPUTIME USAGE: %(instance)s %(time)d") % ( - {'instance': instance.__dict__, 'time': cpu_info.time})) - cpu_num = {'cpu_number': cpu_info.number} - yield util.make_sample_from_instance( - instance, - name='cpu', - type=sample.TYPE_CUMULATIVE, - unit='ns', - volume=cpu_info.time, - additional_metadata=cpu_num, - ) - except virt_inspector.InstanceNotFoundException as err: - # Instance was deleted while getting samples. Ignore it. - LOG.debug(_('Exception while getting samples %s'), err) - except Exception as err: - LOG.error(_('could not get CPU time for %(id)s: %(e)s') % ( - {'id': instance.id, 'e': err})) - LOG.exception(err) + def get_samples(self, manager, cache, resources): + for instance in resources: + LOG.info(_('checking instance %s'), instance.id) + instance_name = util.instance_name(instance) + try: + cpu_info = manager.inspector.inspect_cpus(instance_name) + LOG.info(_("CPUTIME USAGE: %(instance)s %(time)d") % ( + {'instance': instance.__dict__, + 'time': cpu_info.time})) + cpu_num = {'cpu_number': cpu_info.number} + yield util.make_sample_from_instance( + instance, + name='cpu', + type=sample.TYPE_CUMULATIVE, + unit='ns', + volume=cpu_info.time, + additional_metadata=cpu_num, + ) + except virt_inspector.InstanceNotFoundException as err: + # Instance was deleted while getting samples. Ignore it. + LOG.debug(_('Exception while getting samples %s'), err) + except Exception as err: + LOG.error(_('could not get CPU time for %(id)s: %(e)s') % ( + {'id': instance.id, 'e': err})) + LOG.exception(err) diff --git a/ceilometer/compute/pollsters/disk.py b/ceilometer/compute/pollsters/disk.py index cd1d4cb2a..67e8f3269 100644 --- a/ceilometer/compute/pollsters/disk.py +++ b/ceilometer/compute/pollsters/disk.py @@ -81,23 +81,24 @@ class _Base(plugin.ComputePollster): def _get_sample(instance, c_data): """Return one Sample.""" - def get_samples(self, manager, cache, instance): - instance_name = util.instance_name(instance) - try: - c_data = self._populate_cache( - manager.inspector, - cache, - instance, - instance_name, - ) - yield self._get_sample(instance, c_data) - except virt_inspector.InstanceNotFoundException as err: - # Instance was deleted while getting samples. Ignore it. - LOG.debug(_('Exception while getting samples %s'), err) - except Exception as err: - LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( - {'name': instance_name, 'error': err})) - LOG.exception(err) + def get_samples(self, manager, cache, resources): + for instance in resources: + instance_name = util.instance_name(instance) + try: + c_data = self._populate_cache( + manager.inspector, + cache, + instance, + instance_name, + ) + yield self._get_sample(instance, c_data) + except virt_inspector.InstanceNotFoundException as err: + # Instance was deleted while getting samples. Ignore it. + LOG.debug(_('Exception while getting samples %s'), err) + except Exception as err: + LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( + {'name': instance_name, 'error': err})) + LOG.exception(err) class ReadRequestsPollster(_Base): diff --git a/ceilometer/compute/pollsters/instance.py b/ceilometer/compute/pollsters/instance.py index 2d79b1822..6cd3906ef 100644 --- a/ceilometer/compute/pollsters/instance.py +++ b/ceilometer/compute/pollsters/instance.py @@ -26,26 +26,28 @@ from ceilometer import sample class InstancePollster(plugin.ComputePollster): @staticmethod - def get_samples(manager, cache, instance): - yield util.make_sample_from_instance( - instance, - name='instance', - type=sample.TYPE_GAUGE, - unit='instance', - volume=1, - ) + def get_samples(manager, cache, resources): + for instance in resources: + yield util.make_sample_from_instance( + instance, + name='instance', + type=sample.TYPE_GAUGE, + unit='instance', + volume=1, + ) class InstanceFlavorPollster(plugin.ComputePollster): @staticmethod - def get_samples(manager, cache, instance): - yield util.make_sample_from_instance( - instance, - # Use the "meter name + variable" syntax - name='instance:%s' % - instance.flavor['name'], - type=sample.TYPE_GAUGE, - unit='instance', - volume=1, - ) + def get_samples(manager, cache, resources): + for instance in resources: + yield util.make_sample_from_instance( + instance, + # Use the "meter name + variable" syntax + name='instance:%s' % + instance.flavor['name'], + type=sample.TYPE_GAUGE, + unit='instance', + volume=1, + ) diff --git a/ceilometer/compute/pollsters/net.py b/ceilometer/compute/pollsters/net.py index 052c2b4a1..96cb99b1c 100644 --- a/ceilometer/compute/pollsters/net.py +++ b/ceilometer/compute/pollsters/net.py @@ -72,26 +72,27 @@ class _Base(plugin.ComputePollster): ) return i_cache[instance_name] - def get_samples(self, manager, cache, instance): - instance_name = util.instance_name(instance) - LOG.info(_('checking instance %s'), instance.id) - try: - vnics = self._get_vnics_for_instance( - cache, - manager.inspector, - instance_name, - ) - for vnic, info in vnics: - LOG.info(self.NET_USAGE_MESSAGE, instance_name, - vnic.name, info.rx_bytes, info.tx_bytes) - yield self._get_sample(instance, vnic, info) - except virt_inspector.InstanceNotFoundException as err: - # Instance was deleted while getting samples. Ignore it. - LOG.debug(_('Exception while getting samples %s'), err) - except Exception as err: - LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( - {'name': instance_name, 'error': err})) - LOG.exception(err) + def get_samples(self, manager, cache, resources): + for instance in resources: + instance_name = util.instance_name(instance) + LOG.info(_('checking instance %s'), instance.id) + try: + vnics = self._get_vnics_for_instance( + cache, + manager.inspector, + instance_name, + ) + for vnic, info in vnics: + LOG.info(self.NET_USAGE_MESSAGE, instance_name, + vnic.name, info.rx_bytes, info.tx_bytes) + yield self._get_sample(instance, vnic, info) + except virt_inspector.InstanceNotFoundException as err: + # Instance was deleted while getting samples. Ignore it. + LOG.debug(_('Exception while getting samples %s'), err) + except Exception as err: + LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( + {'name': instance_name, 'error': err})) + LOG.exception(err) class IncomingBytesPollster(_Base): diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index b6104bc4c..d553fef9a 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -104,3 +104,12 @@ class PollsterBase(PluginBase): how to use it. """ + + +@six.add_metaclass(abc.ABCMeta) +class DiscoveryBase(object): + @abc.abstractmethod + def discover(self, param=None): + """Discover resources to monitor. + :param param: an optional parameter to guide the discovery + """ diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index 2aa769a47..2b2a21214 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -3,9 +3,11 @@ # Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2013 Intel corp. # Copyright © 2013 eNovance +# Copyright © 2014 Red Hat, Inc # -# Author: Yunhong Jiang -# Julien Danjou +# Authors: Yunhong Jiang +# Julien Danjou +# Eoghan Glynn # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -20,6 +22,7 @@ # under the License. import abc +import copy import datetime import mock @@ -27,14 +30,33 @@ import six from stevedore import extension from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common.fixture import mockpatch from ceilometer import pipeline from ceilometer import plugin +from ceilometer import publisher +from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer.tests import base from ceilometer import transformer -default_test_data = sample.Sample( +class TestSample(sample.Sample): + def __init__(self, name, type, unit, volume, user_id, project_id, + resource_id, timestamp, resource_metadata, source=None): + super(TestSample, self).__init__(name, type, unit, volume, user_id, + project_id, resource_id, timestamp, + resource_metadata, source) + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.__dict__ == other.__dict__ + return False + + def __ne__(self, other): + return not self.__eq__(other) + + +default_test_data = TestSample( name='test', type=sample.TYPE_CUMULATIVE, unit='', @@ -50,23 +72,33 @@ default_test_data = sample.Sample( class TestPollster(plugin.PollsterBase): test_data = default_test_data - def get_samples(self, manager, cache, instance=None, resources=[]): - self.samples.append((manager, instance)) + def get_samples(self, manager, cache, resources=[]): + self.samples.append((manager, resources)) self.resources.extend(resources) - return [self.test_data] + c = copy.copy(self.test_data) + c.resource_metadata['resources'] = resources + return [c] class TestPollsterException(TestPollster): - def get_samples(self, manager, cache, instance=None, resources=[]): - # Put an instance parameter here so that it can be used - # by both central manager and compute manager - # In future, we possibly don't need such hack if we - # combine the get_samples() function again - self.samples.append((manager, instance)) + def get_samples(self, manager, cache, resources=[]): + self.samples.append((manager, resources)) self.resources.extend(resources) raise Exception() +class TestDiscovery(plugin.DiscoveryBase): + def discover(self, param=None): + self.params.append(param) + return self.resources + + +class TestDiscoveryException(plugin.DiscoveryBase): + def discover(self, param=None): + self.params.append(param) + raise Exception() + + @six.add_metaclass(abc.ABCMeta) class BaseAgentManagerTestCase(base.BaseTestCase): @@ -78,7 +110,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class PollsterAnother(TestPollster): samples = [] resources = [] - test_data = sample.Sample( + test_data = TestSample( name='testanother', type=default_test_data.type, unit=default_test_data.unit, @@ -92,7 +124,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class PollsterException(TestPollsterException): samples = [] resources = [] - test_data = sample.Sample( + test_data = TestSample( name='testexception', type=default_test_data.type, unit=default_test_data.unit, @@ -106,7 +138,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class PollsterExceptionAnother(TestPollsterException): samples = [] resources = [] - test_data = sample.Sample( + test_data = TestSample( name='testexceptionanother', type=default_test_data.type, unit=default_test_data.unit, @@ -117,6 +149,17 @@ class BaseAgentManagerTestCase(base.BaseTestCase): timestamp=default_test_data.timestamp, resource_metadata=default_test_data.resource_metadata) + class Discovery(TestDiscovery): + params = [] + resources = [] + + class DiscoveryAnother(TestDiscovery): + params = [] + resources = [] + + class DiscoveryException(TestDiscoveryException): + params = [] + def setup_pipeline(self): self.transformer_manager = transformer.TransformerExtensionManager( 'ceilometer.transformer', @@ -125,7 +168,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.pipeline_cfg, self.transformer_manager) - def create_extension_manager(self): + def create_pollster_manager(self): return extension.ExtensionManager.make_test_instance( [ extension.Extension( @@ -151,6 +194,27 @@ class BaseAgentManagerTestCase(base.BaseTestCase): ], ) + def create_discovery_manager(self): + return extension.ExtensionManager.make_test_instance( + [ + extension.Extension( + 'testdiscovery', + None, + None, + self.Discovery(), ), + extension.Extension( + 'testdiscoveryanother', + None, + None, + self.DiscoveryAnother(), ), + extension.Extension( + 'testdiscoveryexception', + None, + None, + self.DiscoveryException(), ), + ], + ) + @abc.abstractmethod def create_manager(self): """Return subclass specific manager.""" @@ -159,12 +223,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def setUp(self): super(BaseAgentManagerTestCase, self).setUp() self.mgr = self.create_manager() - self.mgr.pollster_manager = self.create_extension_manager() + self.mgr.pollster_manager = self.create_pollster_manager() self.pipeline_cfg = [{ 'name': "test_pipeline", 'interval': 60, 'counters': ['test'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }, ] @@ -174,6 +238,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'pipeline_cfg_file', self.path_get('etc/ceilometer/pipeline.yaml') ) + self.useFixture(mockpatch.PatchObject( + publisher, 'get_publisher', side_effect=self.get_publisher)) + + def get_publisher(self, url, namespace=''): + fake_drivers = {'test://': test_publisher.TestPublisher, + 'new://': test_publisher.TestPublisher, + 'rpc://': test_publisher.TestPublisher} + return fake_drivers[url](url) def tearDown(self): self.Pollster.samples = [] @@ -184,16 +256,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.PollsterAnother.resources = [] self.PollsterException.resources = [] self.PollsterExceptionAnother.resources = [] + self.Discovery.params = [] + self.DiscoveryAnother.params = [] + self.DiscoveryException.params = [] + self.Discovery.resources = [] + self.DiscoveryAnother.resources = [] super(BaseAgentManagerTestCase, self).tearDown() def test_setup_polling_tasks(self): polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(len(polling_tasks), 1) self.assertTrue(60 in polling_tasks.keys()) - self.assertEqual(len(polling_tasks[60].resources), 1) - self.assertEqual(len(polling_tasks[60].resources['test']), 1) + per_task_resources = polling_tasks[60].resources + self.assertEqual(len(per_task_resources), 1) + self.assertEqual(per_task_resources['test'], + set(self.pipeline_cfg[0]['resources'])) self.mgr.interval_task(polling_tasks.values()[0]) pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + del pub.samples[0].resource_metadata['resources'] self.assertEqual(pub.samples[0], self.Pollster.test_data) def test_setup_polling_tasks_multiple_interval(self): @@ -201,7 +281,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline", 'interval': 10, 'counters': ['test'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }) @@ -230,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline", 'interval': 60, 'counters': ['testanother'], - 'resources': ['testanother://'], + 'resources': ['testanother://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }) @@ -239,9 +319,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(len(polling_tasks), 1) pollsters = polling_tasks.get(60).pollsters self.assertEqual(len(pollsters), 2) - self.assertEqual(len(polling_tasks[60].resources), 2) - self.assertEqual(len(polling_tasks[60].resources['test']), 1) - self.assertEqual(len(polling_tasks[60].resources['testanother']), 1) + per_task_resources = polling_tasks[60].resources + self.assertEqual(len(per_task_resources), 2) + self.assertEqual(per_task_resources['test'], + set(self.pipeline_cfg[0]['resources'])) + self.assertEqual(per_task_resources['testanother'], + set(self.pipeline_cfg[1]['resources'])) def test_interval_exception_isolation(self): self.pipeline_cfg = [ @@ -249,7 +332,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline_1", 'interval': 10, 'counters': ['testexceptionanother'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }, @@ -257,7 +340,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline_2", 'interval': 10, 'counters': ['testexception'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }, @@ -290,3 +373,71 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'publishers': ["test"], }) self.setup_pipeline() + + def _verify_discovery_params(self, expected): + self.assertEqual(self.Discovery.params, expected) + self.assertEqual(self.DiscoveryAnother.params, expected) + self.assertEqual(self.DiscoveryException.params, expected) + + def _do_test_per_agent_discovery(self, + discovered_resources, + static_resources): + self.mgr.discovery_manager = self.create_discovery_manager() + if discovered_resources: + self.mgr.default_discovery = [d.name + for d in self.mgr.discovery_manager] + self.Discovery.resources = discovered_resources + self.DiscoveryAnother.resources = [d[::-1] + for d in discovered_resources] + self.pipeline_cfg[0]['resources'] = static_resources + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + self._verify_discovery_params([None] if discovered_resources else []) + discovery = self.Discovery.resources + self.DiscoveryAnother.resources + # compare resource lists modulo ordering + self.assertEqual(set(self.Pollster.resources), + set(static_resources or discovery)) + + def test_per_agent_discovery_discovered_only(self): + self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'], + []) + + def test_per_agent_discovery_static_only(self): + self._do_test_per_agent_discovery([], + ['static_1', 'static_2']) + + def test_per_agent_discovery_discovered_overridden_by_static(self): + self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'], + ['static_1', 'static_2']) + + def test_multiple_pipelines_different_static_resources(self): + # assert that the amalgation of all static resources for a set + # of pipelines with a common interval is passed to individual + # pollsters matching those pipelines + self.pipeline_cfg[0]['resources'] = ['test://'] + self.pipeline_cfg.append({ + 'name': "another_pipeline", + 'interval': 60, + 'counters': ['test'], + 'resources': ['another://'], + 'transformers': [], + 'publishers': ["new"], + }) + self.mgr.discovery_manager = self.create_discovery_manager() + self.Discovery.resources = [] + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks), 1) + self.assertTrue(60 in polling_tasks.keys()) + self.mgr.interval_task(polling_tasks.get(60)) + self._verify_discovery_params([]) + self.assertEqual(len(self.Pollster.samples), 1) + amalgamated_resources = set(['test://', 'another://']) + self.assertEqual(set(self.Pollster.samples[0][1]), + amalgamated_resources) + for pipeline in self.mgr.pipeline_manager.pipelines: + self.assertEqual(len(pipeline.publishers[0].samples), 1) + published = pipeline.publishers[0].samples[0] + self.assertEqual(set(published.resource_metadata['resources']), + amalgamated_resources) diff --git a/ceilometer/tests/central/test_manager.py b/ceilometer/tests/central/test_manager.py index 6db3606bd..0a85dc9e8 100644 --- a/ceilometer/tests/central/test_manager.py +++ b/ceilometer/tests/central/test_manager.py @@ -40,6 +40,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): return manager.AgentManager() def setUp(self): + self.source_resources = True super(TestRunTasks, self).setUp() self.useFixture(mockpatch.Patch( 'keystoneclient.v2_0.client.Client', diff --git a/ceilometer/tests/compute/pollsters/test_cpu.py b/ceilometer/tests/compute/pollsters/test_cpu.py index 31d976450..15aa65139 100644 --- a/ceilometer/tests/compute/pollsters/test_cpu.py +++ b/ceilometer/tests/compute/pollsters/test_cpu.py @@ -53,7 +53,7 @@ class TestCPUPollster(base.TestPollsterBase): def _verify_cpu_metering(expected_time): cache = {} - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(set([s.name for s in samples]), set(['cpu'])) @@ -75,7 +75,7 @@ class TestCPUPollster(base.TestPollsterBase): pollster = cpu.CPUPollster() cache = {} - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(samples[0].volume, 10 ** 6) self.assertEqual(len(cache), 0) diff --git a/ceilometer/tests/compute/pollsters/test_diskio.py b/ceilometer/tests/compute/pollsters/test_diskio.py index 7c31ebe40..3dbb60376 100644 --- a/ceilometer/tests/compute/pollsters/test_diskio.py +++ b/ceilometer/tests/compute/pollsters/test_diskio.py @@ -45,7 +45,7 @@ class TestDiskPollsters(base.TestPollsterBase): mgr = manager.AgentManager() cache = {} - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) assert samples self.assertIn(pollster.CACHE_KEY_DISK, cache) self.assertIn(self.instance.name, cache[pollster.CACHE_KEY_DISK]) diff --git a/ceilometer/tests/compute/pollsters/test_instance.py b/ceilometer/tests/compute/pollsters/test_instance.py index 9d56c6df2..5d251a99c 100644 --- a/ceilometer/tests/compute/pollsters/test_instance.py +++ b/ceilometer/tests/compute/pollsters/test_instance.py @@ -34,7 +34,7 @@ class TestInstancePollster(base.TestPollsterBase): def test_get_samples_instance(self): mgr = manager.AgentManager() pollster = pollsters_instance.InstancePollster() - samples = list(pollster.get_samples(mgr, {}, self.instance)) + samples = list(pollster.get_samples(mgr, {}, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(samples[0].name, 'instance') self.assertEqual(samples[0].resource_metadata['vcpus'], 1) @@ -47,6 +47,6 @@ class TestInstancePollster(base.TestPollsterBase): def test_get_samples_instance_flavor(self): mgr = manager.AgentManager() pollster = pollsters_instance.InstanceFlavorPollster() - samples = list(pollster.get_samples(mgr, {}, self.instance)) + samples = list(pollster.get_samples(mgr, {}, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(samples[0].name, 'instance:m1.small') diff --git a/ceilometer/tests/compute/pollsters/test_net.py b/ceilometer/tests/compute/pollsters/test_net.py index eb817220f..cd9422b7a 100644 --- a/ceilometer/tests/compute/pollsters/test_net.py +++ b/ceilometer/tests/compute/pollsters/test_net.py @@ -72,7 +72,7 @@ class TestNetPollster(base.TestPollsterBase): def _check_get_samples(self, factory, expected): mgr = manager.AgentManager() pollster = factory() - samples = list(pollster.get_samples(mgr, {}, self.instance)) + samples = list(pollster.get_samples(mgr, {}, [self.instance])) self.assertEqual(len(samples), 3) # one for each nic self.assertEqual(set([s.name for s in samples]), set([samples[0].name])) @@ -157,7 +157,7 @@ class TestNetPollsterCache(base.TestPollsterBase): self.instance.name: vnics, }, } - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) self.assertEqual(len(samples), 1) def test_incoming_bytes(self): diff --git a/ceilometer/tests/compute/test_manager.py b/ceilometer/tests/compute/test_manager.py index de3466ac3..575be3d16 100644 --- a/ceilometer/tests/compute/test_manager.py +++ b/ceilometer/tests/compute/test_manager.py @@ -19,6 +19,7 @@ """ import mock +from ceilometer import agent from ceilometer.compute import manager from ceilometer import nova_client from ceilometer.openstack.common.fixture import mockpatch @@ -51,25 +52,24 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): + self.source_resources = False super(TestRunTasks, self).setUp() # Set up a fake instance value to be returned by # instance_get_all_by_host() so when the manager gets the list # of instances to poll we can control the results. - self.instance = self._fake_instance('faux', 'active') + self.instances = [self._fake_instance('doing', 'active'), + self._fake_instance('resting', 'paused')] stillborn_instance = self._fake_instance('stillborn', 'error') - def instance_get_all_by_host(*args): - return [self.instance, stillborn_instance] - self.useFixture(mockpatch.PatchObject( nova_client.Client, 'instance_get_all_by_host', - side_effect=lambda *x: [self.instance, stillborn_instance])) + side_effect=lambda *x: self.instances + [stillborn_instance])) def test_setup_polling_tasks(self): super(TestRunTasks, self).test_setup_polling_tasks() - self.assertTrue(self.Pollster.samples[0][1] is self.instance) + self.assertEqual(self.Pollster.samples[0][1], self.instances) def test_interval_exception_isolation(self): super(TestRunTasks, self).test_interval_exception_isolation() @@ -81,5 +81,14 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): with mock.patch.object(nova_client.Client, 'instance_get_all_by_host', side_effect=lambda *x: self._raise_exception()): mgr = manager.AgentManager() - polling_task = manager.PollingTask(mgr) + polling_task = agent.PollingTask(mgr) polling_task.poll_and_publish() + + def self_local_instances_default_agent_discovery(self): + self.setup_pipeline() + self.assertEqual(self.mgr.default_discovery, ['local_instances']) + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + self._verify_discovery_params([None]) + self.assertEqual(set(self.Pollster.resources), + set(self.instances)) diff --git a/setup.cfg b/setup.cfg index 8e0cacfe0..aa71b6e36 100644 --- a/setup.cfg +++ b/setup.cfg @@ -64,6 +64,9 @@ ceilometer.notification = http.response = ceilometer.middleware:HTTPResponse stack_crud = ceilometer.orchestration.notifications:StackCRUD +ceilometer.discover = + local_instances = ceilometer.compute.discovery:InstanceDiscovery + ceilometer.poll.compute = disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster