Encompassing one source pollsters with common context

After per-source separation of static resources & discovery samples are
publishing with context of separate pollster, so if it is used arithmetic
transformer which handle different meters, it will fail.

Now samples are publishing from common source context, after all
source meters have been collected.

Change-Id: I0d186247b4b87fd94317318c180bdfb9c6da70fc
Closes-bug: #1394228
This commit is contained in:
Igor Degtiarov 2014-12-09 15:51:32 +02:00
parent 9af8f521e3
commit 21935cb0c1
2 changed files with 59 additions and 34 deletions

View File

@ -64,8 +64,8 @@ class Resources(object):
return static_resources + source_discovery
@staticmethod
def key(source, pollster):
return '%s-%s' % (source.name, pollster.name)
def key(source_name, pollster):
return '%s-%s' % (source_name, pollster.name)
class PollingTask(object):
@ -79,7 +79,7 @@ class PollingTask(object):
# elements of the Cartesian product of sources X pollsters
# with a common interval
self.pollster_matches = set()
self.pollster_matches = collections.defaultdict(set)
# per-sink publisher contexts associated with each source
self.publishers = {}
@ -95,41 +95,47 @@ class PollingTask(object):
self.manager.context)
self.publishers[pipeline.source.name] = publish_context
self.publishers[pipeline.source.name].add_pipelines([pipeline])
self.pollster_matches.update([(pipeline.source, pollster)])
key = Resources.key(pipeline.source, pollster)
self.pollster_matches[pipeline.source.name].add(pollster)
key = Resources.key(pipeline.source.name, pollster)
self.resources[key].setup(pipeline)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
cache = {}
discovery_cache = {}
for source, pollster in self.pollster_matches:
LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"),
dict(poll=pollster.name, src=source))
pollster_resources = None
if pollster.obj.default_discovery:
pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
key = Resources.key(source, pollster)
source_resources = list(self.resources[key].get(discovery_cache))
polling_resources = (source_resources or pollster_resources)
if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources found"),
pollster.name)
continue
with self.publishers[source.name] as publisher:
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
for source_name in self.pollster_matches:
with self.publishers[source_name] as publisher:
for pollster in self.pollster_matches[source_name]:
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
pollster_resources = None
if pollster.obj.default_discovery:
pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
key = Resources.key(source_name, pollster)
source_resources = list(
self.resources[key].get(discovery_cache))
polling_resources = (source_resources or
pollster_resources)
if not polling_resources:
LOG.info(_(
"Skip polling pollster %s, no resources found"),
pollster.name)
continue
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
))
publisher(samples)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
class AgentManager(os_service.Service):

View File

@ -361,7 +361,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
pollsters = polling_tasks.get(60).pollster_matches
pollsters = polling_tasks.get(60).pollster_matches['test_pipeline']
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
@ -679,10 +679,29 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_task = self.mgr.setup_polling_tasks().values()[0]
pollster = list(polling_task.pollster_matches)[0][1]
pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
LOG = mock.MagicMock()
with mock.patch('ceilometer.agent.LOG', LOG):
polling_task.poll_and_publish()
if not self.mgr.discover():
LOG.info.assert_called_with('Skip polling pollster %s, no '
'resources found', pollster.name)
def test_arithmetic_transformer(self):
self.pipeline_cfg[0]['counters'] = ['test', 'testanother']
self.pipeline_cfg[0]['transformers'] = [
{'name': 'arithmetic',
'parameters': {
'target': {'name': 'test_sum',
'unit': default_test_data.unit,
'type': default_test_data.type,
'expr': '$(test) * 10 + $(testanother)'
}
}}
]
self.setup_pipeline()
self.mgr.setup_polling_tasks()[60].poll_and_publish()
samples = self.mgr.pipeline_manager.pipelines[0].publishers[0].samples
self.assertEqual(1, len(samples))
self.assertEqual('test_sum', samples[0].name)
self.assertEqual(11, samples[0].volume)