Wider selection of aggregates for mongodb

Partially-addresses: BP wider-aggregate-range

Support for two new aggregation functions has been added:

  * stddev: population standard deviation (unparameterized)
  * cardinality: count of distinct values for a sample attribute
    (parameterized by attribute name)

For example in order to calculate the standard deviation of CPU util:

  GET /v2/meters/cpu_util/statistics?aggregate.func=stddev

  HTTP/1.0 200 OK
  [{"aggregate": {"stddev": 0.6858829535841072},
    "duration_start": "2014-01-30T11:13:23",
    "duration_end": "2014-01-31T16:07:13",
    "duration": 104030.0,
    "period": 0,
    "period_start": "2014-01-30T11:13:23",
    "period_end": "2014-01-31T16:07:13",
    "groupby": null,
    "unit": "%"}]

Or to calculate the number of distinct instances that existed for
each tenant contrasted with the total number of instance samples
for that tenant in each 15 min period:

  GET /v2/meters/instance/statistics?aggregate.func=cardinality&aggregate.param=resource_id
                                    &aggregate.func=count
                                    &groupby=project_id&period=900

  HTTP/1.0 200 OK
  [{"count": 19,
    "aggregate": {"count": 19.0, "cardinality/resource_id": 3.0},
    "duration": 328.478029,
    "duration_start": "2014-01-31T10:00:41.823919",
    "duration_end": "2014-01-31T10:06:10.301948",
    "period": 900,
    "period_start": "2014-01-31T10:00:00",
    "period_end": "2014-01-31T10:15:00",
    "groupby": {"project_id": "061a5c91811e4044b7dc86c6136c4f99"},
    "unit": "instance"},
   {"count": 22,
    "aggregate": {"count": 22.0, "cardinality/resource_id": 4.0},
    "duration": 808.00384,
    "duration_start": "2014-01-31T10:15:15",
    "duration_end": "2014-01-31T10:28:43.003840",
    "period": 900,
    "period_start": "2014-01-31T10:15:00",
    "period_end": "2014-01-31T10:30:00",
    "groupby": {"project_id": "061a5c91811e4044b7dc86c6136c4f99"},
    "unit": "instance"},
   {"count": 2,
    "aggregate": {"count": 2.0, "cardinality/resource_id": 2.0},
    "duration": 0.0,
    "duration_start": "2014-01-31T10:35:15",
    "duration_end": "2014-01-31T10:35:15",
    "period": 900,
    "period_start": "2014-01-31T10:30:00",
    "period_end": "2014-01-31T10:45:00",
    "groupby": {"project_id": "061a5c91811e4044b7dc86c6136c4f99"},
    "unit": "instance"}]

Test coverage is provided by the scenario test added in
next patch of this series.

Change-Id: Id3a37622f35d9a7d757c485b4d486b3f01cc6474
This commit is contained in:
Eoghan Glynn 2014-02-28 19:10:58 +00:00
parent 6863f83a3f
commit 7b58c1c9ea
2 changed files with 114 additions and 9 deletions

View File

@ -63,6 +63,11 @@ class StorageBadVersion(Exception):
"""Error raised when the storage backend version is not good enough."""
class StorageBadAggregate(Exception):
"""Error raised when an aggregate is unacceptable to storage backend."""
code = 400
def get_engine(conf):
"""Load the configured engine and return an instance."""
if conf.database_connection:

View File

@ -144,6 +144,90 @@ class Connection(pymongo_base.Connection):
),
)
UNPARAMETERIZED_AGGREGATES = dict(
emit_initial=dict(
stddev=(
''
)
),
emit_body=dict(
stddev='sdsum: this.counter_volume,'
'sdcount: 1,'
'weighted_distances: 0,'
'stddev: 0,'
),
reduce_initial=dict(
stddev=''
),
reduce_body=dict(
stddev='sdsum: values[0].sdsum,'
'sdcount: values[0].sdcount,'
'weighted_distances: values[0].weighted_distances,'
'stddev: values[0].stddev,'
),
reduce_computation=dict(
stddev=(
'var deviance = (res.sdsum / res.sdcount) - values[i].sdsum;'
'var weight = res.sdcount / ++res.sdcount;'
'res.weighted_distances += (Math.pow(deviance, 2) * weight);'
'res.sdsum += values[i].sdsum;'
)
),
finalize=dict(
stddev=(
'value.stddev = Math.sqrt(value.weighted_distances /'
' value.sdcount);'
)
),
)
PARAMETERIZED_AGGREGATES = dict(
validate=dict(
cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id',
'source']
),
emit_initial=dict(
cardinality=(
'var aggregate = {};'
'aggregate["cardinality/%(aggregate_param)s"] ='
' this["%(aggregate_param)s"];'
)
),
emit_body=dict(
cardinality='aggregate : aggregate,'
),
reduce_initial=dict(
cardinality=(
'var distincts = {};'
'distincts[values[0].aggregate['
' "cardinality/%(aggregate_param)s"]] = true;'
'var aggregate = {};'
'aggregate["cardinality/%(aggregate_param)s"] = NumberInt(1);'
)
),
reduce_body=dict(
cardinality='aggregate : aggregate,'
),
reduce_computation=dict(
cardinality=(
'if (!(values[i].aggregate["cardinality/%(aggregate_param)s"]'
' in distincts)) {'
' distincts[values[i].aggregate['
' "cardinality/%(aggregate_param)s"]] = true;'
' res.aggregate["cardinality/%(aggregate_param)s"] ='
' NumberInt(Object.keys(distincts).length);}'
)
),
finalize=dict(
cardinality=(
'if (typeof value.aggregate['
' "cardinality/%(aggregate_param)s"] !== "number") {'
' value.aggregate["cardinality/%(aggregate_param)s"] ='
' NumberInt(1);}'
)
),
)
EMIT_STATS_COMMON = """
%(aggregate_initial_placeholder)s
emit(%(key_val)s, { unit: this.counter_unit,
@ -651,8 +735,20 @@ class Connection(pymongo_base.Connection):
fragments = ''
for a in aggregate:
if a.func in fragment_map:
if a.func in self.STANDARD_AGGREGATES[fragment_key]:
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
fragments += fragment_map[a.func]
elif a.func in self.UNPARAMETERIZED_AGGREGATES[fragment_key]:
fragment_map = self.UNPARAMETERIZED_AGGREGATES[fragment_key]
fragments += fragment_map[a.func]
elif a.func in self.PARAMETERIZED_AGGREGATES[fragment_key]:
fragment_map = self.PARAMETERIZED_AGGREGATES[fragment_key]
v = self.PARAMETERIZED_AGGREGATES['validate'].get(a.func)
if not (v and v(a.param)):
raise storage.StorageBadAggregate('Bad aggregate: %s.%s'
% (a.func, a.param))
params = dict(aggregate_param=a.param)
fragments += (fragment_map[a.func] % params)
else:
raise NotImplementedError(_('Selectable aggregate function %s'
' is not supported') % a.func)
@ -732,16 +828,18 @@ class Connection(pymongo_base.Connection):
@staticmethod
def _stats_result_aggregates(result, aggregate):
stats_args = {}
if isinstance(result.get('count'), (int, long)):
stats_args['count'] = int(result['count'])
for attr in ['min', 'max', 'sum', 'avg']:
for attr in ['count', 'min', 'max', 'sum', 'avg']:
if attr in result:
stats_args[attr] = result[attr]
if aggregate:
stats_args['aggregate'] = dict(
('%s%s' % (a.func, '/%s' % a.param if a.param else ''),
result[a.func]) for a in aggregate
)
stats_args['aggregate'] = {}
for a in aggregate:
ak = '%s%s' % (a.func, '/%s' % a.param if a.param else '')
if ak in result:
stats_args['aggregate'][ak] = result[ak]
elif 'aggregate' in result:
stats_args['aggregate'][ak] = result['aggregate'].get(ak)
return stats_args
@staticmethod
@ -840,7 +938,9 @@ class Connection(pymongo_base.Connection):
'min': True,
'sum': True,
'avg': True,
'count': True}}
'count': True,
'stddev': True,
'cardinality': True}}
},
'alarms': {'query': {'simple': True,
'complex': True},