Merge "Remove replace/preserve logic from rate of change transformer"
This commit is contained in:
commit
ff2bc968f4
@ -53,23 +53,18 @@ class ScalingTransformer(transformer.TransformerBase):
|
|||||||
"""Transformer to apply a scaling conversion.
|
"""Transformer to apply a scaling conversion.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, source={}, target={}, replace=False, **kwargs):
|
def __init__(self, source={}, target={}, **kwargs):
|
||||||
"""Initialize transformer with configured parameters.
|
"""Initialize transformer with configured parameters.
|
||||||
|
|
||||||
:param source: dict containing source counter unit
|
:param source: dict containing source counter unit
|
||||||
:param target: dict containing target counter name, type,
|
:param target: dict containing target counter name, type,
|
||||||
unit and scaling factor (a missing value
|
unit and scaling factor (a missing value
|
||||||
connotes no change)
|
connotes no change)
|
||||||
:param replace: true if source counter is to be replaced
|
|
||||||
(as opposed to an additive conversion)
|
|
||||||
"""
|
"""
|
||||||
self.source = source
|
self.source = source
|
||||||
self.target = target
|
self.target = target
|
||||||
self.replace = replace
|
|
||||||
self.preserved = []
|
|
||||||
LOG.debug(_('scaling conversion transformer with source:'
|
LOG.debug(_('scaling conversion transformer with source:'
|
||||||
' %(source)s target: %(target)s replace:'
|
' %(source)s target: %(target)s:') % locals())
|
||||||
' %(replace)s') % locals())
|
|
||||||
super(ScalingTransformer, self).__init__(**kwargs)
|
super(ScalingTransformer, self).__init__(**kwargs)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -98,36 +93,14 @@ class ScalingTransformer(transformer.TransformerBase):
|
|||||||
resource_metadata=counter.resource_metadata
|
resource_metadata=counter.resource_metadata
|
||||||
)
|
)
|
||||||
|
|
||||||
def _keep(self, counter, transformed):
|
|
||||||
"""Either replace counter with the transformed version
|
|
||||||
or preserve for flush() call to emit as an additional
|
|
||||||
sample.
|
|
||||||
"""
|
|
||||||
if self.replace:
|
|
||||||
counter = transformed
|
|
||||||
else:
|
|
||||||
self.preserved.append(transformed)
|
|
||||||
return counter
|
|
||||||
|
|
||||||
def handle_sample(self, context, counter, source):
|
def handle_sample(self, context, counter, source):
|
||||||
"""Handle a sample, converting if necessary."""
|
"""Handle a sample, converting if necessary."""
|
||||||
LOG.debug('handling counter %s', (counter,))
|
LOG.debug('handling counter %s', (counter,))
|
||||||
if (self.source.get('unit', counter.unit) == counter.unit):
|
if (self.source.get('unit', counter.unit) == counter.unit):
|
||||||
transformed = self._convert(counter)
|
counter = self._convert(counter)
|
||||||
LOG.debug(_('converted to: %s') % (transformed,))
|
LOG.debug(_('converted to: %s') % (counter,))
|
||||||
counter = self._keep(counter, transformed)
|
|
||||||
return counter
|
return counter
|
||||||
|
|
||||||
def flush(self, context, source):
|
|
||||||
"""Emit the additional transformed counter in the non-replace
|
|
||||||
case.
|
|
||||||
"""
|
|
||||||
counters = []
|
|
||||||
if self.preserved:
|
|
||||||
counters.extend(self.preserved)
|
|
||||||
self.preserved = []
|
|
||||||
return counters
|
|
||||||
|
|
||||||
|
|
||||||
class RateOfChangeTransformer(ScalingTransformer):
|
class RateOfChangeTransformer(ScalingTransformer):
|
||||||
"""Transformer based on the rate of change of a counter volume,
|
"""Transformer based on the rate of change of a counter volume,
|
||||||
@ -164,10 +137,9 @@ class RateOfChangeTransformer(ScalingTransformer):
|
|||||||
rate_of_change = ((1.0 * volume_delta / time_delta)
|
rate_of_change = ((1.0 * volume_delta / time_delta)
|
||||||
if time_delta else 0.0)
|
if time_delta else 0.0)
|
||||||
|
|
||||||
transformed = self._convert(counter, rate_of_change)
|
counter = self._convert(counter, rate_of_change)
|
||||||
LOG.debug(_('converted to: %s') % (transformed,))
|
LOG.debug(_('converted to: %s') % (counter,))
|
||||||
counter = self._keep(counter, transformed)
|
else:
|
||||||
elif self.replace:
|
|
||||||
LOG.warn(_('dropping counter with no predecessor: %s') % counter)
|
LOG.warn(_('dropping counter with no predecessor: %s') % counter)
|
||||||
counter = None
|
counter = None
|
||||||
return counter
|
return counter
|
||||||
|
@ -20,6 +20,5 @@
|
|||||||
unit: "%"
|
unit: "%"
|
||||||
type: "gauge"
|
type: "gauge"
|
||||||
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
||||||
replace: False
|
|
||||||
publishers:
|
publishers:
|
||||||
- rpc://
|
- rpc://
|
||||||
|
@ -638,7 +638,8 @@ class TestPipeline(base.TestCase):
|
|||||||
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
|
self.assertTrue(getattr(self.TransformerClass.samples[0], "name")
|
||||||
== 'a:b')
|
== 'a:b')
|
||||||
|
|
||||||
def _do_test_global_unit_conversion(self, replace, scale):
|
def test_global_unit_conversion(self):
|
||||||
|
scale = 'volume / ((10**6) * 60)'
|
||||||
self.pipeline_cfg[0]['transformers'] = [
|
self.pipeline_cfg[0]['transformers'] = [
|
||||||
{
|
{
|
||||||
'name': 'unit_conversion',
|
'name': 'unit_conversion',
|
||||||
@ -647,7 +648,6 @@ class TestPipeline(base.TestCase):
|
|||||||
'target': {'name': 'cpu_mins',
|
'target': {'name': 'cpu_mins',
|
||||||
'unit': 'min',
|
'unit': 'min',
|
||||||
'scale': scale},
|
'scale': scale},
|
||||||
'replace': replace
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
@ -674,22 +674,12 @@ class TestPipeline(base.TestCase):
|
|||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 1)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None, None)
|
||||||
self.assertEqual(len(publisher.counters), 1 if replace else 2)
|
self.assertEqual(len(publisher.counters), 1)
|
||||||
cpu_mins = publisher.counters[-1]
|
cpu_mins = publisher.counters[-1]
|
||||||
self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins')
|
self.assertEquals(getattr(cpu_mins, 'name'), 'cpu_mins')
|
||||||
self.assertEquals(getattr(cpu_mins, 'unit'), 'min')
|
self.assertEquals(getattr(cpu_mins, 'unit'), 'min')
|
||||||
self.assertEquals(getattr(cpu_mins, 'type'), counter.TYPE_CUMULATIVE)
|
self.assertEquals(getattr(cpu_mins, 'type'), counter.TYPE_CUMULATIVE)
|
||||||
self.assertEquals(getattr(cpu_mins, 'volume'), 20)
|
self.assertEquals(getattr(cpu_mins, 'volume'), 20)
|
||||||
if not replace:
|
|
||||||
self.assertEquals(publisher.counters[0], counters[0])
|
|
||||||
|
|
||||||
def test_global_unit_conversion_replacing(self):
|
|
||||||
scale = 'volume / ((10**6) * 60)'
|
|
||||||
self._do_test_global_unit_conversion(True, scale)
|
|
||||||
|
|
||||||
def test_global_unit_conversion_additive(self):
|
|
||||||
scale = 1 / ((10 ** 6) * 60.0)
|
|
||||||
self._do_test_global_unit_conversion(False, scale)
|
|
||||||
|
|
||||||
def test_unit_identified_source_unit_conversion(self):
|
def test_unit_identified_source_unit_conversion(self):
|
||||||
self.pipeline_cfg[0]['transformers'] = [
|
self.pipeline_cfg[0]['transformers'] = [
|
||||||
@ -699,7 +689,6 @@ class TestPipeline(base.TestCase):
|
|||||||
'source': {'unit': '°C'},
|
'source': {'unit': '°C'},
|
||||||
'target': {'unit': '°F',
|
'target': {'unit': '°F',
|
||||||
'scale': '(volume * 1.8) + 32'},
|
'scale': '(volume * 1.8) + 32'},
|
||||||
'replace': True
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
@ -761,7 +750,6 @@ class TestPipeline(base.TestCase):
|
|||||||
'unit': '%',
|
'unit': '%',
|
||||||
'type': counter.TYPE_GAUGE,
|
'type': counter.TYPE_GAUGE,
|
||||||
'scale': s},
|
'scale': s},
|
||||||
'replace': False
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
@ -826,19 +814,16 @@ class TestPipeline(base.TestCase):
|
|||||||
|
|
||||||
pipe.publish_counters(None, counters, None)
|
pipe.publish_counters(None, counters, None)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 4)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
# original counters are passed thru' unmolested
|
|
||||||
for i in xrange(4):
|
|
||||||
self.assertEquals(publisher.counters[i], counters[i])
|
|
||||||
pipe.flush(None, None)
|
pipe.flush(None, None)
|
||||||
self.assertEqual(len(publisher.counters), 6)
|
self.assertEqual(len(publisher.counters), 2)
|
||||||
cpu_util = publisher.counters[4]
|
cpu_util = publisher.counters[0]
|
||||||
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
||||||
self.assertEquals(getattr(cpu_util, 'resource_id'), 'test_resource')
|
self.assertEquals(getattr(cpu_util, 'resource_id'), 'test_resource')
|
||||||
self.assertEquals(getattr(cpu_util, 'unit'), '%')
|
self.assertEquals(getattr(cpu_util, 'unit'), '%')
|
||||||
self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE)
|
self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE)
|
||||||
self.assertEquals(getattr(cpu_util, 'volume'), expected)
|
self.assertEquals(getattr(cpu_util, 'volume'), expected)
|
||||||
cpu_util = publisher.counters[5]
|
cpu_util = publisher.counters[1]
|
||||||
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
||||||
self.assertEquals(getattr(cpu_util, 'resource_id'), 'test_resource2')
|
self.assertEquals(getattr(cpu_util, 'resource_id'), 'test_resource2')
|
||||||
self.assertEquals(getattr(cpu_util, 'unit'), '%')
|
self.assertEquals(getattr(cpu_util, 'unit'), '%')
|
||||||
@ -877,7 +862,7 @@ class TestPipeline(base.TestCase):
|
|||||||
0.0,
|
0.0,
|
||||||
offset=0)
|
offset=0)
|
||||||
|
|
||||||
def _do_test_rate_of_change_no_predecessor(self, replace):
|
def test_rate_of_change_no_predecessor(self):
|
||||||
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
|
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
|
||||||
self.pipeline_cfg[0]['transformers'] = [
|
self.pipeline_cfg[0]['transformers'] = [
|
||||||
{
|
{
|
||||||
@ -887,8 +872,7 @@ class TestPipeline(base.TestCase):
|
|||||||
'target': {'name': 'cpu_util',
|
'target': {'name': 'cpu_util',
|
||||||
'unit': '%',
|
'unit': '%',
|
||||||
'type': counter.TYPE_GAUGE,
|
'type': counter.TYPE_GAUGE,
|
||||||
'scale': s},
|
'scale': s}
|
||||||
'replace': replace
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
@ -914,14 +898,6 @@ class TestPipeline(base.TestCase):
|
|||||||
|
|
||||||
pipe.publish_counters(None, counters, None)
|
pipe.publish_counters(None, counters, None)
|
||||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
self.assertEqual(len(publisher.counters), 0 if replace else 1)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
pipe.flush(None, None)
|
pipe.flush(None, None)
|
||||||
self.assertEqual(len(publisher.counters), 0 if replace else 1)
|
self.assertEqual(len(publisher.counters), 0)
|
||||||
if not replace:
|
|
||||||
self.assertEquals(publisher.counters[0], counters[0])
|
|
||||||
|
|
||||||
def _do_test_rate_of_change_no_predecessor_discard(self):
|
|
||||||
self._do_test_rate_of_change_no_predecessor(True)
|
|
||||||
|
|
||||||
def _do_test_rate_of_change_no_predecessor_preserve(self):
|
|
||||||
self._do_test_rate_of_change_no_predecessor(False)
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user