From 6d3f179baa259b8364c7683c08ea7d26843c3b50 Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Mon, 3 Feb 2014 16:48:08 +0000 Subject: [PATCH] Selectable aggregate support in mongodb Addresses: BP selectable-aggregates Add parallel support to mongodb for the selection of standard aggregate functions. Test coverage is provided by the scenario test added in previous patch of this series. Change-Id: I797b69958733a9401d3825547c95e41672bc4374 --- ceilometer/storage/impl_mongodb.py | 205 +++++++++++++++++++++++------ 1 file changed, 162 insertions(+), 43 deletions(-) diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 7131b503b..365bc367e 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -2,9 +2,11 @@ # # Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2013 eNovance +# Copyright © 2014 Red Hat, Inc # -# Author: Doug Hellmann -# Julien Danjou +# Authors: Doug Hellmann +# Julien Danjou +# Eoghan Glynn # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -96,12 +98,56 @@ class Connection(pymongo_base.Connection): } """) + STANDARD_AGGREGATES = dict( + emit_initial=dict( + sum='', + count='', + avg='', + min='', + max='' + ), + emit_body=dict( + sum='sum: this.counter_volume,', + count='count: NumberInt(1),', + avg='acount: NumberInt(1), asum: this.counter_volume,', + min='min: this.counter_volume,', + max='max: this.counter_volume,' + ), + reduce_initial=dict( + sum='', + count='', + avg='', + min='', + max='' + ), + reduce_body=dict( + sum='sum: values[0].sum,', + count='count: values[0].count,', + avg='acount: values[0].acount, asum: values[0].asum,', + min='min: values[0].min,', + max='max: values[0].max,' + ), + reduce_computation=dict( + sum='res.sum += values[i].sum;', + count='res.count = NumberInt(res.count + values[i].count);', + avg=('res.acount = NumberInt(res.acount + values[i].acount);' + 'res.asum += values[i].asum;'), + min='if ( values[i].min < res.min ) {res.min = values[i].min;}', + max='if ( values[i].max > res.max ) {res.max = values[i].max;}' + ), + finalize=dict( + sum='', + count='', + avg='value.avg = value.asum / value.acount;', + min='', + max='' + ), + ) + EMIT_STATS_COMMON = """ + %(aggregate_initial_placeholder)s emit(%(key_val)s, { unit: this.counter_unit, - min : this.counter_volume, - max : this.counter_volume, - sum : this.counter_volume, - count : NumberInt(1), + %(aggregate_body_placeholder)s groupby : %(groupby_val)s, duration_start : this.timestamp, duration_end : this.timestamp, @@ -129,10 +175,14 @@ class Connection(pymongo_base.Connection): } """ - PARAMS_MAP_STATS = {'key_val': '\'statistics\'', - 'groupby_val': 'null', - 'period_start_val': 'this.timestamp', - 'period_end_val': 'this.timestamp'} + PARAMS_MAP_STATS = { + 'key_val': '\'statistics\'', + 'groupby_val': 'null', + 'period_start_val': 'this.timestamp', + 'period_end_val': 'this.timestamp', + 'aggregate_initial_placeholder': '%(aggregate_initial_val)s', + 'aggregate_body_placeholder': '%(aggregate_body_val)s' + } MAP_STATS = bson.code.Code("function () {" + EMIT_STATS_COMMON % PARAMS_MAP_STATS + @@ -142,7 +192,9 @@ class Connection(pymongo_base.Connection): 'key_val': 'period_start', 'groupby_val': 'null', 'period_start_val': 'new Date(period_start)', - 'period_end_val': 'new Date(period_start + period)' + 'period_end_val': 'new Date(period_start + period)', + 'aggregate_initial_placeholder': '%(aggregate_initial_val)s', + 'aggregate_body_placeholder': '%(aggregate_body_val)s' } MAP_STATS_PERIOD = bson.code.Code( @@ -151,10 +203,14 @@ class Connection(pymongo_base.Connection): EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD + "}") - PARAMS_MAP_STATS_GROUPBY = {'key_val': 'groupby_key', - 'groupby_val': 'groupby', - 'period_start_val': 'this.timestamp', - 'period_end_val': 'this.timestamp'} + PARAMS_MAP_STATS_GROUPBY = { + 'key_val': 'groupby_key', + 'groupby_val': 'groupby', + 'period_start_val': 'this.timestamp', + 'period_end_val': 'this.timestamp', + 'aggregate_initial_placeholder': '%(aggregate_initial_val)s', + 'aggregate_body_placeholder': '%(aggregate_body_val)s' + } MAP_STATS_GROUPBY = bson.code.Code( "function () {" + @@ -166,7 +222,9 @@ class Connection(pymongo_base.Connection): 'key_val': 'groupby_key', 'groupby_val': 'groupby', 'period_start_val': 'new Date(period_start)', - 'period_end_val': 'new Date(period_start + period)' + 'period_end_val': 'new Date(period_start + period)', + 'aggregate_initial_placeholder': '%(aggregate_initial_val)s', + 'aggregate_body_placeholder': '%(aggregate_body_val)s' } MAP_STATS_PERIOD_GROUPBY = bson.code.Code( @@ -179,23 +237,16 @@ class Connection(pymongo_base.Connection): REDUCE_STATS = bson.code.Code(""" function (key, values) { + %(aggregate_initial_val)s var res = { unit: values[0].unit, - min: values[0].min, - max: values[0].max, - count: values[0].count, - sum: values[0].sum, + %(aggregate_body_val)s groupby: values[0].groupby, period_start: values[0].period_start, period_end: values[0].period_end, duration_start: values[0].duration_start, duration_end: values[0].duration_end }; for ( var i=1; i res.max ) - res.max = values[i].max; - res.count = NumberInt(res.count + values[i].count); - res.sum += values[i].sum; + %(aggregate_computation_val)s if ( values[i].duration_start < res.duration_start ) res.duration_start = values[i].duration_start; if ( values[i].duration_end > res.duration_end ) @@ -207,7 +258,7 @@ class Connection(pymongo_base.Connection): FINALIZE_STATS = bson.code.Code(""" function (key, value) { - value.avg = value.sum / value.count; + %(aggregate_val)s value.duration = (value.duration_end - value.duration_start) / 1000; value.period = NumberInt((value.period_end - value.period_start) / 1000); @@ -591,6 +642,23 @@ class Connection(pymongo_base.Connection): finally: self.db[out].drop() + def _aggregate_param(self, fragment_key, aggregate): + fragment_map = self.STANDARD_AGGREGATES[fragment_key] + + if not aggregate: + return ''.join([f for f in fragment_map.values()]) + + fragments = '' + + for a in aggregate: + if a.func in fragment_map: + fragments += fragment_map[a.func] + else: + raise NotImplementedError(_('Selectable aggregate function %s' + ' is not supported') % a.func) + + return fragments + def get_meter_statistics(self, sample_filter, period=None, groupby=None, aggregate=None): """Return an iterable of models.Statistics instance containing meter @@ -604,10 +672,6 @@ class Connection(pymongo_base.Connection): 'resource_id', 'source'])): raise NotImplementedError("Unable to group by these fields") - if aggregate: - msg = _('Selectable aggregates not implemented') - raise NotImplementedError(msg) - q = pymongo_base.make_query_from_filter(sample_filter) if period: @@ -618,34 +682,82 @@ class Connection(pymongo_base.Connection): limit=1, sort=[('timestamp', pymongo.ASCENDING)])[0]['timestamp'] period_start = int(calendar.timegm(period_start.utctimetuple())) - params_period = {'period': period, - 'period_first': period_start, - 'groupby_fields': json.dumps(groupby)} + map_params = {'period': period, + 'period_first': period_start, + 'groupby_fields': json.dumps(groupby)} if groupby: - map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period + map_fragment = self.MAP_STATS_PERIOD_GROUPBY else: - map_stats = self.MAP_STATS_PERIOD % params_period + map_fragment = self.MAP_STATS_PERIOD else: if groupby: - params_groupby = {'groupby_fields': json.dumps(groupby)} - map_stats = self.MAP_STATS_GROUPBY % params_groupby + map_params = {'groupby_fields': json.dumps(groupby)} + map_fragment = self.MAP_STATS_GROUPBY else: - map_stats = self.MAP_STATS + map_params = dict() + map_fragment = self.MAP_STATS + + sub = self._aggregate_param + + map_params['aggregate_initial_val'] = sub('emit_initial', aggregate) + map_params['aggregate_body_val'] = sub('emit_body', aggregate) + + map_stats = map_fragment % map_params + + reduce_params = dict( + aggregate_initial_val=sub('reduce_initial', aggregate), + aggregate_body_val=sub('reduce_body', aggregate), + aggregate_computation_val=sub('reduce_computation', aggregate) + ) + reduce_stats = self.REDUCE_STATS % reduce_params + + finalize_params = dict(aggregate_val=sub('finalize', aggregate)) + finalize_stats = self.FINALIZE_STATS % finalize_params results = self.db.meter.map_reduce( map_stats, - self.REDUCE_STATS, + reduce_stats, {'inline': 1}, - finalize=self.FINALIZE_STATS, + finalize=finalize_stats, query=q, ) # FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted() # to return the results return sorted( - (models.Statistics(**(r['value'])) for r in results['results']), + (self._stats_result_to_model(r['value'], groupby, aggregate) + for r in results['results']), key=operator.attrgetter('period_start')) + @staticmethod + def _stats_result_aggregates(result, aggregate): + stats_args = {} + if isinstance(result.get('count'), (int, long)): + stats_args['count'] = int(result['count']) + for attr in ['min', 'max', 'sum', 'avg']: + if attr in result: + stats_args[attr] = result[attr] + if aggregate: + stats_args['aggregate'] = dict( + ('%s%s' % (a.func, '/%s' % a.param if a.param else ''), + result[a.func]) for a in aggregate + ) + return stats_args + + @staticmethod + def _stats_result_to_model(result, groupby, aggregate): + stats_args = Connection._stats_result_aggregates(result, aggregate) + stats_args['unit'] = result['unit'] + stats_args['duration'] = result['duration'] + stats_args['duration_start'] = result['duration_start'] + stats_args['duration_end'] = result['duration_end'] + stats_args['period'] = result['period'] + stats_args['period_start'] = result['period_start'] + stats_args['period_end'] = result['period_end'] + stats_args['groupby'] = (dict( + (g, result['groupby'][g]) for g in groupby) if groupby else None) + return models.Statistics(**stats_args) + def get_alarm_changes(self, alarm_id, on_behalf_of, user=None, project=None, type=None, start_timestamp=None, start_timestamp_op=None, @@ -722,7 +834,14 @@ class Connection(pymongo_base.Connection): 'statistics': {'groupby': True, 'query': {'simple': True, 'metadata': True}, - 'aggregation': {'standard': True}}, + 'aggregation': {'standard': True, + 'selectable': { + 'max': True, + 'min': True, + 'sum': True, + 'avg': True, + 'count': True}} + }, 'alarms': {'query': {'simple': True, 'complex': True}, 'history': {'query': {'simple': True,