Decouple source and sink configuration for pipelines

Addresses: BP decoupled-source-sink-discoverable-resources

Add support for a new decoupled model of pipeline config
which does not conflate the elements specific to sources
and sinks.

Instead of the basic unit of config being a consolidated
pipeline, the sources and sinks may now be specified as
separate lists of dictionaries, before being linked together
once parsed.

This allows source-specific configuration, such as resource
discovery, to be kept focused only on the fine-grained source
while avoiding the necessity for wide duplication of sink-
related config.

The shape of the new config format is best illustrated with
an example:

  ---
  sources:
      - name: meter_source
        interval: 600
        meters:
            - "*"
        sinks:
            - meter_sink
      - name: host_cpu_source
        interval: 120
        meters:
             - "cpu.util.*min"
        resources:
             - "snmp://ip1"
             - "snmp://ip2"
             - "snmp://ip3"
        sinks:
            - meter_sink
            - lossy_sink
      - name: instance_cpu_source
        interval: 60
        meters:
            - "cpu"
        sinks:
            - cpu_sink
  sinks:
      - name: meter_sink
        transformers:
        publishers:
            - rpc://
      - name: lossy_sink
        transformers:
        publishers:
            - udp://addr
      - name: cpu_sink
        transformers:
            - name: "rate_of_change"
              parameters:
                  target:
                      name: "cpu_util"
                      unit: "%"
                      type: "gauge"
                      scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
        publishers:
            - rpc://

The old pipeline.yaml format continues to be supported as
a deprecated feature to avoid breaking existing deployments.
The semantics of the common individual configuration elements
are identical in the deprecated and decoupled versions.

Change-Id: Ide86c0feba88ae736f2a913b5faa95e640c4ceaf
This commit is contained in:
Eoghan Glynn 2014-02-24 19:31:36 +00:00
parent cae5b179e5
commit 9114e135b6
12 changed files with 946 additions and 416 deletions

View File

@ -29,7 +29,6 @@ from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline
from ceilometer import transformer
LOG = log.getLogger(__name__)
@ -105,20 +104,16 @@ class AgentManager(os_service.Service):
self.pipeline_manager.pipelines,
self.pollster_manager.extensions):
if pipeline.support_meter(pollster.name):
polling_task = polling_tasks.get(pipeline.interval)
polling_task = polling_tasks.get(pipeline.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[pipeline.interval] = polling_task
polling_tasks[pipeline.get_interval()] = polling_task
polling_task.add(pollster, [pipeline])
return polling_tasks
def start(self):
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
self.pipeline_manager = pipeline.setup_pipeline()
for interval, task in self.setup_polling_tasks().iteritems():
self.tg.add_timer(interval,

View File

@ -23,7 +23,6 @@ from oslo.config import cfg
from pecan import hooks
from ceilometer import pipeline
from ceilometer import transformer
class ConfigHook(hooks.PecanHook):
@ -55,9 +54,7 @@ class PipelineHook(hooks.PecanHook):
if self.__class__.pipeline_manager is None:
# this is done here as the cfg options are not available
# when the file is imported.
self.__class__.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer'))
self.__class__.pipeline_manager = pipeline.setup_pipeline()
def before(self, state):
state.request.pipeline_manager = self.pipeline_manager

View File

@ -28,7 +28,6 @@ from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline
from ceilometer import service
from ceilometer.storage import models
from ceilometer import transformer
LOG = log.getLogger(__name__)
@ -67,11 +66,7 @@ class NotificationService(service.DispatchedService, rpc_service.Service):
def initialize_service_hook(self, service):
'''Consumers must be declared before consume_thread start.'''
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
self.pipeline_manager = pipeline.setup_pipeline()
LOG.debug(_('Loading event definitions'))
self.event_converter = event_converter.setup_events(

View File

@ -63,7 +63,6 @@ from ceilometer.openstack.common import timeutils
from ceilometer import pipeline
from ceilometer import sample
from ceilometer import service
from ceilometer import transformer
class CeilometerMiddleware(object):
@ -80,11 +79,7 @@ class CeilometerMiddleware(object):
service.prepare_service([])
self.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer',
),
)
self.pipeline_manager = pipeline.setup_pipeline()
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_')
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'

View File

@ -1,8 +1,10 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Intel Corp.
# Copyright © 2014 Red Hat, Inc
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
# Authors: Yunhong Jiang <yunhong.jiang@intel.com>
# Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -27,6 +29,7 @@ import yaml
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
from ceilometer import publisher
from ceilometer import transformer as xformer
OPTS = [
@ -71,29 +74,18 @@ class PublishContext(object):
p.flush(self.context)
class Pipeline(object):
"""Sample handling pipeline
class Source(object):
"""Represents a source of samples, in effect a set of pollsters
and/or notification handlers emitting samples for a set of matching
meters.
Pipeline describes a chain of handlers. The chain starts with
transformer and ends with one or more publishers.
The first transformer in the chain gets sample from data collector, i.e.
pollster or notification handler, takes some action like dropping,
aggregation, changing field etc, then passes the updated sample
to next step.
The subsequent transformers, if any, handle the data similarly.
In the end of the chain, publishers publish the data. The exact publishing
method depends on publisher type, for example, pushing into data storage
through message bus, sending to external CW software through CW API call.
If no transformer is included in the chain, the publishers get samples
from data collector and publish them directly.
Each source encapsulates meter name matching, polling interval
determination, optional resource enumeration or discovery, and
mapping to one or more sinks for publication.
"""
def __init__(self, cfg, transformer_manager):
def __init__(self, cfg):
self.cfg = cfg
try:
@ -104,8 +96,7 @@ class Pipeline(object):
raise PipelineException("Invalid interval value", cfg)
# Support 'counters' for backward compatibility
self.meters = cfg.get('meters', cfg.get('counters'))
# It's legal to have no transformer specified
self.transformer_cfg = cfg['transformers'] or []
self.sinks = cfg.get('sinks')
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@ -113,27 +104,12 @@ class Pipeline(object):
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
self._check_meters()
if not cfg.get('publishers'):
raise PipelineException("No publisher specified", cfg)
self.publishers = []
for p in cfg['publishers']:
if '://' not in p:
# Support old format without URL
p = p + "://"
try:
self.publishers.append(publisher.get_publisher(p))
except Exception:
LOG.exception(_("Unable to load publisher %s"), p)
self.transformers = self._setup_transformers(cfg, transformer_manager)
self.resources = cfg.get('resources') or []
if not isinstance(self.resources, list):
raise PipelineException("Resources should be a list", cfg)
self._check_meters()
def __str__(self):
return self.name
@ -161,6 +137,106 @@ class Pipeline(object):
"Included meters specified with wildcard",
self.cfg)
# (yjiang5) To support meters like instance:m1.tiny,
# which include variable part at the end starting with ':'.
# Hope we will not add such meters in future.
@staticmethod
def _variable_meter_name(name):
m = name.partition(':')
if m[1] == ':':
return m[1].join((m[0], '*'))
else:
return name
def support_meter(self, meter_name):
meter_name = self._variable_meter_name(meter_name)
# Special case: if we only have negation, we suppose the default is
# allow
default = all(meter.startswith('!') for meter in self.meters)
# Support wildcard like storage.* and !disk.*
# Start with negation, we consider that the order is deny, allow
if any(fnmatch.fnmatch(meter_name, meter[1:])
for meter in self.meters
if meter[0] == '!'):
return False
if any(fnmatch.fnmatch(meter_name, meter)
for meter in self.meters
if meter[0] != '!'):
return True
return default
def check_sinks(self, sinks):
if not self.sinks:
raise PipelineException(
"No sink defined in source %s" % self,
self.cfg)
for sink in self.sinks:
if sink not in sinks:
raise PipelineException(
"Dangling sink %s from source %s" % (sink, self),
self.cfg)
class Sink(object):
"""Represents a sink for the transformation and publication of
samples emitted from a related source.
Each sink config is concerned *only* with the transformation rules
and publication conduits for samples.
In effect, a sink describes a chain of handlers. The chain starts
with zero or more transformers and ends with one or more publishers.
The first transformer in the chain is passed samples from the
corresponding source, takes some action such as deriving rate of
change, performing unit conversion, or aggregating, before passing
the modified sample to next step.
The subsequent transformers, if any, handle the data similarly.
At the end of the chain, publishers publish the data. The exact
publishing method depends on publisher type, for example, pushing
into data storage via the message bus providing guaranteed delivery,
or for loss-tolerant samples UDP may be used.
If no transformers are included in the chain, the publishers are
passed samples directly from the sink which are published unchanged.
"""
def __init__(self, cfg, transformer_manager):
self.cfg = cfg
try:
self.name = cfg['name']
# It's legal to have no transformer specified
self.transformer_cfg = cfg['transformers'] or []
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
if not cfg.get('publishers'):
raise PipelineException("No publisher specified", cfg)
self.publishers = []
for p in cfg['publishers']:
if '://' not in p:
# Support old format without URL
p = p + "://"
try:
self.publishers.append(publisher.get_publisher(p))
except Exception:
LOG.exception(_("Unable to load publisher %s"), p)
self.transformers = self._setup_transformers(cfg, transformer_manager)
def __str__(self):
return self.name
def _setup_transformers(self, cfg, transformer_manager):
transformer_cfg = cfg['transformers'] or []
transformers = []
@ -234,49 +310,11 @@ class Pipeline(object):
'pub': p}))
LOG.audit(_("Pipeline %s: Published samples") % self)
def publish_sample(self, ctxt, sample):
self.publish_samples(ctxt, [sample])
def publish_samples(self, ctxt, samples):
for meter_name, samples in itertools.groupby(
sorted(samples, key=operator.attrgetter('name')),
operator.attrgetter('name')):
if self.support_meter(meter_name):
self._publish_samples(0, ctxt, samples)
# (yjiang5) To support meters like instance:m1.tiny,
# which include variable part at the end starting with ':'.
# Hope we will not add such meters in future.
def _variable_meter_name(self, name):
m = name.partition(':')
if m[1] == ':':
return m[1].join((m[0], '*'))
else:
return name
def support_meter(self, meter_name):
meter_name = self._variable_meter_name(meter_name)
# Special case: if we only have negation, we suppose the default it
# allow
if all(meter.startswith('!') for meter in self.meters):
default = True
else:
default = False
# Support wildcard like storage.* and !disk.*
# Start with negation, we consider that the order is deny, allow
if any(fnmatch.fnmatch(meter_name, meter[1:])
for meter in self.meters
if meter[0] == '!'):
return False
if any(fnmatch.fnmatch(meter_name, meter)
for meter in self.meters
if meter[0] != '!'):
return True
return default
self._publish_samples(0, ctxt, samples)
def flush(self, ctxt):
"""Flush data after all samples have been injected to pipeline."""
@ -292,8 +330,43 @@ class Pipeline(object):
'trans': transformer}))
LOG.exception(err)
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source.
"""
def __init__(self, source, sink):
self.source = source
self.sink = sink
self.name = str(self)
def __str__(self):
return (self.source.name if self.source.name == self.sink.name
else '%s:%s' % (self.source.name, self.sink.name))
def get_interval(self):
return self.interval
return self.source.interval
@property
def resources(self):
return self.source.resources
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
@property
def publishers(self):
return self.sink.publishers
def publish_sample(self, ctxt, sample):
self.publish_samples(ctxt, [sample])
def publish_samples(self, ctxt, samples):
supported = [s for s in samples if self.source.support_meter(s.name)]
self.sink.publish_samples(ctxt, supported)
def flush(self, ctxt):
self.sink.flush(ctxt)
class PipelineManager(object):
@ -305,31 +378,82 @@ class PipelineManager(object):
"""
def __init__(self, cfg,
transformer_manager):
def __init__(self, cfg, transformer_manager):
"""Setup the pipelines according to config.
The top of the cfg is a list of pipeline definitions.
The configuration is supported in one of two forms:
Pipeline definition is an dictionary specifying the target samples,
the transformers involved, and the target publishers:
{
"name": pipeline_name
"interval": interval_time
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"transformers":[
1. Deprecated: the source and sink configuration are conflated
as a list of consolidated pipelines.
The pipelines are defined as a list of dictionaries each
specifying the target samples, the transformers involved,
and the target publishers, for example:
[{"name": pipeline_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"transformers": [
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
{"name": "Transformer_2",
{"name": "Transformer_2",
"parameters": {"p1": "value"}},
]
"publishers": ["publisher_1", "publisher_2"]
}
],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": pipeline_2,
"interval": interval_time,
"meters" : ["meter_3"],
"publishers": ["publisher_3"]
},
]
Interval is how many seconds should the samples be injected to
the pipeline.
2. Decoupled: the source and sink configuration are separately
specified before being linked together. This allows source-
specific configuration, such as resource discovery, to be
kept focused only on the fine-grained source while avoiding
the necessity for wide duplication of sink-related config.
The configuration is provided in the form of separate lists
of dictionaries defining sources and sinks, for example:
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
"sinks" : ["sink_1", "sink_2"]
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
"sinks" : ["sink_2"]
},
],
"sinks": [{"name": sink_1,
"transformers": [
{"name": "Transformer_1",
"parameters": {"p1": "value"}},
{"name": "Transformer_2",
"parameters": {"p1": "value"}},
],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": sink_2,
"publishers": ["publisher_3"]
},
]
}
The semantics of the common individual configuration elements
are identical in the deprecated and decoupled version.
The interval determines the cadence of sample injection into
the pipeline where samples are produced under the direct control
of an agent, i.e. via a polling cycle as opposed to incoming
notifications.
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
@ -352,8 +476,26 @@ class PipelineManager(object):
Publisher's name is plugin name in setup.cfg
"""
self.pipelines = [Pipeline(pipedef, transformer_manager)
for pipedef in cfg]
self.pipelines = []
if 'sources' in cfg or 'sinks' in cfg:
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_('detected decoupled pipeline config format'))
sources = [Source(s) for s in cfg.get('sources', [])]
sinks = dict((s['name'], Sink(s, transformer_manager))
for s in cfg.get('sinks', []))
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
self.pipelines.append(Pipeline(source,
sinks[target]))
else:
LOG.warning(_('detected deprecated pipeline config format'))
for pipedef in cfg:
source = Source(pipedef)
sink = Sink(pipedef, transformer_manager)
self.pipelines.append(Pipeline(source, sink))
def publisher(self, context):
"""Build a new Publisher for these manager pipelines.
@ -363,7 +505,7 @@ class PipelineManager(object):
return PublishContext(context, self.pipelines)
def setup_pipeline(transformer_manager):
def setup_pipeline(transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
if not os.path.exists(cfg_file):
@ -378,4 +520,7 @@ def setup_pipeline(transformer_manager):
LOG.info(_("Pipeline config: %s"), pipeline_cfg)
return PipelineManager(pipeline_cfg,
transformer_manager)
transformer_manager or
xformer.TransformerExtensionManager(
'ceilometer.transformer',
))

View File

@ -60,7 +60,7 @@ class TestSwiftMiddleware(test.BaseTestCase):
def __init__(self):
self.pipelines = [self._faux_pipeline(self)]
def _fake_setup_pipeline(self, transformer_manager):
def _fake_setup_pipeline(self, transformer_manager=None):
return self.pipeline_manager
def setUp(self):

View File

@ -17,9 +17,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
import yaml
import six
from stevedore import extension
from ceilometer.openstack.common.fixture import mockpatch
@ -34,32 +35,8 @@ from ceilometer.transformer import accumulator
from ceilometer.transformer import conversions
class TestTransformerAccumulator(test.BaseTestCase):
def test_handle_sample(self):
test_sample = sample.Sample(
name='a',
type=sample.TYPE_GAUGE,
volume=1,
unit='B',
user_id="test_user",
project_id="test_proj",
resource_id="test_resource",
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={}
)
# Test when size is set to less than 1.
tf = accumulator.TransformerAccumulator(size=0)
self.assertEqual(tf.handle_sample(None, test_sample), test_sample)
self.assertFalse(hasattr(tf, 'samples'))
# Test when size is set to greater or equal than 1.
tf = accumulator.TransformerAccumulator(size=2)
tf.handle_sample(None, test_sample)
self.assertEqual(len(tf.samples), 1)
class TestPipeline(test.BaseTestCase):
@six.add_metaclass(abc.ABCMeta)
class BasePipelineTestCase(test.BaseTestCase):
def fake_tem_init(self):
"""Fake a transformerManager for pipeline
The faked entry point setting is below:
@ -136,7 +113,7 @@ class TestPipeline(test.BaseTestCase):
raise Exception()
def setUp(self):
super(TestPipeline, self).setUp()
super(BasePipelineTestCase, self).setUp()
self.test_counter = sample.Sample(
name='a',
@ -163,16 +140,31 @@ class TestPipeline(test.BaseTestCase):
self.transformer_manager = transformer.TransformerExtensionManager()
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 5,
'counters': ['a'],
'transformers': [
{'name': "update",
'parameters': {}}
],
'publishers': ["test://"],
}, ]
self._setup_pipeline_cfg()
@abc.abstractmethod
def _setup_pipeline_cfg(self):
"""Setup the appropriate form of pipeline config."""
@abc.abstractmethod
def _augment_pipeline_cfg(self):
"""Augment the pipeline config with an additional element."""
@abc.abstractmethod
def _break_pipeline_cfg(self):
"""Break the pipeline config with a malformed element."""
@abc.abstractmethod
def _set_pipeline_cfg(self, field, value):
"""Set a field to a value in the pipeline config."""
@abc.abstractmethod
def _extend_pipeline_cfg(self, field, value):
"""Extend an existing field in the pipeline config with a value."""
@abc.abstractmethod
def _unset_pipeline_cfg(self, field):
"""Clear an existing field in the pipeline config."""
def _exception_create_pipelinemanager(self):
self.assertRaises(pipeline.PipelineException,
@ -181,51 +173,51 @@ class TestPipeline(test.BaseTestCase):
self.transformer_manager)
def test_no_counters(self):
del self.pipeline_cfg[0]['counters']
self._unset_pipeline_cfg('counters')
self._exception_create_pipelinemanager()
def test_no_transformers(self):
del self.pipeline_cfg[0]['transformers']
self._unset_pipeline_cfg('transformers')
self._exception_create_pipelinemanager()
def test_no_name(self):
del self.pipeline_cfg[0]['name']
self._unset_pipeline_cfg('name')
self._exception_create_pipelinemanager()
def test_no_interval(self):
del self.pipeline_cfg[0]['interval']
self._unset_pipeline_cfg('interval')
self._exception_create_pipelinemanager()
def test_no_publishers(self):
del self.pipeline_cfg[0]['publishers']
self._unset_pipeline_cfg('publishers')
self._exception_create_pipelinemanager()
def test_invalid_resources(self):
invalid_resource = {'invalid': 1}
self.pipeline_cfg[0]['resources'] = invalid_resource
self._set_pipeline_cfg('resources', invalid_resource)
self._exception_create_pipelinemanager()
def test_check_counters_include_exclude_same(self):
counter_cfg = ['a', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
self._exception_create_pipelinemanager()
def test_check_counters_include_exclude(self):
counter_cfg = ['a', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
self._exception_create_pipelinemanager()
def test_check_counters_wildcard_included(self):
counter_cfg = ['a', '*']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
self._exception_create_pipelinemanager()
def test_check_publishers_invalid_publisher(self):
publisher_cfg = ['test_invalid']
self.pipeline_cfg[0]['publishers'] = publisher_cfg
self._set_pipeline_cfg('publishers', publisher_cfg)
def test_invalid_string_interval(self):
self.pipeline_cfg[0]['interval'] = 'string'
self._set_pipeline_cfg('interval', 'string')
self._exception_create_pipelinemanager()
def test_check_transformer_invalid_transformer(self):
@ -233,7 +225,7 @@ class TestPipeline(test.BaseTestCase):
{'name': "test_invalid",
'parameters': {}}
]
self.pipeline_cfg[0]['transformers'] = transformer_cfg
self._set_pipeline_cfg('transformers', transformer_cfg)
self._exception_create_pipelinemanager()
def test_get_interval(self):
@ -241,7 +233,7 @@ class TestPipeline(test.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
self.assertTrue(pipe.get_interval() == 5)
self.assertEqual(pipe.get_interval(), 5)
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
@ -252,14 +244,14 @@ class TestPipeline(test.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
def test_multiple_included_counters(self):
counter_cfg = ['a', 'b']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
@ -285,13 +277,13 @@ class TestPipeline(test.BaseTestCase):
p([self.test_counter])
self.assertEqual(len(publisher.samples), 2)
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(getattr(publisher.samples[1], "name"), 'b_update')
def test_counter_dont_match(self):
counter_cfg = ['nomatch']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -303,7 +295,7 @@ class TestPipeline(test.BaseTestCase):
def test_wildcard_counter(self):
counter_cfg = ['*']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -311,19 +303,19 @@ class TestPipeline(test.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
def test_wildcard_excluded_counters_not_excluded(self):
counter_cfg = ['*', '!b']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -336,7 +328,7 @@ class TestPipeline(test.BaseTestCase):
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -344,14 +336,14 @@ class TestPipeline(test.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
def test_all_excluded_counters_is_excluded(self):
counter_cfg = ['!a', '!c']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].support_meter('a'))
@ -360,7 +352,7 @@ class TestPipeline(test.BaseTestCase):
def test_wildcard_and_excluded_wildcard_counters(self):
counter_cfg = ['*', '!disk.*']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
@ -369,7 +361,7 @@ class TestPipeline(test.BaseTestCase):
def test_included_counter_and_wildcard_counters(self):
counter_cfg = ['cpu', 'disk.*']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertTrue(pipeline_manager.pipelines[0].
@ -380,7 +372,7 @@ class TestPipeline(test.BaseTestCase):
def test_excluded_counter_and_excluded_wildcard_counters(self):
counter_cfg = ['!cpu', '!disk.*']
self.pipeline_cfg[0]['counters'] = counter_cfg
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertFalse(pipeline_manager.pipelines[0].
@ -390,19 +382,7 @@ class TestPipeline(test.BaseTestCase):
support_meter('instance'))
def test_multiple_pipeline(self):
self.pipeline_cfg.append({
'name': 'second_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
"append_name": "_new",
}
}],
'publishers': ['new'],
})
self._augment_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
@ -432,29 +412,14 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(len(new_publisher.samples), 1)
self.assertEqual(new_publisher.calls, 1)
self.assertEqual(getattr(new_publisher.samples[0], "name"), 'b_new')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], "name")
== 'b')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], "name"),
'b')
def test_multiple_pipeline_exception(self):
self.pipeline_cfg.append({
'name': "second_pipeline",
"interval": 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
"append_name": "_new",
}
}],
'publishers': ['except'],
})
self._break_pipeline_cfg()
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
@ -480,14 +445,14 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(publisher.calls, 1)
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], "name")
== 'b')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], "name"),
'b')
def test_none_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = None
self._set_pipeline_cfg('transformers', None)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -498,7 +463,7 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(getattr(publisher.samples[0], 'name'), 'a')
def test_empty_transformer_pipeline(self):
self.pipeline_cfg[0]['transformers'] = []
self._set_pipeline_cfg('transformers', [])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -509,7 +474,7 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(getattr(publisher.samples[0], 'name'), 'a')
def test_multiple_transformer_same_class(self):
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'update',
'parameters': {}
@ -519,6 +484,7 @@ class TestPipeline(test.BaseTestCase):
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
@ -530,14 +496,14 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_update')
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], 'name')
== 'a_update')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], 'name'),
'a_update')
def test_multiple_transformer_same_class_different_parameter(self):
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'update',
'parameters':
@ -553,23 +519,24 @@ class TestPipeline(test.BaseTestCase):
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.assertTrue(len(self.TransformerClass.samples) == 2)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
self.assertTrue(getattr(self.TransformerClass.samples[1], 'name')
== 'a_update')
self.assertEqual(len(self.TransformerClass.samples), 2)
self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'),
'a')
self.assertEqual(getattr(self.TransformerClass.samples[1], 'name'),
'a_update')
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_new')
def test_multiple_transformer_drop_transformer(self):
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'update',
'parameters':
@ -589,6 +556,7 @@ class TestPipeline(test.BaseTestCase):
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -596,15 +564,15 @@ class TestPipeline(test.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 0)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertTrue(getattr(self.TransformerClass.samples[0], 'name')
== 'a')
self.assertTrue(len(self.TransformerClassDrop.samples) == 1)
self.assertTrue(getattr(self.TransformerClassDrop.samples[0], 'name')
== 'a_update')
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(self.TransformerClass.samples[0], 'name'),
'a')
self.assertEqual(len(self.TransformerClassDrop.samples), 1)
self.assertEqual(getattr(self.TransformerClassDrop.samples[0], 'name'),
'a_update')
def test_multiple_publisher(self):
self.pipeline_cfg[0]['publishers'] = ['test://', 'new://']
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
@ -621,7 +589,7 @@ class TestPipeline(test.BaseTestCase):
'a_update')
def test_multiple_publisher_isolation(self):
self.pipeline_cfg[0]['publishers'] = ['except://', 'new://']
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -633,7 +601,7 @@ class TestPipeline(test.BaseTestCase):
'a_update')
def test_multiple_counter_pipeline(self):
self.pipeline_cfg[0]['counters'] = ['a', 'b']
self._set_pipeline_cfg('counters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -657,7 +625,7 @@ class TestPipeline(test.BaseTestCase):
def test_flush_pipeline_cache(self):
CACHE_SIZE = 10
self.pipeline_cfg[0]['transformers'].extend([
extra_transformer_cfg = [
{
'name': 'cache',
'parameters': {
@ -670,8 +638,9 @@ class TestPipeline(test.BaseTestCase):
{
'append_name': '_new'
}
}, ]
)
},
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -688,12 +657,12 @@ class TestPipeline(test.BaseTestCase):
pipe.publish_sample(None, self.test_counter)
pipe.flush(None)
self.assertEqual(len(publisher.samples), CACHE_SIZE)
self.assertTrue(getattr(publisher.samples[0], 'name')
== 'a_update_new')
self.assertEqual(getattr(publisher.samples[0], 'name'),
'a_update_new')
def test_flush_pipeline_cache_multiple_counter(self):
CACHE_SIZE = 3
self.pipeline_cfg[0]['transformers'].extend([
extra_transformer_cfg = [
{
'name': 'cache',
'parameters': {
@ -706,9 +675,10 @@ class TestPipeline(test.BaseTestCase):
{
'append_name': '_new'
}
}, ]
)
self.pipeline_cfg[0]['counters'] = ['a', 'b']
},
]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
self._set_pipeline_cfg('counters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
@ -738,10 +708,11 @@ class TestPipeline(test.BaseTestCase):
'b_update_new')
def test_flush_pipeline_cache_before_publisher(self):
self.pipeline_cfg[0]['transformers'].append({
extra_transformer_cfg = [{
'name': 'cache',
'parameters': {}
})
}]
self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
@ -785,15 +756,15 @@ class TestPipeline(test.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 1)
self.assertTrue(len(self.TransformerClass.samples) == 1)
self.assertEqual(len(self.TransformerClass.samples), 1)
self.assertEqual(getattr(publisher.samples[0], "name"),
'a:b_update')
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
== 'a:b')
self.assertEqual(getattr(self.TransformerClass.samples[0], "name"),
'a:b')
def test_global_unit_conversion(self):
scale = 'volume / ((10**6) * 60)'
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'unit_conversion',
'parameters': {
@ -804,7 +775,8 @@ class TestPipeline(test.BaseTestCase):
}
},
]
self.pipeline_cfg[0]['counters'] = ['cpu']
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
counters = [
sample.Sample(
name='cpu',
@ -835,7 +807,7 @@ class TestPipeline(test.BaseTestCase):
self.assertEqual(getattr(cpu_mins, 'volume'), 20)
def test_unit_identified_source_unit_conversion(self):
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'unit_conversion',
'parameters': {
@ -845,8 +817,9 @@ class TestPipeline(test.BaseTestCase):
}
},
]
self.pipeline_cfg[0]['counters'] = ['core_temperature',
'ambient_temperature']
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['core_temperature',
'ambient_temperature'])
counters = [
sample.Sample(
name='core_temperature',
@ -894,7 +867,7 @@ class TestPipeline(test.BaseTestCase):
s = "(resource_metadata.user_metadata.autoscaling_weight or 1.0)" \
"* (resource_metadata.non.existent or 1.0)" \
"* (100.0 / (10**9 * (resource_metadata.cpu_number or 1)))"
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
@ -906,7 +879,8 @@ class TestPipeline(test.BaseTestCase):
}
},
]
self.pipeline_cfg[0]['counters'] = ['cpu']
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
now = timeutils.utcnow()
later = now + datetime.timedelta(minutes=offset)
um = {'autoscaling_weight': weight} if weight else {}
@ -1017,7 +991,7 @@ class TestPipeline(test.BaseTestCase):
def test_rate_of_change_no_predecessor(self):
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
@ -1029,7 +1003,8 @@ class TestPipeline(test.BaseTestCase):
}
},
]
self.pipeline_cfg[0]['counters'] = ['cpu']
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
now = timeutils.utcnow()
counters = [
sample.Sample(
@ -1057,7 +1032,7 @@ class TestPipeline(test.BaseTestCase):
def test_resources(self):
resources = ['test1://', 'test2://']
self.pipeline_cfg[0]['resources'] = resources
self._set_pipeline_cfg('resources', resources)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
self.assertEqual(pipeline_manager.pipelines[0].resources,
@ -1116,7 +1091,7 @@ class TestPipeline(test.BaseTestCase):
'unit': '(B|request)'}
map_to = {'name': 'disk.\\1.\\2.rate',
'unit': '\\1/s'}
self.pipeline_cfg[0]['transformers'] = [
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
@ -1130,51 +1105,12 @@ class TestPipeline(test.BaseTestCase):
},
},
]
self.pipeline_cfg[0]['counters'] = ['disk.read.bytes',
'disk.write.requests']
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['disk.read.bytes',
'disk.write.requests'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
meters = ('disk.read.bytes', 'disk.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_mapping(pipe, meters, units)
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
meters, units):
with open('etc/ceilometer/pipeline.yaml') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
for p in pipeline_cfg:
p['publishers'] = ['test://']
pipeline_manager = pipeline.PipelineManager(pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
def test_rate_of_change_boilerplate_disk_read_cfg(self):
meters = ('disk.read.bytes', 'disk.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_disk_write_cfg(self):
meters = ('disk.write.bytes', 'disk.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
meters = ('network.incoming.bytes', 'network.incoming.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)

View File

@ -0,0 +1,249 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2014 Red Hat, Inc
#
# Author: Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from ceilometer import pipeline
from ceilometer import sample
from ceilometer.tests import pipeline_base
class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _setup_pipeline_cfg(self):
source = {'name': 'test_source',
'interval': 5,
'counters': ['a'],
'resources': [],
'sinks': ['test_sink']}
sink = {'name': 'test_sink',
'transformers': [{'name': 'update', 'parameters': {}}],
'publishers': ['test://']}
self.pipeline_cfg = {'sources': [source], 'sinks': [sink]}
def _augment_pipeline_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'counters': ['b'],
'resources': [],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['new'],
})
def _break_pipeline_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'counters': ['b'],
'resources': [],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['except'],
})
def _set_pipeline_cfg(self, field, value):
if field in self.pipeline_cfg['sources'][0]:
self.pipeline_cfg['sources'][0][field] = value
else:
self.pipeline_cfg['sinks'][0][field] = value
def _extend_pipeline_cfg(self, field, value):
if field in self.pipeline_cfg['sources'][0]:
self.pipeline_cfg['sources'][0][field].extend(value)
else:
self.pipeline_cfg['sinks'][0][field].extend(value)
def _unset_pipeline_cfg(self, field):
if field in self.pipeline_cfg['sources'][0]:
del self.pipeline_cfg['sources'][0][field]
else:
del self.pipeline_cfg['sinks'][0][field]
def test_source_no_sink(self):
del self.pipeline_cfg['sinks']
self._exception_create_pipelinemanager()
def test_source_dangling_sink(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'counters': ['b'],
'resources': [],
'sinks': ['second_sink']
})
self._exception_create_pipelinemanager()
def test_sink_no_source(self):
del self.pipeline_cfg['sources']
self._exception_create_pipelinemanager()
def test_source_with_multiple_sinks(self):
counter_cfg = ['a', 'b']
self._set_pipeline_cfg('counters', counter_cfg)
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['new'],
})
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.test_counter = sample.Sample(
name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.assertEqual(len(pipeline_manager.pipelines), 2)
self.assertEqual(str(pipeline_manager.pipelines[0]),
'test_source:test_sink')
self.assertEqual(str(pipeline_manager.pipelines[1]),
'test_source:second_sink')
test_publisher = pipeline_manager.pipelines[0].publishers[0]
new_publisher = pipeline_manager.pipelines[1].publishers[0]
for publisher, sfx in [(test_publisher, '_update'),
(new_publisher, '_new')]:
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(publisher.calls, 2)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a' + sfx)
self.assertEqual(getattr(publisher.samples[1], "name"), 'b' + sfx)
def test_multiple_sources_with_single_sink(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
'interval': 5,
'counters': ['b'],
'resources': [],
'sinks': ['test_sink']
})
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.test_counter = sample.Sample(
name='b',
type=self.test_counter.type,
volume=self.test_counter.volume,
unit=self.test_counter.unit,
user_id=self.test_counter.user_id,
project_id=self.test_counter.project_id,
resource_id=self.test_counter.resource_id,
timestamp=self.test_counter.timestamp,
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
p([self.test_counter])
self.assertEqual(len(pipeline_manager.pipelines), 2)
self.assertEqual(str(pipeline_manager.pipelines[0]),
'test_source:test_sink')
self.assertEqual(str(pipeline_manager.pipelines[1]),
'second_source:test_sink')
test_publisher = pipeline_manager.pipelines[0].publishers[0]
another_publisher = pipeline_manager.pipelines[1].publishers[0]
for publisher in [test_publisher, another_publisher]:
self.assertEqual(len(publisher.samples), 2)
self.assertEqual(publisher.calls, 2)
self.assertEqual(getattr(publisher.samples[0], "name"), 'a_update')
self.assertEqual(getattr(publisher.samples[1], "name"), 'b_update')
transformed_samples = self.TransformerClass.samples
self.assertEqual(len(transformed_samples), 2)
self.assertEqual([getattr(s, 'name') for s in transformed_samples],
['a', 'b'])
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
meters, units):
with open('etc/ceilometer/pipeline.yaml') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
for s in pipeline_cfg['sinks']:
s['publishers'] = ['test://']
pipeline_manager = pipeline.PipelineManager(pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
def test_rate_of_change_boilerplate_disk_read_cfg(self):
meters = ('disk.read.bytes', 'disk.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_disk_write_cfg(self):
meters = ('disk.write.bytes', 'disk.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
meters = ('network.incoming.bytes', 'network.incoming.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)

View File

@ -0,0 +1,115 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2014 Red Hat, Inc
#
# Author: Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from ceilometer import pipeline
from ceilometer.tests import pipeline_base
class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase):
def _setup_pipeline_cfg(self):
self.pipeline_cfg = [{
'name': 'test_pipeline',
'interval': 5,
'counters': ['a'],
'transformers': [
{'name': 'update',
'parameters': {}}
],
'publishers': ['test://'],
}, ]
def _augment_pipeline_cfg(self):
self.pipeline_cfg.append({
'name': 'second_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['new'],
})
def _break_pipeline_cfg(self):
self.pipeline_cfg.append({
'name': 'second_pipeline',
'interval': 5,
'counters': ['b'],
'transformers': [{
'name': 'update',
'parameters':
{
'append_name': '_new',
}
}],
'publishers': ['except'],
})
def _set_pipeline_cfg(self, field, value):
self.pipeline_cfg[0][field] = value
def _extend_pipeline_cfg(self, field, value):
self.pipeline_cfg[0][field].extend(value)
def _unset_pipeline_cfg(self, field):
del self.pipeline_cfg[0][field]
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
meters, units):
with open('etc/ceilometer/deprecated_pipeline.yaml') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
for p in pipeline_cfg:
p['publishers'] = ['test://']
pipeline_manager = pipeline.PipelineManager(pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[index]
self._do_test_rate_of_change_mapping(pipe, meters, units)
def test_rate_of_change_boilerplate_disk_read_cfg(self):
meters = ('disk.read.bytes', 'disk.read.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_disk_write_cfg(self):
meters = ('disk.write.bytes', 'disk.write.requests')
units = ('B', 'request')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2,
meters,
units)
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
meters = ('network.incoming.bytes', 'network.incoming.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
units = ('B', 'packet')
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
meters,
units)

View File

@ -273,16 +273,27 @@ A sample configuration file can be found in `ceilometer.conf.sample`_.
Pipelines
=========
Pipelines describe chains of handlers, which can be transformers and/or
publishers.
Pipelines describe a coupling between sources of samples and the
corresponding sinks for transformation and publication of these
data.
The chain can start with a transformer, which is responsible for converting
the data, coming from the pollsters or notification handlers (for further
information see the :ref:`polling` section), to the required format, which
can mean dropping some parts of the sample, doing aggregation, changing
field or deriving samples for secondary meters, like in case of *cpu_util*,
see the example below, in the configuration details. The pipeline can contain
multiple transformers or none at all.
A source is a producer of samples, in effect a set of pollsters and/or
notification handlers emitting samples for a set of matching meters.
Each source configuration encapsulates meter name matching, polling
interval determination, optional resource enumeration or discovery,
and mapping to one or more sinks for publication.
A sink on the other hand is a consumer of samples, providing logic for
the transformation and publication of samples emitted from related sources.
Each sink configuration is concerned `only` with the transformation rules
and publication conduits for samples.
In effect, a sink describes a chain of handlers. The chain starts with
zero or more transformers and ends with one or more publishers. The first
transformer in the chain is passed samples from the corresponding source,
takes some action such as deriving rate of change, performing unit conversion,
or aggregating, before passing the modified sample to next step.
The chains end with one or more publishers. This component makes it possible
to persist the data into storage through the message bus or to send it to one
@ -300,21 +311,30 @@ ceilometer.conf. Multiple chains can be defined in one configuration file.
The chain definition looks like the following::
---
-
name: 'name of the pipeline'
interval: 'how often should the samples be injected into the pipeline'
meters:
- 'meter filter'
transformers: 'definition of transformers'
publishers:
- 'list of publishers'
sources:
- name: 'source name'
interval: 'how often should the samples be injected into the pipeline'
meters:
- 'meter filter'
resources:
- 'list of resource URLs'
sinks
- 'sink name'
sinks:
- name: 'sink name'
transformers: 'definition of transformers'
publishers:
- 'list of publishers'
The *interval* should be defined in seconds.
The *interval* parameter in the sources section should be defined in seconds. It
determines the cadence of sample injection into the pipeline, where samples are
produced under the direct control of an agent, i.e. via a polling cycle as opposed
to incoming notifications.
There are several ways to define the list of meters for a pipeline. The list
of valid meters can be found in the :ref:`measurements` section. There is
There are several ways to define the list of meters for a pipeline source. The
list of valid meters can be found in the :ref:`measurements` section. There is
a possibility to define all the meters, or just included or excluded meters,
with which a pipeline should operate:
with which a source should operate:
* To include all meters, use the '*' wildcard symbol.
* To define the list of meters, use either of the following:
@ -337,8 +357,13 @@ The above definition methods can be used in the following combinations:
pipeline. Wildcard and included meters cannot co-exist in the same
pipeline definition section.
The *transformers* section provides the possibility to add a list of
transformer definitions. The names of the transformers should be the same
The optional *resources* section of a pipeline source allows a static
list of resource URLs to be to be configured. An amalgamated list of all
statically configured resources for a set of pipeline sources with a
common interval is passed to individual pollsters matching those pipelines.
The *transformers* section of a pipeline sink provides the possibility to add a
list of transformer definitions. The names of the transformers should be the same
as the names of the related extensions in setup.cfg.
The definition of transformers can contain the following fields::
@ -392,4 +417,4 @@ setup.cfg.
The default configuration can be found in `pipeline.yaml`_.
.. _pipeline.yaml: https://git.openstack.org/cgit/openstack/ceilometer/tree/etc/ceilometer/pipeline.yaml
.. _pipeline.yaml: https://git.openstack.org/cgit/openstack/ceilometer/tree/etc/ceilometer/pipeline.yaml

View File

@ -0,0 +1,69 @@
---
-
name: meter_pipeline
interval: 600
meters:
- "*"
resources:
transformers:
publishers:
- rpc://
-
name: cpu_pipeline
interval: 600
meters:
- "cpu"
transformers:
- name: "rate_of_change"
parameters:
target:
name: "cpu_util"
unit: "%"
type: "gauge"
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers:
- rpc://
-
name: disk_pipeline
interval: 600
meters:
- "disk.read.bytes"
- "disk.read.requests"
- "disk.write.bytes"
- "disk.write.requests"
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "disk\\.(read|write)\\.(bytes|requests)"
unit: "(B|request)"
target:
map_to:
name: "disk.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://
-
name: network_pipeline
interval: 600
meters:
- "network.incoming.bytes"
- "network.incoming.packets"
- "network.outgoing.bytes"
- "network.outgoing.packets"
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
unit: "(B|packet)"
target:
map_to:
name: "network.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://

View File

@ -1,69 +1,78 @@
---
-
name: meter_pipeline
interval: 600
meters:
- "*"
resources:
transformers:
publishers:
- rpc://
-
name: cpu_pipeline
interval: 600
meters:
- "cpu"
transformers:
- name: "rate_of_change"
parameters:
target:
name: "cpu_util"
unit: "%"
type: "gauge"
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers:
- rpc://
-
name: disk_pipeline
interval: 600
meters:
- "disk.read.bytes"
- "disk.read.requests"
- "disk.write.bytes"
- "disk.write.requests"
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "disk\\.(read|write)\\.(bytes|requests)"
unit: "(B|request)"
target:
map_to:
name: "disk.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://
-
name: network_pipeline
interval: 600
meters:
- "network.incoming.bytes"
- "network.incoming.packets"
- "network.outgoing.bytes"
- "network.outgoing.packets"
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
unit: "(B|packet)"
target:
map_to:
name: "network.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://
sources:
- name: meter_source
interval: 600
meters:
- "*"
sinks:
- meter_sink
- name: cpu_source
interval: 600
meters:
- "cpu"
sinks:
- cpu_sink
- name: disk_source
interval: 600
meters:
- "disk.read.bytes"
- "disk.read.requests"
- "disk.write.bytes"
- "disk.write.requests"
sinks:
- disk_sink
- name: network_source
interval: 600
meters:
- "network.incoming.bytes"
- "network.incoming.packets"
- "network.outgoing.bytes"
- "network.outgoing.packets"
sinks:
- network_sink
sinks:
- name: meter_sink
transformers:
publishers:
- rpc://
- name: cpu_sink
transformers:
- name: "rate_of_change"
parameters:
target:
name: "cpu_util"
unit: "%"
type: "gauge"
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
publishers:
- rpc://
- name: disk_sink
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "disk\\.(read|write)\\.(bytes|requests)"
unit: "(B|request)"
target:
map_to:
name: "disk.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://
- name: network_sink
transformers:
- name: "rate_of_change"
parameters:
source:
map_from:
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
unit: "(B|packet)"
target:
map_to:
name: "network.\\1.\\2.rate"
unit: "\\1/s"
type: "gauge"
publishers:
- rpc://