diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 728cc1780..8465893ea 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -63,6 +63,11 @@ class StorageBadVersion(Exception): """Error raised when the storage backend version is not good enough.""" +class StorageBadAggregate(Exception): + """Error raised when an aggregate is unacceptable to storage backend.""" + code = 400 + + def get_engine(conf): """Load the configured engine and return an instance.""" if conf.database_connection: diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 365bc367e..0c6958a21 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -144,6 +144,90 @@ class Connection(pymongo_base.Connection): ), ) + UNPARAMETERIZED_AGGREGATES = dict( + emit_initial=dict( + stddev=( + '' + ) + ), + emit_body=dict( + stddev='sdsum: this.counter_volume,' + 'sdcount: 1,' + 'weighted_distances: 0,' + 'stddev: 0,' + ), + reduce_initial=dict( + stddev='' + ), + reduce_body=dict( + stddev='sdsum: values[0].sdsum,' + 'sdcount: values[0].sdcount,' + 'weighted_distances: values[0].weighted_distances,' + 'stddev: values[0].stddev,' + ), + reduce_computation=dict( + stddev=( + 'var deviance = (res.sdsum / res.sdcount) - values[i].sdsum;' + 'var weight = res.sdcount / ++res.sdcount;' + 'res.weighted_distances += (Math.pow(deviance, 2) * weight);' + 'res.sdsum += values[i].sdsum;' + ) + ), + finalize=dict( + stddev=( + 'value.stddev = Math.sqrt(value.weighted_distances /' + ' value.sdcount);' + ) + ), + ) + + PARAMETERIZED_AGGREGATES = dict( + validate=dict( + cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id', + 'source'] + ), + emit_initial=dict( + cardinality=( + 'var aggregate = {};' + 'aggregate["cardinality/%(aggregate_param)s"] =' + ' this["%(aggregate_param)s"];' + ) + ), + emit_body=dict( + cardinality='aggregate : aggregate,' + ), + reduce_initial=dict( + cardinality=( + 'var distincts = {};' + 'distincts[values[0].aggregate[' + ' "cardinality/%(aggregate_param)s"]] = true;' + 'var aggregate = {};' + 'aggregate["cardinality/%(aggregate_param)s"] = NumberInt(1);' + ) + ), + reduce_body=dict( + cardinality='aggregate : aggregate,' + ), + reduce_computation=dict( + cardinality=( + 'if (!(values[i].aggregate["cardinality/%(aggregate_param)s"]' + ' in distincts)) {' + ' distincts[values[i].aggregate[' + ' "cardinality/%(aggregate_param)s"]] = true;' + ' res.aggregate["cardinality/%(aggregate_param)s"] =' + ' NumberInt(Object.keys(distincts).length);}' + ) + ), + finalize=dict( + cardinality=( + 'if (typeof value.aggregate[' + ' "cardinality/%(aggregate_param)s"] !== "number") {' + ' value.aggregate["cardinality/%(aggregate_param)s"] =' + ' NumberInt(1);}' + ) + ), + ) + EMIT_STATS_COMMON = """ %(aggregate_initial_placeholder)s emit(%(key_val)s, { unit: this.counter_unit, @@ -651,8 +735,20 @@ class Connection(pymongo_base.Connection): fragments = '' for a in aggregate: - if a.func in fragment_map: + if a.func in self.STANDARD_AGGREGATES[fragment_key]: + fragment_map = self.STANDARD_AGGREGATES[fragment_key] fragments += fragment_map[a.func] + elif a.func in self.UNPARAMETERIZED_AGGREGATES[fragment_key]: + fragment_map = self.UNPARAMETERIZED_AGGREGATES[fragment_key] + fragments += fragment_map[a.func] + elif a.func in self.PARAMETERIZED_AGGREGATES[fragment_key]: + fragment_map = self.PARAMETERIZED_AGGREGATES[fragment_key] + v = self.PARAMETERIZED_AGGREGATES['validate'].get(a.func) + if not (v and v(a.param)): + raise storage.StorageBadAggregate('Bad aggregate: %s.%s' + % (a.func, a.param)) + params = dict(aggregate_param=a.param) + fragments += (fragment_map[a.func] % params) else: raise NotImplementedError(_('Selectable aggregate function %s' ' is not supported') % a.func) @@ -732,16 +828,18 @@ class Connection(pymongo_base.Connection): @staticmethod def _stats_result_aggregates(result, aggregate): stats_args = {} - if isinstance(result.get('count'), (int, long)): - stats_args['count'] = int(result['count']) - for attr in ['min', 'max', 'sum', 'avg']: + for attr in ['count', 'min', 'max', 'sum', 'avg']: if attr in result: stats_args[attr] = result[attr] + if aggregate: - stats_args['aggregate'] = dict( - ('%s%s' % (a.func, '/%s' % a.param if a.param else ''), - result[a.func]) for a in aggregate - ) + stats_args['aggregate'] = {} + for a in aggregate: + ak = '%s%s' % (a.func, '/%s' % a.param if a.param else '') + if ak in result: + stats_args['aggregate'][ak] = result[ak] + elif 'aggregate' in result: + stats_args['aggregate'][ak] = result['aggregate'].get(ak) return stats_args @staticmethod @@ -840,7 +938,9 @@ class Connection(pymongo_base.Connection): 'min': True, 'sum': True, 'avg': True, - 'count': True}} + 'count': True, + 'stddev': True, + 'cardinality': True}} }, 'alarms': {'query': {'simple': True, 'complex': True},