diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 826a1169f..846594ca7 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -1219,6 +1219,42 @@ class BasePipelineTestCase(test.BaseTestCase): self.assertEqual(expected_length, len(publisher.samples)) return sorted(publisher.samples, key=lambda s: s.volume) + def test_aggregator_meter_type(self): + volumes = [1.0, 2.0, 3.0] + transformer_cfg = [ + { + 'name': 'aggregator', + 'parameters': {'size': len(volumes) * len(sample.TYPES)} + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', + ['testgauge', 'testcumulative', 'testdelta']) + counters = [] + for sample_type in sample.TYPES: + for volume in volumes: + counters.append(sample.Sample( + name='test' + sample_type, + type=sample_type, + volume=volume, + unit='B', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + )) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, counters) + pipe.flush(None) + publisher = pipeline_manager.pipelines[0].publishers[0] + actual = sorted(s.volume for s in publisher.samples) + self.assertEqual([2.0, 3.0, 6.0], actual) + def test_aggregator_input_validation(self): aggregator = conversions.AggregatorTransformer("1", "15", None, None, None) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 5e1592b7a..956a5586f 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import re import six @@ -165,6 +166,7 @@ class AggregatorTransformer(ScalingTransformer): **kwargs): super(AggregatorTransformer, self).__init__(**kwargs) self.samples = {} + self.counts = collections.defaultdict(int) self.size = int(size) if size else None self.retention_time = float(retention_time) if retention_time else None self.initial_timestamp = None @@ -200,23 +202,27 @@ class AggregatorTransformer(ScalingTransformer): # NOTE(sileht): it assumes, a meter always have the same unit/type return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys) - def handle_sample(self, context, sample): + def handle_sample(self, context, sample_): if not self.initial_timestamp: - self.initial_timestamp = timeutils.parse_isotime(sample.timestamp) + self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp) self.aggregated_samples += 1 - key = self._get_unique_key(sample) + key = self._get_unique_key(sample_) + self.counts[key] += 1 if key not in self.samples: - self.samples[key] = self._convert(sample) + self.samples[key] = self._convert(sample_) if self.merged_attribute_policy[ 'resource_metadata'] == 'drop': self.samples[key].resource_metadata = {} else: - self.samples[key].volume += self._scale(sample) + if sample_.type == sample.TYPE_CUMULATIVE: + self.samples[key].volume = self._scale(sample_) + else: + self.samples[key].volume += self._scale(sample_) for field in self.merged_attribute_policy: if self.merged_attribute_policy[field] == 'last': setattr(self.samples[key], field, - getattr(sample, field)) + getattr(sample_, field)) def flush(self, context): if not self.initial_timestamp: @@ -228,7 +234,13 @@ class AggregatorTransformer(ScalingTransformer): full = self.aggregated_samples >= self.size if full or expired: x = self.samples.values() - self.samples = {} + # gauge aggregates need to be averages + for s in x: + if s.type == sample.TYPE_GAUGE: + key = self._get_unique_key(s) + s.volume /= self.counts[key] + self.samples.clear() + self.counts.clear() self.aggregated_samples = 0 self.initial_timestamp = None return x