From 9114e135b6b2a9a2c190cca5f9ccf4f185c6471a Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Mon, 24 Feb 2014 19:31:36 +0000 Subject: [PATCH] Decouple source and sink configuration for pipelines Addresses: BP decoupled-source-sink-discoverable-resources Add support for a new decoupled model of pipeline config which does not conflate the elements specific to sources and sinks. Instead of the basic unit of config being a consolidated pipeline, the sources and sinks may now be specified as separate lists of dictionaries, before being linked together once parsed. This allows source-specific configuration, such as resource discovery, to be kept focused only on the fine-grained source while avoiding the necessity for wide duplication of sink- related config. The shape of the new config format is best illustrated with an example: --- sources: - name: meter_source interval: 600 meters: - "*" sinks: - meter_sink - name: host_cpu_source interval: 120 meters: - "cpu.util.*min" resources: - "snmp://ip1" - "snmp://ip2" - "snmp://ip3" sinks: - meter_sink - lossy_sink - name: instance_cpu_source interval: 60 meters: - "cpu" sinks: - cpu_sink sinks: - name: meter_sink transformers: publishers: - rpc:// - name: lossy_sink transformers: publishers: - udp://addr - name: cpu_sink transformers: - name: "rate_of_change" parameters: target: name: "cpu_util" unit: "%" type: "gauge" scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" publishers: - rpc:// The old pipeline.yaml format continues to be supported as a deprecated feature to avoid breaking existing deployments. The semantics of the common individual configuration elements are identical in the deprecated and decoupled versions. Change-Id: Ide86c0feba88ae736f2a913b5faa95e640c4ceaf --- ceilometer/agent.py | 11 +- ceilometer/api/hooks.py | 5 +- ceilometer/notification.py | 7 +- ceilometer/objectstore/swift_middleware.py | 7 +- ceilometer/pipeline.py | 345 +++++++++++++----- .../objectstore/test_swift_middleware.py | 2 +- .../{test_pipeline.py => pipeline_base.py} | 334 +++++++---------- ceilometer/tests/test_decoupled_pipeline.py | 249 +++++++++++++ ceilometer/tests/test_deprecated_pipeline.py | 115 ++++++ doc/source/configuration.rst | 73 ++-- etc/ceilometer/deprecated_pipeline.yaml | 69 ++++ etc/ceilometer/pipeline.yaml | 145 ++++---- 12 files changed, 946 insertions(+), 416 deletions(-) rename ceilometer/tests/{test_pipeline.py => pipeline_base.py} (81%) create mode 100644 ceilometer/tests/test_decoupled_pipeline.py create mode 100644 ceilometer/tests/test_deprecated_pipeline.py create mode 100644 etc/ceilometer/deprecated_pipeline.yaml diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 738b951c4..808697714 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -29,7 +29,6 @@ from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline -from ceilometer import transformer LOG = log.getLogger(__name__) @@ -105,20 +104,16 @@ class AgentManager(os_service.Service): self.pipeline_manager.pipelines, self.pollster_manager.extensions): if pipeline.support_meter(pollster.name): - polling_task = polling_tasks.get(pipeline.interval) + polling_task = polling_tasks.get(pipeline.get_interval()) if not polling_task: polling_task = self.create_polling_task() - polling_tasks[pipeline.interval] = polling_task + polling_tasks[pipeline.get_interval()] = polling_task polling_task.add(pollster, [pipeline]) return polling_tasks def start(self): - self.pipeline_manager = pipeline.setup_pipeline( - transformer.TransformerExtensionManager( - 'ceilometer.transformer', - ), - ) + self.pipeline_manager = pipeline.setup_pipeline() for interval, task in self.setup_polling_tasks().iteritems(): self.tg.add_timer(interval, diff --git a/ceilometer/api/hooks.py b/ceilometer/api/hooks.py index 9ef6a1d56..757482df4 100644 --- a/ceilometer/api/hooks.py +++ b/ceilometer/api/hooks.py @@ -23,7 +23,6 @@ from oslo.config import cfg from pecan import hooks from ceilometer import pipeline -from ceilometer import transformer class ConfigHook(hooks.PecanHook): @@ -55,9 +54,7 @@ class PipelineHook(hooks.PecanHook): if self.__class__.pipeline_manager is None: # this is done here as the cfg options are not available # when the file is imported. - self.__class__.pipeline_manager = pipeline.setup_pipeline( - transformer.TransformerExtensionManager( - 'ceilometer.transformer')) + self.__class__.pipeline_manager = pipeline.setup_pipeline() def before(self, state): state.request.pipeline_manager = self.pipeline_manager diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 122e2fb13..cf031f350 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -28,7 +28,6 @@ from ceilometer.openstack.common import service as os_service from ceilometer import pipeline from ceilometer import service from ceilometer.storage import models -from ceilometer import transformer LOG = log.getLogger(__name__) @@ -67,11 +66,7 @@ class NotificationService(service.DispatchedService, rpc_service.Service): def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' - self.pipeline_manager = pipeline.setup_pipeline( - transformer.TransformerExtensionManager( - 'ceilometer.transformer', - ), - ) + self.pipeline_manager = pipeline.setup_pipeline() LOG.debug(_('Loading event definitions')) self.event_converter = event_converter.setup_events( diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index 6bc77e95c..10431840c 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -63,7 +63,6 @@ from ceilometer.openstack.common import timeutils from ceilometer import pipeline from ceilometer import sample from ceilometer import service -from ceilometer import transformer class CeilometerMiddleware(object): @@ -80,11 +79,7 @@ class CeilometerMiddleware(object): service.prepare_service([]) - self.pipeline_manager = pipeline.setup_pipeline( - transformer.TransformerExtensionManager( - 'ceilometer.transformer', - ), - ) + self.pipeline_manager = pipeline.setup_pipeline() self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_') if self.reseller_prefix and self.reseller_prefix[-1] != '_': self.reseller_prefix += '_' diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index eb8a29a22..823893a8b 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -1,8 +1,10 @@ # -*- encoding: utf-8 -*- # # Copyright © 2013 Intel Corp. +# Copyright © 2014 Red Hat, Inc # -# Author: Yunhong Jiang +# Authors: Yunhong Jiang +# Eoghan Glynn # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -27,6 +29,7 @@ import yaml from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log from ceilometer import publisher +from ceilometer import transformer as xformer OPTS = [ @@ -71,29 +74,18 @@ class PublishContext(object): p.flush(self.context) -class Pipeline(object): - """Sample handling pipeline +class Source(object): + """Represents a source of samples, in effect a set of pollsters + and/or notification handlers emitting samples for a set of matching + meters. - Pipeline describes a chain of handlers. The chain starts with - transformer and ends with one or more publishers. - - The first transformer in the chain gets sample from data collector, i.e. - pollster or notification handler, takes some action like dropping, - aggregation, changing field etc, then passes the updated sample - to next step. - - The subsequent transformers, if any, handle the data similarly. - - In the end of the chain, publishers publish the data. The exact publishing - method depends on publisher type, for example, pushing into data storage - through message bus, sending to external CW software through CW API call. - - If no transformer is included in the chain, the publishers get samples - from data collector and publish them directly. + Each source encapsulates meter name matching, polling interval + determination, optional resource enumeration or discovery, and + mapping to one or more sinks for publication. """ - def __init__(self, cfg, transformer_manager): + def __init__(self, cfg): self.cfg = cfg try: @@ -104,8 +96,7 @@ class Pipeline(object): raise PipelineException("Invalid interval value", cfg) # Support 'counters' for backward compatibility self.meters = cfg.get('meters', cfg.get('counters')) - # It's legal to have no transformer specified - self.transformer_cfg = cfg['transformers'] or [] + self.sinks = cfg.get('sinks') except KeyError as err: raise PipelineException( "Required field %s not specified" % err.args[0], cfg) @@ -113,27 +104,12 @@ class Pipeline(object): if self.interval <= 0: raise PipelineException("Interval value should > 0", cfg) - self._check_meters() - - if not cfg.get('publishers'): - raise PipelineException("No publisher specified", cfg) - - self.publishers = [] - for p in cfg['publishers']: - if '://' not in p: - # Support old format without URL - p = p + "://" - try: - self.publishers.append(publisher.get_publisher(p)) - except Exception: - LOG.exception(_("Unable to load publisher %s"), p) - - self.transformers = self._setup_transformers(cfg, transformer_manager) - self.resources = cfg.get('resources') or [] if not isinstance(self.resources, list): raise PipelineException("Resources should be a list", cfg) + self._check_meters() + def __str__(self): return self.name @@ -161,6 +137,106 @@ class Pipeline(object): "Included meters specified with wildcard", self.cfg) + # (yjiang5) To support meters like instance:m1.tiny, + # which include variable part at the end starting with ':'. + # Hope we will not add such meters in future. + @staticmethod + def _variable_meter_name(name): + m = name.partition(':') + if m[1] == ':': + return m[1].join((m[0], '*')) + else: + return name + + def support_meter(self, meter_name): + meter_name = self._variable_meter_name(meter_name) + + # Special case: if we only have negation, we suppose the default is + # allow + default = all(meter.startswith('!') for meter in self.meters) + + # Support wildcard like storage.* and !disk.* + # Start with negation, we consider that the order is deny, allow + if any(fnmatch.fnmatch(meter_name, meter[1:]) + for meter in self.meters + if meter[0] == '!'): + return False + + if any(fnmatch.fnmatch(meter_name, meter) + for meter in self.meters + if meter[0] != '!'): + return True + + return default + + def check_sinks(self, sinks): + if not self.sinks: + raise PipelineException( + "No sink defined in source %s" % self, + self.cfg) + for sink in self.sinks: + if sink not in sinks: + raise PipelineException( + "Dangling sink %s from source %s" % (sink, self), + self.cfg) + + +class Sink(object): + """Represents a sink for the transformation and publication of + samples emitted from a related source. + + Each sink config is concerned *only* with the transformation rules + and publication conduits for samples. + + In effect, a sink describes a chain of handlers. The chain starts + with zero or more transformers and ends with one or more publishers. + + The first transformer in the chain is passed samples from the + corresponding source, takes some action such as deriving rate of + change, performing unit conversion, or aggregating, before passing + the modified sample to next step. + + The subsequent transformers, if any, handle the data similarly. + + At the end of the chain, publishers publish the data. The exact + publishing method depends on publisher type, for example, pushing + into data storage via the message bus providing guaranteed delivery, + or for loss-tolerant samples UDP may be used. + + If no transformers are included in the chain, the publishers are + passed samples directly from the sink which are published unchanged. + + """ + + def __init__(self, cfg, transformer_manager): + self.cfg = cfg + + try: + self.name = cfg['name'] + # It's legal to have no transformer specified + self.transformer_cfg = cfg['transformers'] or [] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + + if not cfg.get('publishers'): + raise PipelineException("No publisher specified", cfg) + + self.publishers = [] + for p in cfg['publishers']: + if '://' not in p: + # Support old format without URL + p = p + "://" + try: + self.publishers.append(publisher.get_publisher(p)) + except Exception: + LOG.exception(_("Unable to load publisher %s"), p) + + self.transformers = self._setup_transformers(cfg, transformer_manager) + + def __str__(self): + return self.name + def _setup_transformers(self, cfg, transformer_manager): transformer_cfg = cfg['transformers'] or [] transformers = [] @@ -234,49 +310,11 @@ class Pipeline(object): 'pub': p})) LOG.audit(_("Pipeline %s: Published samples") % self) - def publish_sample(self, ctxt, sample): - self.publish_samples(ctxt, [sample]) - def publish_samples(self, ctxt, samples): for meter_name, samples in itertools.groupby( sorted(samples, key=operator.attrgetter('name')), operator.attrgetter('name')): - if self.support_meter(meter_name): - self._publish_samples(0, ctxt, samples) - - # (yjiang5) To support meters like instance:m1.tiny, - # which include variable part at the end starting with ':'. - # Hope we will not add such meters in future. - def _variable_meter_name(self, name): - m = name.partition(':') - if m[1] == ':': - return m[1].join((m[0], '*')) - else: - return name - - def support_meter(self, meter_name): - meter_name = self._variable_meter_name(meter_name) - - # Special case: if we only have negation, we suppose the default it - # allow - if all(meter.startswith('!') for meter in self.meters): - default = True - else: - default = False - - # Support wildcard like storage.* and !disk.* - # Start with negation, we consider that the order is deny, allow - if any(fnmatch.fnmatch(meter_name, meter[1:]) - for meter in self.meters - if meter[0] == '!'): - return False - - if any(fnmatch.fnmatch(meter_name, meter) - for meter in self.meters - if meter[0] != '!'): - return True - - return default + self._publish_samples(0, ctxt, samples) def flush(self, ctxt): """Flush data after all samples have been injected to pipeline.""" @@ -292,8 +330,43 @@ class Pipeline(object): 'trans': transformer})) LOG.exception(err) + +class Pipeline(object): + """Represents a coupling between a sink and a corresponding source. + """ + + def __init__(self, source, sink): + self.source = source + self.sink = sink + self.name = str(self) + + def __str__(self): + return (self.source.name if self.source.name == self.sink.name + else '%s:%s' % (self.source.name, self.sink.name)) + def get_interval(self): - return self.interval + return self.source.interval + + @property + def resources(self): + return self.source.resources + + def support_meter(self, meter_name): + return self.source.support_meter(meter_name) + + @property + def publishers(self): + return self.sink.publishers + + def publish_sample(self, ctxt, sample): + self.publish_samples(ctxt, [sample]) + + def publish_samples(self, ctxt, samples): + supported = [s for s in samples if self.source.support_meter(s.name)] + self.sink.publish_samples(ctxt, supported) + + def flush(self, ctxt): + self.sink.flush(ctxt) class PipelineManager(object): @@ -305,31 +378,82 @@ class PipelineManager(object): """ - def __init__(self, cfg, - transformer_manager): + def __init__(self, cfg, transformer_manager): """Setup the pipelines according to config. - The top of the cfg is a list of pipeline definitions. + The configuration is supported in one of two forms: - Pipeline definition is an dictionary specifying the target samples, - the transformers involved, and the target publishers: - { - "name": pipeline_name - "interval": interval_time - "meters" : ["meter_1", "meter_2"], - "resources": ["resource_uri1", "resource_uri2"], - "transformers":[ + 1. Deprecated: the source and sink configuration are conflated + as a list of consolidated pipelines. + + The pipelines are defined as a list of dictionaries each + specifying the target samples, the transformers involved, + and the target publishers, for example: + + [{"name": pipeline_1, + "interval": interval_time, + "meters" : ["meter_1", "meter_2"], + "resources": ["resource_uri1", "resource_uri2"], + "transformers": [ {"name": "Transformer_1", "parameters": {"p1": "value"}}, - {"name": "Transformer_2", + {"name": "Transformer_2", "parameters": {"p1": "value"}}, - ] - "publishers": ["publisher_1", "publisher_2"] - } + ], + "publishers": ["publisher_1", "publisher_2"] + }, + {"name": pipeline_2, + "interval": interval_time, + "meters" : ["meter_3"], + "publishers": ["publisher_3"] + }, + ] - Interval is how many seconds should the samples be injected to - the pipeline. + 2. Decoupled: the source and sink configuration are separately + specified before being linked together. This allows source- + specific configuration, such as resource discovery, to be + kept focused only on the fine-grained source while avoiding + the necessity for wide duplication of sink-related config. + + The configuration is provided in the form of separate lists + of dictionaries defining sources and sinks, for example: + + {"sources": [{"name": source_1, + "interval": interval_time, + "meters" : ["meter_1", "meter_2"], + "resources": ["resource_uri1", "resource_uri2"], + "sinks" : ["sink_1", "sink_2"] + }, + {"name": source_2, + "interval": interval_time, + "meters" : ["meter_3"], + "sinks" : ["sink_2"] + }, + ], + "sinks": [{"name": sink_1, + "transformers": [ + {"name": "Transformer_1", + "parameters": {"p1": "value"}}, + + {"name": "Transformer_2", + "parameters": {"p1": "value"}}, + ], + "publishers": ["publisher_1", "publisher_2"] + }, + {"name": sink_2, + "publishers": ["publisher_3"] + }, + ] + } + + The semantics of the common individual configuration elements + are identical in the deprecated and decoupled version. + + The interval determines the cadence of sample injection into + the pipeline where samples are produced under the direct control + of an agent, i.e. via a polling cycle as opposed to incoming + notifications. Valid meter format is '*', '!meter_name', or 'meter_name'. '*' is wildcard symbol means any meters; '!meter_name' means @@ -352,8 +476,26 @@ class PipelineManager(object): Publisher's name is plugin name in setup.cfg """ - self.pipelines = [Pipeline(pipedef, transformer_manager) - for pipedef in cfg] + self.pipelines = [] + if 'sources' in cfg or 'sinks' in cfg: + if not ('sources' in cfg and 'sinks' in cfg): + raise PipelineException("Both sources & sinks are required", + cfg) + LOG.info(_('detected decoupled pipeline config format')) + sources = [Source(s) for s in cfg.get('sources', [])] + sinks = dict((s['name'], Sink(s, transformer_manager)) + for s in cfg.get('sinks', [])) + for source in sources: + source.check_sinks(sinks) + for target in source.sinks: + self.pipelines.append(Pipeline(source, + sinks[target])) + else: + LOG.warning(_('detected deprecated pipeline config format')) + for pipedef in cfg: + source = Source(pipedef) + sink = Sink(pipedef, transformer_manager) + self.pipelines.append(Pipeline(source, sink)) def publisher(self, context): """Build a new Publisher for these manager pipelines. @@ -363,7 +505,7 @@ class PipelineManager(object): return PublishContext(context, self.pipelines) -def setup_pipeline(transformer_manager): +def setup_pipeline(transformer_manager=None): """Setup pipeline manager according to yaml config file.""" cfg_file = cfg.CONF.pipeline_cfg_file if not os.path.exists(cfg_file): @@ -378,4 +520,7 @@ def setup_pipeline(transformer_manager): LOG.info(_("Pipeline config: %s"), pipeline_cfg) return PipelineManager(pipeline_cfg, - transformer_manager) + transformer_manager or + xformer.TransformerExtensionManager( + 'ceilometer.transformer', + )) diff --git a/ceilometer/tests/objectstore/test_swift_middleware.py b/ceilometer/tests/objectstore/test_swift_middleware.py index 4cd5bf7be..e1585c96a 100644 --- a/ceilometer/tests/objectstore/test_swift_middleware.py +++ b/ceilometer/tests/objectstore/test_swift_middleware.py @@ -60,7 +60,7 @@ class TestSwiftMiddleware(test.BaseTestCase): def __init__(self): self.pipelines = [self._faux_pipeline(self)] - def _fake_setup_pipeline(self, transformer_manager): + def _fake_setup_pipeline(self, transformer_manager=None): return self.pipeline_manager def setUp(self): diff --git a/ceilometer/tests/test_pipeline.py b/ceilometer/tests/pipeline_base.py similarity index 81% rename from ceilometer/tests/test_pipeline.py rename to ceilometer/tests/pipeline_base.py index 8981e8adb..13d3e3b27 100644 --- a/ceilometer/tests/test_pipeline.py +++ b/ceilometer/tests/pipeline_base.py @@ -17,9 +17,10 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import datetime -import yaml +import six from stevedore import extension from ceilometer.openstack.common.fixture import mockpatch @@ -34,32 +35,8 @@ from ceilometer.transformer import accumulator from ceilometer.transformer import conversions -class TestTransformerAccumulator(test.BaseTestCase): - - def test_handle_sample(self): - test_sample = sample.Sample( - name='a', - type=sample.TYPE_GAUGE, - volume=1, - unit='B', - user_id="test_user", - project_id="test_proj", - resource_id="test_resource", - timestamp=timeutils.utcnow().isoformat(), - resource_metadata={} - ) - - # Test when size is set to less than 1. - tf = accumulator.TransformerAccumulator(size=0) - self.assertEqual(tf.handle_sample(None, test_sample), test_sample) - self.assertFalse(hasattr(tf, 'samples')) - # Test when size is set to greater or equal than 1. - tf = accumulator.TransformerAccumulator(size=2) - tf.handle_sample(None, test_sample) - self.assertEqual(len(tf.samples), 1) - - -class TestPipeline(test.BaseTestCase): +@six.add_metaclass(abc.ABCMeta) +class BasePipelineTestCase(test.BaseTestCase): def fake_tem_init(self): """Fake a transformerManager for pipeline The faked entry point setting is below: @@ -136,7 +113,7 @@ class TestPipeline(test.BaseTestCase): raise Exception() def setUp(self): - super(TestPipeline, self).setUp() + super(BasePipelineTestCase, self).setUp() self.test_counter = sample.Sample( name='a', @@ -163,16 +140,31 @@ class TestPipeline(test.BaseTestCase): self.transformer_manager = transformer.TransformerExtensionManager() - self.pipeline_cfg = [{ - 'name': "test_pipeline", - 'interval': 5, - 'counters': ['a'], - 'transformers': [ - {'name': "update", - 'parameters': {}} - ], - 'publishers': ["test://"], - }, ] + self._setup_pipeline_cfg() + + @abc.abstractmethod + def _setup_pipeline_cfg(self): + """Setup the appropriate form of pipeline config.""" + + @abc.abstractmethod + def _augment_pipeline_cfg(self): + """Augment the pipeline config with an additional element.""" + + @abc.abstractmethod + def _break_pipeline_cfg(self): + """Break the pipeline config with a malformed element.""" + + @abc.abstractmethod + def _set_pipeline_cfg(self, field, value): + """Set a field to a value in the pipeline config.""" + + @abc.abstractmethod + def _extend_pipeline_cfg(self, field, value): + """Extend an existing field in the pipeline config with a value.""" + + @abc.abstractmethod + def _unset_pipeline_cfg(self, field): + """Clear an existing field in the pipeline config.""" def _exception_create_pipelinemanager(self): self.assertRaises(pipeline.PipelineException, @@ -181,51 +173,51 @@ class TestPipeline(test.BaseTestCase): self.transformer_manager) def test_no_counters(self): - del self.pipeline_cfg[0]['counters'] + self._unset_pipeline_cfg('counters') self._exception_create_pipelinemanager() def test_no_transformers(self): - del self.pipeline_cfg[0]['transformers'] + self._unset_pipeline_cfg('transformers') self._exception_create_pipelinemanager() def test_no_name(self): - del self.pipeline_cfg[0]['name'] + self._unset_pipeline_cfg('name') self._exception_create_pipelinemanager() def test_no_interval(self): - del self.pipeline_cfg[0]['interval'] + self._unset_pipeline_cfg('interval') self._exception_create_pipelinemanager() def test_no_publishers(self): - del self.pipeline_cfg[0]['publishers'] + self._unset_pipeline_cfg('publishers') self._exception_create_pipelinemanager() def test_invalid_resources(self): invalid_resource = {'invalid': 1} - self.pipeline_cfg[0]['resources'] = invalid_resource + self._set_pipeline_cfg('resources', invalid_resource) self._exception_create_pipelinemanager() def test_check_counters_include_exclude_same(self): counter_cfg = ['a', '!a'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) self._exception_create_pipelinemanager() def test_check_counters_include_exclude(self): counter_cfg = ['a', '!b'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) self._exception_create_pipelinemanager() def test_check_counters_wildcard_included(self): counter_cfg = ['a', '*'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) self._exception_create_pipelinemanager() def test_check_publishers_invalid_publisher(self): publisher_cfg = ['test_invalid'] - self.pipeline_cfg[0]['publishers'] = publisher_cfg + self._set_pipeline_cfg('publishers', publisher_cfg) def test_invalid_string_interval(self): - self.pipeline_cfg[0]['interval'] = 'string' + self._set_pipeline_cfg('interval', 'string') self._exception_create_pipelinemanager() def test_check_transformer_invalid_transformer(self): @@ -233,7 +225,7 @@ class TestPipeline(test.BaseTestCase): {'name': "test_invalid", 'parameters': {}} ] - self.pipeline_cfg[0]['transformers'] = transformer_cfg + self._set_pipeline_cfg('transformers', transformer_cfg) self._exception_create_pipelinemanager() def test_get_interval(self): @@ -241,7 +233,7 @@ class TestPipeline(test.BaseTestCase): self.transformer_manager) pipe = pipeline_manager.pipelines[0] - self.assertTrue(pipe.get_interval() == 5) + self.assertEqual(pipe.get_interval(), 5) def test_publisher_transformer_invoked(self): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, @@ -252,14 +244,14 @@ class TestPipeline(test.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.samples), 1) - self.assertTrue(len(self.TransformerClass.samples) == 1) + self.assertEqual(len(self.TransformerClass.samples), 1) self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update') - self.assertTrue(getattr(self.TransformerClass.samples[0], "name") - == 'a') + self.assertEqual(getattr(self.TransformerClass.samples[0], "name"), + 'a') def test_multiple_included_counters(self): counter_cfg = ['a', 'b'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) @@ -285,13 +277,13 @@ class TestPipeline(test.BaseTestCase): p([self.test_counter]) self.assertEqual(len(publisher.samples), 2) - self.assertTrue(len(self.TransformerClass.samples) == 2) + self.assertEqual(len(self.TransformerClass.samples), 2) self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update') self.assertEqual(getattr(publisher.samples[1], "name"), 'b_update') def test_counter_dont_match(self): counter_cfg = ['nomatch'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -303,7 +295,7 @@ class TestPipeline(test.BaseTestCase): def test_wildcard_counter(self): counter_cfg = ['*'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -311,19 +303,19 @@ class TestPipeline(test.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.samples), 1) - self.assertTrue(len(self.TransformerClass.samples) == 1) + self.assertEqual(len(self.TransformerClass.samples), 1) self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update') def test_wildcard_excluded_counters(self): counter_cfg = ['*', '!a'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_meter('a')) def test_wildcard_excluded_counters_not_excluded(self): counter_cfg = ['*', '!b'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -336,7 +328,7 @@ class TestPipeline(test.BaseTestCase): def test_all_excluded_counters_not_excluded(self): counter_cfg = ['!b', '!c'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -344,14 +336,14 @@ class TestPipeline(test.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.samples), 1) - self.assertTrue(len(self.TransformerClass.samples) == 1) + self.assertEqual(len(self.TransformerClass.samples), 1) self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update') - self.assertTrue(getattr(self.TransformerClass.samples[0], "name") - == 'a') + self.assertEqual(getattr(self.TransformerClass.samples[0], "name"), + 'a') def test_all_excluded_counters_is_excluded(self): counter_cfg = ['!a', '!c'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0].support_meter('a')) @@ -360,7 +352,7 @@ class TestPipeline(test.BaseTestCase): def test_wildcard_and_excluded_wildcard_counters(self): counter_cfg = ['*', '!disk.*'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0]. @@ -369,7 +361,7 @@ class TestPipeline(test.BaseTestCase): def test_included_counter_and_wildcard_counters(self): counter_cfg = ['cpu', 'disk.*'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) self.assertTrue(pipeline_manager.pipelines[0]. @@ -380,7 +372,7 @@ class TestPipeline(test.BaseTestCase): def test_excluded_counter_and_excluded_wildcard_counters(self): counter_cfg = ['!cpu', '!disk.*'] - self.pipeline_cfg[0]['counters'] = counter_cfg + self._set_pipeline_cfg('counters', counter_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) self.assertFalse(pipeline_manager.pipelines[0]. @@ -390,19 +382,7 @@ class TestPipeline(test.BaseTestCase): support_meter('instance')) def test_multiple_pipeline(self): - self.pipeline_cfg.append({ - 'name': 'second_pipeline', - 'interval': 5, - 'counters': ['b'], - 'transformers': [{ - 'name': 'update', - 'parameters': - { - "append_name": "_new", - } - }], - 'publishers': ['new'], - }) + self._augment_pipeline_cfg() pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) @@ -432,29 +412,14 @@ class TestPipeline(test.BaseTestCase): self.assertEqual(len(new_publisher.samples), 1) self.assertEqual(new_publisher.calls, 1) self.assertEqual(getattr(new_publisher.samples[0], "name"), 'b_new') - self.assertTrue(getattr(self.TransformerClass.samples[0], "name") - == 'a') - - self.assertTrue(len(self.TransformerClass.samples) == 2) - self.assertTrue(getattr(self.TransformerClass.samples[0], "name") - == 'a') - self.assertTrue(getattr(self.TransformerClass.samples[1], "name") - == 'b') + self.assertEqual(len(self.TransformerClass.samples), 2) + self.assertEqual(getattr(self.TransformerClass.samples[0], "name"), + 'a') + self.assertEqual(getattr(self.TransformerClass.samples[1], "name"), + 'b') def test_multiple_pipeline_exception(self): - self.pipeline_cfg.append({ - 'name': "second_pipeline", - "interval": 5, - 'counters': ['b'], - 'transformers': [{ - 'name': 'update', - 'parameters': - { - "append_name": "_new", - } - }], - 'publishers': ['except'], - }) + self._break_pipeline_cfg() pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) @@ -480,14 +445,14 @@ class TestPipeline(test.BaseTestCase): self.assertEqual(publisher.calls, 1) self.assertEqual(len(publisher.samples), 1) self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update') - self.assertTrue(len(self.TransformerClass.samples) == 2) - self.assertTrue(getattr(self.TransformerClass.samples[0], "name") - == 'a') - self.assertTrue(getattr(self.TransformerClass.samples[1], "name") - == 'b') + self.assertEqual(len(self.TransformerClass.samples), 2) + self.assertEqual(getattr(self.TransformerClass.samples[0], "name"), + 'a') + self.assertEqual(getattr(self.TransformerClass.samples[1], "name"), + 'b') def test_none_transformer_pipeline(self): - self.pipeline_cfg[0]['transformers'] = None + self._set_pipeline_cfg('transformers', None) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -498,7 +463,7 @@ class TestPipeline(test.BaseTestCase): self.assertEqual(getattr(publisher.samples[0], 'name'), 'a') def test_empty_transformer_pipeline(self): - self.pipeline_cfg[0]['transformers'] = [] + self._set_pipeline_cfg('transformers', []) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -509,7 +474,7 @@ class TestPipeline(test.BaseTestCase): self.assertEqual(getattr(publisher.samples[0], 'name'), 'a') def test_multiple_transformer_same_class(self): - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'update', 'parameters': {} @@ -519,6 +484,7 @@ class TestPipeline(test.BaseTestCase): 'parameters': {} }, ] + self._set_pipeline_cfg('transformers', transformer_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) @@ -530,14 +496,14 @@ class TestPipeline(test.BaseTestCase): self.assertEqual(len(publisher.samples), 1) self.assertEqual(getattr(publisher.samples[0], 'name'), 'a_update_update') - self.assertTrue(len(self.TransformerClass.samples) == 2) - self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') - == 'a') - self.assertTrue(getattr(self.TransformerClass.samples[1], 'name') - == 'a_update') + self.assertEqual(len(self.TransformerClass.samples), 2) + self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'), + 'a') + self.assertEqual(getattr(self.TransformerClass.samples[1], 'name'), + 'a_update') def test_multiple_transformer_same_class_different_parameter(self): - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'update', 'parameters': @@ -553,23 +519,24 @@ class TestPipeline(test.BaseTestCase): } }, ] + self._set_pipeline_cfg('transformers', transformer_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: p([self.test_counter]) - self.assertTrue(len(self.TransformerClass.samples) == 2) - self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') - == 'a') - self.assertTrue(getattr(self.TransformerClass.samples[1], 'name') - == 'a_update') + self.assertEqual(len(self.TransformerClass.samples), 2) + self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'), + 'a') + self.assertEqual(getattr(self.TransformerClass.samples[1], 'name'), + 'a_update') publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.samples), 1) self.assertEqual(getattr(publisher.samples[0], 'name'), 'a_update_new') def test_multiple_transformer_drop_transformer(self): - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'update', 'parameters': @@ -589,6 +556,7 @@ class TestPipeline(test.BaseTestCase): } }, ] + self._set_pipeline_cfg('transformers', transformer_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -596,15 +564,15 @@ class TestPipeline(test.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.samples), 0) - self.assertTrue(len(self.TransformerClass.samples) == 1) - self.assertTrue(getattr(self.TransformerClass.samples[0], 'name') - == 'a') - self.assertTrue(len(self.TransformerClassDrop.samples) == 1) - self.assertTrue(getattr(self.TransformerClassDrop.samples[0], 'name') - == 'a_update') + self.assertEqual(len(self.TransformerClass.samples), 1) + self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'), + 'a') + self.assertEqual(len(self.TransformerClassDrop.samples), 1) + self.assertEqual(getattr(self.TransformerClassDrop.samples[0], 'name'), + 'a_update') def test_multiple_publisher(self): - self.pipeline_cfg[0]['publishers'] = ['test://', 'new://'] + self._set_pipeline_cfg('publishers', ['test://', 'new://']) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) @@ -621,7 +589,7 @@ class TestPipeline(test.BaseTestCase): 'a_update') def test_multiple_publisher_isolation(self): - self.pipeline_cfg[0]['publishers'] = ['except://', 'new://'] + self._set_pipeline_cfg('publishers', ['except://', 'new://']) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -633,7 +601,7 @@ class TestPipeline(test.BaseTestCase): 'a_update') def test_multiple_counter_pipeline(self): - self.pipeline_cfg[0]['counters'] = ['a', 'b'] + self._set_pipeline_cfg('counters', ['a', 'b']) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -657,7 +625,7 @@ class TestPipeline(test.BaseTestCase): def test_flush_pipeline_cache(self): CACHE_SIZE = 10 - self.pipeline_cfg[0]['transformers'].extend([ + extra_transformer_cfg = [ { 'name': 'cache', 'parameters': { @@ -670,8 +638,9 @@ class TestPipeline(test.BaseTestCase): { 'append_name': '_new' } - }, ] - ) + }, + ] + self._extend_pipeline_cfg('transformers', extra_transformer_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) pipe = pipeline_manager.pipelines[0] @@ -688,12 +657,12 @@ class TestPipeline(test.BaseTestCase): pipe.publish_sample(None, self.test_counter) pipe.flush(None) self.assertEqual(len(publisher.samples), CACHE_SIZE) - self.assertTrue(getattr(publisher.samples[0], 'name') - == 'a_update_new') + self.assertEqual(getattr(publisher.samples[0], 'name'), + 'a_update_new') def test_flush_pipeline_cache_multiple_counter(self): CACHE_SIZE = 3 - self.pipeline_cfg[0]['transformers'].extend([ + extra_transformer_cfg = [ { 'name': 'cache', 'parameters': { @@ -706,9 +675,10 @@ class TestPipeline(test.BaseTestCase): { 'append_name': '_new' } - }, ] - ) - self.pipeline_cfg[0]['counters'] = ['a', 'b'] + }, + ] + self._extend_pipeline_cfg('transformers', extra_transformer_cfg) + self._set_pipeline_cfg('counters', ['a', 'b']) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) with pipeline_manager.publisher(None) as p: @@ -738,10 +708,11 @@ class TestPipeline(test.BaseTestCase): 'b_update_new') def test_flush_pipeline_cache_before_publisher(self): - self.pipeline_cfg[0]['transformers'].append({ + extra_transformer_cfg = [{ 'name': 'cache', 'parameters': {} - }) + }] + self._extend_pipeline_cfg('transformers', extra_transformer_cfg) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) pipe = pipeline_manager.pipelines[0] @@ -785,15 +756,15 @@ class TestPipeline(test.BaseTestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.samples), 1) - self.assertTrue(len(self.TransformerClass.samples) == 1) + self.assertEqual(len(self.TransformerClass.samples), 1) self.assertEqual(getattr(publisher.samples[0], "name"), 'a:b_update') - self.assertTrue(getattr(self.TransformerClass.samples[0], "name") - == 'a:b') + self.assertEqual(getattr(self.TransformerClass.samples[0], "name"), + 'a:b') def test_global_unit_conversion(self): scale = 'volume / ((10**6) * 60)' - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'unit_conversion', 'parameters': { @@ -804,7 +775,8 @@ class TestPipeline(test.BaseTestCase): } }, ] - self.pipeline_cfg[0]['counters'] = ['cpu'] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['cpu']) counters = [ sample.Sample( name='cpu', @@ -835,7 +807,7 @@ class TestPipeline(test.BaseTestCase): self.assertEqual(getattr(cpu_mins, 'volume'), 20) def test_unit_identified_source_unit_conversion(self): - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'unit_conversion', 'parameters': { @@ -845,8 +817,9 @@ class TestPipeline(test.BaseTestCase): } }, ] - self.pipeline_cfg[0]['counters'] = ['core_temperature', - 'ambient_temperature'] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['core_temperature', + 'ambient_temperature']) counters = [ sample.Sample( name='core_temperature', @@ -894,7 +867,7 @@ class TestPipeline(test.BaseTestCase): s = "(resource_metadata.user_metadata.autoscaling_weight or 1.0)" \ "* (resource_metadata.non.existent or 1.0)" \ "* (100.0 / (10**9 * (resource_metadata.cpu_number or 1)))" - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'rate_of_change', 'parameters': { @@ -906,7 +879,8 @@ class TestPipeline(test.BaseTestCase): } }, ] - self.pipeline_cfg[0]['counters'] = ['cpu'] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['cpu']) now = timeutils.utcnow() later = now + datetime.timedelta(minutes=offset) um = {'autoscaling_weight': weight} if weight else {} @@ -1017,7 +991,7 @@ class TestPipeline(test.BaseTestCase): def test_rate_of_change_no_predecessor(self): s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'rate_of_change', 'parameters': { @@ -1029,7 +1003,8 @@ class TestPipeline(test.BaseTestCase): } }, ] - self.pipeline_cfg[0]['counters'] = ['cpu'] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['cpu']) now = timeutils.utcnow() counters = [ sample.Sample( @@ -1057,7 +1032,7 @@ class TestPipeline(test.BaseTestCase): def test_resources(self): resources = ['test1://', 'test2://'] - self.pipeline_cfg[0]['resources'] = resources + self._set_pipeline_cfg('resources', resources) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) self.assertEqual(pipeline_manager.pipelines[0].resources, @@ -1116,7 +1091,7 @@ class TestPipeline(test.BaseTestCase): 'unit': '(B|request)'} map_to = {'name': 'disk.\\1.\\2.rate', 'unit': '\\1/s'} - self.pipeline_cfg[0]['transformers'] = [ + transformer_cfg = [ { 'name': 'rate_of_change', 'parameters': { @@ -1130,51 +1105,12 @@ class TestPipeline(test.BaseTestCase): }, }, ] - self.pipeline_cfg[0]['counters'] = ['disk.read.bytes', - 'disk.write.requests'] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['disk.read.bytes', + 'disk.write.requests']) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) pipe = pipeline_manager.pipelines[0] meters = ('disk.read.bytes', 'disk.write.requests') units = ('B', 'request') self._do_test_rate_of_change_mapping(pipe, meters, units) - - def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index, - meters, units): - with open('etc/ceilometer/pipeline.yaml') as fap: - data = fap.read() - pipeline_cfg = yaml.safe_load(data) - for p in pipeline_cfg: - p['publishers'] = ['test://'] - pipeline_manager = pipeline.PipelineManager(pipeline_cfg, - self.transformer_manager) - pipe = pipeline_manager.pipelines[index] - self._do_test_rate_of_change_mapping(pipe, meters, units) - - def test_rate_of_change_boilerplate_disk_read_cfg(self): - meters = ('disk.read.bytes', 'disk.read.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, - meters, - units) - - def test_rate_of_change_boilerplate_disk_write_cfg(self): - meters = ('disk.write.bytes', 'disk.write.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, - meters, - units) - - def test_rate_of_change_boilerplate_network_incoming_cfg(self): - meters = ('network.incoming.bytes', 'network.incoming.packets') - units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) - - def test_rate_of_change_boilerplate_network_outgoing_cfg(self): - meters = ('network.outgoing.bytes', 'network.outgoing.packets') - units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) diff --git a/ceilometer/tests/test_decoupled_pipeline.py b/ceilometer/tests/test_decoupled_pipeline.py new file mode 100644 index 000000000..293adb32f --- /dev/null +++ b/ceilometer/tests/test_decoupled_pipeline.py @@ -0,0 +1,249 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2014 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import yaml + +from ceilometer import pipeline +from ceilometer import sample +from ceilometer.tests import pipeline_base + + +class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase): + def _setup_pipeline_cfg(self): + source = {'name': 'test_source', + 'interval': 5, + 'counters': ['a'], + 'resources': [], + 'sinks': ['test_sink']} + sink = {'name': 'test_sink', + 'transformers': [{'name': 'update', 'parameters': {}}], + 'publishers': ['test://']} + self.pipeline_cfg = {'sources': [source], 'sinks': [sink]} + + def _augment_pipeline_cfg(self): + self.pipeline_cfg['sources'].append({ + 'name': 'second_source', + 'interval': 5, + 'counters': ['b'], + 'resources': [], + 'sinks': ['second_sink'] + }) + self.pipeline_cfg['sinks'].append({ + 'name': 'second_sink', + 'transformers': [{ + 'name': 'update', + 'parameters': + { + 'append_name': '_new', + } + }], + 'publishers': ['new'], + }) + + def _break_pipeline_cfg(self): + self.pipeline_cfg['sources'].append({ + 'name': 'second_source', + 'interval': 5, + 'counters': ['b'], + 'resources': [], + 'sinks': ['second_sink'] + }) + self.pipeline_cfg['sinks'].append({ + 'name': 'second_sink', + 'transformers': [{ + 'name': 'update', + 'parameters': + { + 'append_name': '_new', + } + }], + 'publishers': ['except'], + }) + + def _set_pipeline_cfg(self, field, value): + if field in self.pipeline_cfg['sources'][0]: + self.pipeline_cfg['sources'][0][field] = value + else: + self.pipeline_cfg['sinks'][0][field] = value + + def _extend_pipeline_cfg(self, field, value): + if field in self.pipeline_cfg['sources'][0]: + self.pipeline_cfg['sources'][0][field].extend(value) + else: + self.pipeline_cfg['sinks'][0][field].extend(value) + + def _unset_pipeline_cfg(self, field): + if field in self.pipeline_cfg['sources'][0]: + del self.pipeline_cfg['sources'][0][field] + else: + del self.pipeline_cfg['sinks'][0][field] + + def test_source_no_sink(self): + del self.pipeline_cfg['sinks'] + self._exception_create_pipelinemanager() + + def test_source_dangling_sink(self): + self.pipeline_cfg['sources'].append({ + 'name': 'second_source', + 'interval': 5, + 'counters': ['b'], + 'resources': [], + 'sinks': ['second_sink'] + }) + self._exception_create_pipelinemanager() + + def test_sink_no_source(self): + del self.pipeline_cfg['sources'] + self._exception_create_pipelinemanager() + + def test_source_with_multiple_sinks(self): + counter_cfg = ['a', 'b'] + self._set_pipeline_cfg('counters', counter_cfg) + self.pipeline_cfg['sinks'].append({ + 'name': 'second_sink', + 'transformers': [{ + 'name': 'update', + 'parameters': + { + 'append_name': '_new', + } + }], + 'publishers': ['new'], + }) + self.pipeline_cfg['sources'][0]['sinks'].append('second_sink') + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + with pipeline_manager.publisher(None) as p: + p([self.test_counter]) + + self.test_counter = sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + ) + + with pipeline_manager.publisher(None) as p: + p([self.test_counter]) + + self.assertEqual(len(pipeline_manager.pipelines), 2) + self.assertEqual(str(pipeline_manager.pipelines[0]), + 'test_source:test_sink') + self.assertEqual(str(pipeline_manager.pipelines[1]), + 'test_source:second_sink') + test_publisher = pipeline_manager.pipelines[0].publishers[0] + new_publisher = pipeline_manager.pipelines[1].publishers[0] + for publisher, sfx in [(test_publisher, '_update'), + (new_publisher, '_new')]: + self.assertEqual(len(publisher.samples), 2) + self.assertEqual(publisher.calls, 2) + self.assertEqual(getattr(publisher.samples[0], "name"), 'a' + sfx) + self.assertEqual(getattr(publisher.samples[1], "name"), 'b' + sfx) + + def test_multiple_sources_with_single_sink(self): + self.pipeline_cfg['sources'].append({ + 'name': 'second_source', + 'interval': 5, + 'counters': ['b'], + 'resources': [], + 'sinks': ['test_sink'] + }) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + with pipeline_manager.publisher(None) as p: + p([self.test_counter]) + + self.test_counter = sample.Sample( + name='b', + type=self.test_counter.type, + volume=self.test_counter.volume, + unit=self.test_counter.unit, + user_id=self.test_counter.user_id, + project_id=self.test_counter.project_id, + resource_id=self.test_counter.resource_id, + timestamp=self.test_counter.timestamp, + resource_metadata=self.test_counter.resource_metadata, + ) + + with pipeline_manager.publisher(None) as p: + p([self.test_counter]) + + self.assertEqual(len(pipeline_manager.pipelines), 2) + self.assertEqual(str(pipeline_manager.pipelines[0]), + 'test_source:test_sink') + self.assertEqual(str(pipeline_manager.pipelines[1]), + 'second_source:test_sink') + test_publisher = pipeline_manager.pipelines[0].publishers[0] + another_publisher = pipeline_manager.pipelines[1].publishers[0] + for publisher in [test_publisher, another_publisher]: + self.assertEqual(len(publisher.samples), 2) + self.assertEqual(publisher.calls, 2) + self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update') + self.assertEqual(getattr(publisher.samples[1], "name"), 'b_update') + + transformed_samples = self.TransformerClass.samples + self.assertEqual(len(transformed_samples), 2) + self.assertEqual([getattr(s, 'name') for s in transformed_samples], + ['a', 'b']) + + def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index, + meters, units): + with open('etc/ceilometer/pipeline.yaml') as fap: + data = fap.read() + pipeline_cfg = yaml.safe_load(data) + for s in pipeline_cfg['sinks']: + s['publishers'] = ['test://'] + pipeline_manager = pipeline.PipelineManager(pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[index] + self._do_test_rate_of_change_mapping(pipe, meters, units) + + def test_rate_of_change_boilerplate_disk_read_cfg(self): + meters = ('disk.read.bytes', 'disk.read.requests') + units = ('B', 'request') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + meters, + units) + + def test_rate_of_change_boilerplate_disk_write_cfg(self): + meters = ('disk.write.bytes', 'disk.write.requests') + units = ('B', 'request') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + meters, + units) + + def test_rate_of_change_boilerplate_network_incoming_cfg(self): + meters = ('network.incoming.bytes', 'network.incoming.packets') + units = ('B', 'packet') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, + meters, + units) + + def test_rate_of_change_boilerplate_network_outgoing_cfg(self): + meters = ('network.outgoing.bytes', 'network.outgoing.packets') + units = ('B', 'packet') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, + meters, + units) diff --git a/ceilometer/tests/test_deprecated_pipeline.py b/ceilometer/tests/test_deprecated_pipeline.py new file mode 100644 index 000000000..606dcadef --- /dev/null +++ b/ceilometer/tests/test_deprecated_pipeline.py @@ -0,0 +1,115 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2014 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import yaml + +from ceilometer import pipeline +from ceilometer.tests import pipeline_base + + +class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase): + def _setup_pipeline_cfg(self): + self.pipeline_cfg = [{ + 'name': 'test_pipeline', + 'interval': 5, + 'counters': ['a'], + 'transformers': [ + {'name': 'update', + 'parameters': {}} + ], + 'publishers': ['test://'], + }, ] + + def _augment_pipeline_cfg(self): + self.pipeline_cfg.append({ + 'name': 'second_pipeline', + 'interval': 5, + 'counters': ['b'], + 'transformers': [{ + 'name': 'update', + 'parameters': + { + 'append_name': '_new', + } + }], + 'publishers': ['new'], + }) + + def _break_pipeline_cfg(self): + self.pipeline_cfg.append({ + 'name': 'second_pipeline', + 'interval': 5, + 'counters': ['b'], + 'transformers': [{ + 'name': 'update', + 'parameters': + { + 'append_name': '_new', + } + }], + 'publishers': ['except'], + }) + + def _set_pipeline_cfg(self, field, value): + self.pipeline_cfg[0][field] = value + + def _extend_pipeline_cfg(self, field, value): + self.pipeline_cfg[0][field].extend(value) + + def _unset_pipeline_cfg(self, field): + del self.pipeline_cfg[0][field] + + def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index, + meters, units): + with open('etc/ceilometer/deprecated_pipeline.yaml') as fap: + data = fap.read() + pipeline_cfg = yaml.safe_load(data) + for p in pipeline_cfg: + p['publishers'] = ['test://'] + pipeline_manager = pipeline.PipelineManager(pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[index] + self._do_test_rate_of_change_mapping(pipe, meters, units) + + def test_rate_of_change_boilerplate_disk_read_cfg(self): + meters = ('disk.read.bytes', 'disk.read.requests') + units = ('B', 'request') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + meters, + units) + + def test_rate_of_change_boilerplate_disk_write_cfg(self): + meters = ('disk.write.bytes', 'disk.write.requests') + units = ('B', 'request') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, + meters, + units) + + def test_rate_of_change_boilerplate_network_incoming_cfg(self): + meters = ('network.incoming.bytes', 'network.incoming.packets') + units = ('B', 'packet') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, + meters, + units) + + def test_rate_of_change_boilerplate_network_outgoing_cfg(self): + meters = ('network.outgoing.bytes', 'network.outgoing.packets') + units = ('B', 'packet') + self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, + meters, + units) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 5fc88eb75..ebbd42da2 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -273,16 +273,27 @@ A sample configuration file can be found in `ceilometer.conf.sample`_. Pipelines ========= -Pipelines describe chains of handlers, which can be transformers and/or -publishers. +Pipelines describe a coupling between sources of samples and the +corresponding sinks for transformation and publication of these +data. -The chain can start with a transformer, which is responsible for converting -the data, coming from the pollsters or notification handlers (for further -information see the :ref:`polling` section), to the required format, which -can mean dropping some parts of the sample, doing aggregation, changing -field or deriving samples for secondary meters, like in case of *cpu_util*, -see the example below, in the configuration details. The pipeline can contain -multiple transformers or none at all. +A source is a producer of samples, in effect a set of pollsters and/or +notification handlers emitting samples for a set of matching meters. + +Each source configuration encapsulates meter name matching, polling +interval determination, optional resource enumeration or discovery, +and mapping to one or more sinks for publication. + +A sink on the other hand is a consumer of samples, providing logic for +the transformation and publication of samples emitted from related sources. +Each sink configuration is concerned `only` with the transformation rules +and publication conduits for samples. + +In effect, a sink describes a chain of handlers. The chain starts with +zero or more transformers and ends with one or more publishers. The first +transformer in the chain is passed samples from the corresponding source, +takes some action such as deriving rate of change, performing unit conversion, +or aggregating, before passing the modified sample to next step. The chains end with one or more publishers. This component makes it possible to persist the data into storage through the message bus or to send it to one @@ -300,21 +311,30 @@ ceilometer.conf. Multiple chains can be defined in one configuration file. The chain definition looks like the following:: --- - - - name: 'name of the pipeline' - interval: 'how often should the samples be injected into the pipeline' - meters: - - 'meter filter' - transformers: 'definition of transformers' - publishers: - - 'list of publishers' + sources: + - name: 'source name' + interval: 'how often should the samples be injected into the pipeline' + meters: + - 'meter filter' + resources: + - 'list of resource URLs' + sinks + - 'sink name' + sinks: + - name: 'sink name' + transformers: 'definition of transformers' + publishers: + - 'list of publishers' -The *interval* should be defined in seconds. +The *interval* parameter in the sources section should be defined in seconds. It +determines the cadence of sample injection into the pipeline, where samples are +produced under the direct control of an agent, i.e. via a polling cycle as opposed +to incoming notifications. -There are several ways to define the list of meters for a pipeline. The list -of valid meters can be found in the :ref:`measurements` section. There is +There are several ways to define the list of meters for a pipeline source. The +list of valid meters can be found in the :ref:`measurements` section. There is a possibility to define all the meters, or just included or excluded meters, -with which a pipeline should operate: +with which a source should operate: * To include all meters, use the '*' wildcard symbol. * To define the list of meters, use either of the following: @@ -337,8 +357,13 @@ The above definition methods can be used in the following combinations: pipeline. Wildcard and included meters cannot co-exist in the same pipeline definition section. -The *transformers* section provides the possibility to add a list of -transformer definitions. The names of the transformers should be the same +The optional *resources* section of a pipeline source allows a static +list of resource URLs to be to be configured. An amalgamated list of all +statically configured resources for a set of pipeline sources with a +common interval is passed to individual pollsters matching those pipelines. + +The *transformers* section of a pipeline sink provides the possibility to add a +list of transformer definitions. The names of the transformers should be the same as the names of the related extensions in setup.cfg. The definition of transformers can contain the following fields:: @@ -392,4 +417,4 @@ setup.cfg. The default configuration can be found in `pipeline.yaml`_. -.. _pipeline.yaml: https://git.openstack.org/cgit/openstack/ceilometer/tree/etc/ceilometer/pipeline.yaml \ No newline at end of file +.. _pipeline.yaml: https://git.openstack.org/cgit/openstack/ceilometer/tree/etc/ceilometer/pipeline.yaml diff --git a/etc/ceilometer/deprecated_pipeline.yaml b/etc/ceilometer/deprecated_pipeline.yaml new file mode 100644 index 000000000..b34f5ab52 --- /dev/null +++ b/etc/ceilometer/deprecated_pipeline.yaml @@ -0,0 +1,69 @@ +--- +- + name: meter_pipeline + interval: 600 + meters: + - "*" + resources: + transformers: + publishers: + - rpc:// +- + name: cpu_pipeline + interval: 600 + meters: + - "cpu" + transformers: + - name: "rate_of_change" + parameters: + target: + name: "cpu_util" + unit: "%" + type: "gauge" + scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" + publishers: + - rpc:// +- + name: disk_pipeline + interval: 600 + meters: + - "disk.read.bytes" + - "disk.read.requests" + - "disk.write.bytes" + - "disk.write.requests" + transformers: + - name: "rate_of_change" + parameters: + source: + map_from: + name: "disk\\.(read|write)\\.(bytes|requests)" + unit: "(B|request)" + target: + map_to: + name: "disk.\\1.\\2.rate" + unit: "\\1/s" + type: "gauge" + publishers: + - rpc:// +- + name: network_pipeline + interval: 600 + meters: + - "network.incoming.bytes" + - "network.incoming.packets" + - "network.outgoing.bytes" + - "network.outgoing.packets" + transformers: + - name: "rate_of_change" + parameters: + source: + map_from: + name: "network\\.(incoming|outgoing)\\.(bytes|packets)" + unit: "(B|packet)" + target: + map_to: + name: "network.\\1.\\2.rate" + unit: "\\1/s" + type: "gauge" + publishers: + - rpc:// diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index b34f5ab52..dcd9feab6 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -1,69 +1,78 @@ --- -- - name: meter_pipeline - interval: 600 - meters: - - "*" - resources: - transformers: - publishers: - - rpc:// -- - name: cpu_pipeline - interval: 600 - meters: - - "cpu" - transformers: - - name: "rate_of_change" - parameters: - target: - name: "cpu_util" - unit: "%" - type: "gauge" - scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - publishers: - - rpc:// -- - name: disk_pipeline - interval: 600 - meters: - - "disk.read.bytes" - - "disk.read.requests" - - "disk.write.bytes" - - "disk.write.requests" - transformers: - - name: "rate_of_change" - parameters: - source: - map_from: - name: "disk\\.(read|write)\\.(bytes|requests)" - unit: "(B|request)" - target: - map_to: - name: "disk.\\1.\\2.rate" - unit: "\\1/s" - type: "gauge" - publishers: - - rpc:// -- - name: network_pipeline - interval: 600 - meters: - - "network.incoming.bytes" - - "network.incoming.packets" - - "network.outgoing.bytes" - - "network.outgoing.packets" - transformers: - - name: "rate_of_change" - parameters: - source: - map_from: - name: "network\\.(incoming|outgoing)\\.(bytes|packets)" - unit: "(B|packet)" - target: - map_to: - name: "network.\\1.\\2.rate" - unit: "\\1/s" - type: "gauge" - publishers: - - rpc:// +sources: + - name: meter_source + interval: 600 + meters: + - "*" + sinks: + - meter_sink + - name: cpu_source + interval: 600 + meters: + - "cpu" + sinks: + - cpu_sink + - name: disk_source + interval: 600 + meters: + - "disk.read.bytes" + - "disk.read.requests" + - "disk.write.bytes" + - "disk.write.requests" + sinks: + - disk_sink + - name: network_source + interval: 600 + meters: + - "network.incoming.bytes" + - "network.incoming.packets" + - "network.outgoing.bytes" + - "network.outgoing.packets" + sinks: + - network_sink +sinks: + - name: meter_sink + transformers: + publishers: + - rpc:// + - name: cpu_sink + transformers: + - name: "rate_of_change" + parameters: + target: + name: "cpu_util" + unit: "%" + type: "gauge" + scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" + publishers: + - rpc:// + - name: disk_sink + transformers: + - name: "rate_of_change" + parameters: + source: + map_from: + name: "disk\\.(read|write)\\.(bytes|requests)" + unit: "(B|request)" + target: + map_to: + name: "disk.\\1.\\2.rate" + unit: "\\1/s" + type: "gauge" + publishers: + - rpc:// + - name: network_sink + transformers: + - name: "rate_of_change" + parameters: + source: + map_from: + name: "network\\.(incoming|outgoing)\\.(bytes|packets)" + unit: "(B|packet)" + target: + map_to: + name: "network.\\1.\\2.rate" + unit: "\\1/s" + type: "gauge" + publishers: + - rpc://