Merge "Selectable aggregate support in mongodb"

This commit is contained in:
Jenkins 2014-03-04 13:35:01 +00:00 committed by Gerrit Code Review
commit 6863f83a3f

View File

@ -2,9 +2,11 @@
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
# Copyright © 2013 eNovance
# Copyright © 2014 Red Hat, Inc
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
# Julien Danjou <julien@danjou.info>
# Authors: Doug Hellmann <doug.hellmann@dreamhost.com>
# Julien Danjou <julien@danjou.info>
# Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -96,12 +98,56 @@ class Connection(pymongo_base.Connection):
}
""")
STANDARD_AGGREGATES = dict(
emit_initial=dict(
sum='',
count='',
avg='',
min='',
max=''
),
emit_body=dict(
sum='sum: this.counter_volume,',
count='count: NumberInt(1),',
avg='acount: NumberInt(1), asum: this.counter_volume,',
min='min: this.counter_volume,',
max='max: this.counter_volume,'
),
reduce_initial=dict(
sum='',
count='',
avg='',
min='',
max=''
),
reduce_body=dict(
sum='sum: values[0].sum,',
count='count: values[0].count,',
avg='acount: values[0].acount, asum: values[0].asum,',
min='min: values[0].min,',
max='max: values[0].max,'
),
reduce_computation=dict(
sum='res.sum += values[i].sum;',
count='res.count = NumberInt(res.count + values[i].count);',
avg=('res.acount = NumberInt(res.acount + values[i].acount);'
'res.asum += values[i].asum;'),
min='if ( values[i].min < res.min ) {res.min = values[i].min;}',
max='if ( values[i].max > res.max ) {res.max = values[i].max;}'
),
finalize=dict(
sum='',
count='',
avg='value.avg = value.asum / value.acount;',
min='',
max=''
),
)
EMIT_STATS_COMMON = """
%(aggregate_initial_placeholder)s
emit(%(key_val)s, { unit: this.counter_unit,
min : this.counter_volume,
max : this.counter_volume,
sum : this.counter_volume,
count : NumberInt(1),
%(aggregate_body_placeholder)s
groupby : %(groupby_val)s,
duration_start : this.timestamp,
duration_end : this.timestamp,
@ -129,10 +175,14 @@ class Connection(pymongo_base.Connection):
}
"""
PARAMS_MAP_STATS = {'key_val': '\'statistics\'',
'groupby_val': 'null',
'period_start_val': 'this.timestamp',
'period_end_val': 'this.timestamp'}
PARAMS_MAP_STATS = {
'key_val': '\'statistics\'',
'groupby_val': 'null',
'period_start_val': 'this.timestamp',
'period_end_val': 'this.timestamp',
'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
'aggregate_body_placeholder': '%(aggregate_body_val)s'
}
MAP_STATS = bson.code.Code("function () {" +
EMIT_STATS_COMMON % PARAMS_MAP_STATS +
@ -142,7 +192,9 @@ class Connection(pymongo_base.Connection):
'key_val': 'period_start',
'groupby_val': 'null',
'period_start_val': 'new Date(period_start)',
'period_end_val': 'new Date(period_start + period)'
'period_end_val': 'new Date(period_start + period)',
'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
'aggregate_body_placeholder': '%(aggregate_body_val)s'
}
MAP_STATS_PERIOD = bson.code.Code(
@ -151,10 +203,14 @@ class Connection(pymongo_base.Connection):
EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD +
"}")
PARAMS_MAP_STATS_GROUPBY = {'key_val': 'groupby_key',
'groupby_val': 'groupby',
'period_start_val': 'this.timestamp',
'period_end_val': 'this.timestamp'}
PARAMS_MAP_STATS_GROUPBY = {
'key_val': 'groupby_key',
'groupby_val': 'groupby',
'period_start_val': 'this.timestamp',
'period_end_val': 'this.timestamp',
'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
'aggregate_body_placeholder': '%(aggregate_body_val)s'
}
MAP_STATS_GROUPBY = bson.code.Code(
"function () {" +
@ -166,7 +222,9 @@ class Connection(pymongo_base.Connection):
'key_val': 'groupby_key',
'groupby_val': 'groupby',
'period_start_val': 'new Date(period_start)',
'period_end_val': 'new Date(period_start + period)'
'period_end_val': 'new Date(period_start + period)',
'aggregate_initial_placeholder': '%(aggregate_initial_val)s',
'aggregate_body_placeholder': '%(aggregate_body_val)s'
}
MAP_STATS_PERIOD_GROUPBY = bson.code.Code(
@ -179,23 +237,16 @@ class Connection(pymongo_base.Connection):
REDUCE_STATS = bson.code.Code("""
function (key, values) {
%(aggregate_initial_val)s
var res = { unit: values[0].unit,
min: values[0].min,
max: values[0].max,
count: values[0].count,
sum: values[0].sum,
%(aggregate_body_val)s
groupby: values[0].groupby,
period_start: values[0].period_start,
period_end: values[0].period_end,
duration_start: values[0].duration_start,
duration_end: values[0].duration_end };
for ( var i=1; i<values.length; i++ ) {
if ( values[i].min < res.min )
res.min = values[i].min;
if ( values[i].max > res.max )
res.max = values[i].max;
res.count = NumberInt(res.count + values[i].count);
res.sum += values[i].sum;
%(aggregate_computation_val)s
if ( values[i].duration_start < res.duration_start )
res.duration_start = values[i].duration_start;
if ( values[i].duration_end > res.duration_end )
@ -207,7 +258,7 @@ class Connection(pymongo_base.Connection):
FINALIZE_STATS = bson.code.Code("""
function (key, value) {
value.avg = value.sum / value.count;
%(aggregate_val)s
value.duration = (value.duration_end - value.duration_start) / 1000;
value.period = NumberInt((value.period_end - value.period_start)
/ 1000);
@ -591,6 +642,23 @@ class Connection(pymongo_base.Connection):
finally:
self.db[out].drop()
def _aggregate_param(self, fragment_key, aggregate):
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
if not aggregate:
return ''.join([f for f in fragment_map.values()])
fragments = ''
for a in aggregate:
if a.func in fragment_map:
fragments += fragment_map[a.func]
else:
raise NotImplementedError(_('Selectable aggregate function %s'
' is not supported') % a.func)
return fragments
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of models.Statistics instance containing meter
@ -604,10 +672,6 @@ class Connection(pymongo_base.Connection):
'resource_id', 'source'])):
raise NotImplementedError("Unable to group by these fields")
if aggregate:
msg = _('Selectable aggregates not implemented')
raise NotImplementedError(msg)
q = pymongo_base.make_query_from_filter(sample_filter)
if period:
@ -618,34 +682,82 @@ class Connection(pymongo_base.Connection):
limit=1, sort=[('timestamp',
pymongo.ASCENDING)])[0]['timestamp']
period_start = int(calendar.timegm(period_start.utctimetuple()))
params_period = {'period': period,
'period_first': period_start,
'groupby_fields': json.dumps(groupby)}
map_params = {'period': period,
'period_first': period_start,
'groupby_fields': json.dumps(groupby)}
if groupby:
map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period
map_fragment = self.MAP_STATS_PERIOD_GROUPBY
else:
map_stats = self.MAP_STATS_PERIOD % params_period
map_fragment = self.MAP_STATS_PERIOD
else:
if groupby:
params_groupby = {'groupby_fields': json.dumps(groupby)}
map_stats = self.MAP_STATS_GROUPBY % params_groupby
map_params = {'groupby_fields': json.dumps(groupby)}
map_fragment = self.MAP_STATS_GROUPBY
else:
map_stats = self.MAP_STATS
map_params = dict()
map_fragment = self.MAP_STATS
sub = self._aggregate_param
map_params['aggregate_initial_val'] = sub('emit_initial', aggregate)
map_params['aggregate_body_val'] = sub('emit_body', aggregate)
map_stats = map_fragment % map_params
reduce_params = dict(
aggregate_initial_val=sub('reduce_initial', aggregate),
aggregate_body_val=sub('reduce_body', aggregate),
aggregate_computation_val=sub('reduce_computation', aggregate)
)
reduce_stats = self.REDUCE_STATS % reduce_params
finalize_params = dict(aggregate_val=sub('finalize', aggregate))
finalize_stats = self.FINALIZE_STATS % finalize_params
results = self.db.meter.map_reduce(
map_stats,
self.REDUCE_STATS,
reduce_stats,
{'inline': 1},
finalize=self.FINALIZE_STATS,
finalize=finalize_stats,
query=q,
)
# FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted()
# to return the results
return sorted(
(models.Statistics(**(r['value'])) for r in results['results']),
(self._stats_result_to_model(r['value'], groupby, aggregate)
for r in results['results']),
key=operator.attrgetter('period_start'))
@staticmethod
def _stats_result_aggregates(result, aggregate):
stats_args = {}
if isinstance(result.get('count'), (int, long)):
stats_args['count'] = int(result['count'])
for attr in ['min', 'max', 'sum', 'avg']:
if attr in result:
stats_args[attr] = result[attr]
if aggregate:
stats_args['aggregate'] = dict(
('%s%s' % (a.func, '/%s' % a.param if a.param else ''),
result[a.func]) for a in aggregate
)
return stats_args
@staticmethod
def _stats_result_to_model(result, groupby, aggregate):
stats_args = Connection._stats_result_aggregates(result, aggregate)
stats_args['unit'] = result['unit']
stats_args['duration'] = result['duration']
stats_args['duration_start'] = result['duration_start']
stats_args['duration_end'] = result['duration_end']
stats_args['period'] = result['period']
stats_args['period_start'] = result['period_start']
stats_args['period_end'] = result['period_end']
stats_args['groupby'] = (dict(
(g, result['groupby'][g]) for g in groupby) if groupby else None)
return models.Statistics(**stats_args)
def get_alarm_changes(self, alarm_id, on_behalf_of,
user=None, project=None, type=None,
start_timestamp=None, start_timestamp_op=None,
@ -722,7 +834,14 @@ class Connection(pymongo_base.Connection):
'statistics': {'groupby': True,
'query': {'simple': True,
'metadata': True},
'aggregation': {'standard': True}},
'aggregation': {'standard': True,
'selectable': {
'max': True,
'min': True,
'sum': True,
'avg': True,
'count': True}}
},
'alarms': {'query': {'simple': True,
'complex': True},
'history': {'query': {'simple': True,