Name and unit mapping for rate_of_change transformer

Precursor-to: BP rates-derived-from-cumulative

Adds support for regex-based name and unit mapping to the
rate_of_change transformer. Regular expression back-references
are key to simplifying potentially repetitive config.

This avoids a explosion of pipelines to apply conversions to
classes of similar meters. For example to allow the following
sample derivations:

  * disk.read.requests (request) -> disk.read.requests.rate (request/s)
  * disk.write.requests (request) -> disk.write.requests.rate (request/s)
  * disk.read.bytes (B) -> disk.read.bytes.rate (B/s)
  * disk.write.bytes (B) -> disk.write.bytes.rate (B/s)

to be mapped via a single transformer config:

    transformers:
        - name: "rate_of_change"
          parameters:
              source:
                  map_from:
                      name: "disk\\.(read|write)\\.(bytes|requests)"
                      unit: "(B|request)"
              target:
                  map_to:
                      name: "disk.\\1.\\2.rate"
                      unit: "\\1/s"
                  type: "gauge"

Change-Id: I94b6bfbef538a8b067c3c5ff65066505a199bc5a
This commit is contained in:
Eoghan Glynn 2014-01-14 12:29:06 +00:00
parent 38d2a8beb9
commit 943fafb929
2 changed files with 89 additions and 7 deletions

View File

@ -1067,3 +1067,69 @@ class TestPipeline(test.BaseTestCase):
self.transformer_manager)
self.assertEqual(len(pipeline_manager.pipelines[0].resources),
0)
def test_rate_of_change_mapping(self):
map_from = {'name': 'disk\\.(read|write)\\.(bytes|requests)',
'unit': '(B|request)'}
map_to = {'name': 'disk.\\1.\\2.rate',
'unit': '\\1/s'}
self.pipeline_cfg[0]['transformers'] = [
{
'name': 'rate_of_change',
'parameters': {
'source': {
'map_from': map_from
},
'target': {
'map_to': map_to,
'type': sample.TYPE_GAUGE
},
},
},
]
self.pipeline_cfg[0]['counters'] = ['disk.read.bytes',
'disk.write.requests']
now = timeutils.utcnow()
base = 1000
offset = 7
rate = 42
later = now + datetime.timedelta(minutes=offset)
counters = []
for v, ts in [(base, now.isoformat()),
(base + (offset * 60 * rate), later.isoformat())]:
for n, u, r in [('disk.read.bytes', 'B', 'resource1'),
('disk.write.requests', 'request', 'resource2')]:
s = sample.Sample(
name=n,
type=sample.TYPE_CUMULATIVE,
volume=v,
unit=u,
user_id='test_user',
project_id='test_proj',
resource_id=r,
timestamp=ts,
resource_metadata={},
)
counters.append(s)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_samples(None, counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(len(publisher.samples), 2)
pipe.flush(None)
self.assertEqual(len(publisher.samples), 2)
bps = publisher.samples[0]
self.assertEqual(getattr(bps, 'name'), 'disk.read.bytes.rate')
self.assertEqual(getattr(bps, 'resource_id'), 'resource1')
self.assertEqual(getattr(bps, 'unit'), 'B/s')
self.assertEqual(getattr(bps, 'type'), sample.TYPE_GAUGE)
self.assertEqual(getattr(bps, 'volume'), rate)
rps = publisher.samples[1]
self.assertEqual(getattr(rps, 'name'), 'disk.write.requests.rate')
self.assertEqual(getattr(rps, 'resource_id'), 'resource2')
self.assertEqual(getattr(rps, 'unit'), 'request/s')
self.assertEqual(getattr(rps, 'type'), sample.TYPE_GAUGE)
self.assertEqual(getattr(rps, 'volume'), rate)

View File

@ -17,6 +17,7 @@
# under the License.
import collections
import re
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
@ -64,31 +65,45 @@ class ScalingTransformer(transformer.TransformerBase):
"""
self.source = source
self.target = target
self.scale = target.get('scale')
LOG.debug(_('scaling conversion transformer with source:'
' %(source)s target: %(target)s:')
% {'source': source,
'target': target})
super(ScalingTransformer, self).__init__(**kwargs)
@staticmethod
def _scale(s, scale):
def _scale(self, s):
"""Apply the scaling factor (either a straight multiplicative
factor or else a string to be eval'd).
"""
ns = Namespace(s.as_dict())
scale = self.scale
return ((eval(scale, {}, ns) if isinstance(scale, basestring)
else s.volume * scale) if scale else s.volume)
def _map(self, s, attr):
"""Apply the name or unit mapping if configured.
"""
mapped = None
from_ = self.source.get('map_from')
to_ = self.target.get('map_to')
if from_ and to_:
if from_.get(attr) and to_.get(attr):
try:
mapped = re.sub(from_[attr], to_[attr], getattr(s, attr))
except Exception:
pass
return mapped or self.target.get(attr, getattr(s, attr))
def _convert(self, s, growth=1):
"""Transform the appropriate sample fields.
"""
scale = self.target.get('scale')
return sample.Sample(
name=self.target.get('name', s.name),
unit=self.target.get('unit', s.unit),
name=self._map(s, 'name'),
unit=self._map(s, 'unit'),
type=self.target.get('type', s.type),
volume=self._scale(s, scale) * growth,
volume=self._scale(s) * growth,
user_id=s.user_id,
project_id=s.project_id,
resource_id=s.resource_id,
@ -115,8 +130,9 @@ class RateOfChangeTransformer(ScalingTransformer):
def __init__(self, **kwargs):
"""Initialize transformer with configured parameters.
"""
self.cache = {}
super(RateOfChangeTransformer, self).__init__(**kwargs)
self.cache = {}
self.scale = self.scale or '1'
def handle_sample(self, context, s):
"""Handle a sample, converting if necessary."""