diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index b7a3f4814..5cd4619c7 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -33,6 +33,7 @@ from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer import transformer from ceilometer.transformer import accumulator +from ceilometer.transformer import arithmetic from ceilometer.transformer import conversions @@ -57,6 +58,7 @@ class BasePipelineTestCase(test.BaseTestCase): 'aggregator': conversions.AggregatorTransformer, 'unit_conversion': conversions.ScalingTransformer, 'rate_of_change': conversions.RateOfChangeTransformer, + 'arithmetic': arithmetic.ArithmeticTransformer, } if name in class_name_ext: @@ -1503,3 +1505,215 @@ class BasePipelineTestCase(test.BaseTestCase): self.assertEqual(42, getattr(publisher.samples[0], 'volume')) self.assertEqual("test_resource", getattr(publisher.samples[0], 'resource_id')) + + def _do_test_arithmetic_expr_parse(self, expr, expected): + actual = arithmetic.ArithmeticTransformer.parse_expr(expr) + self.assertEqual(expected, actual) + + def test_arithmetic_expr_parse(self): + expr = '$(cpu) + $(cpu.util)' + expected = ('cpu.volume + _cpu_util_ESC.volume', + { + 'cpu': 'cpu', + 'cpu.util': '_cpu_util_ESC' + }) + self._do_test_arithmetic_expr_parse(expr, expected) + + def test_arithmetic_expr_parse_parameter(self): + expr = '$(cpu) + $(cpu.util).resource_metadata' + expected = ('cpu.volume + _cpu_util_ESC.resource_metadata', + { + 'cpu': 'cpu', + 'cpu.util': '_cpu_util_ESC' + }) + self._do_test_arithmetic_expr_parse(expr, expected) + + def test_arithmetic_expr_parse_reserved_keyword(self): + expr = '$(class) + $(cpu.util)' + expected = ('_class_ESC.volume + _cpu_util_ESC.volume', + { + 'class': '_class_ESC', + 'cpu.util': '_cpu_util_ESC' + }) + self._do_test_arithmetic_expr_parse(expr, expected) + + def test_arithmetic_expr_parse_already_escaped(self): + expr = '$(class) + $(_class_ESC)' + expected = ('_class_ESC.volume + __class_ESC_ESC.volume', + { + 'class': '_class_ESC', + '_class_ESC': '__class_ESC_ESC' + }) + self._do_test_arithmetic_expr_parse(expr, expected) + + def _do_test_arithmetic(self, expression, scenario, expected): + transformer_cfg = [ + { + 'name': 'arithmetic', + 'parameters': { + 'target': {'name': 'new_meter', + 'unit': '%', + 'type': sample.TYPE_GAUGE, + 'expr': expression}, + } + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', + list(set(s['name'] for s in scenario))) + counters = [] + test_resources = ['test_resource1', 'test_resource2'] + for resource_id in test_resources: + for s in scenario: + counters.append(sample.Sample( + name=s['name'], + type=sample.TYPE_CUMULATIVE, + volume=s['volume'], + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id=resource_id, + timestamp=timeutils.utcnow().isoformat(), + resource_metadata=s.get('metadata') + )) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, counters) + publisher = pipeline_manager.pipelines[0].publishers[0] + expected_len = len(test_resources) * len(expected) + self.assertEqual(0, len(publisher.samples)) + pipe.flush(None) + self.assertEqual(expected_len, len(publisher.samples)) + + # bucket samples by resource first + samples_by_resource = dict((r, []) for r in test_resources) + for s in publisher.samples: + samples_by_resource[s.resource_id].append(s) + + for resource_id in samples_by_resource: + self.assertEqual(len(expected), + len(samples_by_resource[resource_id])) + for i, s in enumerate(samples_by_resource[resource_id]): + self.assertEqual('new_meter', getattr(s, 'name')) + self.assertEqual(resource_id, getattr(s, 'resource_id')) + self.assertEqual('%', getattr(s, 'unit')) + self.assertEqual(sample.TYPE_GAUGE, getattr(s, 'type')) + self.assertEqual(expected[i], getattr(s, 'volume')) + + def test_arithmetic_transformer(self): + expression = '100.0 * $(memory.usage) / $(memory)' + scenario = [ + dict(name='memory', volume=1024.0), + dict(name='memory.usage', volume=512.0), + ] + expected = [50.0] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_expr_empty(self): + expression = '' + scenario = [ + dict(name='memory', volume=1024.0), + dict(name='memory.usage', volume=512.0), + ] + expected = [] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_expr_misconfigured(self): + expression = '512.0 * 3' + scenario = [ + dict(name='memory', volume=1024.0), + dict(name='memory.usage', volume=512.0), + ] + expected = [] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_nan(self): + expression = 'float(\'nan\') * $(memory.usage) / $(memory)' + scenario = [ + dict(name='memory', volume=1024.0), + dict(name='memory.usage', volume=512.0), + ] + expected = [] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_exception(self): + expression = '$(memory) / 0' + scenario = [ + dict(name='memory', volume=1024.0), + dict(name='memory.usage', volume=512.0), + ] + expected = [] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_multiple_samples(self): + expression = '100.0 * $(memory.usage) / $(memory)' + scenario = [ + dict(name='memory', volume=2048.0), + dict(name='memory.usage', volume=512.0), + dict(name='memory', volume=1024.0), + ] + expected = [50.0] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_missing(self): + expression = '100.0 * $(memory.usage) / $(memory)' + scenario = [dict(name='memory.usage', volume=512.0)] + expected = [] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_more_than_needed(self): + expression = '100.0 * $(memory.usage) / $(memory)' + scenario = [ + dict(name='memory', volume=1024.0), + dict(name='memory.usage', volume=512.0), + dict(name='cpu_util', volume=90.0), + ] + expected = [50.0] + self._do_test_arithmetic(expression, scenario, expected) + + def test_arithmetic_transformer_cache_cleared(self): + transformer_cfg = [ + { + 'name': 'arithmetic', + 'parameters': { + 'target': {'name': 'new_meter', + 'expr': '$(memory.usage) + 2'} + } + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['memory.usage']) + counter = sample.Sample( + name='memory.usage', + type=sample.TYPE_GAUGE, + volume=1024.0, + unit='MB', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata=None + ) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, [counter]) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(0, len(publisher.samples)) + pipe.flush(None) + self.assertEqual(1, len(publisher.samples)) + self.assertEqual(1026.0, publisher.samples[0].volume) + + pipe.flush(None) + self.assertEqual(1, len(publisher.samples)) + + counter.volume = 2048.0 + pipe.publish_samples(None, [counter]) + pipe.flush(None) + self.assertEqual(2, len(publisher.samples)) + self.assertEqual(2050.0, publisher.samples[1].volume) diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py index e747a322a..0f20b6f77 100644 --- a/ceilometer/transformer/__init__.py +++ b/ceilometer/transformer/__init__.py @@ -16,6 +16,7 @@ # under the License. import abc +import collections import six from stevedore import extension @@ -66,3 +67,27 @@ class TransformerBase(object): :param context: Passed from the data collector. """ return [] + + +class Namespace(object): + """Encapsulates the namespace. + + Encapsulation is done by wrapping the evaluation of the configured rule. + This allows nested dicts to be accessed in the attribute style, + and missing attributes to yield false when used in a boolean expression. + """ + def __init__(self, seed): + self.__dict__ = collections.defaultdict(lambda: Namespace({})) + self.__dict__.update(seed) + for k, v in self.__dict__.iteritems(): + if isinstance(v, dict): + self.__dict__[k] = Namespace(v) + + def __getattr__(self, attr): + return self.__dict__[attr] + + def __getitem__(self, key): + return self.__dict__[key] + + def __nonzero__(self): + return len(self.__dict__) > 0 diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py new file mode 100644 index 000000000..2eeac49f9 --- /dev/null +++ b/ceilometer/transformer/arithmetic.py @@ -0,0 +1,155 @@ +# +# Copyright 2014 Red Hat, Inc +# +# Author: Nejc Saje +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import keyword +import math +import re + +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log +from ceilometer import sample +from ceilometer import transformer + +LOG = log.getLogger(__name__) + + +class ArithmeticTransformer(transformer.TransformerBase): + """Multi meter arithmetic transformer. + + Transformer that performs arithmetic operations + over one or more meters and/or their metadata. + """ + + meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)') + + def __init__(self, target=None, **kwargs): + super(ArithmeticTransformer, self).__init__(**kwargs) + target = target or {} + self.target = target + self.expr = target.get('expr', '') + self.expr_escaped, self.escaped_names = self.parse_expr(self.expr) + self.required_meters = self.escaped_names.values() + self.misconfigured = len(self.required_meters) == 0 + if not self.misconfigured: + self.reference_meter = self.required_meters[0] + # convert to set for more efficient contains operation + self.required_meters = set(self.required_meters) + self.cache = collections.defaultdict(dict) + self.latest_timestamp = None + else: + LOG.warn(_('Arithmetic transformer must use at least one' + ' meter in expression \'%s\''), self.expr) + + def _update_cache(self, _sample): + """Update the cache with the latest sample.""" + escaped_name = self.escaped_names.get(_sample.name, '') + if escaped_name not in self.required_meters: + return + self.cache[_sample.resource_id][escaped_name] = _sample + + def _check_requirements(self, resource_id): + """Check if all the required meters are available in the cache.""" + return len(self.cache[resource_id]) == len(self.required_meters) + + def _calculate(self, resource_id): + """Evaluate the expression and return a new sample if successful.""" + ns_dict = dict((m, s.as_dict()) for m, s + in self.cache[resource_id].iteritems()) + ns = transformer.Namespace(ns_dict) + try: + new_volume = eval(self.expr_escaped, {}, ns) + if math.isnan(new_volume): + raise ArithmeticError(_('Expression evaluated to ' + 'a NaN value!')) + + reference_sample = self.cache[resource_id][self.reference_meter] + return sample.Sample( + name=self.target.get('name', reference_sample.name), + unit=self.target.get('unit', reference_sample.unit), + type=self.target.get('type', reference_sample.type), + volume=float(new_volume), + user_id=reference_sample.user_id, + project_id=reference_sample.project_id, + resource_id=reference_sample.resource_id, + timestamp=self.latest_timestamp, + resource_metadata=reference_sample.resource_metadata + ) + except Exception as e: + LOG.warn(_('Unable to evaluate expression %(expr)s: %(exc)s'), + {'expr': self.expr, 'exc': str(e)}) + + def handle_sample(self, context, _sample): + self._update_cache(_sample) + self.latest_timestamp = _sample.timestamp + + def flush(self, context): + new_samples = [] + if not self.misconfigured: + for resource_id in self.cache: + if self._check_requirements(resource_id): + new_samples.append(self._calculate(resource_id)) + else: + LOG.warn(_('Unable to perform calculation, not all of ' + '{%s} are present'), + ', '.join(self.required_meters)) + self.cache.clear() + return new_samples + + @classmethod + def parse_expr(cls, expr): + """Transforms meter names in the expression into valid identifiers. + + :param expr: unescaped expression + :return: A tuple of the escaped expression and a dict representing + the translation of meter names into Python identifiers + """ + + class Replacer(): + """Replaces matched meter names with escaped names. + + If the meter name is not followed by parameter access in the + expression, it defaults to accessing the 'volume' parameter. + """ + + def __init__(self, original_expr): + self.original_expr = original_expr + self.escaped_map = {} + + def __call__(self, match): + meter_name = match.group(1) + escaped_name = self.escape(meter_name) + self.escaped_map[meter_name] = escaped_name + + if (match.end(0) == len(self.original_expr) or + self.original_expr[match.end(0)] != '.'): + escaped_name += '.volume' + return escaped_name + + @staticmethod + def escape(name): + has_dot = '.' in name + if has_dot: + name = name.replace('.', '_') + + if has_dot or name.endswith('ESC') or name in keyword.kwlist: + name = "_" + name + '_ESC' + return name + + replacer = Replacer(expr) + expr = re.sub(cls.meter_name_re, replacer, expr) + return expr, replacer.escaped_map diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 8954608e3..7e1cbb6f1 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -15,7 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import re from ceilometer.openstack.common.gettextutils import _ @@ -27,30 +26,6 @@ from ceilometer import transformer LOG = log.getLogger(__name__) -class Namespace(object): - """Encapsulates the namespace. - - Encapsulation is going by wrapping the evaluation of the configured scale - factor. This allows nested dicts to be accessed in the attribute style, - and missing attributes to yield false when used in a boolean expression. - """ - def __init__(self, seed): - self.__dict__ = collections.defaultdict(lambda: Namespace({})) - self.__dict__.update(seed) - for k, v in self.__dict__.iteritems(): - if isinstance(v, dict): - self.__dict__[k] = Namespace(v) - - def __getattr__(self, attr): - return self.__dict__[attr] - - def __getitem__(self, key): - return self.__dict__[key] - - def __nonzero__(self): - return len(self.__dict__) > 0 - - class ScalingTransformer(transformer.TransformerBase): """Transformer to apply a scaling conversion.""" @@ -78,7 +53,7 @@ class ScalingTransformer(transformer.TransformerBase): Either a straight multiplicative factor or else a string to be eval'd. """ - ns = Namespace(s.as_dict()) + ns = transformer.Namespace(s.as_dict()) scale = self.scale return ((eval(scale, {}, ns) if isinstance(scale, basestring)