aodh/tests/test_pipeline.py
Julien Danjou c885f33c31 Code cleanup, remove useless import
This is a massive removal of all useless import that are all over the code
and unit tests. There's also some simple dead code removal or move.

Change-Id: Ida1208b47f38e552219e3b909f8d7c4f22ba3273
Signed-off-by: Julien Danjou <julien@danjou.info>
2013-02-11 17:29:31 +01:00

666 lines
26 KiB
Python

# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Intel Corp.
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import dispatch
from stevedore import extension
from ceilometer import counter
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer.tests import base
class TestPipeline(base.TestCase):
def fake_tem_init(self):
"""Fake a transformerManager for pipeline
The faked entry point setting is below:
update: TransformerClass
except: TransformerClassException
drop: TransformerClassDrop
"""
pass
def fake_tem_get_ext(self, name):
class_name_ext = {
'update': self.TransformerClass,
'except': self.TransformerClassException,
'drop': self.TransformerClassDrop,
'cache': self.TransformerClassCache}
if name in class_name_ext:
return extension.Extension(name, None,
class_name_ext[name],
None,
)
raise KeyError(name)
class PublisherClass():
def __init__(self):
self.counters = []
def publish_counter(self, ctxt, counter, source):
self.counters.append(counter)
class PublisherClassException():
def publish_counter(self, ctxt, counter, source):
raise Exception()
class TransformerClass(object):
samples = []
def __init__(self, append_name='_update'):
self.__class__.samples = []
self.append_name = append_name
def flush(self, ctxt, source):
return []
def handle_sample(self, ctxt, counter, source):
self.__class__.samples.append(counter)
newname = getattr(counter, 'name') + self.append_name
return counter._replace(name=newname)
class TransformerClassDrop(object):
samples = []
def __init__(self):
self.__class__.samples = []
def handle_sample(self, ctxt, counter, source):
self.__class__.samples.append(counter)
class TransformerClassException(object):
def handle_sample(self, ctxt, counter, source):
raise Exception()
class TransformerClassCache(object):
samples = []
caches = []
def __init__(self, drop=True):
self.__class__.samples = []
self.__class__.caches = []
def handle_sample(self, ctxt, counter, source):
self.__class__.samples.append(counter)
self.__class__.caches.append(counter)
def flush(self, ctxt, source):
return self.__class__.caches
def _create_publisher_manager(self, ext_name='test'):
self.publisher_manager = dispatch.NameDispatchExtensionManager(
'fake',
[],
invoke_on_load=False,
)
self.publisher = self.PublisherClass()
self.new_publisher = self.PublisherClass()
self.publisher_exception = self.PublisherClassException()
self.publisher_manager.extensions = [
extension.Extension(
'test',
None,
None,
self.publisher,
),
extension.Extension(
'new',
None,
None,
self.new_publisher,
),
extension.Extension(
'except',
None,
None,
self.publisher_exception,
),
]
self.publisher_manager.by_name = dict(
(e.name, e)
for e
in self.publisher_manager.extensions)
def setUp(self):
super(TestPipeline, self).setUp()
self.test_counter = counter.Counter(
name='a',
type='test_type',
volume=1,
unit='B',
user_id="test_user",
project_id="test_proj",
resource_id="test_resource",
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={}
)
self.stubs.Set(pipeline.TransformerExtensionManager,
"__init__",
self.fake_tem_init)
self.stubs.Set(pipeline.TransformerExtensionManager,
"get_ext",
self.fake_tem_get_ext)
self._create_publisher_manager()
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 5,
'counters': ['a'],
'transformers': [
{'name': "update",
'parameters': {}}
],
'publishers': ["test"],
}, ]
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.pipeline_cfg,
self.publisher_manager)
def test_no_counters(self):
del self.pipeline_cfg[0]['counters']
self._exception_create_pipelinemanager()
def test_no_transformers(self):
del self.pipeline_cfg[0]['transformers']
self._exception_create_pipelinemanager()
def test_no_name(self):
del self.pipeline_cfg[0]['name']
self._exception_create_pipelinemanager()
def test_no_interval(self):
del self.pipeline_cfg[0]['interval']
self._exception_create_pipelinemanager()
def test_no_publishers(self):
del self.pipeline_cfg[0]['publishers']
self._exception_create_pipelinemanager()
def test_check_counters_include_exclude_same(self):
counter_cfg = ['a', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._exception_create_pipelinemanager()
def test_check_counters_include_exclude(self):
counter_cfg = ['a', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._exception_create_pipelinemanager()
def test_check_counters_wildcard_included(self):
counter_cfg = ['a', '*']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._exception_create_pipelinemanager()
def test_check_publishers_invalid_publisher(self):
publisher_cfg = ['test_invalid']
self.pipeline_cfg[0]['publishers'] = publisher_cfg
self._exception_create_pipelinemanager()
def test_invalid_string_interval(self):
self.pipeline_cfg[0]['interval'] = 'string'
self._exception_create_pipelinemanager()
def test_check_transformer_invalid_transformer(self):
transformer_cfg = [
{'name': "test_invalid",
'parameters': {}}
]
self.pipeline_cfg[0]['transformers'] = transformer_cfg
self._exception_create_pipelinemanager()
def test_pipelines_for_counter(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
self.assertTrue(len(pipeline_manager.pipelines_for_counter('a'))
== 1)
self.assertTrue(len(pipeline_manager.pipelines_for_counter('b'))
== 0)
def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
self.assertTrue(pipe.get_interval() == 5)
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
def test_multiple_included_counters(self):
counter_cfg = ['a', 'b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
pipe = pipeline_manager.pipelines_for_counter('b')[0]
self.test_counter = self.test_counter._replace(name='b')
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(getattr(self.publisher.counters[1], "name")
== 'b_update')
def test_wildcard_counter(self):
counter_cfg = ['*']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')
self.assertTrue(len(pipe) == 0)
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
def test_all_excluded_counters_is_excluded(self):
counter_cfg = ['!a', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')
self.assertTrue(len(pipe) == 0)
pipe_1 = pipeline_manager.pipelines_for_counter('c')
self.assertTrue(len(pipe_1) == 0)
def test_multiple_pipeline(self):
self.pipeline_cfg.append({
'name': 'second_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
"append_name": "_new",
}
}],
'publishers': ['new'],
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], "name")
== 'b_new')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], "name")
== 'b')
def test_multple_pipeline_exception(self):
self.pipeline_cfg.append({
'name': "second_pipeline",
"interval": 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
"append_name": "_new",
}
}],
'publishers': ['except'],
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(len(self.new_publisher.counters) == 0)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], "name")
== 'b')
def test_none_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = None
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
def test_empty_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = []
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
def test_multiple_transformer_same_class(self):
self.pipeline_cfg[0]['transformers'] = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'update',
'parameters': {}
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_update')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], 'name')
== 'a_update')
def test_multiple_transformer_same_class_different_parameter(self):
self.pipeline_cfg[0]['transformers'] = [
{
'name': 'update',
'parameters':
{
"append_name": "_update",
}
},
{
'name': 'update',
'parameters':
{
"append_name": "_new",
}
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], 'name')
== 'a_update')
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
def test_multiple_transformer_drop_transformer(self):
self.pipeline_cfg[0]['transformers'] = [
{
'name': 'update',
'parameters':
{
"append_name": "_update",
}
},
{
'name': 'drop',
'parameters': {}
},
{
'name': 'update',
'parameters':
{
"append_name": "_new",
}
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
self.assertTrue(len(self.TransformerClassDrop.samples) == 1)
self.assertTrue(getattr(self.TransformerClassDrop.samples[0], 'name')
== 'a_update')
def test_multiple_publisher(self):
self.pipeline_cfg[0]['publishers'] = ['test', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], 'name')
== 'a_update')
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
def test_multiple_publisher_isolation(self):
self.pipeline_cfg[0]['publishers'] = ['except', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], 'name')
== 'a_update')
def test_multiple_counter_pipeline(self):
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe_1 = pipeline_manager.pipelines_for_counter('b')[0]
self.assertTrue(pipe is pipe_1)
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe_1.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
self.assertTrue(getattr(self.publisher.counters[1], 'name')
== 'b_update')
def test_flush_pipeline_cache(self):
self.pipeline_cfg[0]['transformers'].extend([
{
'name': 'cache',
'parameters': {}
},
{
'name': 'update',
'parameters':
{
'append_name': '_new'
}
}, ]
)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClassCache.caches) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(len(self.TransformerClassCache.caches) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
def test_flush_pipeline_cache_multiple_counter(self):
self.pipeline_cfg[0]['transformers'].extend([
{
'name': 'cache',
'parameters': {}
},
{
'name': 'update',
'parameters':
{
'append_name': '_new'
}
}, ]
)
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.test_counter = self.test_counter._replace(name='b')
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClassCache.caches) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(len(self.TransformerClassCache.caches) == 2)
self.assertTrue(len(self.TransformerClass.samples) == 4)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
self.assertTrue(getattr(self.publisher.counters[1], 'name')
== 'b_update_new')
def test_flush_pipeline_cache_before_publisher(self):
self.pipeline_cfg[0]['transformers'].append({
'name': 'cache',
'parameters': {}
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a')[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.TransformerClassCache.caches) == 1)
self.assertTrue(len(self.publisher.counters) == 0)
pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
def test_variable_counter(self):
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 5,
'counters': ['a:*'],
'transformers': [
{'name': "update",
'parameters': {}}
],
'publishers': ["test"],
}, ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.publisher_manager)
pipe = pipeline_manager.pipelines_for_counter('a:*')[0]
self.test_counter = self.test_counter._replace(name='a:b')
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a:b_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a:b')