Merge "Encompassing one source pollsters with common context"

This commit is contained in:
Jenkins 2014-12-12 10:19:49 +00:00 committed by Gerrit Code Review
commit 727eae15b1
2 changed files with 59 additions and 34 deletions

View File

@ -64,8 +64,8 @@ class Resources(object):
return static_resources + source_discovery
@staticmethod
def key(source, pollster):
return '%s-%s' % (source.name, pollster.name)
def key(source_name, pollster):
return '%s-%s' % (source_name, pollster.name)
class PollingTask(object):
@ -79,7 +79,7 @@ class PollingTask(object):
# elements of the Cartesian product of sources X pollsters
# with a common interval
self.pollster_matches = set()
self.pollster_matches = collections.defaultdict(set)
# per-sink publisher contexts associated with each source
self.publishers = {}
@ -95,41 +95,47 @@ class PollingTask(object):
self.manager.context)
self.publishers[pipeline.source.name] = publish_context
self.publishers[pipeline.source.name].add_pipelines([pipeline])
self.pollster_matches.update([(pipeline.source, pollster)])
key = Resources.key(pipeline.source, pollster)
self.pollster_matches[pipeline.source.name].add(pollster)
key = Resources.key(pipeline.source.name, pollster)
self.resources[key].setup(pipeline)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
cache = {}
discovery_cache = {}
for source, pollster in self.pollster_matches:
LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"),
dict(poll=pollster.name, src=source))
pollster_resources = None
if pollster.obj.default_discovery:
pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
key = Resources.key(source, pollster)
source_resources = list(self.resources[key].get(discovery_cache))
polling_resources = (source_resources or pollster_resources)
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources found"),
pollster.name)
continue
with self.publishers[source.name] as publisher:
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
for source_name in self.pollster_matches:
with self.publishers[source_name] as publisher:
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
pollster_resources = None
if pollster.obj.default_discovery:
pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
key = Resources.key(source_name, pollster)
source_resources = list(
self.resources[key].get(discovery_cache))
polling_resources = (source_resources or
pollster_resources)
if not polling_resources:
LOG.info(_(
"Skip polling pollster %s, no resources found"),
pollster.name)
continue
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
class AgentManager(os_service.Service):

View File

@ -361,7 +361,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches
pollsters = polling_tasks.get(60).pollster_matches['test_pipeline']
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
@ -679,10 +679,29 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_task = self.mgr.setup_polling_tasks().values()[0]
pollster = list(polling_task.pollster_matches)[0][1]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
LOG = mock.MagicMock()
with mock.patch('ceilometer.agent.LOG', LOG):
polling_task.poll_and_publish()
if not self.mgr.discover():
LOG.info.assert_called_with('Skip polling pollster %s, no '
'resources found', pollster.name)
def test_arithmetic_transformer(self):
self.pipeline_cfg[0]['counters'] = ['test', 'testanother']
self.pipeline_cfg[0]['transformers'] = [
{'name': 'arithmetic',
'parameters': {
'target': {'name': 'test_sum',
'unit': default_test_data.unit,
'type': default_test_data.type,
'expr': '$(test) * 10 + $(testanother)'
}
}}
]
self.setup_pipeline()
self.mgr.setup_polling_tasks()[60].poll_and_publish()
samples = self.mgr.pipeline_manager.pipelines[0].publishers[0].samples
self.assertEqual(1, len(samples))
self.assertEqual('test_sum', samples[0].name)
self.assertEqual(11, samples[0].volume)