pipeline: manager publish multiple counters

This makes the polling agent publish all counters in a row.
This fixes bug #1126990 and bug #1130475.

This moves the publisher() method to the *manager*. No agent/middleware
interacts with only one pipeline, this one an implementation mistake.

Change-Id: I45246849830066e39491f762b457adbdfa8d0e2e
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-02-13 13:46:07 +01:00
parent e9a7decae5
commit 21af2e30b3
6 changed files with 123 additions and 131 deletions

View File

@ -47,12 +47,9 @@ class AgentManager(object):
context,
cfg.CONF.counter_source,
)
with publisher:
LOG.info('Polling %s', ext.name)
for c in ext.obj.get_counters(manager, *args, **kwargs):
LOG.debug('Publishing counter: %s', c)
publisher(c)
with publisher as p:
LOG.debug('Polling and publishing %s', ext.name)
p(ext.obj.get_counters(manager, *args, **kwargs))
except Exception as err:
LOG.warning('Continuing after error from %s: %s',
ext.name, err)

View File

@ -109,16 +109,11 @@ class CollectorService(service.PeriodicService):
def _process_notification_for_ext(self, ext, notification):
handler = ext.obj
if notification['event_type'] in handler.get_event_types():
for c in handler.process_notification(notification):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Spawn green thread?
self.publish_counter(c)
def publish_counter(self, counter):
"""Create a metering message for the counter and publish it."""
ctxt = context.get_admin_context()
self.pipeline_manager.publish_counter(ctxt, counter,
cfg.CONF.counter_source)
with self.pipeline_manager.publisher(ctxt,
cfg.CONF.counter_source) as p:
# FIXME(dhellmann): Spawn green thread?
p(list(handler.process_notification(notification)))
def record_metering_data(self, context, data):
"""This method is triggered when metering data is

View File

@ -66,20 +66,22 @@ class TransformerExtensionManager(extension.ExtensionManager):
class Publisher(object):
def __init__(self, pipeline, context, source):
self.pipeline = pipeline
def __init__(self, pipelines, context, source):
self.pipelines = pipelines
self.context = context
self.source = source
def __enter__(self):
def p(counters):
return self.pipeline.publish_counters(self.context,
for p in self.pipelines:
p.publish_counters(self.context,
counters,
self.source)
return p
def __exit__(self, exc_type, exc_value, traceback):
self.pipeline.flush(self.context, self.source)
for p in self.pipelines:
p.flush(self.context, self.source)
class Pipeline(object):
@ -241,14 +243,6 @@ class Pipeline(object):
LOG.audit("Pipeline %s: Published counters", self)
def publisher(self, context, source):
"""Build a new Publisher for this pipeline.
:param context: The context.
:param source: Counter source.
"""
return Publisher(self, context, source)
def publish_counter(self, ctxt, counter, source):
self.publish_counters(ctxt, [counter], source)
@ -285,9 +279,16 @@ class Pipeline(object):
LOG.audit("Flush pipeline %s", self)
for (i, transformer) in enumerate(self.transformers):
try:
self._publish_counters(i + 1, ctxt,
list(transformer.flush(ctxt, source)),
source)
except Exception as err:
LOG.warning(
"Pipeline %s: Error flushing "
"transformer %s",
self, transformer)
LOG.exception(err)
def get_interval(self):
return self.interval
@ -353,24 +354,13 @@ class PipelineManager(object):
transformer_manager)
for pipedef in cfg]
def pipelines_for_counter(self, counter_name):
"""Get all pipelines that support counter"""
return [p for p in self.pipelines if p.support_counter(counter_name)]
def publish_counter(self, ctxt, counter, source):
"""Publish counter through pipelines
This is helpful to notification mechanism, so that they don't need
to maintain the private mapping cache from counter to pipelines.
For polling based data collector, they may need keep private
mapping cache for different interval support.
def publisher(self, context, source):
"""Build a new Publisher for these manager pipelines.
:param context: The context.
:param source: Counter source.
"""
# TODO(yjiang5) Utilize a cache
for p in self.pipelines:
if p.support_counter(counter.name):
p.publish_counter(ctxt, counter, source)
return Publisher(self.pipelines, context, source)
def setup_pipeline(publisher_manager):

View File

@ -189,6 +189,7 @@ class TestCollectorService(tests_base.TestCase):
# configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
self.srv.pipeline_manager.pipelines[0] = MagicMock()
self.srv.notification_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
@ -197,4 +198,5 @@ class TestCollectorService(tests_base.TestCase):
),
])
self.srv.process_notification(TEST_NOTICE)
assert self.srv.pipeline_manager.publish_counter.called
self.assertTrue(
self.srv.pipeline_manager.publisher.called)

View File

@ -43,15 +43,22 @@ class FakeApp(object):
class TestSwiftMiddleware(base.TestCase):
class _faux_pipeline_manager():
class _faux_pipeline_manager(object):
class _faux_pipeline(object):
def __init__(self):
self.counters = []
def publish_counters(self, context, counters, source):
def publish_counters(self, ctxt, counters, source):
self.counters.extend(counters)
def flush(self, ctx, source):
pass
def __init__(self):
self.pipelines = [self._faux_pipeline()]
def publisher(self, context, source):
return pipeline.Publisher(self, context, source)
return pipeline.Publisher(self.pipelines, context, source)
def flush(self, context, source):
pass
@ -78,8 +85,9 @@ class TestSwiftMiddleware(base.TestCase):
environ={'REQUEST_METHOD': 'GET'})
resp = app(req.environ, self.start_response)
self.assertEqual(list(resp), ["This string is 28 bytes long"])
self.assertEqual(len(self.pipeline_manager.counters), 1)
data = self.pipeline_manager.counters[0]
counters = self.pipeline_manager.pipelines[0].counters
self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 28)
self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container')
@ -92,8 +100,9 @@ class TestSwiftMiddleware(base.TestCase):
'wsgi.input':
StringIO.StringIO('some stuff')})
resp = list(app(req.environ, self.start_response))
self.assertEqual(len(self.pipeline_manager.counters), 1)
data = self.pipeline_manager.counters[0]
counters = self.pipeline_manager.pipelines[0].counters
self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 10)
self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container')
@ -106,8 +115,9 @@ class TestSwiftMiddleware(base.TestCase):
'wsgi.input':
StringIO.StringIO('some other stuff')})
resp = list(app(req.environ, self.start_response))
self.assertEqual(len(self.pipeline_manager.counters), 1)
data = self.pipeline_manager.counters[0]
counters = self.pipeline_manager.pipelines[0].counters
self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 16)
self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container')
@ -118,8 +128,9 @@ class TestSwiftMiddleware(base.TestCase):
req = Request.blank('/1.0/account/container',
environ={'REQUEST_METHOD': 'GET'})
resp = list(app(req.environ, self.start_response))
self.assertEqual(len(self.pipeline_manager.counters), 1)
data = self.pipeline_manager.counters[0]
counters = self.pipeline_manager.pipelines[0].counters
self.assertEqual(len(counters), 1)
data = counters[0]
self.assertEqual(data.volume, 28)
self.assertEqual(data.resource_metadata['version'], '1.0')
self.assertEqual(data.resource_metadata['container'], 'container')

View File

@ -220,26 +220,19 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = transformer_cfg
self._exception_create_pipelinemanager()
def test_pipelines_for_counter(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
self.assertTrue(len(pipeline_manager.pipelines_for_counter('a'))
== 1)
self.assertTrue(len(pipeline_manager.pipelines_for_counter('b'))
== 0)
def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe = pipeline_manager.pipelines[0]
self.assertTrue(pipe.get_interval() == 5)
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -253,12 +246,15 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
pipe = pipeline_manager.pipelines_for_counter('b')[0]
self.test_counter = self.test_counter._replace(name='b')
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 2)
@ -272,8 +268,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -285,17 +281,15 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')
self.assertTrue(len(pipe) == 0)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
@ -306,8 +300,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -321,10 +315,9 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')
self.assertTrue(len(pipe) == 0)
pipe_1 = pipeline_manager.pipelines_for_counter('c')
self.assertTrue(len(pipe_1) == 0)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_counter('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_counter('c'))
def test_multiple_pipeline(self):
self.pipeline_cfg.append({
@ -343,13 +336,13 @@ class TestPipeline(base.TestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
@ -382,12 +375,14 @@ class TestPipeline(base.TestCase):
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
@ -403,8 +398,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = None
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
@ -412,8 +407,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['transformers'] = []
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
@ -430,8 +425,9 @@ class TestPipeline(base.TestCase):
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
@ -461,8 +457,8 @@ class TestPipeline(base.TestCase):
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
@ -496,8 +492,8 @@ class TestPipeline(base.TestCase):
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 0)
self.assertTrue(len(self.TransformerClass.samples) == 1)
@ -511,8 +507,9 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['publishers'] = ['test', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.new_publisher.counters) == 1)
@ -525,8 +522,8 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['publishers'] = ['except', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], 'name')
@ -536,12 +533,10 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
self.assertTrue(pipe is pipe_1)
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter,
self.test_counter._replace(name='b')])
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
@ -567,7 +562,7 @@ class TestPipeline(base.TestCase):
)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0)
@ -603,15 +598,16 @@ class TestPipeline(base.TestCase):
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter,
self.test_counter._replace(name='b')])
self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), 0)
pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
@ -625,7 +621,7 @@ class TestPipeline(base.TestCase):
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0)
@ -647,10 +643,11 @@ class TestPipeline(base.TestCase):
}, ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a:*')[0]
self.test_counter = self.test_counter._replace(name='a:b')
pipe.publish_counter(None, self.test_counter, None)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)