transformer: add acculumator transformer

This adds a transformer accumulating counters until a threshold, and then
flushing them out.

This implements a solution to blueprint swift-batched-requests

Change-Id: Ic1f36138d8ee1e5705f2285987763fbff9de0184
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-02-11 16:38:06 +01:00
parent 44e262d98d
commit b4e6fa07d1
4 changed files with 66 additions and 27 deletions

View File

View File

@ -0,0 +1,43 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Julien Danjou
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer import plugin
class TransformerAccumulator(plugin.TransformerBase):
"""Transformer that accumulates counter until a threshold, and then flush
them out in the wild. """
def __init__(self, size=1, **kwargs):
if size >= 1:
self.counters = []
self.size = size
super(TransformerAccumulator, self).__init__(**kwargs)
def handle_sample(self, context, counter, source):
if self.size >= 1:
self.counters.append(counter)
else:
return counter
def flush(self, context, source):
if len(self.counters) >= self.size:
x = self.counters
self.counters = []
return x
return []

View File

@ -131,6 +131,7 @@ setuptools.setup(
libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
[ceilometer.transformer] [ceilometer.transformer]
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
[ceilometer.publisher] [ceilometer.publisher]
meter_publisher = ceilometer.publisher.meter_publish:MeterPublisher meter_publisher = ceilometer.publisher.meter_publish:MeterPublisher

View File

@ -22,6 +22,7 @@ from stevedore import extension
from ceilometer import counter from ceilometer import counter
from ceilometer import plugin from ceilometer import plugin
from ceilometer.transformer import accumulator
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer.tests import base from ceilometer.tests import base
@ -42,7 +43,7 @@ class TestPipeline(base.TestCase):
'update': self.TransformerClass, 'update': self.TransformerClass,
'except': self.TransformerClassException, 'except': self.TransformerClassException,
'drop': self.TransformerClassDrop, 'drop': self.TransformerClassDrop,
'cache': self.TransformerClassCache} 'cache': accumulator.TransformerAccumulator}
if name in class_name_ext: if name in class_name_ext:
return extension.Extension(name, None, return extension.Extension(name, None,
@ -91,21 +92,6 @@ class TestPipeline(base.TestCase):
def handle_sample(self, ctxt, counter, source): def handle_sample(self, ctxt, counter, source):
raise Exception() raise Exception()
class TransformerClassCache(object):
samples = []
caches = []
def __init__(self, drop=True):
self.__class__.caches = []
def handle_sample(self, ctxt, counter, source):
self.__class__.caches.append(counter)
def flush(self, ctxt, source):
x = self.__class__.caches
self.__class__.caches = []
return x
def _create_publisher_manager(self, ext_name='test'): def _create_publisher_manager(self, ext_name='test'):
self.publisher_manager = dispatch.NameDispatchExtensionManager( self.publisher_manager = dispatch.NameDispatchExtensionManager(
'fake', 'fake',
@ -563,10 +549,13 @@ class TestPipeline(base.TestCase):
== 'b_update') == 'b_update')
def test_flush_pipeline_cache(self): def test_flush_pipeline_cache(self):
CACHE_SIZE = 10
self.pipeline_cfg[0]['transformers'].extend([ self.pipeline_cfg[0]['transformers'].extend([
{ {
'name': 'cache', 'name': 'cache',
'parameters': {} 'parameters': {
'size': CACHE_SIZE,
}
}, },
{ {
'name': 'update', 'name': 'update',
@ -581,20 +570,27 @@ class TestPipeline(base.TestCase):
pipe = pipeline_manager.pipelines_for_counter('a')[0] pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClassCache.caches) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None) pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 1) self.assertEqual(len(self.publisher.counters), 0)
self.assertTrue(len(self.TransformerClass.samples) == 2) pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), 0)
for i in range(CACHE_SIZE - 2):
pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(self.publisher.counters[0], 'name') self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new') == 'a_update_new')
def test_flush_pipeline_cache_multiple_counter(self): def test_flush_pipeline_cache_multiple_counter(self):
CACHE_SIZE = 3
self.pipeline_cfg[0]['transformers'].extend([ self.pipeline_cfg[0]['transformers'].extend([
{ {
'name': 'cache', 'name': 'cache',
'parameters': {} 'parameters': {
'size': CACHE_SIZE
}
}, },
{ {
'name': 'update', 'name': 'update',
@ -611,12 +607,12 @@ class TestPipeline(base.TestCase):
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b') self.test_counter = self.test_counter._replace(name='b')
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClassCache.caches) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None) pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 2) self.assertEqual(len(self.publisher.counters), 0)
self.assertTrue(len(self.TransformerClass.samples) == 4) pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(self.publisher.counters[0], 'name') self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new') == 'a_update_new')
self.assertTrue(getattr(self.publisher.counters[1], 'name') self.assertTrue(getattr(self.publisher.counters[1], 'name')
@ -632,7 +628,6 @@ class TestPipeline(base.TestCase):
pipe = pipeline_manager.pipelines_for_counter('a')[0] pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None) pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClassCache.caches) == 1)
self.assertTrue(len(self.publisher.counters) == 0) self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None) pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 1) self.assertTrue(len(self.publisher.counters) == 1)