pipeline: flush after publishing call

Until now, the pipeline was never flushed out, so no flush() method from any
transformer was called. Let's fix that by creating a context publisher that
flushes once we finished publishing.

Change-Id: I2c0ab3c7c4aee77a1d7a1a6fccb19504c05f77f1
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-02-11 15:44:33 +01:00
parent 01f46b7a01
commit 44e262d98d
7 changed files with 64 additions and 33 deletions

View File

@ -42,12 +42,15 @@ class AgentManager(object):
"""Used to invoke the plugins loaded by the ExtensionManager.
"""
try:
LOG.info('Polling %s', ext.name)
for c in ext.obj.get_counters(manager, *args, **kwargs):
LOG.debug('Publishing counter: %s', c)
manager.pipeline_manager.publish_counter(
context, c,
cfg.CONF.counter_source)
publisher = manager.pipeline_manager.publisher(
context,
cfg.CONF.counter_source,
)
with publisher:
LOG.info('Polling %s', ext.name)
for c in ext.obj.get_counters(manager, *args, **kwargs):
LOG.debug('Publishing counter: %s', c)
publisher(c)
except Exception as err:
LOG.warning('Continuing after error from %s: %s',

View File

@ -95,10 +95,12 @@ class CeilometerMiddleware(object):
version, account, container, obj = split_path(req.path, 1, 4, True)
now = timeutils.utcnow().isoformat()
if bytes_received:
self.pipeline_manager.publish_counters(
with self.pipeline_manager.publisher(
context.get_admin_context(),
[counter.Counter(
cfg.CONF.counter_source
) as publisher:
if bytes_received:
publisher([counter.Counter(
name='storage.objects.incoming.bytes',
type='delta',
unit='B',
@ -112,13 +114,10 @@ class CeilometerMiddleware(object):
"version": version,
"container": container,
"object": obj,
}, )],
cfg.CONF.counter_source)
})])
if bytes_sent:
self.pipeline_manager.publish_counters(
context.get_admin_context(),
[counter.Counter(
if bytes_sent:
publisher([counter.Counter(
name='storage.objects.outgoing.bytes',
type='delta',
unit='B',
@ -132,8 +131,7 @@ class CeilometerMiddleware(object):
"version": version,
"container": container,
"object": obj,
})],
cfg.CONF.counter_source)
})])
def filter_factory(global_conf, **local_conf):

View File

@ -64,6 +64,24 @@ class TransformerExtensionManager(extension.ExtensionManager):
return self.by_name[name]
class Publisher(object):
def __init__(self, pipeline, context, source):
self.pipeline = pipeline
self.context = context
self.source = source
def __enter__(self):
def p(counters):
return self.pipeline.publish_counters(self.context,
counters,
self.source)
return p
def __exit__(self, exc_type, exc_value, traceback):
self.pipeline.flush(self.context, self.source)
class Pipeline(object):
"""Sample handling pipeline
@ -223,6 +241,14 @@ class Pipeline(object):
LOG.audit("Pipeline %s: Published counters", self)
def publisher(self, context, source):
"""Build a new Publisher for this pipeline.
:param context: The context.
:param source: Counter source.
"""
return Publisher(self, context, source)
def publish_counter(self, ctxt, counter, source):
self.publish_counters(ctxt, [counter], source)

View File

@ -86,7 +86,6 @@ class TestRunTasks(base.TestCase):
self.Pollster.test_data)
def test_notifications(self):
self.assertTrue(self.mgr.pipeline_manager.publish_counter.called)
args, _ = self.mgr.pipeline_manager.publish_counter.call_args
self.assertEqual(args[1], self.Pollster.test_data)
self.assertEqual(args[2], cfg.CONF.counter_source)
self.assertTrue(self.mgr.pipeline_manager.publisher.called)
args, _ = self.mgr.pipeline_manager.publisher.call_args
self.assertEqual(args[1], cfg.CONF.counter_source)

View File

@ -94,7 +94,6 @@ class TestRunTasks(base.TestCase):
self.assertTrue(self.Pollster.counters[0][1] is self.instance)
def test_notifications(self):
self.assertTrue(self.mgr.pipeline_manager.publish_counter.called)
args, _ = self.mgr.pipeline_manager.publish_counter.call_args
self.assertEqual(args[1], self.Pollster.test_data)
self.assertEqual(args[2], cfg.CONF.counter_source)
self.assertTrue(self.mgr.pipeline_manager.publisher.called)
args, _ = self.mgr.pipeline_manager.publisher.call_args
self.assertEqual(args[1], cfg.CONF.counter_source)

View File

@ -48,6 +48,12 @@ class TestSwiftMiddleware(base.TestCase):
def publish_counters(self, context, counters, source):
self.counters.extend(counters)
def publisher(self, context, source):
return pipeline.Publisher(self, context, source)
def flush(self, context, source):
pass
def _faux_setup_pipeline(self, publisher_manager):
return self.pipeline_manager

View File

@ -2,7 +2,8 @@
#
# Copyright © 2013 Intel Corp.
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
# Authors: Yunhong Jiang <yunhong.jiang@intel.com>
# Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -20,6 +21,7 @@ from stevedore import dispatch
from stevedore import extension
from ceilometer import counter
from ceilometer import plugin
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer.tests import base
@ -61,7 +63,7 @@ class TestPipeline(base.TestCase):
def publish_counters(self, ctxt, counters, source):
raise Exception()
class TransformerClass(object):
class TransformerClass(plugin.TransformerBase):
samples = []
def __init__(self, append_name='_update'):
@ -76,7 +78,7 @@ class TestPipeline(base.TestCase):
newname = getattr(counter, 'name') + self.append_name
return counter._replace(name=newname)
class TransformerClassDrop(object):
class TransformerClassDrop(plugin.TransformerBase):
samples = []
def __init__(self):
@ -94,15 +96,15 @@ class TestPipeline(base.TestCase):
caches = []
def __init__(self, drop=True):
self.__class__.samples = []
self.__class__.caches = []
def handle_sample(self, ctxt, counter, source):
self.__class__.samples.append(counter)
self.__class__.caches.append(counter)
def flush(self, ctxt, source):
return self.__class__.caches
x = self.__class__.caches
self.__class__.caches = []
return x
def _create_publisher_manager(self, ext_name='test'):
self.publisher_manager = dispatch.NameDispatchExtensionManager(
@ -585,7 +587,6 @@ class TestPipeline(base.TestCase):
pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(len(self.TransformerClassCache.caches) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
@ -615,7 +616,6 @@ class TestPipeline(base.TestCase):
self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(len(self.TransformerClassCache.caches) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 4)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')