Merge "Adds group by statistics for MongoDB driver"

This commit is contained in:
Jenkins 2013-08-29 02:42:48 +00:00 committed by Gerrit Code Review
commit bd9b26e757
2 changed files with 105 additions and 43 deletions

View File

@ -28,6 +28,7 @@ import weakref
import bson.code
import bson.objectid
import json
import pymongo
from oslo.config import cfg
@ -172,40 +173,86 @@ class Connection(base.Connection):
}
""")
MAP_STATS = bson.code.Code("""
function () {
emit('statistics', { unit: this.counter_unit,
min : this.counter_volume,
max : this.counter_volume,
sum : this.counter_volume,
count : NumberInt(1),
duration_start : this.timestamp,
duration_end : this.timestamp,
period_start : this.timestamp,
period_end : this.timestamp} )
}
""")
EMIT_STATS_COMMON = """
emit(%(key_val)s, { unit: this.counter_unit,
min : this.counter_volume,
max : this.counter_volume,
sum : this.counter_volume,
count : NumberInt(1),
groupby : %(groupby_val)s,
duration_start : this.timestamp,
duration_end : this.timestamp,
period_start : %(period_start_val)s,
period_end : %(period_end_val)s} )
"""
MAP_STATS_PERIOD = bson.code.Code("""
function () {
var period = %d * 1000;
var period_first = %d * 1000;
MAP_STATS_PERIOD_VAR = """
var period = %(period)d * 1000;
var period_first = %(period_first)d * 1000;
var period_start = period_first
+ (Math.floor(new Date(this.timestamp.getTime()
- period_first) / period)
* period);
emit(period_start,
{ unit: this.counter_unit,
min : this.counter_volume,
max : this.counter_volume,
sum : this.counter_volume,
count : NumberInt(1),
duration_start : this.timestamp,
duration_end : this.timestamp,
period_start : new Date(period_start),
period_end : new Date(period_start + period) } )
"""
MAP_STATS_GROUPBY_VAR = """
var groupby_fields = %(groupby_fields)s;
var groupby = {};
var groupby_key = {};
for ( var i=0; i<groupby_fields.length; i++ ) {
groupby[groupby_fields[i]] = this[groupby_fields[i]]
groupby_key[groupby_fields[i]] = this[groupby_fields[i]]
}
"""
PARAMS_MAP_STATS = {'key_val': '\'statistics\'',
'groupby_val': 'null',
'period_start_val': 'this.timestamp',
'period_end_val': 'this.timestamp'}
MAP_STATS = bson.code.Code("function () {" +
EMIT_STATS_COMMON % PARAMS_MAP_STATS +
"}")
PARAMS_MAP_STATS_PERIOD = {
'key_val': 'period_start',
'groupby_val': 'null',
'period_start_val': 'new Date(period_start)',
'period_end_val': 'new Date(period_start + period)'
}
""")
MAP_STATS_PERIOD = bson.code.Code(
"function () {" +
MAP_STATS_PERIOD_VAR +
EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD +
"}")
PARAMS_MAP_STATS_GROUPBY = {'key_val': 'groupby_key',
'groupby_val': 'groupby',
'period_start_val': 'this.timestamp',
'period_end_val': 'this.timestamp'}
MAP_STATS_GROUPBY = bson.code.Code(
"function () {" +
MAP_STATS_GROUPBY_VAR +
EMIT_STATS_COMMON % PARAMS_MAP_STATS_GROUPBY +
"}")
PARAMS_MAP_STATS_PERIOD_GROUPBY = {
'key_val': 'groupby_key',
'groupby_val': 'groupby',
'period_start_val': 'new Date(period_start)',
'period_end_val': 'new Date(period_start + period)'
}
MAP_STATS_PERIOD_GROUPBY = bson.code.Code(
"function () {" +
MAP_STATS_PERIOD_VAR +
MAP_STATS_GROUPBY_VAR +
" groupby_key['period_start'] = period_start\n" +
EMIT_STATS_COMMON % PARAMS_MAP_STATS_PERIOD_GROUPBY +
"}")
REDUCE_STATS = bson.code.Code("""
function (key, values) {
@ -214,6 +261,7 @@ class Connection(base.Connection):
max: values[0].max,
count: values[0].count,
sum: values[0].sum,
groupby: values[0].groupby,
period_start: values[0].period_start,
period_end: values[0].period_end,
duration_start: values[0].duration_start,
@ -690,8 +738,10 @@ class Connection(base.Connection):
The filter must have a meter value set.
"""
if groupby:
raise NotImplementedError("Group by not implemented.")
if (groupby and
set(groupby) - set(['user_id', 'project_id',
'resource_id', 'source'])):
raise NotImplementedError("Unable to group by these fields")
q = make_query_from_filter(sample_filter)
@ -703,9 +753,19 @@ class Connection(base.Connection):
limit=1, sort=[('timestamp',
pymongo.ASCENDING)])[0]['timestamp']
period_start = int(calendar.timegm(period_start.utctimetuple()))
map_stats = self.MAP_STATS_PERIOD % (period, period_start)
params_period = {'period': period,
'period_first': period_start,
'groupby_fields': json.dumps(groupby)}
if groupby:
map_stats = self.MAP_STATS_PERIOD_GROUPBY % params_period
else:
map_stats = self.MAP_STATS_PERIOD % params_period
else:
map_stats = self.MAP_STATS
if groupby:
params_groupby = {'groupby_fields': json.dumps(groupby)}
map_stats = self.MAP_STATS_GROUPBY % params_groupby
else:
map_stats = self.MAP_STATS
results = self.db.meter.map_reduce(
map_stats,
@ -715,13 +775,11 @@ class Connection(base.Connection):
query=q,
)
# TODO(jd) implement groupby and remove this code
for r in results['results']:
r['value']['groupby'] = None
return sorted((models.Statistics(**(r['value']))
for r in results['results']),
key=operator.attrgetter('period_start'))
# FIXME(terriyu) Fix get_meter_statistics() so we don't use sorted()
# to return the results
return sorted(
(models.Statistics(**(r['value'])) for r in results['results']),
key=operator.attrgetter('period_start'))
@staticmethod
def _decode_matching_metadata(matching_metadata):

View File

@ -1059,12 +1059,16 @@ class StatisticsGroupByTest(DBTestBase,
f = storage.SampleFilter(
meter='instance',
)
result = self.conn.get_meter_statistics(
f, groupby=['wtf'])
# NOTE(terriyu): The MongoDB get_meter_statistics() returns a list
# whereas the SQLAlchemy get_meter_statistics() returns a generator.
# You have to apply list() to the SQLAlchemy generator to get it to
# throw an error. The MongoDB get_meter_statistics() will throw an
# error before list() is called. By using lambda, we can cover both
# MongoDB and SQLAlchemy in a single test.
self.assertRaises(
NotImplementedError,
list,
result)
lambda: list(self.conn.get_meter_statistics(f, groupby=['wtf']))
)
def test_group_by_metadata(self):
pass