Merge "Transformer to measure rate of change"
This commit is contained in:
commit
a1a80e0a1f
@ -20,6 +20,7 @@ import copy
|
||||
|
||||
from ceilometer import counter as ceilocounter
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import timeutils
|
||||
from ceilometer import transformer
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -57,7 +58,7 @@ class ScalingTransformer(transformer.TransformerBase):
|
||||
return ((eval(scale, {}, ns) if isinstance(scale, basestring)
|
||||
else counter.volume * scale) if scale else counter.volume)
|
||||
|
||||
def _convert(self, counter):
|
||||
def _convert(self, counter, growth=1):
|
||||
"""Transform the appropriate counter fields.
|
||||
"""
|
||||
scale = self.target.get('scale')
|
||||
@ -65,7 +66,7 @@ class ScalingTransformer(transformer.TransformerBase):
|
||||
name=self.target.get('name', counter.name),
|
||||
unit=self.target.get('unit', counter.unit),
|
||||
type=self.target.get('type', counter.type),
|
||||
volume=self._scale(counter, scale),
|
||||
volume=self._scale(counter, scale) * growth,
|
||||
user_id=counter.user_id,
|
||||
project_id=counter.project_id,
|
||||
resource_id=counter.resource_id,
|
||||
@ -102,3 +103,47 @@ class ScalingTransformer(transformer.TransformerBase):
|
||||
counters.append(self.preserved)
|
||||
self.preserved = None
|
||||
return counters
|
||||
|
||||
|
||||
class RateOfChangeTransformer(ScalingTransformer):
|
||||
"""Transformer based on the rate of change of a counter volume,
|
||||
for example taking the current and previous volumes of a
|
||||
cumulative counter and producing a gauge value based on the
|
||||
proportion of some maximum used.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
"""
|
||||
self.cache = {}
|
||||
super(RateOfChangeTransformer, self).__init__(**kwargs)
|
||||
|
||||
def handle_sample(self, context, counter, source):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
LOG.debug('handling counter %s', (counter,))
|
||||
key = counter.name + counter.resource_id
|
||||
prev = self.cache.get(key)
|
||||
timestamp = timeutils.parse_isotime(counter.timestamp)
|
||||
self.cache[key] = (counter.volume, timestamp)
|
||||
|
||||
if prev:
|
||||
prev_volume = prev[0]
|
||||
prev_timestamp = prev[1]
|
||||
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
|
||||
# we only allow negative deltas for noncumulative counters, whereas
|
||||
# for cumulative we assume that a reset has occurred in the interim
|
||||
# so that the current volume gives a lower bound on growth
|
||||
volume_delta = (counter.volume - prev_volume
|
||||
if (prev_volume <= counter.volume or
|
||||
counter.type != ceilocounter.TYPE_CUMULATIVE)
|
||||
else counter.volume)
|
||||
rate_of_change = ((1.0 * volume_delta / time_delta)
|
||||
if time_delta else 0.0)
|
||||
|
||||
transformed = self._convert(counter, rate_of_change)
|
||||
LOG.debug(_('converted to: %s') % (transformed,))
|
||||
counter = self._keep(counter, transformed)
|
||||
elif self.replace:
|
||||
LOG.warn(_('dropping counter with no predecessor: %s') % counter)
|
||||
counter = None
|
||||
return counter
|
||||
|
@ -88,6 +88,7 @@ ceilometer.compute.virt =
|
||||
ceilometer.transformer =
|
||||
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
|
||||
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
|
||||
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
|
||||
|
||||
ceilometer.publisher =
|
||||
test = ceilometer.publisher.test:TestPublisher
|
||||
|
@ -17,6 +17,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from stevedore import extension
|
||||
|
||||
from ceilometer import counter
|
||||
@ -47,6 +49,7 @@ class TestPipeline(base.TestCase):
|
||||
'drop': self.TransformerClassDrop,
|
||||
'cache': accumulator.TransformerAccumulator,
|
||||
'unit_conversion': conversions.ScalingTransformer,
|
||||
'rate_of_change': conversions.RateOfChangeTransformer,
|
||||
}
|
||||
|
||||
if name in class_name_ext:
|
||||
@ -742,3 +745,143 @@ class TestPipeline(base.TestCase):
|
||||
self.assertEquals(getattr(amb_temp, 'name'), 'ambient_temperature')
|
||||
self.assertEquals(getattr(amb_temp, 'unit'), '°F')
|
||||
self.assertEquals(getattr(amb_temp, 'volume'), 88.8)
|
||||
self.assertEquals(getattr(core_temp, 'volume'), 96.8)
|
||||
|
||||
def _do_test_rate_of_change_conversion(self, prev, curr, offset,
|
||||
type, expected):
|
||||
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
|
||||
self.pipeline_cfg[0]['transformers'] = [
|
||||
{
|
||||
'name': 'rate_of_change',
|
||||
'parameters': {
|
||||
'source': {},
|
||||
'target': {'name': 'cpu_util',
|
||||
'unit': '%',
|
||||
'type': counter.TYPE_GAUGE,
|
||||
'scale': s},
|
||||
'replace': False
|
||||
}
|
||||
},
|
||||
]
|
||||
self.pipeline_cfg[0]['counters'] = ['cpu']
|
||||
now = timeutils.utcnow()
|
||||
later = now + datetime.timedelta(minutes=offset)
|
||||
counters = [
|
||||
counter.Counter(
|
||||
name='cpu',
|
||||
type=type,
|
||||
volume=prev,
|
||||
unit='ns',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=now.isoformat(),
|
||||
resource_metadata={'cpu_number': 4}
|
||||
),
|
||||
counter.Counter(
|
||||
name='cpu',
|
||||
type=type,
|
||||
volume=curr,
|
||||
unit='ns',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=later.isoformat(),
|
||||
resource_metadata={'cpu_number': 4}
|
||||
),
|
||||
]
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
pipe = pipeline_manager.pipelines[0]
|
||||
|
||||
pipe.publish_counters(None, counters, None)
|
||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
self.assertEqual(len(publisher.counters), 2)
|
||||
# original counters are passed thru' unmolested
|
||||
self.assertEquals(publisher.counters[0], counters[0])
|
||||
self.assertEquals(publisher.counters[1], counters[1])
|
||||
pipe.flush(None, None)
|
||||
self.assertEqual(len(publisher.counters), 3)
|
||||
cpu_util = publisher.counters[-1]
|
||||
self.assertEquals(getattr(cpu_util, 'name'), 'cpu_util')
|
||||
self.assertEquals(getattr(cpu_util, 'unit'), '%')
|
||||
self.assertEquals(getattr(cpu_util, 'type'), counter.TYPE_GAUGE)
|
||||
self.assertEquals(getattr(cpu_util, 'volume'), expected)
|
||||
|
||||
def test_rate_of_change_conversion(self):
|
||||
self._do_test_rate_of_change_conversion(120000000000,
|
||||
180000000000,
|
||||
1,
|
||||
counter.TYPE_CUMULATIVE,
|
||||
25.0)
|
||||
|
||||
def test_rate_of_change_conversion_negative_cumulative_delta(self):
|
||||
self._do_test_rate_of_change_conversion(180000000000,
|
||||
120000000000,
|
||||
1,
|
||||
counter.TYPE_CUMULATIVE,
|
||||
50.0)
|
||||
|
||||
def test_rate_of_change_conversion_negative_gauge_delta(self):
|
||||
self._do_test_rate_of_change_conversion(180000000000,
|
||||
120000000000,
|
||||
1,
|
||||
counter.TYPE_GAUGE,
|
||||
-25.0)
|
||||
|
||||
def test_rate_of_change_conversion_zero_delay(self):
|
||||
self._do_test_rate_of_change_conversion(120000000000,
|
||||
120000000000,
|
||||
0,
|
||||
counter.TYPE_CUMULATIVE,
|
||||
0.0)
|
||||
|
||||
def _do_test_rate_of_change_no_predecessor(self, replace):
|
||||
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))"
|
||||
self.pipeline_cfg[0]['transformers'] = [
|
||||
{
|
||||
'name': 'rate_of_change',
|
||||
'parameters': {
|
||||
'source': {},
|
||||
'target': {'name': 'cpu_util',
|
||||
'unit': '%',
|
||||
'type': counter.TYPE_GAUGE,
|
||||
'scale': s},
|
||||
'replace': replace
|
||||
}
|
||||
},
|
||||
]
|
||||
self.pipeline_cfg[0]['counters'] = ['cpu']
|
||||
now = timeutils.utcnow()
|
||||
counters = [
|
||||
counter.Counter(
|
||||
name='cpu',
|
||||
type=counter.TYPE_CUMULATIVE,
|
||||
volume=120000000000,
|
||||
unit='ns',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=now.isoformat(),
|
||||
resource_metadata={'cpu_number': 4}
|
||||
),
|
||||
]
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
pipe = pipeline_manager.pipelines[0]
|
||||
|
||||
pipe.publish_counters(None, counters, None)
|
||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
self.assertEqual(len(publisher.counters), 0 if replace else 1)
|
||||
pipe.flush(None, None)
|
||||
self.assertEqual(len(publisher.counters), 0 if replace else 1)
|
||||
if not replace:
|
||||
self.assertEquals(publisher.counters[0], counters[0])
|
||||
|
||||
def _do_test_rate_of_change_no_predecessor_discard(self):
|
||||
self._do_test_rate_of_change_no_predecessor(True)
|
||||
|
||||
def _do_test_rate_of_change_no_predecessor_preserve(self):
|
||||
self._do_test_rate_of_change_no_predecessor(False)
|
||||
|
Loading…
x
Reference in New Issue
Block a user