From f8ccabfcc585c12772e870a8420a36dd397ee666 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 22 Jul 2013 12:04:45 +0000 Subject: [PATCH] Remove replace/preserve logic from rate of change transformer The idea is to not re-emit cpu samples from the cpu_pipeline by default, but instead simplify the pipeline.yaml by allowing these original samples to be emitted by the general-purpose pipeline as before, thus avoiding unintended double-publication. Now only the derived samples would be emitted from the scaling or rate of change transformers. Change-Id: I12623181d289fc0ee3cdb6fcdbacf8e76e53d244 --- ceilometer/transformer/conversions.py | 42 ++++-------------------- etc/ceilometer/pipeline.yaml | 1 - tests/test_pipeline.py | 46 +++++++-------------------- 3 files changed, 18 insertions(+), 71 deletions(-) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index fd265dbfc..1e5bc5f95 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -53,23 +53,18 @@ class ScalingTransformer(transformer.TransformerBase): """Transformer to apply a scaling conversion. """ - def __init__(self, source={}, target={}, replace=False, **kwargs): + def __init__(self, source={}, target={}, **kwargs): """Initialize transformer with configured parameters. :param source: dict containing source counter unit :param target: dict containing target counter name, type, unit and scaling factor (a missing value connotes no change) - :param replace: true if source counter is to be replaced - (as opposed to an additive conversion) """ self.source = source self.target = target - self.replace = replace - self.preserved = [] LOG.debug(_('scaling conversion transformer with source:' - ' %(source)s target: %(target)s replace:' - ' %(replace)s') % locals()) + ' %(source)s target: %(target)s:') % locals()) super(ScalingTransformer, self).__init__(**kwargs) @staticmethod @@ -98,36 +93,14 @@ class ScalingTransformer(transformer.TransformerBase): resource_metadata=counter.resource_metadata ) - def _keep(self, counter, transformed): - """Either replace counter with the transformed version - or preserve for flush() call to emit as an additional - sample. - """ - if self.replace: - counter = transformed - else: - self.preserved.append(transformed) - return counter - def handle_sample(self, context, counter, source): """Handle a sample, converting if necessary.""" LOG.debug('handling counter %s', (counter,)) if (self.source.get('unit', counter.unit) == counter.unit): - transformed = self._convert(counter) - LOG.debug(_('converted to: %s') % (transformed,)) - counter = self._keep(counter, transformed) + counter = self._convert(counter) + LOG.debug(_('converted to: %s') % (counter,)) return counter - def flush(self, context, source): - """Emit the additional transformed counter in the non-replace - case. - """ - counters = [] - if self.preserved: - counters.extend(self.preserved) - self.preserved = [] - return counters - class RateOfChangeTransformer(ScalingTransformer): """Transformer based on the rate of change of a counter volume, @@ -164,10 +137,9 @@ class RateOfChangeTransformer(ScalingTransformer): rate_of_change = ((1.0 * volume_delta / time_delta) if time_delta else 0.0) - transformed = self._convert(counter, rate_of_change) - LOG.debug(_('converted to: %s') % (transformed,)) - counter = self._keep(counter, transformed) - elif self.replace: + counter = self._convert(counter, rate_of_change) + LOG.debug(_('converted to: %s') % (counter,)) + else: LOG.warn(_('dropping counter with no predecessor: %s') % counter) counter = None return counter diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml index 3c3bc25e5..3e58f7b93 100644 --- a/etc/ceilometer/pipeline.yaml +++ b/etc/ceilometer/pipeline.yaml @@ -20,6 +20,5 @@ unit: "%" type: "gauge" scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - replace: False publishers: - rpc:// diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 188ce4401..d2892f872 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -638,7 +638,8 @@ class TestPipeline(base.TestCase): self.assertTrue(getattr(self.TransformerClass.samples[0], "name") == 'a:b') - def _do_test_global_unit_conversion(self, replace, scale): + def test_global_unit_conversion(self): + scale = 'volume / ((10**6) * 60)' self.pipeline_cfg[0]['transformers'] = [ { 'name': 'unit_conversion', @@ -647,7 +648,6 @@ class TestPipeline(base.TestCase): 'target': {'name': 'cpu_mins', 'unit': 'min', 'scale': scale}, - 'replace': replace } }, ] @@ -674,22 +674,12 @@ class TestPipeline(base.TestCase): publisher = pipeline_manager.pipelines[0].publishers[0] self.assertEqual(len(publisher.counters), 1) pipe.flush(None, None) - self.assertEqual(len(publisher.counters), 1 if replace else 2) + self.assertEqual(len(publisher.counters), 1) cpu_mins = publisher.counters[-1] self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins') self.assertEquals(getattr(cpu_mins, 'unit'), 'min') self.assertEquals(getattr(cpu_mins, 'type'), counter.TYPE_CUMULATIVE) self.assertEquals(getattr(cpu_mins, 'volume'), 20) - if not replace: - self.assertEquals(publisher.counters[0], counters[0]) - - def test_global_unit_conversion_replacing(self): - scale = 'volume / ((10**6) * 60)' - self._do_test_global_unit_conversion(True, scale) - - def test_global_unit_conversion_additive(self): - scale = 1 / ((10 ** 6) * 60.0) - self._do_test_global_unit_conversion(False, scale) def test_unit_identified_source_unit_conversion(self): self.pipeline_cfg[0]['transformers'] = [ @@ -699,7 +689,6 @@ class TestPipeline(base.TestCase): 'source': {'unit': '°C'}, 'target': {'unit': '°F', 'scale': '(volume * 1.8) + 32'}, - 'replace': True } }, ] @@ -761,7 +750,6 @@ class TestPipeline(base.TestCase): 'unit': '%', 'type': counter.TYPE_GAUGE, 'scale': s}, - 'replace': False } }, ] @@ -826,19 +814,16 @@ class TestPipeline(base.TestCase): pipe.publish_counters(None, counters, None) publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(len(publisher.counters), 4) - # original counters are passed thru' unmolested - for i in xrange(4): - self.assertEquals(publisher.counters[i], counters[i]) + self.assertEqual(len(publisher.counters), 2) pipe.flush(None, None) - self.assertEqual(len(publisher.counters), 6) - cpu_util = publisher.counters[4] + self.assertEqual(len(publisher.counters), 2) + cpu_util = publisher.counters[0] self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util') self.assertEquals(getattr(cpu_util, 'resource_id'), 'test_resource') self.assertEquals(getattr(cpu_util, 'unit'), '%') self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE) self.assertEquals(getattr(cpu_util, 'volume'), expected) - cpu_util = publisher.counters[5] + cpu_util = publisher.counters[1] self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util') self.assertEquals(getattr(cpu_util, 'resource_id'), 'test_resource2') self.assertEquals(getattr(cpu_util, 'unit'), '%') @@ -877,7 +862,7 @@ class TestPipeline(base.TestCase): 0.0, offset=0) - def _do_test_rate_of_change_no_predecessor(self, replace): + def test_rate_of_change_no_predecessor(self): s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" self.pipeline_cfg[0]['transformers'] = [ { @@ -887,8 +872,7 @@ class TestPipeline(base.TestCase): 'target': {'name': 'cpu_util', 'unit': '%', 'type': counter.TYPE_GAUGE, - 'scale': s}, - 'replace': replace + 'scale': s} } }, ] @@ -914,14 +898,6 @@ class TestPipeline(base.TestCase): pipe.publish_counters(None, counters, None) publisher = pipeline_manager.pipelines[0].publishers[0] - self.assertEqual(len(publisher.counters), 0 if replace else 1) + self.assertEqual(len(publisher.counters), 0) pipe.flush(None, None) - self.assertEqual(len(publisher.counters), 0 if replace else 1) - if not replace: - self.assertEquals(publisher.counters[0], counters[0]) - - def _do_test_rate_of_change_no_predecessor_discard(self): - self._do_test_rate_of_change_no_predecessor(True) - - def _do_test_rate_of_change_no_predecessor_preserve(self): - self._do_test_rate_of_change_no_predecessor(False) + self.assertEqual(len(publisher.counters), 0)