Merge "pipeline: switch publisher loading model to driver"

This commit is contained in:
Jenkins 2013-06-14 15:30:10 +00:00 committed by Gerrit Code Review
commit 7988885901
15 changed files with 200 additions and 290 deletions

View File

@ -27,7 +27,6 @@ from oslo.config import cfg
from ceilometer import counter
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import service
from ceilometer import transformer
from ceilometer.openstack.common import timeutils
@ -86,9 +85,6 @@ pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
with pipeline_manager.publisher(context.get_admin_context(),

View File

@ -24,7 +24,6 @@ from oslo.config import cfg
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import transformer
LOG = log.getLogger(__name__)
@ -57,9 +56,6 @@ class AgentManager(object):
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
self.pollster_manager = extension_manager

View File

@ -21,7 +21,6 @@ from oslo.config import cfg
from pecan import hooks
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import storage
from ceilometer import transformer
@ -61,9 +60,7 @@ class PipelineHook(hooks.PecanHook):
# when the file is imported.
self.__class__.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer'),
publisher.PublisherExtensionManager(
'ceilometer.publisher'))
'ceilometer.transformer'))
def before(self, state):
state.request.pipeline_manager = self.pipeline_manager

View File

@ -31,7 +31,6 @@ from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import storage
from ceilometer import transformer
@ -117,9 +116,6 @@ class CollectorService(rpc_service.Service):
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
LOG.debug('loading notification handlers from %s',

View File

@ -17,7 +17,6 @@
# under the License.
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import transformer
from ceilometer.openstack.common import context as req_context
from ceilometer.openstack.common import log as logging
@ -57,9 +56,6 @@ def _load_pipeline_manager():
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)

View File

@ -60,7 +60,6 @@ from ceilometer import counter
from ceilometer.openstack.common import context
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import service
from ceilometer import transformer
@ -82,9 +81,6 @@ class CeilometerMiddleware(object):
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
def __call__(self, env, start_response):

View File

@ -23,6 +23,8 @@ from oslo.config import cfg
import yaml
from ceilometer.openstack.common import log
from ceilometer import publisher
OPTS = [
cfg.StrOpt('pipeline_cfg_file',
@ -90,7 +92,7 @@ class Pipeline(object):
"""
def __init__(self, cfg, publisher_manager, transformer_manager):
def __init__(self, cfg, transformer_manager):
self.cfg = cfg
try:
@ -100,10 +102,8 @@ class Pipeline(object):
except ValueError:
raise PipelineException("Invalid interval value", cfg)
self.counters = cfg['counters']
self.publishers = cfg['publishers']
# It's legal to have no transformer specified
self.transformer_cfg = cfg['transformers'] or []
self.publisher_manager = publisher_manager
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@ -113,7 +113,15 @@ class Pipeline(object):
self._check_counters()
self._check_publishers(cfg, publisher_manager)
if not cfg.get('publishers'):
raise PipelineException("No publisher specified", cfg)
self.publishers = []
for p in cfg['publishers']:
try:
self.publishers.append(publisher.get_publisher(p))
except Exception:
LOG.exception("Unable to load publisher %s", p)
self.transformers = self._setup_transformers(cfg, transformer_manager)
@ -144,16 +152,6 @@ class Pipeline(object):
"Included counters specified with wildcard",
self.cfg)
def _check_publishers(self, cfg, publisher_manager):
if not self.publishers:
raise PipelineException(
"No publisher specified", cfg)
if not set(self.publishers).issubset(set(publisher_manager.names())):
raise PipelineException(
"Publishers %s invalid" %
set(self.publishers).difference(
set(self.publisher_manager.names())), cfg)
def _setup_transformers(self, cfg, transformer_manager):
transformer_cfg = cfg['transformers'] or []
transformers = []
@ -174,14 +172,6 @@ class Pipeline(object):
return transformers
def _publish_counters_to_one_publisher(self, ext, ctxt, counters, source):
try:
ext.obj.publish_counters(ctxt, counters, source)
except Exception as err:
LOG.warning("Pipeline %s: Continue after error "
"from publisher %s", self, ext.name)
LOG.exception(err)
def _transform_counter(self, start, ctxt, counter, source):
try:
for transformer in self.transformers[start:]:
@ -218,12 +208,13 @@ class Pipeline(object):
transformed_counters.append(counter)
LOG.audit("Pipeline %s: Publishing counters", self)
self.publisher_manager.map(self.publishers,
self._publish_counters_to_one_publisher,
ctxt=ctxt,
counters=transformed_counters,
source=source,
)
for p in self.publishers:
try:
p.publish_counters(ctxt, transformed_counters, source)
except Exception:
LOG.exception("Pipeline %s: Continue after error "
"from publisher %s", self, p)
LOG.audit("Pipeline %s: Published counters", self)
@ -288,8 +279,7 @@ class PipelineManager(object):
"""
def __init__(self, cfg,
transformer_manager,
publisher_manager):
transformer_manager):
"""Setup the pipelines according to config.
The top of the cfg is a list of pipeline definitions.
@ -331,8 +321,7 @@ class PipelineManager(object):
Publisher's name is plugin name in setup.py
"""
self.pipelines = [Pipeline(pipedef, publisher_manager,
transformer_manager)
self.pipelines = [Pipeline(pipedef, transformer_manager)
for pipedef in cfg]
def publisher(self, context, source):
@ -344,7 +333,7 @@ class PipelineManager(object):
return PublishContext(context, source, self.pipelines)
def setup_pipeline(transformer_manager, publisher_manager):
def setup_pipeline(transformer_manager):
"""Setup pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
if not os.path.exists(cfg_file):
@ -359,5 +348,4 @@ def setup_pipeline(transformer_manager, publisher_manager):
LOG.info("Pipeline config: %s", pipeline_cfg)
return PipelineManager(pipeline_cfg,
transformer_manager,
publisher_manager)
transformer_manager)

View File

@ -19,17 +19,17 @@
# under the License.
import abc
from stevedore import dispatch
from stevedore import driver
class PublisherExtensionManager(dispatch.NameDispatchExtensionManager):
def get_publisher(name, namespace='ceilometer.publisher'):
"""Get publisher driver and load it.
def __init__(self, namespace):
super(PublisherExtensionManager, self).__init__(
namespace=namespace,
check_func=lambda x: True,
invoke_on_load=True,
)
:param name: Name of the publisher driver.
:param namespace: Namespace to use to look for drivers.
"""
loaded_driver = driver.DriverManager(namespace, name)
return loaded_driver.driver()
class PublisherBase(object):

View File

@ -0,0 +1,37 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Publish a counter in memory, useful for testing
"""
from ceilometer import publisher
class TestPublisher(publisher.PublisherBase):
"""Publisher used in unit testing."""
def __init__(self):
self.counters = []
def publish_counters(self, context, counters, source):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param counter: Counter from pipeline after transformation
:param source: counter source
"""
self.counters.extend(counters)

View File

@ -77,6 +77,7 @@ ceilometer.transformer =
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
ceilometer.publisher =
test = ceilometer.publisher.test:TestPublisher
meter_publisher = ceilometer.publisher.meter:MeterPublisher
meter = ceilometer.publisher.meter:MeterPublisher
udp = ceilometer.publisher.udp:UDPPublisher

View File

@ -28,7 +28,6 @@ from stevedore.tests import manager as extension_tests
from ceilometer import counter
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.tests import base
from ceilometer import transformer
@ -70,13 +69,6 @@ class TestPollsterException(TestPollster):
class BaseAgentManagerTestCase(base.TestCase):
class PublisherClass():
def __init__(self):
self.counters = []
def publish_counters(self, ctxt, counter, source):
self.counters.extend(counter)
class Pollster(TestPollster):
counters = []
test_data = default_test_data
@ -94,29 +86,12 @@ class BaseAgentManagerTestCase(base.TestCase):
test_data = default_test_data._replace(name='testexceptionanother')
def setup_pipeline(self):
self.publisher = self.PublisherClass()
self.transformer_manager = transformer.TransformerExtensionManager(
'ceilometer.transformer',
)
self.publisher_manager = publisher.PublisherExtensionManager(
'fake',
)
self.publisher_manager.extensions = [
extension.Extension(
'test_pub',
None,
None,
self.publisher,
), ]
self.publisher_manager.by_name = dict(
(e.name, e)
for e
in self.publisher_manager.extensions)
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
def create_extension_manager(self):
return extension_tests.TestExtensionManager(
@ -160,7 +135,7 @@ class BaseAgentManagerTestCase(base.TestCase):
'interval': 60,
'counters': ['test'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
}, ]
self.setup_pipeline()
@ -176,7 +151,8 @@ class BaseAgentManagerTestCase(base.TestCase):
self.assertEqual(len(polling_tasks), 1)
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.values()[0])
self.assertEqual(self.publisher.counters[0], self.Pollster.test_data)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(pub.counters[0], self.Pollster.test_data)
def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg.append({
@ -184,7 +160,7 @@ class BaseAgentManagerTestCase(base.TestCase):
'interval': 10,
'counters': ['test'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
@ -199,7 +175,7 @@ class BaseAgentManagerTestCase(base.TestCase):
'interval': 10,
'counters': ['test_invalid'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1)
@ -211,7 +187,7 @@ class BaseAgentManagerTestCase(base.TestCase):
'interval': 60,
'counters': ['testanother'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
@ -226,23 +202,23 @@ class BaseAgentManagerTestCase(base.TestCase):
'interval': 10,
'counters': ['testexceptionanother'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
},
{
'name': "test_pipeline_2",
'interval': 10,
'counters': ['testexception'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
},
]
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks.keys()), 1)
polling_tasks.get(10)
self.mgr.interval_task(polling_tasks.get(10))
self.assertEqual(len(self.publisher.counters), 0)
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(pub.counters), 0)

View File

@ -58,7 +58,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.mgr.setup_notifier_task()
self.mgr.poll_instance(None, self.instance)
self.assertEqual(len(self.Pollster.counters), 1)
assert self.publisher.counters[0] == self.Pollster.test_data
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(pub.counters[0], self.Pollster.test_data)
def test_setup_polling_tasks(self):
super(TestRunTasks, self).test_setup_polling_tasks()

View File

@ -61,7 +61,7 @@ class TestSwiftMiddleware(base.TestCase):
def flush(self, ctx, source):
pass
def _faux_setup_pipeline(self, transformer_manager, publisher_manager):
def _faux_setup_pipeline(self, transformer_manager):
return self.pipeline_manager
def setUp(self):

View File

@ -20,10 +20,8 @@
from ceilometer import notifier
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import transformer
from ceilometer.tests import base as tests_base
from stevedore import extension
MESSAGE = {
@ -68,44 +66,21 @@ MESSAGE = {
class TestNotifier(tests_base.TestCase):
class PublisherClass():
def __init__(self):
self.counters = []
def publish_counters(self, ctxt, counter, source):
self.counters.extend(counter)
def test_process_notification(self):
pub = self.PublisherClass()
transformer_manager = transformer.TransformerExtensionManager(
'ceilometer.transformer',
)
publisher_manager = publisher.PublisherExtensionManager(
'fake',
)
publisher_manager.extensions = [
extension.Extension(
'test_pub',
None,
None,
pub,
), ]
publisher_manager.by_name = dict(
(e.name, e)
for e
in publisher_manager.extensions)
notifier._pipeline_manager = pipeline.PipelineManager(
[{
'name': "test_pipeline",
'interval': 60,
'counters': ['*'],
'transformers': [],
'publishers': ["test_pub"],
'publishers': ["test"],
}],
transformer_manager,
publisher_manager)
transformer_manager)
pub = notifier._pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(pub.counters), 0)
notifier.notify(None, MESSAGE)
self.assertTrue(len(pub.counters) > 0)

View File

@ -21,6 +21,7 @@ from stevedore import extension
from ceilometer import counter
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
from ceilometer import transformer
from ceilometer.transformer import accumulator
from ceilometer.openstack.common import timeutils
@ -53,12 +54,11 @@ class TestPipeline(base.TestCase):
raise KeyError(name)
class PublisherClass():
def __init__(self):
self.counters = []
def publish_counters(self, ctxt, counters, source):
self.counters.extend(counters)
def get_publisher(self, name, namespace=''):
fake_drivers = {'test': test_publisher.TestPublisher,
'new': test_publisher.TestPublisher,
'except': self.PublisherClassException}
return fake_drivers[name]()
class PublisherClassException():
def publish_counters(self, ctxt, counters, source):
@ -115,40 +115,10 @@ class TestPipeline(base.TestCase):
"get_ext",
self.fake_tem_get_ext)
self.publisher_manager = publisher.PublisherExtensionManager(
'fake',
)
self.stubs.Set(publisher, 'get_publisher', self.get_publisher)
self.transformer_manager = transformer.TransformerExtensionManager()
self.publisher = self.PublisherClass()
self.new_publisher = self.PublisherClass()
self.publisher_exception = self.PublisherClassException()
self.publisher_manager.extensions = [
extension.Extension(
'test',
None,
None,
self.publisher,
),
extension.Extension(
'new',
None,
None,
self.new_publisher,
),
extension.Extension(
'except',
None,
None,
self.publisher_exception,
),
]
self.publisher_manager.by_name = dict(
(e.name, e)
for e
in self.publisher_manager.extensions)
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 5,
@ -164,8 +134,7 @@ class TestPipeline(base.TestCase):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
def test_no_counters(self):
del self.pipeline_cfg[0]['counters']
@ -205,7 +174,6 @@ class TestPipeline(base.TestCase):
def test_check_publishers_invalid_publisher(self):
publisher_cfg = ['test_invalid']
self.pipeline_cfg[0]['publishers'] = publisher_cfg
self._exception_create_pipelinemanager()
def test_invalid_string_interval(self):
self.pipeline_cfg[0]['interval'] = 'string'
@ -221,24 +189,22 @@ class TestPipeline(base.TestCase):
def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertTrue(pipe.get_interval() == 5)
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
@ -246,73 +212,68 @@ class TestPipeline(base.TestCase):
counter_cfg = ['a', 'b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.test_counter = self.test_counter._replace(name='b')
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 2)
self.assertEqual(len(publisher.counters), 2)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(getattr(self.publisher.counters[1], "name")
== 'b_update')
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
self.assertEqual(getattr(publisher.counters[1], "name"), 'b_update')
def test_wildcard_counter(self):
counter_cfg = ['*']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.counters[0], "name"),
'a_update')
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
@ -320,8 +281,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['!a', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_counter('b'))
self.assertFalse(pipeline_manager.pipelines[0].support_counter('c'))
@ -342,8 +302,7 @@ class TestPipeline(base.TestCase):
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -352,12 +311,12 @@ class TestPipeline(base.TestCase):
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], "name")
== 'b_new')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
new_publisher = pipeline_manager.pipelines[1].publishers[0]
self.assertEqual(len(new_publisher.counters), 1)
self.assertEqual(getattr(new_publisher.counters[0], "name"), 'b_new')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
@ -382,8 +341,7 @@ class TestPipeline(base.TestCase):
'publishers': ['except'],
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -393,10 +351,9 @@ class TestPipeline(base.TestCase):
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a_update')
self.assertTrue(len(self.new_publisher.counters) == 0)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], "name"), 'a_update')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
@ -406,22 +363,22 @@ class TestPipeline(base.TestCase):
def test_none_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = None
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'), 'a')
def test_empty_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = []
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name') == 'a')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'), 'a')
def test_multiple_transformer_same_class(self):
self.pipeline_cfg[0]['transformers'] = [
@ -435,15 +392,15 @@ class TestPipeline(base.TestCase):
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_update')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update_update')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
@ -468,8 +425,7 @@ class TestPipeline(base.TestCase):
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -478,9 +434,10 @@ class TestPipeline(base.TestCase):
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], 'name')
== 'a_update')
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update_new')
def test_multiple_transformer_drop_transformer(self):
self.pipeline_cfg[0]['transformers'] = [
@ -504,12 +461,12 @@ class TestPipeline(base.TestCase):
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 0)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
@ -520,45 +477,44 @@ class TestPipeline(base.TestCase):
def test_multiple_publisher(self):
self.pipeline_cfg[0]['publishers'] = ['test', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], 'name')
== 'a_update')
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
publisher = pipeline_manager.pipelines[0].publishers[0]
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(len(new_publisher.counters), 1)
self.assertEqual(getattr(new_publisher.counters[0], 'name'),
'a_update')
self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update')
def test_multiple_publisher_isolation(self):
self.pipeline_cfg[0]['publishers'] = ['except', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.new_publisher.counters) == 1)
self.assertTrue(getattr(self.new_publisher.counters[0], 'name')
== 'a_update')
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(len(new_publisher.counters), 1)
self.assertEqual(getattr(new_publisher.counters[0], 'name'),
'a_update')
def test_multiple_counter_pipeline(self):
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter,
self.test_counter._replace(name='b')])
self.assertTrue(len(self.publisher.counters) == 2)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
self.assertTrue(getattr(self.publisher.counters[1], 'name')
== 'b_update')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 2)
self.assertEqual(getattr(publisher.counters[0], 'name'), 'a_update')
self.assertEqual(getattr(publisher.counters[1], 'name'), 'b_update')
def test_flush_pipeline_cache(self):
CACHE_SIZE = 10
@ -578,22 +534,22 @@ class TestPipeline(base.TestCase):
}, ]
)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), 0)
self.assertEqual(len(publisher.counters), 0)
pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), 0)
self.assertEqual(len(publisher.counters), 0)
for i in range(CACHE_SIZE - 2):
pipe.publish_counter(None, self.test_counter, None)
pipe.flush(None, None)
self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
self.assertEqual(len(publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(publisher.counters[0], 'name')
== 'a_update_new')
def test_flush_pipeline_cache_multiple_counter(self):
@ -615,23 +571,22 @@ class TestPipeline(base.TestCase):
)
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter,
self.test_counter._replace(name='b')])
self.assertTrue(len(self.publisher.counters) == 0)
self.assertEqual(len(self.publisher.counters), 0)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 0)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertEqual(len(self.publisher.counters), CACHE_SIZE)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update_new')
self.assertTrue(getattr(self.publisher.counters[1], 'name')
== 'b_update_new')
self.assertEqual(len(publisher.counters), CACHE_SIZE)
self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update_new')
self.assertEqual(getattr(publisher.counters[1], 'name'),
'b_update_new')
def test_flush_pipeline_cache_before_publisher(self):
self.pipeline_cfg[0]['transformers'].append({
@ -639,16 +594,16 @@ class TestPipeline(base.TestCase):
'parameters': {}
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
publisher = pipe.publishers[0]
pipe.publish_counter(None, self.test_counter, None)
self.assertTrue(len(self.publisher.counters) == 0)
self.assertEqual(len(publisher.counters), 0)
pipe.flush(None, None)
self.assertTrue(len(self.publisher.counters) == 1)
self.assertTrue(getattr(self.publisher.counters[0], 'name')
== 'a_update')
self.assertEqual(len(publisher.counters), 1)
self.assertEqual(getattr(publisher.counters[0], 'name'),
'a_update')
def test_variable_counter(self):
self.pipeline_cfg = [{
@ -662,17 +617,17 @@ class TestPipeline(base.TestCase):
'publishers': ["test"],
}, ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.transformer_manager)
self.test_counter = self.test_counter._replace(name='a:b')
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
self.assertTrue(len(self.publisher.counters) == 1)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.counters), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.publisher.counters[0], "name")
== 'a:b_update')
self.assertEqual(getattr(publisher.counters[0], "name"),
'a:b_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a:b')