diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 308d178fd..738b951c4 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -1,8 +1,10 @@ # -*- encoding: utf-8 -*- # # Copyright © 2013 Julien Danjou +# Copyright © 2014 Red Hat, Inc # -# Author: Julien Danjou +# Authors: Julien Danjou +# Eoghan Glynn # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -16,13 +18,14 @@ # License for the specific language governing permissions and limitations # under the License. -import abc import collections import itertools +import urlparse -import six +from stevedore import extension from ceilometer.openstack.common import context +from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline @@ -31,7 +34,6 @@ from ceilometer import transformer LOG = log.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) class PollingTask(object): """Polling task for polling samples and inject into pipeline. A polling task can be invoked periodically or only once. @@ -53,24 +55,49 @@ class PollingTask(object): self.resources[pollster.name].update(pipeline.resources) self.pollsters.update([pollster]) - @abc.abstractmethod def poll_and_publish(self): """Polling sample and publish into pipeline.""" + agent_resources = self.manager.discover() + with self.publish_context as publisher: + cache = {} + for pollster in self.pollsters: + LOG.info(_("Polling pollster %s"), pollster.name) + source_resources = list(self.resources[pollster.name]) + try: + samples = list(pollster.obj.get_samples( + self.manager, + cache, + resources=source_resources or agent_resources, + )) + publisher(samples) + except Exception as err: + LOG.warning(_( + 'Continue after error from %(name)s: %(error)s') + % ({'name': pollster.name, 'error': err}), + exc_info=True) -@six.add_metaclass(abc.ABCMeta) class AgentManager(os_service.Service): - def __init__(self, extension_manager): + def __init__(self, namespace, default_discovery=[]): super(AgentManager, self).__init__() - - self.pollster_manager = extension_manager - + self.default_discovery = default_discovery + self.pollster_manager = self._extensions('poll', namespace) + self.discovery_manager = self._extensions('discover') self.context = context.RequestContext('admin', 'admin', is_admin=True) - @abc.abstractmethod + @staticmethod + def _extensions(category, agent_ns=None): + namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns + else 'ceilometer.%s' % category) + return extension.ExtensionManager( + namespace=namespace, + invoke_on_load=True, + ) + def create_polling_task(self): - """Create an empty polling task.""" + """Create an initially empty polling task.""" + return PollingTask(self) def setup_polling_tasks(self): polling_tasks = {} @@ -101,3 +128,29 @@ class AgentManager(os_service.Service): @staticmethod def interval_task(task): task.poll_and_publish() + + @staticmethod + def _parse_discoverer(url): + s = urlparse.urlparse(url) + return (s.scheme or s.path), (s.netloc + s.path if s.scheme else None) + + def _discoverer(self, name): + for d in self.discovery_manager: + if d.name == name: + return d.obj + return None + + def discover(self, discovery=[]): + resources = [] + for url in (discovery or self.default_discovery): + name, param = self._parse_discoverer(url) + discoverer = self._discoverer(name) + if discoverer: + try: + discovered = discoverer.discover(param) + resources.extend(discovered) + except Exception as err: + LOG.exception(_('Unable to discover resources: %s') % err) + else: + LOG.warning(_('Unknown discovery extension: %s') % name) + return resources diff --git a/ceilometer/central/manager.py b/ceilometer/central/manager.py index 842851769..e2b1542ca 100644 --- a/ceilometer/central/manager.py +++ b/ceilometer/central/manager.py @@ -18,10 +18,8 @@ from keystoneclient.v2_0 import client as ksclient from oslo.config import cfg -from stevedore import extension from ceilometer import agent -from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import service @@ -31,42 +29,10 @@ cfg.CONF.import_group('service_credentials', 'ceilometer.service') LOG = log.getLogger(__name__) -class PollingTask(agent.PollingTask): - def poll_and_publish(self): - """Tasks to be run at a periodic interval.""" - with self.publish_context as publisher: - # TODO(yjiang5) passing samples into get_samples to avoid - # polling all counters one by one - cache = {} - for pollster in self.pollsters: - try: - LOG.info(_("Polling pollster %s"), pollster.name) - resources = list(self.resources[pollster.name]) - samples = list(pollster.obj.get_samples( - self.manager, - cache, - resources=resources, - )) - publisher(samples) - except Exception as err: - LOG.warning(_( - 'Continue after error from %(name)s: %(error)s') - % ({'name': pollster.name, 'error': err})) - LOG.exception(err) - - class AgentManager(agent.AgentManager): def __init__(self): - super(AgentManager, self).__init__( - extension.ExtensionManager( - namespace='ceilometer.poll.central', - invoke_on_load=True, - ) - ) - - def create_polling_task(self): - return PollingTask(self) + super(AgentManager, self).__init__('central') def interval_task(self, task): self.keystone = ksclient.Client( diff --git a/ceilometer/compute/discovery.py b/ceilometer/compute/discovery.py new file mode 100644 index 000000000..393868ae0 --- /dev/null +++ b/ceilometer/compute/discovery.py @@ -0,0 +1,35 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2014 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + +from ceilometer import nova_client +from ceilometer import plugin + + +class InstanceDiscovery(plugin.DiscoveryBase): + def __init__(self): + super(InstanceDiscovery, self).__init__() + self.nova_cli = nova_client.Client() + + def discover(self, param=None): + """Discover resources to monitor. + """ + instances = self.nova_cli.instance_get_all_by_host(cfg.CONF.host) + return [i for i in instances + if getattr(i, 'OS-EXT-STS:vm_state', None) != 'error'] diff --git a/ceilometer/compute/manager.py b/ceilometer/compute/manager.py index 4cdcbdb29..5d3019386 100644 --- a/ceilometer/compute/manager.py +++ b/ceilometer/compute/manager.py @@ -16,13 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo.config import cfg -from stevedore import extension - from ceilometer import agent from ceilometer.compute.virt import inspector as virt_inspector -from ceilometer import nova_client -from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import service @@ -30,51 +25,11 @@ from ceilometer import service LOG = log.getLogger(__name__) -class PollingTask(agent.PollingTask): - def poll_and_publish_instances(self, instances): - with self.publish_context as publisher: - for instance in instances: - if getattr(instance, 'OS-EXT-STS:vm_state', None) == 'error': - continue - cache = {} - for pollster in self.pollsters: - try: - LOG.info(_("Polling pollster %s"), pollster.name) - samples = list(pollster.obj.get_samples( - self.manager, - cache, - instance, - )) - publisher(samples) - except Exception as err: - LOG.warning(_( - 'Continue after error from %(name)s: %(error)s') - % ({'name': pollster.name, 'error': err})) - LOG.exception(err) - - def poll_and_publish(self): - try: - instances = self.manager.nv.instance_get_all_by_host(cfg.CONF.host) - except Exception as err: - LOG.exception(_('Unable to retrieve instances: %s') % err) - else: - self.poll_and_publish_instances(instances) - - class AgentManager(agent.AgentManager): def __init__(self): - super(AgentManager, self).__init__( - extension.ExtensionManager( - namespace='ceilometer.poll.compute', - invoke_on_load=True, - ), - ) + super(AgentManager, self).__init__('compute', ['local_instances']) self._inspector = virt_inspector.get_hypervisor_inspector() - self.nv = nova_client.Client() - - def create_polling_task(self): - return PollingTask(self) @property def inspector(self): diff --git a/ceilometer/compute/plugin.py b/ceilometer/compute/plugin.py index 048ff2b0a..e1ee81ce0 100644 --- a/ceilometer/compute/plugin.py +++ b/ceilometer/compute/plugin.py @@ -30,10 +30,10 @@ class ComputePollster(plugin.PollsterBase): """ @abc.abstractmethod - def get_samples(self, manager, cache, instance): + def get_samples(self, manager, cache, resources): """Return a sequence of Counter instances from polling the resources. :param manager: The service manager invoking the plugin :param cache: A dictionary for passing data between plugins - :param instance: The instance to examine + :param resources: The resources to examine (expected to be instances) """ diff --git a/ceilometer/compute/pollsters/cpu.py b/ceilometer/compute/pollsters/cpu.py index 25cc4e139..1cb64b43f 100644 --- a/ceilometer/compute/pollsters/cpu.py +++ b/ceilometer/compute/pollsters/cpu.py @@ -30,26 +30,28 @@ LOG = log.getLogger(__name__) class CPUPollster(plugin.ComputePollster): - def get_samples(self, manager, cache, instance): - LOG.info(_('checking instance %s'), instance.id) - instance_name = util.instance_name(instance) - try: - cpu_info = manager.inspector.inspect_cpus(instance_name) - LOG.info(_("CPUTIME USAGE: %(instance)s %(time)d") % ( - {'instance': instance.__dict__, 'time': cpu_info.time})) - cpu_num = {'cpu_number': cpu_info.number} - yield util.make_sample_from_instance( - instance, - name='cpu', - type=sample.TYPE_CUMULATIVE, - unit='ns', - volume=cpu_info.time, - additional_metadata=cpu_num, - ) - except virt_inspector.InstanceNotFoundException as err: - # Instance was deleted while getting samples. Ignore it. - LOG.debug(_('Exception while getting samples %s'), err) - except Exception as err: - LOG.error(_('could not get CPU time for %(id)s: %(e)s') % ( - {'id': instance.id, 'e': err})) - LOG.exception(err) + def get_samples(self, manager, cache, resources): + for instance in resources: + LOG.info(_('checking instance %s'), instance.id) + instance_name = util.instance_name(instance) + try: + cpu_info = manager.inspector.inspect_cpus(instance_name) + LOG.info(_("CPUTIME USAGE: %(instance)s %(time)d") % ( + {'instance': instance.__dict__, + 'time': cpu_info.time})) + cpu_num = {'cpu_number': cpu_info.number} + yield util.make_sample_from_instance( + instance, + name='cpu', + type=sample.TYPE_CUMULATIVE, + unit='ns', + volume=cpu_info.time, + additional_metadata=cpu_num, + ) + except virt_inspector.InstanceNotFoundException as err: + # Instance was deleted while getting samples. Ignore it. + LOG.debug(_('Exception while getting samples %s'), err) + except Exception as err: + LOG.error(_('could not get CPU time for %(id)s: %(e)s') % ( + {'id': instance.id, 'e': err})) + LOG.exception(err) diff --git a/ceilometer/compute/pollsters/disk.py b/ceilometer/compute/pollsters/disk.py index cd1d4cb2a..67e8f3269 100644 --- a/ceilometer/compute/pollsters/disk.py +++ b/ceilometer/compute/pollsters/disk.py @@ -81,23 +81,24 @@ class _Base(plugin.ComputePollster): def _get_sample(instance, c_data): """Return one Sample.""" - def get_samples(self, manager, cache, instance): - instance_name = util.instance_name(instance) - try: - c_data = self._populate_cache( - manager.inspector, - cache, - instance, - instance_name, - ) - yield self._get_sample(instance, c_data) - except virt_inspector.InstanceNotFoundException as err: - # Instance was deleted while getting samples. Ignore it. - LOG.debug(_('Exception while getting samples %s'), err) - except Exception as err: - LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( - {'name': instance_name, 'error': err})) - LOG.exception(err) + def get_samples(self, manager, cache, resources): + for instance in resources: + instance_name = util.instance_name(instance) + try: + c_data = self._populate_cache( + manager.inspector, + cache, + instance, + instance_name, + ) + yield self._get_sample(instance, c_data) + except virt_inspector.InstanceNotFoundException as err: + # Instance was deleted while getting samples. Ignore it. + LOG.debug(_('Exception while getting samples %s'), err) + except Exception as err: + LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( + {'name': instance_name, 'error': err})) + LOG.exception(err) class ReadRequestsPollster(_Base): diff --git a/ceilometer/compute/pollsters/instance.py b/ceilometer/compute/pollsters/instance.py index 2d79b1822..6cd3906ef 100644 --- a/ceilometer/compute/pollsters/instance.py +++ b/ceilometer/compute/pollsters/instance.py @@ -26,26 +26,28 @@ from ceilometer import sample class InstancePollster(plugin.ComputePollster): @staticmethod - def get_samples(manager, cache, instance): - yield util.make_sample_from_instance( - instance, - name='instance', - type=sample.TYPE_GAUGE, - unit='instance', - volume=1, - ) + def get_samples(manager, cache, resources): + for instance in resources: + yield util.make_sample_from_instance( + instance, + name='instance', + type=sample.TYPE_GAUGE, + unit='instance', + volume=1, + ) class InstanceFlavorPollster(plugin.ComputePollster): @staticmethod - def get_samples(manager, cache, instance): - yield util.make_sample_from_instance( - instance, - # Use the "meter name + variable" syntax - name='instance:%s' % - instance.flavor['name'], - type=sample.TYPE_GAUGE, - unit='instance', - volume=1, - ) + def get_samples(manager, cache, resources): + for instance in resources: + yield util.make_sample_from_instance( + instance, + # Use the "meter name + variable" syntax + name='instance:%s' % + instance.flavor['name'], + type=sample.TYPE_GAUGE, + unit='instance', + volume=1, + ) diff --git a/ceilometer/compute/pollsters/net.py b/ceilometer/compute/pollsters/net.py index 052c2b4a1..96cb99b1c 100644 --- a/ceilometer/compute/pollsters/net.py +++ b/ceilometer/compute/pollsters/net.py @@ -72,26 +72,27 @@ class _Base(plugin.ComputePollster): ) return i_cache[instance_name] - def get_samples(self, manager, cache, instance): - instance_name = util.instance_name(instance) - LOG.info(_('checking instance %s'), instance.id) - try: - vnics = self._get_vnics_for_instance( - cache, - manager.inspector, - instance_name, - ) - for vnic, info in vnics: - LOG.info(self.NET_USAGE_MESSAGE, instance_name, - vnic.name, info.rx_bytes, info.tx_bytes) - yield self._get_sample(instance, vnic, info) - except virt_inspector.InstanceNotFoundException as err: - # Instance was deleted while getting samples. Ignore it. - LOG.debug(_('Exception while getting samples %s'), err) - except Exception as err: - LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( - {'name': instance_name, 'error': err})) - LOG.exception(err) + def get_samples(self, manager, cache, resources): + for instance in resources: + instance_name = util.instance_name(instance) + LOG.info(_('checking instance %s'), instance.id) + try: + vnics = self._get_vnics_for_instance( + cache, + manager.inspector, + instance_name, + ) + for vnic, info in vnics: + LOG.info(self.NET_USAGE_MESSAGE, instance_name, + vnic.name, info.rx_bytes, info.tx_bytes) + yield self._get_sample(instance, vnic, info) + except virt_inspector.InstanceNotFoundException as err: + # Instance was deleted while getting samples. Ignore it. + LOG.debug(_('Exception while getting samples %s'), err) + except Exception as err: + LOG.warning(_('Ignoring instance %(name)s: %(error)s') % ( + {'name': instance_name, 'error': err})) + LOG.exception(err) class IncomingBytesPollster(_Base): diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py index b6104bc4c..d553fef9a 100644 --- a/ceilometer/plugin.py +++ b/ceilometer/plugin.py @@ -104,3 +104,12 @@ class PollsterBase(PluginBase): how to use it. """ + + +@six.add_metaclass(abc.ABCMeta) +class DiscoveryBase(object): + @abc.abstractmethod + def discover(self, param=None): + """Discover resources to monitor. + :param param: an optional parameter to guide the discovery + """ diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index 2aa769a47..2b2a21214 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -3,9 +3,11 @@ # Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2013 Intel corp. # Copyright © 2013 eNovance +# Copyright © 2014 Red Hat, Inc # -# Author: Yunhong Jiang -# Julien Danjou +# Authors: Yunhong Jiang +# Julien Danjou +# Eoghan Glynn # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -20,6 +22,7 @@ # under the License. import abc +import copy import datetime import mock @@ -27,14 +30,33 @@ import six from stevedore import extension from ceilometer.openstack.common.fixture import config +from ceilometer.openstack.common.fixture import mockpatch from ceilometer import pipeline from ceilometer import plugin +from ceilometer import publisher +from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer.tests import base from ceilometer import transformer -default_test_data = sample.Sample( +class TestSample(sample.Sample): + def __init__(self, name, type, unit, volume, user_id, project_id, + resource_id, timestamp, resource_metadata, source=None): + super(TestSample, self).__init__(name, type, unit, volume, user_id, + project_id, resource_id, timestamp, + resource_metadata, source) + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.__dict__ == other.__dict__ + return False + + def __ne__(self, other): + return not self.__eq__(other) + + +default_test_data = TestSample( name='test', type=sample.TYPE_CUMULATIVE, unit='', @@ -50,23 +72,33 @@ default_test_data = sample.Sample( class TestPollster(plugin.PollsterBase): test_data = default_test_data - def get_samples(self, manager, cache, instance=None, resources=[]): - self.samples.append((manager, instance)) + def get_samples(self, manager, cache, resources=[]): + self.samples.append((manager, resources)) self.resources.extend(resources) - return [self.test_data] + c = copy.copy(self.test_data) + c.resource_metadata['resources'] = resources + return [c] class TestPollsterException(TestPollster): - def get_samples(self, manager, cache, instance=None, resources=[]): - # Put an instance parameter here so that it can be used - # by both central manager and compute manager - # In future, we possibly don't need such hack if we - # combine the get_samples() function again - self.samples.append((manager, instance)) + def get_samples(self, manager, cache, resources=[]): + self.samples.append((manager, resources)) self.resources.extend(resources) raise Exception() +class TestDiscovery(plugin.DiscoveryBase): + def discover(self, param=None): + self.params.append(param) + return self.resources + + +class TestDiscoveryException(plugin.DiscoveryBase): + def discover(self, param=None): + self.params.append(param) + raise Exception() + + @six.add_metaclass(abc.ABCMeta) class BaseAgentManagerTestCase(base.BaseTestCase): @@ -78,7 +110,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class PollsterAnother(TestPollster): samples = [] resources = [] - test_data = sample.Sample( + test_data = TestSample( name='testanother', type=default_test_data.type, unit=default_test_data.unit, @@ -92,7 +124,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class PollsterException(TestPollsterException): samples = [] resources = [] - test_data = sample.Sample( + test_data = TestSample( name='testexception', type=default_test_data.type, unit=default_test_data.unit, @@ -106,7 +138,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): class PollsterExceptionAnother(TestPollsterException): samples = [] resources = [] - test_data = sample.Sample( + test_data = TestSample( name='testexceptionanother', type=default_test_data.type, unit=default_test_data.unit, @@ -117,6 +149,17 @@ class BaseAgentManagerTestCase(base.BaseTestCase): timestamp=default_test_data.timestamp, resource_metadata=default_test_data.resource_metadata) + class Discovery(TestDiscovery): + params = [] + resources = [] + + class DiscoveryAnother(TestDiscovery): + params = [] + resources = [] + + class DiscoveryException(TestDiscoveryException): + params = [] + def setup_pipeline(self): self.transformer_manager = transformer.TransformerExtensionManager( 'ceilometer.transformer', @@ -125,7 +168,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.pipeline_cfg, self.transformer_manager) - def create_extension_manager(self): + def create_pollster_manager(self): return extension.ExtensionManager.make_test_instance( [ extension.Extension( @@ -151,6 +194,27 @@ class BaseAgentManagerTestCase(base.BaseTestCase): ], ) + def create_discovery_manager(self): + return extension.ExtensionManager.make_test_instance( + [ + extension.Extension( + 'testdiscovery', + None, + None, + self.Discovery(), ), + extension.Extension( + 'testdiscoveryanother', + None, + None, + self.DiscoveryAnother(), ), + extension.Extension( + 'testdiscoveryexception', + None, + None, + self.DiscoveryException(), ), + ], + ) + @abc.abstractmethod def create_manager(self): """Return subclass specific manager.""" @@ -159,12 +223,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def setUp(self): super(BaseAgentManagerTestCase, self).setUp() self.mgr = self.create_manager() - self.mgr.pollster_manager = self.create_extension_manager() + self.mgr.pollster_manager = self.create_pollster_manager() self.pipeline_cfg = [{ 'name': "test_pipeline", 'interval': 60, 'counters': ['test'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }, ] @@ -174,6 +238,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'pipeline_cfg_file', self.path_get('etc/ceilometer/pipeline.yaml') ) + self.useFixture(mockpatch.PatchObject( + publisher, 'get_publisher', side_effect=self.get_publisher)) + + def get_publisher(self, url, namespace=''): + fake_drivers = {'test://': test_publisher.TestPublisher, + 'new://': test_publisher.TestPublisher, + 'rpc://': test_publisher.TestPublisher} + return fake_drivers[url](url) def tearDown(self): self.Pollster.samples = [] @@ -184,16 +256,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.PollsterAnother.resources = [] self.PollsterException.resources = [] self.PollsterExceptionAnother.resources = [] + self.Discovery.params = [] + self.DiscoveryAnother.params = [] + self.DiscoveryException.params = [] + self.Discovery.resources = [] + self.DiscoveryAnother.resources = [] super(BaseAgentManagerTestCase, self).tearDown() def test_setup_polling_tasks(self): polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(len(polling_tasks), 1) self.assertTrue(60 in polling_tasks.keys()) - self.assertEqual(len(polling_tasks[60].resources), 1) - self.assertEqual(len(polling_tasks[60].resources['test']), 1) + per_task_resources = polling_tasks[60].resources + self.assertEqual(len(per_task_resources), 1) + self.assertEqual(per_task_resources['test'], + set(self.pipeline_cfg[0]['resources'])) self.mgr.interval_task(polling_tasks.values()[0]) pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] + del pub.samples[0].resource_metadata['resources'] self.assertEqual(pub.samples[0], self.Pollster.test_data) def test_setup_polling_tasks_multiple_interval(self): @@ -201,7 +281,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline", 'interval': 10, 'counters': ['test'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }) @@ -230,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline", 'interval': 60, 'counters': ['testanother'], - 'resources': ['testanother://'], + 'resources': ['testanother://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }) @@ -239,9 +319,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(len(polling_tasks), 1) pollsters = polling_tasks.get(60).pollsters self.assertEqual(len(pollsters), 2) - self.assertEqual(len(polling_tasks[60].resources), 2) - self.assertEqual(len(polling_tasks[60].resources['test']), 1) - self.assertEqual(len(polling_tasks[60].resources['testanother']), 1) + per_task_resources = polling_tasks[60].resources + self.assertEqual(len(per_task_resources), 2) + self.assertEqual(per_task_resources['test'], + set(self.pipeline_cfg[0]['resources'])) + self.assertEqual(per_task_resources['testanother'], + set(self.pipeline_cfg[1]['resources'])) def test_interval_exception_isolation(self): self.pipeline_cfg = [ @@ -249,7 +332,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline_1", 'interval': 10, 'counters': ['testexceptionanother'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }, @@ -257,7 +340,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'name': "test_pipeline_2", 'interval': 10, 'counters': ['testexception'], - 'resources': ['test://'], + 'resources': ['test://'] if self.source_resources else [], 'transformers': [], 'publishers': ["test"], }, @@ -290,3 +373,71 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'publishers': ["test"], }) self.setup_pipeline() + + def _verify_discovery_params(self, expected): + self.assertEqual(self.Discovery.params, expected) + self.assertEqual(self.DiscoveryAnother.params, expected) + self.assertEqual(self.DiscoveryException.params, expected) + + def _do_test_per_agent_discovery(self, + discovered_resources, + static_resources): + self.mgr.discovery_manager = self.create_discovery_manager() + if discovered_resources: + self.mgr.default_discovery = [d.name + for d in self.mgr.discovery_manager] + self.Discovery.resources = discovered_resources + self.DiscoveryAnother.resources = [d[::-1] + for d in discovered_resources] + self.pipeline_cfg[0]['resources'] = static_resources + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + self._verify_discovery_params([None] if discovered_resources else []) + discovery = self.Discovery.resources + self.DiscoveryAnother.resources + # compare resource lists modulo ordering + self.assertEqual(set(self.Pollster.resources), + set(static_resources or discovery)) + + def test_per_agent_discovery_discovered_only(self): + self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'], + []) + + def test_per_agent_discovery_static_only(self): + self._do_test_per_agent_discovery([], + ['static_1', 'static_2']) + + def test_per_agent_discovery_discovered_overridden_by_static(self): + self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'], + ['static_1', 'static_2']) + + def test_multiple_pipelines_different_static_resources(self): + # assert that the amalgation of all static resources for a set + # of pipelines with a common interval is passed to individual + # pollsters matching those pipelines + self.pipeline_cfg[0]['resources'] = ['test://'] + self.pipeline_cfg.append({ + 'name': "another_pipeline", + 'interval': 60, + 'counters': ['test'], + 'resources': ['another://'], + 'transformers': [], + 'publishers': ["new"], + }) + self.mgr.discovery_manager = self.create_discovery_manager() + self.Discovery.resources = [] + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks), 1) + self.assertTrue(60 in polling_tasks.keys()) + self.mgr.interval_task(polling_tasks.get(60)) + self._verify_discovery_params([]) + self.assertEqual(len(self.Pollster.samples), 1) + amalgamated_resources = set(['test://', 'another://']) + self.assertEqual(set(self.Pollster.samples[0][1]), + amalgamated_resources) + for pipeline in self.mgr.pipeline_manager.pipelines: + self.assertEqual(len(pipeline.publishers[0].samples), 1) + published = pipeline.publishers[0].samples[0] + self.assertEqual(set(published.resource_metadata['resources']), + amalgamated_resources) diff --git a/ceilometer/tests/central/test_manager.py b/ceilometer/tests/central/test_manager.py index 6db3606bd..0a85dc9e8 100644 --- a/ceilometer/tests/central/test_manager.py +++ b/ceilometer/tests/central/test_manager.py @@ -40,6 +40,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): return manager.AgentManager() def setUp(self): + self.source_resources = True super(TestRunTasks, self).setUp() self.useFixture(mockpatch.Patch( 'keystoneclient.v2_0.client.Client', diff --git a/ceilometer/tests/compute/pollsters/test_cpu.py b/ceilometer/tests/compute/pollsters/test_cpu.py index 31d976450..15aa65139 100644 --- a/ceilometer/tests/compute/pollsters/test_cpu.py +++ b/ceilometer/tests/compute/pollsters/test_cpu.py @@ -53,7 +53,7 @@ class TestCPUPollster(base.TestPollsterBase): def _verify_cpu_metering(expected_time): cache = {} - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(set([s.name for s in samples]), set(['cpu'])) @@ -75,7 +75,7 @@ class TestCPUPollster(base.TestPollsterBase): pollster = cpu.CPUPollster() cache = {} - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(samples[0].volume, 10 ** 6) self.assertEqual(len(cache), 0) diff --git a/ceilometer/tests/compute/pollsters/test_diskio.py b/ceilometer/tests/compute/pollsters/test_diskio.py index 7c31ebe40..3dbb60376 100644 --- a/ceilometer/tests/compute/pollsters/test_diskio.py +++ b/ceilometer/tests/compute/pollsters/test_diskio.py @@ -45,7 +45,7 @@ class TestDiskPollsters(base.TestPollsterBase): mgr = manager.AgentManager() cache = {} - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) assert samples self.assertIn(pollster.CACHE_KEY_DISK, cache) self.assertIn(self.instance.name, cache[pollster.CACHE_KEY_DISK]) diff --git a/ceilometer/tests/compute/pollsters/test_instance.py b/ceilometer/tests/compute/pollsters/test_instance.py index 9d56c6df2..5d251a99c 100644 --- a/ceilometer/tests/compute/pollsters/test_instance.py +++ b/ceilometer/tests/compute/pollsters/test_instance.py @@ -34,7 +34,7 @@ class TestInstancePollster(base.TestPollsterBase): def test_get_samples_instance(self): mgr = manager.AgentManager() pollster = pollsters_instance.InstancePollster() - samples = list(pollster.get_samples(mgr, {}, self.instance)) + samples = list(pollster.get_samples(mgr, {}, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(samples[0].name, 'instance') self.assertEqual(samples[0].resource_metadata['vcpus'], 1) @@ -47,6 +47,6 @@ class TestInstancePollster(base.TestPollsterBase): def test_get_samples_instance_flavor(self): mgr = manager.AgentManager() pollster = pollsters_instance.InstanceFlavorPollster() - samples = list(pollster.get_samples(mgr, {}, self.instance)) + samples = list(pollster.get_samples(mgr, {}, [self.instance])) self.assertEqual(len(samples), 1) self.assertEqual(samples[0].name, 'instance:m1.small') diff --git a/ceilometer/tests/compute/pollsters/test_net.py b/ceilometer/tests/compute/pollsters/test_net.py index eb817220f..cd9422b7a 100644 --- a/ceilometer/tests/compute/pollsters/test_net.py +++ b/ceilometer/tests/compute/pollsters/test_net.py @@ -72,7 +72,7 @@ class TestNetPollster(base.TestPollsterBase): def _check_get_samples(self, factory, expected): mgr = manager.AgentManager() pollster = factory() - samples = list(pollster.get_samples(mgr, {}, self.instance)) + samples = list(pollster.get_samples(mgr, {}, [self.instance])) self.assertEqual(len(samples), 3) # one for each nic self.assertEqual(set([s.name for s in samples]), set([samples[0].name])) @@ -157,7 +157,7 @@ class TestNetPollsterCache(base.TestPollsterBase): self.instance.name: vnics, }, } - samples = list(pollster.get_samples(mgr, cache, self.instance)) + samples = list(pollster.get_samples(mgr, cache, [self.instance])) self.assertEqual(len(samples), 1) def test_incoming_bytes(self): diff --git a/ceilometer/tests/compute/test_manager.py b/ceilometer/tests/compute/test_manager.py index de3466ac3..575be3d16 100644 --- a/ceilometer/tests/compute/test_manager.py +++ b/ceilometer/tests/compute/test_manager.py @@ -19,6 +19,7 @@ """ import mock +from ceilometer import agent from ceilometer.compute import manager from ceilometer import nova_client from ceilometer.openstack.common.fixture import mockpatch @@ -51,25 +52,24 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): + self.source_resources = False super(TestRunTasks, self).setUp() # Set up a fake instance value to be returned by # instance_get_all_by_host() so when the manager gets the list # of instances to poll we can control the results. - self.instance = self._fake_instance('faux', 'active') + self.instances = [self._fake_instance('doing', 'active'), + self._fake_instance('resting', 'paused')] stillborn_instance = self._fake_instance('stillborn', 'error') - def instance_get_all_by_host(*args): - return [self.instance, stillborn_instance] - self.useFixture(mockpatch.PatchObject( nova_client.Client, 'instance_get_all_by_host', - side_effect=lambda *x: [self.instance, stillborn_instance])) + side_effect=lambda *x: self.instances + [stillborn_instance])) def test_setup_polling_tasks(self): super(TestRunTasks, self).test_setup_polling_tasks() - self.assertTrue(self.Pollster.samples[0][1] is self.instance) + self.assertEqual(self.Pollster.samples[0][1], self.instances) def test_interval_exception_isolation(self): super(TestRunTasks, self).test_interval_exception_isolation() @@ -81,5 +81,14 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): with mock.patch.object(nova_client.Client, 'instance_get_all_by_host', side_effect=lambda *x: self._raise_exception()): mgr = manager.AgentManager() - polling_task = manager.PollingTask(mgr) + polling_task = agent.PollingTask(mgr) polling_task.poll_and_publish() + + def self_local_instances_default_agent_discovery(self): + self.setup_pipeline() + self.assertEqual(self.mgr.default_discovery, ['local_instances']) + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + self._verify_discovery_params([None]) + self.assertEqual(set(self.Pollster.resources), + set(self.instances)) diff --git a/setup.cfg b/setup.cfg index bdde2b7c5..3da1b2ca9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -64,6 +64,9 @@ ceilometer.notification = http.response = ceilometer.middleware:HTTPResponse stack_crud = ceilometer.orchestration.notifications:StackCRUD +ceilometer.discover = + local_instances = ceilometer.compute.discovery:InstanceDiscovery + ceilometer.poll.compute = disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster