pipeline, publisher, transformer: reorganize code

This move a bit of code to make it less Ceilometer centric.
This is part of blueprint oslo-multi-publisher

Change-Id: I2eb174cb3000c9cca7d3771a2ab66a1a948f5cd9
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2013-04-05 15:42:36 +02:00
parent 9eddee1fbe
commit a56fb16815
10 changed files with 173 additions and 88 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 Julien Danjou
# Copyright © 2012-2013 Julien Danjou
#
# Author: Julien Danjou <julien@danjou.info>
#
@ -31,7 +31,9 @@ gettextutils.install('ceilometer')
from ceilometer import counter
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import service
from ceilometer import transformer
from ceilometer.openstack.common import timeutils
from ceilometer.openstack.common import context
@ -84,11 +86,14 @@ root_logger = logging.getLogger('')
root_logger.addHandler(console)
root_logger.setLevel(logging.DEBUG)
publish_manager = dispatch.NameDispatchExtensionManager(
namespace=pipeline.PUBLISHER_NAMESPACE,
check_func=lambda x: True,
invoke_on_load=True)
pipeline_manager = pipeline.setup_pipeline(publish_manager)
pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
with pipeline_manager.publisher(context.get_admin_context(),
cfg.CONF.counter_source) as p:

View File

@ -20,11 +20,12 @@ import abc
import itertools
from oslo.config import cfg
from stevedore import dispatch
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import transformer
LOG = log.getLogger(__name__)
@ -52,14 +53,15 @@ class PollingTask(object):
class AgentManager(object):
def __init__(self, extension_manager):
publisher_manager = dispatch.NameDispatchExtensionManager(
namespace=pipeline.PUBLISHER_NAMESPACE,
check_func=lambda x: True,
invoke_on_load=True,
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
self.pipeline_manager = pipeline.setup_pipeline(publisher_manager)
self.pollster_manager = extension_manager
self.context = context.RequestContext('admin', 'admin', is_admin=True)

View File

@ -17,7 +17,6 @@
# under the License.
from oslo.config import cfg
from stevedore import dispatch
from ceilometer.collector import meter as meter_api
from ceilometer import extension_manager
@ -32,8 +31,10 @@ import ceilometer.openstack.common.notifier.rpc_notifier
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import service
from ceilometer import storage
from ceilometer import transformer
OPTS = [
cfg.ListOpt('disabled_notification_listeners',
@ -61,12 +62,14 @@ class CollectorService(service.PeriodicService):
def initialize_service_hook(self, service):
'''Consumers must be declared before consume_thread start.'''
LOG.debug('initialize_service_hooks')
publisher_manager = dispatch.NameDispatchExtensionManager(
namespace=pipeline.PUBLISHER_NAMESPACE,
check_func=lambda x: True,
invoke_on_load=True,
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
self.pipeline_manager = pipeline.setup_pipeline(publisher_manager)
LOG.debug('loading notification handlers from %s',
self.COLLECTOR_NAMESPACE)

View File

@ -38,7 +38,6 @@ metadata_headers = X-TEST
from __future__ import absolute_import
from oslo.config import cfg
from stevedore import dispatch
from swift.common.utils import split_path
import webob
@ -61,7 +60,9 @@ from ceilometer import counter
from ceilometer.openstack.common import context
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import service
from ceilometer import transformer
class CeilometerMiddleware(object):
@ -78,13 +79,15 @@ class CeilometerMiddleware(object):
"").split(",") if h.strip()]
service.prepare_service()
publisher_manager = dispatch.NameDispatchExtensionManager(
namespace=pipeline.PUBLISHER_NAMESPACE,
check_func=lambda x: True,
invoke_on_load=True,
)
self.pipeline_manager = pipeline.setup_pipeline(publisher_manager)
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
publisher.PublisherExtensionManager(
'ceilometer.publisher',
),
)
def __call__(self, env, start_response):
start_response_args = [None]

View File

@ -20,7 +20,6 @@ import itertools
import os
from oslo.config import cfg
from stevedore import extension
import yaml
from ceilometer.openstack.common import log
@ -36,9 +35,6 @@ cfg.CONF.register_opts(OPTS)
LOG = log.getLogger(__name__)
PUBLISHER_NAMESPACE = 'ceilometer.publisher'
TRANSFORMER_NAMESPACE = 'ceilometer.transformer'
class PipelineException(Exception):
def __init__(self, message, pipeline_cfg):
@ -49,21 +45,6 @@ class PipelineException(Exception):
return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
class TransformerExtensionManager(extension.ExtensionManager):
def __init__(self):
super(TransformerExtensionManager, self).__init__(
namespace=TRANSFORMER_NAMESPACE,
invoke_on_load=False,
invoke_args=(),
invoke_kwds={}
)
self.by_name = dict((e.name, e) for e in self.extensions)
def get_ext(self, name):
return self.by_name[name]
class PublishContext(object):
def __init__(self, context, source, pipelines=[]):
@ -306,11 +287,9 @@ class PipelineManager(object):
"""
def __init__(self, cfg, publisher_manager):
"""Create the pipeline manager."""
self._setup_pipelines(cfg, publisher_manager)
def _setup_pipelines(self, cfg, publisher_manager):
def __init__(self, cfg,
transformer_manager,
publisher_manager):
"""Setup the pipelines according to config.
The top of the cfg is a list of pipeline definitions.
@ -352,7 +331,6 @@ class PipelineManager(object):
Publisher's name is plugin name in setup.py
"""
transformer_manager = TransformerExtensionManager()
self.pipelines = [Pipeline(pipedef, publisher_manager,
transformer_manager)
for pipedef in cfg]
@ -366,7 +344,7 @@ class PipelineManager(object):
return PublishContext(context, source, self.pipelines)
def setup_pipeline(publisher_manager):
def setup_pipeline(transformer_manager, publisher_manager):
"""Setup pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
if not os.path.exists(cfg_file):
@ -381,4 +359,5 @@ def setup_pipeline(publisher_manager):
LOG.info("Pipeline config: %s", pipeline_cfg)
return PipelineManager(pipeline_cfg,
transformer_manager,
publisher_manager)

View File

@ -0,0 +1,31 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Intel Corp.
# Copyright © 2013 eNovance
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
# Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import dispatch
class PublisherExtensionManager(dispatch.NameDispatchExtensionManager):
def __init__(self, namespace):
super(PublisherExtensionManager, self).__init__(
namespace=namespace,
check_func=lambda x: True,
invoke_on_load=True,
)

View File

@ -0,0 +1,34 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Intel Corp.
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import extension
class TransformerExtensionManager(extension.ExtensionManager):
def __init__(self, namespace):
super(TransformerExtensionManager, self).__init__(
namespace=namespace,
invoke_on_load=False,
invoke_args=(),
invoke_kwds={}
)
self.by_name = dict((e.name, e) for e in self.extensions)
def get_ext(self, name):
return self.by_name[name]

View File

@ -2,8 +2,10 @@
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
# Copyright © 2013 Intel corp.
# Copyright © 2013 eNovance
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
# Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -22,12 +24,13 @@ import datetime
import mock
from stevedore import extension
from stevedore import dispatch
from stevedore.tests import manager as extension_tests
from ceilometer import counter
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.tests import base
from ceilometer import transformer
default_test_data = counter.Counter(
@ -92,10 +95,11 @@ class BaseAgentManagerTestCase(base.TestCase):
def setup_pipeline(self):
self.publisher = self.PublisherClass()
self.publisher_manager = dispatch.NameDispatchExtensionManager(
self.transformer_manager = transformer.TransformerExtensionManager(
'ceilometer.transformer',
)
self.publisher_manager = publisher.PublisherExtensionManager(
'fake',
check_func=lambda x: True,
invoke_on_load=False,
)
self.publisher_manager.extensions = [
extension.Extension(
@ -111,6 +115,7 @@ class BaseAgentManagerTestCase(base.TestCase):
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
def create_extension_manager(self):
@ -233,6 +238,7 @@ class BaseAgentManagerTestCase(base.TestCase):
]
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
polling_tasks = self.mgr.setup_polling_tasks()

View File

@ -61,7 +61,7 @@ class TestSwiftMiddleware(base.TestCase):
def flush(self, ctx, source):
pass
def _faux_setup_pipeline(self, publisher_manager):
def _faux_setup_pipeline(self, transformer_manager, publisher_manager):
return self.pipeline_manager
def setUp(self):

View File

@ -17,11 +17,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import dispatch
from stevedore import extension
from ceilometer import counter
from ceilometer import plugin
from ceilometer import publisher
from ceilometer import transformer
from ceilometer.transformer import accumulator
from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
@ -92,13 +93,35 @@ class TestPipeline(base.TestCase):
def handle_sample(self, ctxt, counter, source):
raise Exception()
def _create_publisher_manager(self, ext_name='test'):
self.publisher_manager = dispatch.NameDispatchExtensionManager(
'fake',
[],
invoke_on_load=False,
def setUp(self):
super(TestPipeline, self).setUp()
self.test_counter = counter.Counter(
name='a',
type='test_type',
volume=1,
unit='B',
user_id="test_user",
project_id="test_proj",
resource_id="test_resource",
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={}
)
self.stubs.Set(transformer.TransformerExtensionManager,
"__init__",
self.fake_tem_init)
self.stubs.Set(transformer.TransformerExtensionManager,
"get_ext",
self.fake_tem_get_ext)
self.publisher_manager = publisher.PublisherExtensionManager(
'fake',
)
self.transformer_manager = transformer.TransformerExtensionManager()
self.publisher = self.PublisherClass()
self.new_publisher = self.PublisherClass()
self.publisher_exception = self.PublisherClassException()
@ -127,30 +150,6 @@ class TestPipeline(base.TestCase):
for e
in self.publisher_manager.extensions)
def setUp(self):
super(TestPipeline, self).setUp()
self.test_counter = counter.Counter(
name='a',
type='test_type',
volume=1,
unit='B',
user_id="test_user",
project_id="test_proj",
resource_id="test_resource",
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={}
)
self.stubs.Set(pipeline.TransformerExtensionManager,
"__init__",
self.fake_tem_init)
self.stubs.Set(pipeline.TransformerExtensionManager,
"get_ext",
self.fake_tem_get_ext)
self._create_publisher_manager()
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 5,
@ -166,6 +165,7 @@ class TestPipeline(base.TestCase):
self.assertRaises(pipeline.PipelineException,
pipeline.PipelineManager,
self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
def test_no_counters(self):
@ -222,6 +222,7 @@ class TestPipeline(base.TestCase):
def test_get_interval(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
pipe = pipeline_manager.pipelines[0]
@ -229,6 +230,7 @@ class TestPipeline(base.TestCase):
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
@ -245,6 +247,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['a', 'b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
@ -267,6 +270,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['*']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -280,6 +284,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['*', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
@ -287,6 +292,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['*', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -299,6 +305,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['!b', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -314,6 +321,7 @@ class TestPipeline(base.TestCase):
counter_cfg = ['!a', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_counter('a'))
self.assertTrue(pipeline_manager.pipelines[0].support_counter('b'))
@ -335,6 +343,7 @@ class TestPipeline(base.TestCase):
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -359,7 +368,7 @@ class TestPipeline(base.TestCase):
self.assertTrue(getattr(self.TransformerClass.samples[1], "name")
== 'b')
def test_multple_pipeline_exception(self):
def test_multiple_pipeline_exception(self):
self.pipeline_cfg.append({
'name': "second_pipeline",
"interval": 5,
@ -374,6 +383,7 @@ class TestPipeline(base.TestCase):
'publishers': ['except'],
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
@ -397,6 +407,7 @@ class TestPipeline(base.TestCase):
def test_none_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = None
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -406,6 +417,7 @@ class TestPipeline(base.TestCase):
def test_empty_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = []
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -424,6 +436,7 @@ class TestPipeline(base.TestCase):
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
@ -456,6 +469,7 @@ class TestPipeline(base.TestCase):
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -491,6 +505,7 @@ class TestPipeline(base.TestCase):
},
]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -506,6 +521,7 @@ class TestPipeline(base.TestCase):
def test_multiple_publisher(self):
self.pipeline_cfg[0]['publishers'] = ['test', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
@ -521,6 +537,7 @@ class TestPipeline(base.TestCase):
def test_multiple_publisher_isolation(self):
self.pipeline_cfg[0]['publishers'] = ['except', 'new']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter])
@ -532,6 +549,7 @@ class TestPipeline(base.TestCase):
def test_multiple_counter_pipeline(self):
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter,
@ -561,6 +579,7 @@ class TestPipeline(base.TestCase):
}, ]
)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
pipe = pipeline_manager.pipelines[0]
@ -597,6 +616,7 @@ class TestPipeline(base.TestCase):
)
self.pipeline_cfg[0]['counters'] = ['a', 'b']
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
with pipeline_manager.publisher(None, None) as p:
p([self.test_counter,
@ -620,6 +640,7 @@ class TestPipeline(base.TestCase):
'parameters': {}
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
pipe = pipeline_manager.pipelines[0]
@ -642,6 +663,7 @@ class TestPipeline(base.TestCase):
'publishers': ["test"],
}, ]
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.publisher_manager)
self.test_counter = self.test_counter._replace(name='a:b')