From 5ace23510716d8abeaf511bbcf3caf51bfe1393c Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Thu, 11 Jul 2013 15:28:06 +0100 Subject: [PATCH] Transformer to measure rate of change Addresses BP transformer-unit Provide a new transformer to emit derived counters that represent the rate of change of existing counters, by retaining the previous volume and timestamp. This transformer will supercede the direct emission of derived counters by pollsters (e.g. cpu_util calculated by the compute CPUPollster). The target counters are identified either by name. The scaling can be expressed as either a straight multiplicative factor or as a string expression to be eval'd. Configured as per usual via the pipeline.yaml, for example: counters: - "cpu" transformers: - name: "rate_of_change" parameters: target: name: "cpu_util" unit: "%" type: "gauge" scale: "100.0 * (10**9 / resource_metadata.get('cpu_number', 1))" replace: False Change-Id: I0affa8d13a4fd72db08f818db809023d2f74217a --- ceilometer/transformer/conversions.py | 49 ++++++++- setup.cfg | 1 + tests/test_pipeline.py | 143 ++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 2 deletions(-) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 6207b8611..b8e98d2d9 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -20,6 +20,7 @@ import copy from ceilometer import counter as ceilocounter from ceilometer.openstack.common import log +from ceilometer.openstack.common import timeutils from ceilometer import transformer LOG = log.getLogger(__name__) @@ -57,7 +58,7 @@ class ScalingTransformer(transformer.TransformerBase): return ((eval(scale, {}, ns) if isinstance(scale, basestring) else counter.volume * scale) if scale else counter.volume) - def _convert(self, counter): + def _convert(self, counter, growth=1): """Transform the appropriate counter fields. """ scale = self.target.get('scale') @@ -65,7 +66,7 @@ class ScalingTransformer(transformer.TransformerBase): name=self.target.get('name', counter.name), unit=self.target.get('unit', counter.unit), type=self.target.get('type', counter.type), - volume=self._scale(counter, scale), + volume=self._scale(counter, scale) * growth, user_id=counter.user_id, project_id=counter.project_id, resource_id=counter.resource_id, @@ -102,3 +103,47 @@ class ScalingTransformer(transformer.TransformerBase): counters.append(self.preserved) self.preserved = None return counters + + +class RateOfChangeTransformer(ScalingTransformer): + """Transformer based on the rate of change of a counter volume, + for example taking the current and previous volumes of a + cumulative counter and producing a gauge value based on the + proportion of some maximum used. + """ + + def __init__(self, **kwargs): + """Initialize transformer with configured parameters. + """ + self.cache = {} + super(RateOfChangeTransformer, self).__init__(**kwargs) + + def handle_sample(self, context, counter, source): + """Handle a sample, converting if necessary.""" + LOG.debug('handling counter %s', (counter,)) + key = counter.name + counter.resource_id + prev = self.cache.get(key) + timestamp = timeutils.parse_isotime(counter.timestamp) + self.cache[key] = (counter.volume, timestamp) + + if prev: + prev_volume = prev[0] + prev_timestamp = prev[1] + time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) + # we only allow negative deltas for noncumulative counters, whereas + # for cumulative we assume that a reset has occurred in the interim + # so that the current volume gives a lower bound on growth + volume_delta = (counter.volume - prev_volume + if (prev_volume <= counter.volume or + counter.type != ceilocounter.TYPE_CUMULATIVE) + else counter.volume) + rate_of_change = ((1.0 * volume_delta / time_delta) + if time_delta else 0.0) + + transformed = self._convert(counter, rate_of_change) + LOG.debug(_('converted to: %s') % (transformed,)) + counter = self._keep(counter, transformed) + elif self.replace: + LOG.warn(_('dropping counter with no predecessor: %s') % counter) + counter = None + return counter diff --git a/setup.cfg b/setup.cfg index a151250c5..eeb342b20 100644 --- a/setup.cfg +++ b/setup.cfg @@ -76,6 +76,7 @@ ceilometer.compute.virt = ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator unit_conversion = ceilometer.transformer.conversions:ScalingTransformer + rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer ceilometer.publisher = test = ceilometer.publisher.test:TestPublisher diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index f9e59fc23..951dd6732 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -17,6 +17,8 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime + from stevedore import extension from ceilometer import counter @@ -47,6 +49,7 @@ class TestPipeline(base.TestCase): 'drop': self.TransformerClassDrop, 'cache': accumulator.TransformerAccumulator, 'unit_conversion': conversions.ScalingTransformer, + 'rate_of_change': conversions.RateOfChangeTransformer, } if name in class_name_ext: @@ -742,3 +745,143 @@ class TestPipeline(base.TestCase): self.assertEquals(getattr(amb_temp, 'name'), 'ambient_temperature') self.assertEquals(getattr(amb_temp, 'unit'), '°F') self.assertEquals(getattr(amb_temp, 'volume'), 88.8) + self.assertEquals(getattr(core_temp, 'volume'), 96.8) + + def _do_test_rate_of_change_conversion(self, prev, curr, offset, + type, expected): + s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" + self.pipeline_cfg[0]['transformers'] = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': counter.TYPE_GAUGE, + 'scale': s}, + 'replace': False + } + }, + ] + self.pipeline_cfg[0]['counters'] = ['cpu'] + now = timeutils.utcnow() + later = now + datetime.timedelta(minutes=offset) + counters = [ + counter.Counter( + name='cpu', + type=type, + volume=prev, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + resource_metadata={'cpu_number': 4} + ), + counter.Counter( + name='cpu', + type=type, + volume=curr, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=later.isoformat(), + resource_metadata={'cpu_number': 4} + ), + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_counters(None, counters, None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 2) + # original counters are passed thru' unmolested + self.assertEquals(publisher.counters[0], counters[0]) + self.assertEquals(publisher.counters[1], counters[1]) + pipe.flush(None, None) + self.assertEqual(len(publisher.counters), 3) + cpu_util = publisher.counters[-1] + self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util') + self.assertEquals(getattr(cpu_util, 'unit'), '%') + self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE) + self.assertEquals(getattr(cpu_util, 'volume'), expected) + + def test_rate_of_change_conversion(self): + self._do_test_rate_of_change_conversion(120000000000, + 180000000000, + 1, + counter.TYPE_CUMULATIVE, + 25.0) + + def test_rate_of_change_conversion_negative_cumulative_delta(self): + self._do_test_rate_of_change_conversion(180000000000, + 120000000000, + 1, + counter.TYPE_CUMULATIVE, + 50.0) + + def test_rate_of_change_conversion_negative_gauge_delta(self): + self._do_test_rate_of_change_conversion(180000000000, + 120000000000, + 1, + counter.TYPE_GAUGE, + -25.0) + + def test_rate_of_change_conversion_zero_delay(self): + self._do_test_rate_of_change_conversion(120000000000, + 120000000000, + 0, + counter.TYPE_CUMULATIVE, + 0.0) + + def _do_test_rate_of_change_no_predecessor(self, replace): + s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" + self.pipeline_cfg[0]['transformers'] = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': counter.TYPE_GAUGE, + 'scale': s}, + 'replace': replace + } + }, + ] + self.pipeline_cfg[0]['counters'] = ['cpu'] + now = timeutils.utcnow() + counters = [ + counter.Counter( + name='cpu', + type=counter.TYPE_CUMULATIVE, + volume=120000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + resource_metadata={'cpu_number': 4} + ), + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_counters(None, counters, None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0 if replace else 1) + pipe.flush(None, None) + self.assertEqual(len(publisher.counters), 0 if replace else 1) + if not replace: + self.assertEquals(publisher.counters[0], counters[0]) + + def _do_test_rate_of_change_no_predecessor_discard(self): + self._do_test_rate_of_change_no_predecessor(True) + + def _do_test_rate_of_change_no_predecessor_preserve(self): + self._do_test_rate_of_change_no_predecessor(False)