Multi meter arithmetic transformer

New pipeline transformer that enables us to perform arithmetic calculations
over one more meters and/or their metadata, for example:

    memory_util = 100 * memory.usage / memory .

The calculation is limited to meters with the same interval.

Example configuration:

    - name: "arithmetic"
      parameters:
        target:
          name: "memory_util"
          unit: "%"
          type: "gauge"
          expr: "100 * $(memory.usage) / $(memory)"

To demonstrate the use of metadata, here is the implementation of
a silly metric that shows average CPU time per core::

    - name: "arithmetic"
      parameters:
        target:
          name: "avg_cpu_per_core"
          unit: "ns"
          type: "cumulative"
          expr: "$(cpu) / ($(cpu).resource_metadata.cpu_number or 1)"

Expression evaluation gracefully handles NaNs and exceptions. In such
a case it does not create a new sample but only logs a warning.

DocImpact: add documentation about using this new transformer
Implements: blueprint arithmetic-transformer
Change-Id: I1b637e5b1d1bb15ed3c3d7ec758d2a684eaccf21
This commit is contained in:
Nejc Saje 2014-07-09 14:13:49 +02:00
parent 5bb4e2b22c
commit 971f9c85c1
4 changed files with 395 additions and 26 deletions

View File

@ -33,6 +33,7 @@ from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer import transformer
from ceilometer.transformer import accumulator
from ceilometer.transformer import arithmetic
from ceilometer.transformer import conversions
@ -57,6 +58,7 @@ class BasePipelineTestCase(test.BaseTestCase):
'aggregator': conversions.AggregatorTransformer,
'unit_conversion': conversions.ScalingTransformer,
'rate_of_change': conversions.RateOfChangeTransformer,
'arithmetic': arithmetic.ArithmeticTransformer,
}
if name in class_name_ext:
@ -1503,3 +1505,215 @@ class BasePipelineTestCase(test.BaseTestCase):
self.assertEqual(42, getattr(publisher.samples[0], 'volume'))
self.assertEqual("test_resource", getattr(publisher.samples[0],
'resource_id'))
def _do_test_arithmetic_expr_parse(self, expr, expected):
actual = arithmetic.ArithmeticTransformer.parse_expr(expr)
self.assertEqual(expected, actual)
def test_arithmetic_expr_parse(self):
expr = '$(cpu) + $(cpu.util)'
expected = ('cpu.volume + _cpu_util_ESC.volume',
{
'cpu': 'cpu',
'cpu.util': '_cpu_util_ESC'
})
self._do_test_arithmetic_expr_parse(expr, expected)
def test_arithmetic_expr_parse_parameter(self):
expr = '$(cpu) + $(cpu.util).resource_metadata'
expected = ('cpu.volume + _cpu_util_ESC.resource_metadata',
{
'cpu': 'cpu',
'cpu.util': '_cpu_util_ESC'
})
self._do_test_arithmetic_expr_parse(expr, expected)
def test_arithmetic_expr_parse_reserved_keyword(self):
expr = '$(class) + $(cpu.util)'
expected = ('_class_ESC.volume + _cpu_util_ESC.volume',
{
'class': '_class_ESC',
'cpu.util': '_cpu_util_ESC'
})
self._do_test_arithmetic_expr_parse(expr, expected)
def test_arithmetic_expr_parse_already_escaped(self):
expr = '$(class) + $(_class_ESC)'
expected = ('_class_ESC.volume + __class_ESC_ESC.volume',
{
'class': '_class_ESC',
'_class_ESC': '__class_ESC_ESC'
})
self._do_test_arithmetic_expr_parse(expr, expected)
def _do_test_arithmetic(self, expression, scenario, expected):
transformer_cfg = [
{
'name': 'arithmetic',
'parameters': {
'target': {'name': 'new_meter',
'unit': '%',
'type': sample.TYPE_GAUGE,
'expr': expression},
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters',
list(set(s['name'] for s in scenario)))
counters = []
test_resources = ['test_resource1', 'test_resource2']
for resource_id in test_resources:
for s in scenario:
counters.append(sample.Sample(
name=s['name'],
type=sample.TYPE_CUMULATIVE,
volume=s['volume'],
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id=resource_id,
timestamp=timeutils.utcnow().isoformat(),
resource_metadata=s.get('metadata')
))
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
expected_len = len(test_resources) * len(expected)
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(expected_len, len(publisher.samples))
# bucket samples by resource first
samples_by_resource = dict((r, []) for r in test_resources)
for s in publisher.samples:
samples_by_resource[s.resource_id].append(s)
for resource_id in samples_by_resource:
self.assertEqual(len(expected),
len(samples_by_resource[resource_id]))
for i, s in enumerate(samples_by_resource[resource_id]):
self.assertEqual('new_meter', getattr(s, 'name'))
self.assertEqual(resource_id, getattr(s, 'resource_id'))
self.assertEqual('%', getattr(s, 'unit'))
self.assertEqual(sample.TYPE_GAUGE, getattr(s, 'type'))
self.assertEqual(expected[i], getattr(s, 'volume'))
def test_arithmetic_transformer(self):
expression = '100.0 * $(memory.usage) / $(memory)'
scenario = [
dict(name='memory', volume=1024.0),
dict(name='memory.usage', volume=512.0),
]
expected = [50.0]
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_expr_empty(self):
expression = ''
scenario = [
dict(name='memory', volume=1024.0),
dict(name='memory.usage', volume=512.0),
]
expected = []
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_expr_misconfigured(self):
expression = '512.0 * 3'
scenario = [
dict(name='memory', volume=1024.0),
dict(name='memory.usage', volume=512.0),
]
expected = []
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_nan(self):
expression = 'float(\'nan\') * $(memory.usage) / $(memory)'
scenario = [
dict(name='memory', volume=1024.0),
dict(name='memory.usage', volume=512.0),
]
expected = []
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_exception(self):
expression = '$(memory) / 0'
scenario = [
dict(name='memory', volume=1024.0),
dict(name='memory.usage', volume=512.0),
]
expected = []
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_multiple_samples(self):
expression = '100.0 * $(memory.usage) / $(memory)'
scenario = [
dict(name='memory', volume=2048.0),
dict(name='memory.usage', volume=512.0),
dict(name='memory', volume=1024.0),
]
expected = [50.0]
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_missing(self):
expression = '100.0 * $(memory.usage) / $(memory)'
scenario = [dict(name='memory.usage', volume=512.0)]
expected = []
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_more_than_needed(self):
expression = '100.0 * $(memory.usage) / $(memory)'
scenario = [
dict(name='memory', volume=1024.0),
dict(name='memory.usage', volume=512.0),
dict(name='cpu_util', volume=90.0),
]
expected = [50.0]
self._do_test_arithmetic(expression, scenario, expected)
def test_arithmetic_transformer_cache_cleared(self):
transformer_cfg = [
{
'name': 'arithmetic',
'parameters': {
'target': {'name': 'new_meter',
'expr': '$(memory.usage) + 2'}
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['memory.usage'])
counter = sample.Sample(
name='memory.usage',
type=sample.TYPE_GAUGE,
volume=1024.0,
unit='MB',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata=None
)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, [counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1026.0, publisher.samples[0].volume)
pipe.flush(None)
self.assertEqual(1, len(publisher.samples))
counter.volume = 2048.0
pipe.publish_samples(None, [counter])
pipe.flush(None)
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2050.0, publisher.samples[1].volume)

View File

@ -16,6 +16,7 @@
# under the License.
import abc
import collections
import six
from stevedore import extension
@ -66,3 +67,27 @@ class TransformerBase(object):
:param context: Passed from the data collector.
"""
return []
class Namespace(object):
"""Encapsulates the namespace.
Encapsulation is done by wrapping the evaluation of the configured rule.
This allows nested dicts to be accessed in the attribute style,
and missing attributes to yield false when used in a boolean expression.
"""
def __init__(self, seed):
self.__dict__ = collections.defaultdict(lambda: Namespace({}))
self.__dict__.update(seed)
for k, v in self.__dict__.iteritems():
if isinstance(v, dict):
self.__dict__[k] = Namespace(v)
def __getattr__(self, attr):
return self.__dict__[attr]
def __getitem__(self, key):
return self.__dict__[key]
def __nonzero__(self):
return len(self.__dict__) > 0

View File

@ -0,0 +1,155 @@
#
# Copyright 2014 Red Hat, Inc
#
# Author: Nejc Saje <nsaje@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import keyword
import math
import re
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer import sample
from ceilometer import transformer
LOG = log.getLogger(__name__)
class ArithmeticTransformer(transformer.TransformerBase):
"""Multi meter arithmetic transformer.
Transformer that performs arithmetic operations
over one or more meters and/or their metadata.
"""
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs):
super(ArithmeticTransformer, self).__init__(**kwargs)
target = target or {}
self.target = target
self.expr = target.get('expr', '')
self.expr_escaped, self.escaped_names = self.parse_expr(self.expr)
self.required_meters = self.escaped_names.values()
self.misconfigured = len(self.required_meters) == 0
if not self.misconfigured:
self.reference_meter = self.required_meters[0]
# convert to set for more efficient contains operation
self.required_meters = set(self.required_meters)
self.cache = collections.defaultdict(dict)
self.latest_timestamp = None
else:
LOG.warn(_('Arithmetic transformer must use at least one'
' meter in expression \'%s\''), self.expr)
def _update_cache(self, _sample):
"""Update the cache with the latest sample."""
escaped_name = self.escaped_names.get(_sample.name, '')
if escaped_name not in self.required_meters:
return
self.cache[_sample.resource_id][escaped_name] = _sample
def _check_requirements(self, resource_id):
"""Check if all the required meters are available in the cache."""
return len(self.cache[resource_id]) == len(self.required_meters)
def _calculate(self, resource_id):
"""Evaluate the expression and return a new sample if successful."""
ns_dict = dict((m, s.as_dict()) for m, s
in self.cache[resource_id].iteritems())
ns = transformer.Namespace(ns_dict)
try:
new_volume = eval(self.expr_escaped, {}, ns)
if math.isnan(new_volume):
raise ArithmeticError(_('Expression evaluated to '
'a NaN value!'))
reference_sample = self.cache[resource_id][self.reference_meter]
return sample.Sample(
name=self.target.get('name', reference_sample.name),
unit=self.target.get('unit', reference_sample.unit),
type=self.target.get('type', reference_sample.type),
volume=float(new_volume),
user_id=reference_sample.user_id,
project_id=reference_sample.project_id,
resource_id=reference_sample.resource_id,
timestamp=self.latest_timestamp,
resource_metadata=reference_sample.resource_metadata
)
except Exception as e:
LOG.warn(_('Unable to evaluate expression %(expr)s: %(exc)s'),
{'expr': self.expr, 'exc': str(e)})
def handle_sample(self, context, _sample):
self._update_cache(_sample)
self.latest_timestamp = _sample.timestamp
def flush(self, context):
new_samples = []
if not self.misconfigured:
for resource_id in self.cache:
if self._check_requirements(resource_id):
new_samples.append(self._calculate(resource_id))
else:
LOG.warn(_('Unable to perform calculation, not all of '
'{%s} are present'),
', '.join(self.required_meters))
self.cache.clear()
return new_samples
@classmethod
def parse_expr(cls, expr):
"""Transforms meter names in the expression into valid identifiers.
:param expr: unescaped expression
:return: A tuple of the escaped expression and a dict representing
the translation of meter names into Python identifiers
"""
class Replacer():
"""Replaces matched meter names with escaped names.
If the meter name is not followed by parameter access in the
expression, it defaults to accessing the 'volume' parameter.
"""
def __init__(self, original_expr):
self.original_expr = original_expr
self.escaped_map = {}
def __call__(self, match):
meter_name = match.group(1)
escaped_name = self.escape(meter_name)
self.escaped_map[meter_name] = escaped_name
if (match.end(0) == len(self.original_expr) or
self.original_expr[match.end(0)] != '.'):
escaped_name += '.volume'
return escaped_name
@staticmethod
def escape(name):
has_dot = '.' in name
if has_dot:
name = name.replace('.', '_')
if has_dot or name.endswith('ESC') or name in keyword.kwlist:
name = "_" + name + '_ESC'
return name
replacer = Replacer(expr)
expr = re.sub(cls.meter_name_re, replacer, expr)
return expr, replacer.escaped_map

View File

@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import re
from ceilometer.openstack.common.gettextutils import _
@ -27,30 +26,6 @@ from ceilometer import transformer
LOG = log.getLogger(__name__)
class Namespace(object):
"""Encapsulates the namespace.
Encapsulation is going by wrapping the evaluation of the configured scale
factor. This allows nested dicts to be accessed in the attribute style,
and missing attributes to yield false when used in a boolean expression.
"""
def __init__(self, seed):
self.__dict__ = collections.defaultdict(lambda: Namespace({}))
self.__dict__.update(seed)
for k, v in self.__dict__.iteritems():
if isinstance(v, dict):
self.__dict__[k] = Namespace(v)
def __getattr__(self, attr):
return self.__dict__[attr]
def __getitem__(self, key):
return self.__dict__[key]
def __nonzero__(self):
return len(self.__dict__) > 0
class ScalingTransformer(transformer.TransformerBase):
"""Transformer to apply a scaling conversion."""
@ -78,7 +53,7 @@ class ScalingTransformer(transformer.TransformerBase):
Either a straight multiplicative factor or else a string to be eval'd.
"""
ns = Namespace(s.as_dict())
ns = transformer.Namespace(s.as_dict())
scale = self.scale
return ((eval(scale, {}, ns) if isinstance(scale, basestring)