diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 808697714..1e274c834 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -33,6 +33,23 @@ from ceilometer import pipeline LOG = log.getLogger(__name__) +class Resources(object): + def __init__(self, agent_manager): + self.agent_manager = agent_manager + self._resources = [] + self._discovery = [] + + def extend(self, pipeline): + self._resources.extend(pipeline.resources) + self._discovery.extend(pipeline.discovery) + + @property + def resources(self): + source_discovery = (self.agent_manager.discover(self._discovery) + if self._discovery else []) + return self._resources + source_discovery + + class PollingTask(object): """Polling task for polling samples and inject into pipeline. A polling task can be invoked periodically or only once. @@ -41,17 +58,20 @@ class PollingTask(object): def __init__(self, agent_manager): self.manager = agent_manager self.pollsters = set() - # Resource definitions are indexed by the pollster - # Use dict of set here to remove the duplicated resource definitions - # for each pollster. - self.resources = collections.defaultdict(set) + # we extend the amalgamation of all static resources for this + # set of pollsters with a common interval, so as to also + # include any dynamically discovered resources specific to + # the matching pipelines (if either is present, the per-agent + # default discovery is overridden) + resource_factory = lambda: Resources(agent_manager) + self.resources = collections.defaultdict(resource_factory) self.publish_context = pipeline.PublishContext( agent_manager.context) def add(self, pollster, pipelines): self.publish_context.add_pipelines(pipelines) for pipeline in pipelines: - self.resources[pollster.name].update(pipeline.resources) + self.resources[pollster.name].extend(pipeline) self.pollsters.update([pollster]) def poll_and_publish(self): @@ -60,8 +80,9 @@ class PollingTask(object): with self.publish_context as publisher: cache = {} for pollster in self.pollsters: - LOG.info(_("Polling pollster %s"), pollster.name) - source_resources = list(self.resources[pollster.name]) + key = pollster.name + LOG.info(_("Polling pollster %s"), key) + source_resources = list(self.resources[key].resources) try: samples = list(pollster.obj.get_samples( self.manager, diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 823893a8b..000691697 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -108,6 +108,10 @@ class Source(object): if not isinstance(self.resources, list): raise PipelineException("Resources should be a list", cfg) + self.discovery = cfg.get('discovery') or [] + if not isinstance(self.discovery, list): + raise PipelineException("Discovery should be a list", cfg) + self._check_meters() def __str__(self): @@ -351,6 +355,10 @@ class Pipeline(object): def resources(self): return self.source.resources + @property + def discovery(self): + return self.source.discovery + def support_meter(self, meter_name): return self.source.support_meter(meter_name) diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index 2b2a21214..cdf3b1a80 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -269,7 +269,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertTrue(60 in polling_tasks.keys()) per_task_resources = polling_tasks[60].resources self.assertEqual(len(per_task_resources), 1) - self.assertEqual(per_task_resources['test'], + self.assertEqual(set(per_task_resources['test'].resources), set(self.pipeline_cfg[0]['resources'])) self.mgr.interval_task(polling_tasks.values()[0]) pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] @@ -321,9 +321,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(len(pollsters), 2) per_task_resources = polling_tasks[60].resources self.assertEqual(len(per_task_resources), 2) - self.assertEqual(per_task_resources['test'], + self.assertEqual(set(per_task_resources['test'].resources), set(self.pipeline_cfg[0]['resources'])) - self.assertEqual(per_task_resources['testanother'], + self.assertEqual(set(per_task_resources['testanother'].resources), set(self.pipeline_cfg[1]['resources'])) def test_interval_exception_isolation(self): @@ -411,6 +411,54 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'], ['static_1', 'static_2']) + def test_per_agent_discovery_overridden_by_per_pipeline_discovery(self): + discovered_resources = ['discovered_1', 'discovered_2'] + self.mgr.discovery_manager = self.create_discovery_manager() + self.Discovery.resources = discovered_resources + self.DiscoveryAnother.resources = [d[::-1] + for d in discovered_resources] + self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother', + 'testdiscoverynonexistent', + 'testdiscoveryexception'] + self.pipeline_cfg[0]['resources'] = [] + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + self.assertEqual(set(self.Pollster.resources), + set(self.DiscoveryAnother.resources)) + + def _do_test_per_pipeline_discovery(self, + discovered_resources, + static_resources): + self.mgr.discovery_manager = self.create_discovery_manager() + self.Discovery.resources = discovered_resources + self.DiscoveryAnother.resources = [d[::-1] + for d in discovered_resources] + self.pipeline_cfg[0]['discovery'] = ['testdiscovery', + 'testdiscoveryanother', + 'testdiscoverynonexistent', + 'testdiscoveryexception'] + self.pipeline_cfg[0]['resources'] = static_resources + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + discovery = self.Discovery.resources + self.DiscoveryAnother.resources + # compare resource lists modulo ordering + self.assertEqual(set(self.Pollster.resources), + set(static_resources + discovery)) + + def test_per_pipeline_discovery_discovered_only(self): + self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'], + []) + + def test_per_pipeline_discovery_static_only(self): + self._do_test_per_pipeline_discovery([], + ['static_1', 'static_2']) + + def test_per_pipeline_discovery_discovered_augmented_by_static(self): + self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'], + ['static_1', 'static_2']) + def test_multiple_pipelines_different_static_resources(self): # assert that the amalgation of all static resources for a set # of pipelines with a common interval is passed to individual