transformer: Add aggregator transformer
This adds a transformer aggregate counters until a threshold and/or a retention time and then flushing them out. Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2 Co-Authored-By: Jordan Pittier <jordan.pittier@cloudwatt.com>
This commit is contained in:
parent
0eb9f5131b
commit
6957e7b66a
@ -20,6 +20,7 @@
|
||||
import abc
|
||||
import datetime
|
||||
|
||||
import mock
|
||||
import six
|
||||
from stevedore import extension
|
||||
|
||||
@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase):
|
||||
'except': self.TransformerClassException,
|
||||
'drop': self.TransformerClassDrop,
|
||||
'cache': accumulator.TransformerAccumulator,
|
||||
'aggregator': conversions.AggregatorTransformer,
|
||||
'unit_conversion': conversions.ScalingTransformer,
|
||||
'rate_of_change': conversions.RateOfChangeTransformer,
|
||||
}
|
||||
@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase):
|
||||
meters = ('disk.read.bytes', 'disk.write.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_mapping(pipe, meters, units)
|
||||
|
||||
def _do_test_aggregator(self, parameters, expected_length):
|
||||
transformer_cfg = [
|
||||
{
|
||||
'name': 'aggregator',
|
||||
'parameters': parameters,
|
||||
},
|
||||
]
|
||||
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||
self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
|
||||
counters = [
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=26,
|
||||
unit='B',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '1.0'}
|
||||
),
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=16,
|
||||
unit='B',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '2.0'}
|
||||
),
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=53,
|
||||
unit='B',
|
||||
user_id='test_user_bis',
|
||||
project_id='test_proj_bis',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '1.0'}
|
||||
),
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=42,
|
||||
unit='B',
|
||||
user_id='test_user_bis',
|
||||
project_id='test_proj_bis',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '2.0'}
|
||||
),
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=15,
|
||||
unit='B',
|
||||
user_id='test_user',
|
||||
project_id='test_proj_bis',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '2.0'}
|
||||
),
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=2,
|
||||
unit='B',
|
||||
user_id='test_user_bis',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '3.0'}
|
||||
),
|
||||
]
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
pipe = pipeline_manager.pipelines[0]
|
||||
|
||||
pipe.publish_samples(None, counters)
|
||||
pipe.flush(None)
|
||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
self.assertEqual(expected_length, len(publisher.samples))
|
||||
return sorted(publisher.samples, key=lambda s: s.volume)
|
||||
|
||||
def test_aggregator_metadata(self):
|
||||
for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': conf,
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=4)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(2, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '3.0'},
|
||||
s.resource_metadata)
|
||||
s = samples[1]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(15, s.volume)
|
||||
self.assertEqual('test_user', s.user_id)
|
||||
self.assertEqual('test_proj_bis', s.project_id)
|
||||
self.assertEqual({'version': '2.0'},
|
||||
s.resource_metadata)
|
||||
s = samples[2]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(42, s.volume)
|
||||
self.assertEqual('test_user', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': expected_version},
|
||||
s.resource_metadata)
|
||||
s = samples[3]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(95, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj_bis', s.project_id)
|
||||
self.assertEqual({'version': expected_version},
|
||||
s.resource_metadata)
|
||||
|
||||
def test_aggregator_user_last_and_metadata_last(self):
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': 'last',
|
||||
'user_id': 'last',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=2)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(44, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '3.0'},
|
||||
s.resource_metadata)
|
||||
s = samples[1]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(110, s.volume)
|
||||
self.assertEqual('test_user', s.user_id)
|
||||
self.assertEqual('test_proj_bis', s.project_id)
|
||||
self.assertEqual({'version': '2.0'},
|
||||
s.resource_metadata)
|
||||
|
||||
def test_aggregator_user_first_and_metadata_last(self):
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': 'last',
|
||||
'user_id': 'first',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=2)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(44, s.volume)
|
||||
self.assertEqual('test_user', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '3.0'},
|
||||
s.resource_metadata)
|
||||
s = samples[1]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(110, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj_bis', s.project_id)
|
||||
self.assertEqual({'version': '2.0'},
|
||||
s.resource_metadata)
|
||||
|
||||
def test_aggregator_all_first(self):
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': 'first',
|
||||
'user_id': 'first',
|
||||
'project_id': 'first',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=1)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(154, s.volume)
|
||||
self.assertEqual('test_user', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '1.0'},
|
||||
s.resource_metadata)
|
||||
|
||||
def test_aggregator_all_last(self):
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': 'last',
|
||||
'user_id': 'last',
|
||||
'project_id': 'last',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=1)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(154, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '3.0'},
|
||||
s.resource_metadata)
|
||||
|
||||
def test_aggregator_all_mixed(self):
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': 'drop',
|
||||
'user_id': 'first',
|
||||
'project_id': 'last',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=1)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(154, s.volume)
|
||||
self.assertEqual('test_user', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({}, s.resource_metadata)
|
||||
|
||||
def test_aggregator_metadata_default(self):
|
||||
samples = self._do_test_aggregator({
|
||||
'user_id': 'last',
|
||||
'project_id': 'last',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=1)
|
||||
s = samples[0]
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(154, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '3.0'},
|
||||
s.resource_metadata)
|
||||
|
||||
@mock.patch('ceilometer.transformer.conversions.LOG')
|
||||
def test_aggregator_metadata_invalid(self, mylog):
|
||||
samples = self._do_test_aggregator({
|
||||
'resource_metadata': 'invalid',
|
||||
'user_id': 'last',
|
||||
'project_id': 'last',
|
||||
'target': {'name': 'aggregated-bytes'}
|
||||
}, expected_length=1)
|
||||
s = samples[0]
|
||||
self.assertTrue(mylog.warn.called)
|
||||
self.assertEqual('aggregated-bytes', s.name)
|
||||
self.assertEqual(154, s.volume)
|
||||
self.assertEqual('test_user_bis', s.user_id)
|
||||
self.assertEqual('test_proj', s.project_id)
|
||||
self.assertEqual({'version': '3.0'},
|
||||
s.resource_metadata)
|
||||
|
||||
def test_aggregator_sized_flush(self):
|
||||
transformer_cfg = [
|
||||
{
|
||||
'name': 'aggregator',
|
||||
'parameters': {'size': 2},
|
||||
},
|
||||
]
|
||||
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||
self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
|
||||
counters = [
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=26,
|
||||
unit='B',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '1.0'}
|
||||
),
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=16,
|
||||
unit='B',
|
||||
user_id='test_user_bis',
|
||||
project_id='test_proj_bis',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '2.0'}
|
||||
)
|
||||
]
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
pipe = pipeline_manager.pipelines[0]
|
||||
|
||||
pipe.publish_samples(None, [counters[0]])
|
||||
pipe.flush(None)
|
||||
publisher = pipe.publishers[0]
|
||||
self.assertEqual(0, len(publisher.samples))
|
||||
|
||||
pipe.publish_samples(None, [counters[1]])
|
||||
pipe.flush(None)
|
||||
publisher = pipe.publishers[0]
|
||||
self.assertEqual(2, len(publisher.samples))
|
||||
|
||||
def test_aggregator_timed_flush(self):
|
||||
timeutils.set_time_override()
|
||||
transformer_cfg = [
|
||||
{
|
||||
'name': 'aggregator',
|
||||
'parameters': {'size': 900, 'retention_time': 60},
|
||||
},
|
||||
]
|
||||
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||
self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
|
||||
counters = [
|
||||
sample.Sample(
|
||||
name='storage.objects.incoming.bytes',
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=26,
|
||||
unit='B',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '1.0'}
|
||||
),
|
||||
]
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
pipe = pipeline_manager.pipelines[0]
|
||||
|
||||
pipe.publish_samples(None, counters)
|
||||
pipe.flush(None)
|
||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
self.assertEqual(0, len(publisher.samples))
|
||||
|
||||
timeutils.advance_time_seconds(120)
|
||||
pipe.flush(None)
|
||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
self.assertEqual(1, len(publisher.samples))
|
||||
|
@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer):
|
||||
(s,))
|
||||
s = None
|
||||
return s
|
||||
|
||||
|
||||
class AggregatorTransformer(ScalingTransformer):
|
||||
"""Transformer that aggregate sample until a threshold or/and a
|
||||
retention_time, and then flush them out in the wild.
|
||||
|
||||
Example:
|
||||
To aggregate sample by resource_metadata and keep the
|
||||
resource_metadata of the latest received sample;
|
||||
|
||||
AggregatorTransformer(retention_time=60, resource_metadata='last')
|
||||
|
||||
To aggregate sample by user_id and resource_metadata and keep the
|
||||
user_id of the first received sample and drop the resource_metadata.
|
||||
|
||||
AggregatorTransformer(size=15, user_id='first',
|
||||
resource_metadata='drop')
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, size=1, retention_time=None,
|
||||
project_id=None, user_id=None, resource_metadata="last",
|
||||
**kwargs):
|
||||
super(AggregatorTransformer, self).__init__(**kwargs)
|
||||
self.samples = {}
|
||||
self.size = size
|
||||
self.retention_time = retention_time
|
||||
self.initial_timestamp = None
|
||||
self.aggregated_samples = 0
|
||||
|
||||
self.key_attributes = []
|
||||
self.merged_attribute_policy = {}
|
||||
|
||||
self._init_attribute('project_id', project_id)
|
||||
self._init_attribute('user_id', user_id)
|
||||
self._init_attribute('resource_metadata', resource_metadata,
|
||||
is_droppable=True, mandatory=True)
|
||||
|
||||
def _init_attribute(self, name, value, is_droppable=False,
|
||||
mandatory=False):
|
||||
drop = ['drop'] if is_droppable else []
|
||||
if value or mandatory:
|
||||
if value not in ['last', 'first'] + drop:
|
||||
LOG.warn('%s is unknown (%s), using last' % (name, value))
|
||||
value = 'last'
|
||||
self.merged_attribute_policy[name] = value
|
||||
else:
|
||||
self.key_attributes.append(name)
|
||||
|
||||
def _get_unique_key(self, s):
|
||||
non_aggregated_keys = "-".join([getattr(s, field)
|
||||
for field in self.key_attributes])
|
||||
#NOTE(sileht): it assumes, a meter always have the same unit/type
|
||||
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
||||
|
||||
def handle_sample(self, context, sample):
|
||||
if not self.initial_timestamp:
|
||||
self.initial_timestamp = timeutils.parse_strtime(
|
||||
sample.timestamp)
|
||||
|
||||
self.aggregated_samples += 1
|
||||
key = self._get_unique_key(sample)
|
||||
if key not in self.samples:
|
||||
self.samples[key] = self._convert(sample)
|
||||
if self.merged_attribute_policy[
|
||||
'resource_metadata'] == 'drop':
|
||||
self.samples[key].resource_metadata = {}
|
||||
else:
|
||||
self.samples[key].volume += self._scale(sample)
|
||||
for field in self.merged_attribute_policy:
|
||||
if self.merged_attribute_policy[field] == 'last':
|
||||
setattr(self.samples[key], field,
|
||||
getattr(sample, field))
|
||||
|
||||
def flush(self, context):
|
||||
expired = self.retention_time and \
|
||||
timeutils.is_older_than(self.initial_timestamp,
|
||||
self.retention_time)
|
||||
full = self.aggregated_samples >= self.size
|
||||
if full or expired:
|
||||
x = self.samples.values()
|
||||
self.samples = {}
|
||||
self.aggregated_samples = 0
|
||||
self.initial_timestamp = None
|
||||
return x
|
||||
return []
|
||||
|
@ -156,6 +156,7 @@ ceilometer.transformer =
|
||||
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
|
||||
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
|
||||
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
|
||||
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
|
||||
|
||||
ceilometer.publisher =
|
||||
test = ceilometer.publisher.test:TestPublisher
|
||||
|
Loading…
x
Reference in New Issue
Block a user