From a56fb1681588d1cbb5854e797cf41f5bbb3adf1b Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Fri, 5 Apr 2013 15:42:36 +0200 Subject: [PATCH] pipeline, publisher, transformer: reorganize code This move a bit of code to make it less Ceilometer centric. This is part of blueprint oslo-multi-publisher Change-Id: I2eb174cb3000c9cca7d3771a2ab66a1a948f5cd9 Signed-off-by: Julien Danjou --- bin/ceilometer-send-counter | 17 +++-- ceilometer/agent.py | 16 +++-- ceilometer/collector/service.py | 15 ++-- ceilometer/objectstore/swift_middleware.py | 17 +++-- ceilometer/pipeline.py | 31 ++------ ceilometer/publisher/__init__.py | 31 ++++++++ ceilometer/transformer/__init__.py | 34 +++++++++ tests/agentbase.py | 14 ++-- tests/objectstore/test_swift_middleware.py | 2 +- tests/test_pipeline.py | 84 ++++++++++++++-------- 10 files changed, 173 insertions(+), 88 deletions(-) diff --git a/bin/ceilometer-send-counter b/bin/ceilometer-send-counter index 33b8adbc5..2b8331721 100755 --- a/bin/ceilometer-send-counter +++ b/bin/ceilometer-send-counter @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- # -# Copyright © 2012 Julien Danjou +# Copyright © 2012-2013 Julien Danjou # # Author: Julien Danjou # @@ -31,7 +31,9 @@ gettextutils.install('ceilometer') from ceilometer import counter from ceilometer import pipeline +from ceilometer import publisher from ceilometer import service +from ceilometer import transformer from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import context @@ -84,11 +86,14 @@ root_logger = logging.getLogger('') root_logger.addHandler(console) root_logger.setLevel(logging.DEBUG) -publish_manager = dispatch.NameDispatchExtensionManager( - namespace=pipeline.PUBLISHER_NAMESPACE, - check_func=lambda x: True, - invoke_on_load=True) -pipeline_manager = pipeline.setup_pipeline(publish_manager) +pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ), + publisher.PublisherExtensionManager( + 'ceilometer.publisher', + ), +) with pipeline_manager.publisher(context.get_admin_context(), cfg.CONF.counter_source) as p: diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 822b00a31..bc215fb83 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -20,11 +20,12 @@ import abc import itertools from oslo.config import cfg -from stevedore import dispatch from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer import pipeline +from ceilometer import publisher +from ceilometer import transformer LOG = log.getLogger(__name__) @@ -52,14 +53,15 @@ class PollingTask(object): class AgentManager(object): def __init__(self, extension_manager): - publisher_manager = dispatch.NameDispatchExtensionManager( - namespace=pipeline.PUBLISHER_NAMESPACE, - check_func=lambda x: True, - invoke_on_load=True, + self.pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ), + publisher.PublisherExtensionManager( + 'ceilometer.publisher', + ), ) - self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) - self.pollster_manager = extension_manager self.context = context.RequestContext('admin', 'admin', is_admin=True) diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 2f4ddf143..22259c111 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -17,7 +17,6 @@ # under the License. from oslo.config import cfg -from stevedore import dispatch from ceilometer.collector import meter as meter_api from ceilometer import extension_manager @@ -32,8 +31,10 @@ import ceilometer.openstack.common.notifier.rpc_notifier from ceilometer.openstack.common import timeutils from ceilometer import pipeline +from ceilometer import publisher from ceilometer import service from ceilometer import storage +from ceilometer import transformer OPTS = [ cfg.ListOpt('disabled_notification_listeners', @@ -61,12 +62,14 @@ class CollectorService(service.PeriodicService): def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' LOG.debug('initialize_service_hooks') - publisher_manager = dispatch.NameDispatchExtensionManager( - namespace=pipeline.PUBLISHER_NAMESPACE, - check_func=lambda x: True, - invoke_on_load=True, + self.pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ), + publisher.PublisherExtensionManager( + 'ceilometer.publisher', + ), ) - self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) LOG.debug('loading notification handlers from %s', self.COLLECTOR_NAMESPACE) diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index 071bb1f6a..9dee2c9f8 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -38,7 +38,6 @@ metadata_headers = X-TEST from __future__ import absolute_import from oslo.config import cfg -from stevedore import dispatch from swift.common.utils import split_path import webob @@ -61,7 +60,9 @@ from ceilometer import counter from ceilometer.openstack.common import context from ceilometer.openstack.common import timeutils from ceilometer import pipeline +from ceilometer import publisher from ceilometer import service +from ceilometer import transformer class CeilometerMiddleware(object): @@ -78,13 +79,15 @@ class CeilometerMiddleware(object): "").split(",") if h.strip()] service.prepare_service() - publisher_manager = dispatch.NameDispatchExtensionManager( - namespace=pipeline.PUBLISHER_NAMESPACE, - check_func=lambda x: True, - invoke_on_load=True, - ) - self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) + self.pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ), + publisher.PublisherExtensionManager( + 'ceilometer.publisher', + ), + ) def __call__(self, env, start_response): start_response_args = [None] diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 8609a64c7..bf59c0459 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -20,7 +20,6 @@ import itertools import os from oslo.config import cfg -from stevedore import extension import yaml from ceilometer.openstack.common import log @@ -36,9 +35,6 @@ cfg.CONF.register_opts(OPTS) LOG = log.getLogger(__name__) -PUBLISHER_NAMESPACE = 'ceilometer.publisher' -TRANSFORMER_NAMESPACE = 'ceilometer.transformer' - class PipelineException(Exception): def __init__(self, message, pipeline_cfg): @@ -49,21 +45,6 @@ class PipelineException(Exception): return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) -class TransformerExtensionManager(extension.ExtensionManager): - - def __init__(self): - super(TransformerExtensionManager, self).__init__( - namespace=TRANSFORMER_NAMESPACE, - invoke_on_load=False, - invoke_args=(), - invoke_kwds={} - ) - self.by_name = dict((e.name, e) for e in self.extensions) - - def get_ext(self, name): - return self.by_name[name] - - class PublishContext(object): def __init__(self, context, source, pipelines=[]): @@ -306,11 +287,9 @@ class PipelineManager(object): """ - def __init__(self, cfg, publisher_manager): - """Create the pipeline manager.""" - self._setup_pipelines(cfg, publisher_manager) - - def _setup_pipelines(self, cfg, publisher_manager): + def __init__(self, cfg, + transformer_manager, + publisher_manager): """Setup the pipelines according to config. The top of the cfg is a list of pipeline definitions. @@ -352,7 +331,6 @@ class PipelineManager(object): Publisher's name is plugin name in setup.py """ - transformer_manager = TransformerExtensionManager() self.pipelines = [Pipeline(pipedef, publisher_manager, transformer_manager) for pipedef in cfg] @@ -366,7 +344,7 @@ class PipelineManager(object): return PublishContext(context, source, self.pipelines) -def setup_pipeline(publisher_manager): +def setup_pipeline(transformer_manager, publisher_manager): """Setup pipeline manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file if not os.path.exists(cfg_file): @@ -381,4 +359,5 @@ def setup_pipeline(publisher_manager): LOG.info("Pipeline config: %s", pipeline_cfg) return PipelineManager(pipeline_cfg, + transformer_manager, publisher_manager) diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py index e69de29bb..63f40f589 100644 --- a/ceilometer/publisher/__init__.py +++ b/ceilometer/publisher/__init__.py @@ -0,0 +1,31 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 Intel Corp. +# Copyright © 2013 eNovance +# +# Author: Yunhong Jiang +# Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from stevedore import dispatch + + +class PublisherExtensionManager(dispatch.NameDispatchExtensionManager): + + def __init__(self, namespace): + super(PublisherExtensionManager, self).__init__( + namespace=namespace, + check_func=lambda x: True, + invoke_on_load=True, + ) diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py index e69de29bb..60ba26897 100644 --- a/ceilometer/transformer/__init__.py +++ b/ceilometer/transformer/__init__.py @@ -0,0 +1,34 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 Intel Corp. +# +# Author: Yunhong Jiang +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from stevedore import extension + + +class TransformerExtensionManager(extension.ExtensionManager): + + def __init__(self, namespace): + super(TransformerExtensionManager, self).__init__( + namespace=namespace, + invoke_on_load=False, + invoke_args=(), + invoke_kwds={} + ) + self.by_name = dict((e.name, e) for e in self.extensions) + + def get_ext(self, name): + return self.by_name[name] diff --git a/tests/agentbase.py b/tests/agentbase.py index dab59c5d9..abeef0975 100644 --- a/tests/agentbase.py +++ b/tests/agentbase.py @@ -2,8 +2,10 @@ # # Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2013 Intel corp. +# Copyright © 2013 eNovance # # Author: Yunhong Jiang +# Julien Danjou # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -22,12 +24,13 @@ import datetime import mock from stevedore import extension -from stevedore import dispatch from stevedore.tests import manager as extension_tests from ceilometer import counter from ceilometer import pipeline +from ceilometer import publisher from ceilometer.tests import base +from ceilometer import transformer default_test_data = counter.Counter( @@ -92,10 +95,11 @@ class BaseAgentManagerTestCase(base.TestCase): def setup_pipeline(self): self.publisher = self.PublisherClass() - self.publisher_manager = dispatch.NameDispatchExtensionManager( + self.transformer_manager = transformer.TransformerExtensionManager( + 'ceilometer.transformer', + ) + self.publisher_manager = publisher.PublisherExtensionManager( 'fake', - check_func=lambda x: True, - invoke_on_load=False, ) self.publisher_manager.extensions = [ extension.Extension( @@ -111,6 +115,7 @@ class BaseAgentManagerTestCase(base.TestCase): self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) def create_extension_manager(self): @@ -233,6 +238,7 @@ class BaseAgentManagerTestCase(base.TestCase): ] self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) polling_tasks = self.mgr.setup_polling_tasks() diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index 5f5244bbe..ff508b3cb 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -61,7 +61,7 @@ class TestSwiftMiddleware(base.TestCase): def flush(self, ctx, source): pass - def _faux_setup_pipeline(self, publisher_manager): + def _faux_setup_pipeline(self, transformer_manager, publisher_manager): return self.pipeline_manager def setUp(self): diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 27e466f0a..06c92553d 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -17,11 +17,12 @@ # License for the specific language governing permissions and limitations # under the License. -from stevedore import dispatch from stevedore import extension from ceilometer import counter from ceilometer import plugin +from ceilometer import publisher +from ceilometer import transformer from ceilometer.transformer import accumulator from ceilometer.openstack.common import timeutils from ceilometer import pipeline @@ -92,13 +93,35 @@ class TestPipeline(base.TestCase): def handle_sample(self, ctxt, counter, source): raise Exception() - def _create_publisher_manager(self, ext_name='test'): - self.publisher_manager = dispatch.NameDispatchExtensionManager( - 'fake', - [], - invoke_on_load=False, + def setUp(self): + super(TestPipeline, self).setUp() + + self.test_counter = counter.Counter( + name='a', + type='test_type', + volume=1, + unit='B', + user_id="test_user", + project_id="test_proj", + resource_id="test_resource", + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={} ) + self.stubs.Set(transformer.TransformerExtensionManager, + "__init__", + self.fake_tem_init) + + self.stubs.Set(transformer.TransformerExtensionManager, + "get_ext", + self.fake_tem_get_ext) + + self.publisher_manager = publisher.PublisherExtensionManager( + 'fake', + ) + + self.transformer_manager = transformer.TransformerExtensionManager() + self.publisher = self.PublisherClass() self.new_publisher = self.PublisherClass() self.publisher_exception = self.PublisherClassException() @@ -127,30 +150,6 @@ class TestPipeline(base.TestCase): for e in self.publisher_manager.extensions) - def setUp(self): - super(TestPipeline, self).setUp() - - self.test_counter = counter.Counter( - name='a', - type='test_type', - volume=1, - unit='B', - user_id="test_user", - project_id="test_proj", - resource_id="test_resource", - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={} - ) - - self.stubs.Set(pipeline.TransformerExtensionManager, - "__init__", - self.fake_tem_init) - - self.stubs.Set(pipeline.TransformerExtensionManager, - "get_ext", - self.fake_tem_get_ext) - - self._create_publisher_manager() self.pipeline_cfg = [{ 'name': "test_pipeline", 'interval': 5, @@ -166,6 +165,7 @@ class TestPipeline(base.TestCase): self.assertRaises(pipeline.PipelineException, pipeline.PipelineManager, self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) def test_no_counters(self): @@ -222,6 +222,7 @@ class TestPipeline(base.TestCase): def test_get_interval(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) pipe = pipeline_manager.pipelines[0] @@ -229,6 +230,7 @@ class TestPipeline(base.TestCase): def test_publisher_transformer_invoked(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: @@ -245,6 +247,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['a', 'b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: @@ -267,6 +270,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['*'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -280,6 +284,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['*', '!a'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) @@ -287,6 +292,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['*', '!b'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -299,6 +305,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['!b', '!c'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -314,6 +321,7 @@ class TestPipeline(base.TestCase): counter_cfg = ['!a', '!c'] self.pipeline_cfg[0]['counters'] = counter_cfg pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) self.assertFalse(pipeline_manager.pipelines[0].support_counter('a')) self.assertTrue(pipeline_manager.pipelines[0].support_counter('b')) @@ -335,6 +343,7 @@ class TestPipeline(base.TestCase): }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -359,7 +368,7 @@ class TestPipeline(base.TestCase): self.assertTrue(getattr(self.TransformerClass.samples[1], "name") == 'b') - def test_multple_pipeline_exception(self): + def test_multiple_pipeline_exception(self): self.pipeline_cfg.append({ 'name': "second_pipeline", "interval": 5, @@ -374,6 +383,7 @@ class TestPipeline(base.TestCase): 'publishers': ['except'], }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: @@ -397,6 +407,7 @@ class TestPipeline(base.TestCase): def test_none_transformer_pipeline(self): self.pipeline_cfg[0]['transformers'] = None pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -406,6 +417,7 @@ class TestPipeline(base.TestCase): def test_empty_transformer_pipeline(self): self.pipeline_cfg[0]['transformers'] = [] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -424,6 +436,7 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: @@ -456,6 +469,7 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -491,6 +505,7 @@ class TestPipeline(base.TestCase): }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -506,6 +521,7 @@ class TestPipeline(base.TestCase): def test_multiple_publisher(self): self.pipeline_cfg[0]['publishers'] = ['test', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: @@ -521,6 +537,7 @@ class TestPipeline(base.TestCase): def test_multiple_publisher_isolation(self): self.pipeline_cfg[0]['publishers'] = ['except', 'new'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter]) @@ -532,6 +549,7 @@ class TestPipeline(base.TestCase): def test_multiple_counter_pipeline(self): self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter, @@ -561,6 +579,7 @@ class TestPipeline(base.TestCase): }, ] ) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) pipe = pipeline_manager.pipelines[0] @@ -597,6 +616,7 @@ class TestPipeline(base.TestCase): ) self.pipeline_cfg[0]['counters'] = ['a', 'b'] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) with pipeline_manager.publisher(None, None) as p: p([self.test_counter, @@ -620,6 +640,7 @@ class TestPipeline(base.TestCase): 'parameters': {} }) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) pipe = pipeline_manager.pipelines[0] @@ -642,6 +663,7 @@ class TestPipeline(base.TestCase): 'publishers': ["test"], }, ] pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, self.publisher_manager) self.test_counter = self.test_counter._replace(name='a:b')