Merge "Wider selection of aggregates for mongodb"
This commit is contained in:
commit
cea7057db4
@ -63,6 +63,11 @@ class StorageBadVersion(Exception):
|
||||
"""Error raised when the storage backend version is not good enough."""
|
||||
|
||||
|
||||
class StorageBadAggregate(Exception):
|
||||
"""Error raised when an aggregate is unacceptable to storage backend."""
|
||||
code = 400
|
||||
|
||||
|
||||
def get_engine(conf):
|
||||
"""Load the configured engine and return an instance."""
|
||||
if conf.database_connection:
|
||||
|
@ -144,6 +144,90 @@ class Connection(pymongo_base.Connection):
|
||||
),
|
||||
)
|
||||
|
||||
UNPARAMETERIZED_AGGREGATES = dict(
|
||||
emit_initial=dict(
|
||||
stddev=(
|
||||
''
|
||||
)
|
||||
),
|
||||
emit_body=dict(
|
||||
stddev='sdsum: this.counter_volume,'
|
||||
'sdcount: 1,'
|
||||
'weighted_distances: 0,'
|
||||
'stddev: 0,'
|
||||
),
|
||||
reduce_initial=dict(
|
||||
stddev=''
|
||||
),
|
||||
reduce_body=dict(
|
||||
stddev='sdsum: values[0].sdsum,'
|
||||
'sdcount: values[0].sdcount,'
|
||||
'weighted_distances: values[0].weighted_distances,'
|
||||
'stddev: values[0].stddev,'
|
||||
),
|
||||
reduce_computation=dict(
|
||||
stddev=(
|
||||
'var deviance = (res.sdsum / res.sdcount) - values[i].sdsum;'
|
||||
'var weight = res.sdcount / ++res.sdcount;'
|
||||
'res.weighted_distances += (Math.pow(deviance, 2) * weight);'
|
||||
'res.sdsum += values[i].sdsum;'
|
||||
)
|
||||
),
|
||||
finalize=dict(
|
||||
stddev=(
|
||||
'value.stddev = Math.sqrt(value.weighted_distances /'
|
||||
' value.sdcount);'
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
PARAMETERIZED_AGGREGATES = dict(
|
||||
validate=dict(
|
||||
cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id',
|
||||
'source']
|
||||
),
|
||||
emit_initial=dict(
|
||||
cardinality=(
|
||||
'var aggregate = {};'
|
||||
'aggregate["cardinality/%(aggregate_param)s"] ='
|
||||
' this["%(aggregate_param)s"];'
|
||||
)
|
||||
),
|
||||
emit_body=dict(
|
||||
cardinality='aggregate : aggregate,'
|
||||
),
|
||||
reduce_initial=dict(
|
||||
cardinality=(
|
||||
'var distincts = {};'
|
||||
'distincts[values[0].aggregate['
|
||||
' "cardinality/%(aggregate_param)s"]] = true;'
|
||||
'var aggregate = {};'
|
||||
'aggregate["cardinality/%(aggregate_param)s"] = NumberInt(1);'
|
||||
)
|
||||
),
|
||||
reduce_body=dict(
|
||||
cardinality='aggregate : aggregate,'
|
||||
),
|
||||
reduce_computation=dict(
|
||||
cardinality=(
|
||||
'if (!(values[i].aggregate["cardinality/%(aggregate_param)s"]'
|
||||
' in distincts)) {'
|
||||
' distincts[values[i].aggregate['
|
||||
' "cardinality/%(aggregate_param)s"]] = true;'
|
||||
' res.aggregate["cardinality/%(aggregate_param)s"] ='
|
||||
' NumberInt(Object.keys(distincts).length);}'
|
||||
)
|
||||
),
|
||||
finalize=dict(
|
||||
cardinality=(
|
||||
'if (typeof value.aggregate['
|
||||
' "cardinality/%(aggregate_param)s"] !== "number") {'
|
||||
' value.aggregate["cardinality/%(aggregate_param)s"] ='
|
||||
' NumberInt(1);}'
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
EMIT_STATS_COMMON = """
|
||||
%(aggregate_initial_placeholder)s
|
||||
emit(%(key_val)s, { unit: this.counter_unit,
|
||||
@ -651,8 +735,20 @@ class Connection(pymongo_base.Connection):
|
||||
fragments = ''
|
||||
|
||||
for a in aggregate:
|
||||
if a.func in fragment_map:
|
||||
if a.func in self.STANDARD_AGGREGATES[fragment_key]:
|
||||
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
|
||||
fragments += fragment_map[a.func]
|
||||
elif a.func in self.UNPARAMETERIZED_AGGREGATES[fragment_key]:
|
||||
fragment_map = self.UNPARAMETERIZED_AGGREGATES[fragment_key]
|
||||
fragments += fragment_map[a.func]
|
||||
elif a.func in self.PARAMETERIZED_AGGREGATES[fragment_key]:
|
||||
fragment_map = self.PARAMETERIZED_AGGREGATES[fragment_key]
|
||||
v = self.PARAMETERIZED_AGGREGATES['validate'].get(a.func)
|
||||
if not (v and v(a.param)):
|
||||
raise storage.StorageBadAggregate('Bad aggregate: %s.%s'
|
||||
% (a.func, a.param))
|
||||
params = dict(aggregate_param=a.param)
|
||||
fragments += (fragment_map[a.func] % params)
|
||||
else:
|
||||
raise NotImplementedError(_('Selectable aggregate function %s'
|
||||
' is not supported') % a.func)
|
||||
@ -732,16 +828,18 @@ class Connection(pymongo_base.Connection):
|
||||
@staticmethod
|
||||
def _stats_result_aggregates(result, aggregate):
|
||||
stats_args = {}
|
||||
if isinstance(result.get('count'), (int, long)):
|
||||
stats_args['count'] = int(result['count'])
|
||||
for attr in ['min', 'max', 'sum', 'avg']:
|
||||
for attr in ['count', 'min', 'max', 'sum', 'avg']:
|
||||
if attr in result:
|
||||
stats_args[attr] = result[attr]
|
||||
|
||||
if aggregate:
|
||||
stats_args['aggregate'] = dict(
|
||||
('%s%s' % (a.func, '/%s' % a.param if a.param else ''),
|
||||
result[a.func]) for a in aggregate
|
||||
)
|
||||
stats_args['aggregate'] = {}
|
||||
for a in aggregate:
|
||||
ak = '%s%s' % (a.func, '/%s' % a.param if a.param else '')
|
||||
if ak in result:
|
||||
stats_args['aggregate'][ak] = result[ak]
|
||||
elif 'aggregate' in result:
|
||||
stats_args['aggregate'][ak] = result['aggregate'].get(ak)
|
||||
return stats_args
|
||||
|
||||
@staticmethod
|
||||
@ -840,7 +938,9 @@ class Connection(pymongo_base.Connection):
|
||||
'min': True,
|
||||
'sum': True,
|
||||
'avg': True,
|
||||
'count': True}}
|
||||
'count': True,
|
||||
'stddev': True,
|
||||
'cardinality': True}}
|
||||
},
|
||||
'alarms': {'query': {'simple': True,
|
||||
'complex': True},
|
||||
|
Loading…
x
Reference in New Issue
Block a user