Don't call publisher without sample

Actually when you push samples into a pipeline, and the publishers don't
care of this sample, the publisher is called with a empty array of
samples.

This cause to send unuseful rpc/udp message.

With this change, the publisher is called only when it is needed.

Change-Id: I5811ead70808825e56cf9224b4db888c39034e35
This commit is contained in:
Mehdi Abaakouk 2013-07-31 17:07:21 +02:00
parent b59a03109c
commit f18d9fd7bd
3 changed files with 29 additions and 10 deletions

View File

@ -209,16 +209,15 @@ class Pipeline(object):
if sample:
transformed_samples.append(sample)
LOG.audit("Pipeline %s: Publishing samples", self)
for p in self.publishers:
try:
p.publish_samples(ctxt, transformed_samples)
except Exception:
LOG.exception("Pipeline %s: Continue after error "
"from publisher %s", self, p)
LOG.audit("Pipeline %s: Published samples", self)
if transformed_samples:
LOG.audit("Pipeline %s: Publishing samples", self)
for p in self.publishers:
try:
p.publish_samples(ctxt, transformed_samples)
except Exception:
LOG.exception("Pipeline %s: Continue after error "
"from publisher %s", self, p)
LOG.audit("Pipeline %s: Published samples", self)
def publish_sample(self, ctxt, sample):
self.publish_samples(ctxt, [sample])

View File

@ -26,6 +26,7 @@ class TestPublisher(publisher.PublisherBase):
def __init__(self, parsed_url):
self.counters = []
self.calls = 0
def publish_samples(self, context, counters):
"""Send a metering message for publishing
@ -34,3 +35,4 @@ class TestPublisher(publisher.PublisherBase):
:param counter: Counter from pipeline after transformation
"""
self.counters.extend(counters)
self.calls += 1

View File

@ -256,6 +256,18 @@ class TestPipeline(base.TestCase):
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
self.assertEqual(getattr(publisher.counters[1], "name"), 'b_update')
def test_counter_dont_match(self):
counter_cfg = ['nomatch']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0)
self.assertEqual(publisher.calls, 0)
def test_wildcard_counter(self):
counter_cfg = ['*']
self.pipeline_cfg[0]['counters'] = counter_cfg
@ -350,9 +362,11 @@ class TestPipeline(base.TestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(publisher.calls, 1)
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
new_publisher = pipeline_manager.pipelines[1].publishers[0]
self.assertEqual(len(new_publisher.counters), 1)
self.assertEqual(new_publisher.calls, 1)
self.assertEqual(getattr(new_publisher.counters[0], "name"), 'b_new')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
@ -399,6 +413,7 @@ class TestPipeline(base.TestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(publisher.calls, 1)
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
self.assertTrue(len(self.TransformerClass.samples) == 2)
@ -415,6 +430,7 @@ class TestPipeline(base.TestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(publisher.calls, 1)
self.assertEqual(getattr(publisher.counters[0], 'name'), 'a')
def test_empty_transformer_pipeline(self):
@ -425,6 +441,7 @@ class TestPipeline(base.TestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(publisher.calls, 1)
self.assertEqual(getattr(publisher.counters[0], 'name'), 'a')
def test_multiple_transformer_same_class(self):
@ -445,6 +462,7 @@ class TestPipeline(base.TestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(publisher.calls, 1)
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update_update')