diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 6207b8611..b8e98d2d9 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -20,6 +20,7 @@ import copy from ceilometer import counter as ceilocounter from ceilometer.openstack.common import log +from ceilometer.openstack.common import timeutils from ceilometer import transformer LOG = log.getLogger(__name__) @@ -57,7 +58,7 @@ class ScalingTransformer(transformer.TransformerBase): return ((eval(scale, {}, ns) if isinstance(scale, basestring) else counter.volume * scale) if scale else counter.volume) - def _convert(self, counter): + def _convert(self, counter, growth=1): """Transform the appropriate counter fields. """ scale = self.target.get('scale') @@ -65,7 +66,7 @@ class ScalingTransformer(transformer.TransformerBase): name=self.target.get('name', counter.name), unit=self.target.get('unit', counter.unit), type=self.target.get('type', counter.type), - volume=self._scale(counter, scale), + volume=self._scale(counter, scale) * growth, user_id=counter.user_id, project_id=counter.project_id, resource_id=counter.resource_id, @@ -102,3 +103,47 @@ class ScalingTransformer(transformer.TransformerBase): counters.append(self.preserved) self.preserved = None return counters + + +class RateOfChangeTransformer(ScalingTransformer): + """Transformer based on the rate of change of a counter volume, + for example taking the current and previous volumes of a + cumulative counter and producing a gauge value based on the + proportion of some maximum used. + """ + + def __init__(self, **kwargs): + """Initialize transformer with configured parameters. + """ + self.cache = {} + super(RateOfChangeTransformer, self).__init__(**kwargs) + + def handle_sample(self, context, counter, source): + """Handle a sample, converting if necessary.""" + LOG.debug('handling counter %s', (counter,)) + key = counter.name + counter.resource_id + prev = self.cache.get(key) + timestamp = timeutils.parse_isotime(counter.timestamp) + self.cache[key] = (counter.volume, timestamp) + + if prev: + prev_volume = prev[0] + prev_timestamp = prev[1] + time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) + # we only allow negative deltas for noncumulative counters, whereas + # for cumulative we assume that a reset has occurred in the interim + # so that the current volume gives a lower bound on growth + volume_delta = (counter.volume - prev_volume + if (prev_volume <= counter.volume or + counter.type != ceilocounter.TYPE_CUMULATIVE) + else counter.volume) + rate_of_change = ((1.0 * volume_delta / time_delta) + if time_delta else 0.0) + + transformed = self._convert(counter, rate_of_change) + LOG.debug(_('converted to: %s') % (transformed,)) + counter = self._keep(counter, transformed) + elif self.replace: + LOG.warn(_('dropping counter with no predecessor: %s') % counter) + counter = None + return counter diff --git a/setup.cfg b/setup.cfg index ab0f589da..07f66784d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -88,6 +88,7 @@ ceilometer.compute.virt = ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator unit_conversion = ceilometer.transformer.conversions:ScalingTransformer + rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer ceilometer.publisher = test = ceilometer.publisher.test:TestPublisher diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index f9e59fc23..951dd6732 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -17,6 +17,8 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime + from stevedore import extension from ceilometer import counter @@ -47,6 +49,7 @@ class TestPipeline(base.TestCase): 'drop': self.TransformerClassDrop, 'cache': accumulator.TransformerAccumulator, 'unit_conversion': conversions.ScalingTransformer, + 'rate_of_change': conversions.RateOfChangeTransformer, } if name in class_name_ext: @@ -742,3 +745,143 @@ class TestPipeline(base.TestCase): self.assertEquals(getattr(amb_temp, 'name'), 'ambient_temperature') self.assertEquals(getattr(amb_temp, 'unit'), '°F') self.assertEquals(getattr(amb_temp, 'volume'), 88.8) + self.assertEquals(getattr(core_temp, 'volume'), 96.8) + + def _do_test_rate_of_change_conversion(self, prev, curr, offset, + type, expected): + s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" + self.pipeline_cfg[0]['transformers'] = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': counter.TYPE_GAUGE, + 'scale': s}, + 'replace': False + } + }, + ] + self.pipeline_cfg[0]['counters'] = ['cpu'] + now = timeutils.utcnow() + later = now + datetime.timedelta(minutes=offset) + counters = [ + counter.Counter( + name='cpu', + type=type, + volume=prev, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + resource_metadata={'cpu_number': 4} + ), + counter.Counter( + name='cpu', + type=type, + volume=curr, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=later.isoformat(), + resource_metadata={'cpu_number': 4} + ), + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_counters(None, counters, None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 2) + # original counters are passed thru' unmolested + self.assertEquals(publisher.counters[0], counters[0]) + self.assertEquals(publisher.counters[1], counters[1]) + pipe.flush(None, None) + self.assertEqual(len(publisher.counters), 3) + cpu_util = publisher.counters[-1] + self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util') + self.assertEquals(getattr(cpu_util, 'unit'), '%') + self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE) + self.assertEquals(getattr(cpu_util, 'volume'), expected) + + def test_rate_of_change_conversion(self): + self._do_test_rate_of_change_conversion(120000000000, + 180000000000, + 1, + counter.TYPE_CUMULATIVE, + 25.0) + + def test_rate_of_change_conversion_negative_cumulative_delta(self): + self._do_test_rate_of_change_conversion(180000000000, + 120000000000, + 1, + counter.TYPE_CUMULATIVE, + 50.0) + + def test_rate_of_change_conversion_negative_gauge_delta(self): + self._do_test_rate_of_change_conversion(180000000000, + 120000000000, + 1, + counter.TYPE_GAUGE, + -25.0) + + def test_rate_of_change_conversion_zero_delay(self): + self._do_test_rate_of_change_conversion(120000000000, + 120000000000, + 0, + counter.TYPE_CUMULATIVE, + 0.0) + + def _do_test_rate_of_change_no_predecessor(self, replace): + s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" + self.pipeline_cfg[0]['transformers'] = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': counter.TYPE_GAUGE, + 'scale': s}, + 'replace': replace + } + }, + ] + self.pipeline_cfg[0]['counters'] = ['cpu'] + now = timeutils.utcnow() + counters = [ + counter.Counter( + name='cpu', + type=counter.TYPE_CUMULATIVE, + volume=120000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + resource_metadata={'cpu_number': 4} + ), + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_counters(None, counters, None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(len(publisher.counters), 0 if replace else 1) + pipe.flush(None, None) + self.assertEqual(len(publisher.counters), 0 if replace else 1) + if not replace: + self.assertEquals(publisher.counters[0], counters[0]) + + def _do_test_rate_of_change_no_predecessor_discard(self): + self._do_test_rate_of_change_no_predecessor(True) + + def _do_test_rate_of_change_no_predecessor_preserve(self): + self._do_test_rate_of_change_no_predecessor(False)