From 943fafb92960083445037ec4879ddebb8da1f4aa Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Tue, 14 Jan 2014 12:29:06 +0000 Subject: [PATCH] Name and unit mapping for rate_of_change transformer Precursor-to: BP rates-derived-from-cumulative Adds support for regex-based name and unit mapping to the rate_of_change transformer. Regular expression back-references are key to simplifying potentially repetitive config. This avoids a explosion of pipelines to apply conversions to classes of similar meters. For example to allow the following sample derivations: * disk.read.requests (request) -> disk.read.requests.rate (request/s) * disk.write.requests (request) -> disk.write.requests.rate (request/s) * disk.read.bytes (B) -> disk.read.bytes.rate (B/s) * disk.write.bytes (B) -> disk.write.bytes.rate (B/s) to be mapped via a single transformer config: transformers: - name: "rate_of_change" parameters: source: map_from: name: "disk\\.(read|write)\\.(bytes|requests)" unit: "(B|request)" target: map_to: name: "disk.\\1.\\2.rate" unit: "\\1/s" type: "gauge" Change-Id: I94b6bfbef538a8b067c3c5ff65066505a199bc5a --- ceilometer/tests/test_pipeline.py | 66 +++++++++++++++++++++++++++ ceilometer/transformer/conversions.py | 30 +++++++++--- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/ceilometer/tests/test_pipeline.py b/ceilometer/tests/test_pipeline.py index f706a4a8c..6e1f13c5a 100644 --- a/ceilometer/tests/test_pipeline.py +++ b/ceilometer/tests/test_pipeline.py @@ -1067,3 +1067,69 @@ class TestPipeline(test.BaseTestCase): self.transformer_manager) self.assertEqual(len(pipeline_manager.pipelines[0].resources), 0) + + def test_rate_of_change_mapping(self): + map_from = {'name': 'disk\\.(read|write)\\.(bytes|requests)', + 'unit': '(B|request)'} + map_to = {'name': 'disk.\\1.\\2.rate', + 'unit': '\\1/s'} + self.pipeline_cfg[0]['transformers'] = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': { + 'map_from': map_from + }, + 'target': { + 'map_to': map_to, + 'type': sample.TYPE_GAUGE + }, + }, + }, + ] + self.pipeline_cfg[0]['counters'] = ['disk.read.bytes', + 'disk.write.requests'] + now = timeutils.utcnow() + base = 1000 + offset = 7 + rate = 42 + later = now + datetime.timedelta(minutes=offset) + counters = [] + for v, ts in [(base, now.isoformat()), + (base + (offset * 60 * rate), later.isoformat())]: + for n, u, r in [('disk.read.bytes', 'B', 'resource1'), + ('disk.write.requests', 'request', 'resource2')]: + s = sample.Sample( + name=n, + type=sample.TYPE_CUMULATIVE, + volume=v, + unit=u, + user_id='test_user', + project_id='test_proj', + resource_id=r, + timestamp=ts, + resource_metadata={}, + ) + counters.append(s) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, counters) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.samples), 2) + pipe.flush(None) + self.assertEqual(len(publisher.samples), 2) + bps = publisher.samples[0] + self.assertEqual(getattr(bps, 'name'), 'disk.read.bytes.rate') + self.assertEqual(getattr(bps, 'resource_id'), 'resource1') + self.assertEqual(getattr(bps, 'unit'), 'B/s') + self.assertEqual(getattr(bps, 'type'), sample.TYPE_GAUGE) + self.assertEqual(getattr(bps, 'volume'), rate) + rps = publisher.samples[1] + self.assertEqual(getattr(rps, 'name'), 'disk.write.requests.rate') + self.assertEqual(getattr(rps, 'resource_id'), 'resource2') + self.assertEqual(getattr(rps, 'unit'), 'request/s') + self.assertEqual(getattr(rps, 'type'), sample.TYPE_GAUGE) + self.assertEqual(getattr(rps, 'volume'), rate) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 5ced6d157..3a7d079f8 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -17,6 +17,7 @@ # under the License. import collections +import re from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common import log @@ -64,31 +65,45 @@ class ScalingTransformer(transformer.TransformerBase): """ self.source = source self.target = target + self.scale = target.get('scale') LOG.debug(_('scaling conversion transformer with source:' ' %(source)s target: %(target)s:') % {'source': source, 'target': target}) super(ScalingTransformer, self).__init__(**kwargs) - @staticmethod - def _scale(s, scale): + def _scale(self, s): """Apply the scaling factor (either a straight multiplicative factor or else a string to be eval'd). """ ns = Namespace(s.as_dict()) + scale = self.scale return ((eval(scale, {}, ns) if isinstance(scale, basestring) else s.volume * scale) if scale else s.volume) + def _map(self, s, attr): + """Apply the name or unit mapping if configured. + """ + mapped = None + from_ = self.source.get('map_from') + to_ = self.target.get('map_to') + if from_ and to_: + if from_.get(attr) and to_.get(attr): + try: + mapped = re.sub(from_[attr], to_[attr], getattr(s, attr)) + except Exception: + pass + return mapped or self.target.get(attr, getattr(s, attr)) + def _convert(self, s, growth=1): """Transform the appropriate sample fields. """ - scale = self.target.get('scale') return sample.Sample( - name=self.target.get('name', s.name), - unit=self.target.get('unit', s.unit), + name=self._map(s, 'name'), + unit=self._map(s, 'unit'), type=self.target.get('type', s.type), - volume=self._scale(s, scale) * growth, + volume=self._scale(s) * growth, user_id=s.user_id, project_id=s.project_id, resource_id=s.resource_id, @@ -115,8 +130,9 @@ class RateOfChangeTransformer(ScalingTransformer): def __init__(self, **kwargs): """Initialize transformer with configured parameters. """ - self.cache = {} super(RateOfChangeTransformer, self).__init__(**kwargs) + self.cache = {} + self.scale = self.scale or '1' def handle_sample(self, context, s): """Handle a sample, converting if necessary."""