diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 38f65a8e6..ec41879b7 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -24,6 +24,7 @@ import calendar import copy import json import operator +import uuid import weakref import bson.code @@ -302,6 +303,39 @@ class Connection(base.Connection): SORT_OPERATION_MAPPING = {'desc': (pymongo.DESCENDING, '$lt'), 'asc': (pymongo.ASCENDING, '$gt')} + MAP_RESOURCES = bson.code.Code(""" + function () { + emit(this.resource_id, + {user_id: this.user_id, + project_id: this.project_id, + source: this.source, + first_timestamp: this.timestamp, + last_timestamp: this.timestamp, + metadata: this.resource_metadata}) + }""") + + REDUCE_RESOURCES = bson.code.Code(""" + function (key, values) { + var merge = {user_id: values[0].user_id, + project_id: values[0].project_id, + source: values[0].source, + first_timestamp: values[0].first_timestamp, + last_timestamp: values[0].last_timestamp, + metadata: values[0].metadata} + values.forEach(function(value) { + if (merge.first_timestamp - value.first_timestamp > 0) { + merge.first_timestamp = value.first_timestamp; + merge.user_id = value.user_id; + merge.project_id = value.project_id; + merge.source = value.source; + } else if (merge.last_timestamp - value.last_timestamp <= 0) { + merge.last_timestamp = value.last_timestamp; + merge.metadata = value.metadata; + } + }); + return merge; + }""") + def __init__(self, conf): url = conf.database.connection @@ -311,7 +345,7 @@ class Connection(base.Connection): # requires a new storage connection. self.conn = self.CONNECTION_POOL.connect(url) - # Require MongoDB 2.2 to use aggregate() and TTL + # Require MongoDB 2.2 to use TTL if self.conn.server_info()['versionArray'] < [2, 2]: raise storage.StorageBadVersion("Need at least MongoDB 2.2") @@ -635,33 +669,29 @@ class Connection(base.Connection): sort_keys = base._handle_sort_key('resource') sort_instructions = self._build_sort_instructions(sort_keys)[0] - aggregate = self.db.meter.aggregate([ - {"$match": q}, - {"$sort": dict(sort_instructions)}, - {"$group": { - "_id": "$resource_id", - "user_id": {"$first": "$user_id"}, - "project_id": {"$first": "$project_id"}, - "source": {"$first": "$source"}, - "first_sample_timestamp": {"$min": "$timestamp"}, - "last_sample_timestamp": {"$max": "$timestamp"}, - "metadata": {"$first": "$resource_metadata"}, - "meters_name": {"$push": "$counter_name"}, - "meters_type": {"$push": "$counter_type"}, - "meters_unit": {"$push": "$counter_unit"}, - }}, - ]) + # use a unique collection name for the results collection, + # as result post-sorting (as oppposed to reduce pre-sorting) + # is not possible on an inline M-R + out = 'resource_list_%s' % uuid.uuid4() + self.db.meter.map_reduce(self.MAP_RESOURCES, + self.REDUCE_RESOURCES, + out=out, + sort={'resource_id': 1}, + query=q) - for result in aggregate['result']: - yield models.Resource( - resource_id=result['_id'], - user_id=result['user_id'], - project_id=result['project_id'], - first_sample_timestamp=result['first_sample_timestamp'], - last_sample_timestamp=result['last_sample_timestamp'], - source=result['source'], - metadata=result['metadata'], - ) + try: + for r in self.db[out].find(sort=sort_instructions): + resource = r['value'] + yield models.Resource( + resource_id=r['_id'], + user_id=resource['user_id'], + project_id=resource['project_id'], + first_sample_timestamp=resource['first_timestamp'], + last_sample_timestamp=resource['last_timestamp'], + source=resource['source'], + metadata=resource['metadata']) + finally: + self.db[out].drop() def get_meters(self, user=None, project=None, resource=None, source=None, metaquery={}, pagination=None): diff --git a/ceilometer/tests/api/v2/test_list_resources_scenarios.py b/ceilometer/tests/api/v2/test_list_resources_scenarios.py index a22f20995..dc089a181 100644 --- a/ceilometer/tests/api/v2/test_list_resources_scenarios.py +++ b/ceilometer/tests/api/v2/test_list_resources_scenarios.py @@ -130,9 +130,9 @@ class TestListResources(FunctionalTest, def test_instance_multiple_samples(self): timestamps = [ - datetime.datetime(2012, 7, 2, 10, 40), datetime.datetime(2012, 7, 2, 10, 41), datetime.datetime(2012, 7, 2, 10, 42), + datetime.datetime(2012, 7, 2, 10, 40), ] for timestamp in timestamps: datapoint = sample.Sample( @@ -145,7 +145,7 @@ class TestListResources(FunctionalTest, 'resource-id', timestamp=timestamp, resource_metadata={'display_name': 'test-server', - 'tag': 'self.sample', + 'tag': 'self.sample-%s' % timestamp, }, source='test', ) @@ -157,7 +157,7 @@ class TestListResources(FunctionalTest, data = self.get_json('/resources') self.assertEqual(1, len(data)) - self._verify_sample_timestamps(data[0], timestamps[0], timestamps[-1]) + self._verify_sample_timestamps(data[0], timestamps[-1], timestamps[1]) def test_instances_one(self): sample1 = sample.Sample(