From df126b7652d0ebe50cb52e4d42f4c2340bfb33a6 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 2 Jul 2013 15:18:59 +0200 Subject: [PATCH] Use MongoDB aggregate to get resources list This avoids querying two different collections to get the data, doing aggregation on one is enough. Change-Id: Ia21693051daefd1ebd4d0c7d07b76d4712b0542c --- ceilometer/storage/__init__.py | 4 ++ ceilometer/storage/impl_mongodb.py | 105 ++++++++++++++++++++--------- ceilometer/tests/db.py | 5 +- tests/api/v2/test_app.py | 2 + tests/storage/test_impl_mongodb.py | 5 -- 5 files changed, 85 insertions(+), 36 deletions(-) diff --git a/ceilometer/storage/__init__.py b/ceilometer/storage/__init__.py index 9322f6aa3..dc88902f0 100644 --- a/ceilometer/storage/__init__.py +++ b/ceilometer/storage/__init__.py @@ -58,6 +58,10 @@ cfg.CONF.import_opt('connection', group='database') +class StorageBadVersion(Exception): + """Error raised when the storage backend version is not good enough.""" + + def get_engine(conf): """Load the configured engine and return an instance.""" if conf.database_connection: diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 7f4e085f2..c9b2c033c 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -252,6 +252,10 @@ class Connection(base.Connection): # requires a new storage connection. self.conn = self.CONNECTION_POOL.connect(url) + # Require MongoDB 2.2 to use aggregate() and TTL + if self.conn.server_info()['versionArray'] < [2, 2]: + raise storage.StorageBadVersion("Need at least MongoDB 2.2") + connection_options = pymongo.uri_parser.parse_uri(url) self.db = getattr(self.conn, connection_options['database']) @@ -428,25 +432,19 @@ class Connection(base.Connection): return dict(criteria_equ, ** criteria_cmp) @classmethod - def paginate_query(cls, q, db_collection, limit=None, marker=None, - sort_keys=[], sort_dir='desc'): - """Returns a query result with sorting / pagination. + def _build_paginate_query(cls, marker, sort_keys=[], sort_dir='desc'): + """Returns a query with sorting / pagination. Pagination works by requiring sort_key and sort_dir. We use the last item in previous page as the 'marker' for pagination. So we return values that follow the passed marker in the order. - :param q: the query dict passed in. - :param db_collection: Database collection that be query. - :param limit: maximum number of items to return. + :param q: The query dict passed in. :param marker: the last item of the previous page; we return the next results after this item. :param sort_keys: array of attributes by which results be sorted. :param sort_dir: direction in which results be sorted (asc, desc). - return: The query with sorting/pagination added. + :return: sort parameters, query to use """ - - if db_collection is None: - return all_sort = [] sort_mapping = {'desc': (pymongo.DESCENDING, '$lt'), 'asc': (pymongo.ASCENDING, '$gt') @@ -467,7 +465,33 @@ class Connection(base.Connection): marker, _sort_flag)) metaquery = {"$or": sort_criteria_list} - q.update(metaquery) + else: + metaquery = {} + + return all_sort, metaquery + + @classmethod + def paginate_query(cls, q, db_collection, limit=None, marker=None, + sort_keys=[], sort_dir='desc'): + """Returns a query result with sorting / pagination. + + Pagination works by requiring sort_key and sort_dir. + We use the last item in previous page as the 'marker' for pagination. + So we return values that follow the passed marker in the order. + :param q: the query dict passed in. + :param db_collection: Database collection that be query. + :param limit: maximum number of items to return. + :param marker: the last item of the previous page; we return the next + results after this item. + :param sort_keys: array of attributes by which results be sorted. + :param sort_dir: direction in which results be sorted (asc, desc). + return: The query with sorting/pagination added. + """ + + all_sort, query = cls._build_paginate_query(marker, + sort_keys, + sort_dir) + q.update(query) #NOTE(Fengqian):MongoDB collection.find can not handle limit #when it equals None, it will raise TypeError, so we treate @@ -524,6 +548,11 @@ class Connection(base.Connection): :param sort_key: Attribute by which results be sorted. :param sort_dir: Direction with which results be sorted(asc, desc). """ + + if marker_pairs is not None: + raise NotImplementedError( + "Cannot use marker pairs in resource listing") + q = {} if user is not None: q['user_id'] = user @@ -551,31 +580,47 @@ class Connection(base.Connection): if ts_range: q['timestamp'] = ts_range - marker = self._get_marker(self.db.resource, marker_pairs=marker_pairs) sort_keys = base._handle_sort_key('resource', sort_key) + sort_instruction, query = self._build_paginate_query(None, + sort_keys, + sort_dir) + q.update(query) - # FIXME(jd): We should use self.db.meter.group() and not use the - # resource collection, but that's not supported by MIM, so it's not - # easily testable yet. Since it was bugged before anyway, it's still - # better for now. - resource_ids = self.db.meter.find(q).distinct('resource_id') - q = {'_id': {'$in': resource_ids}} - for resource in self.paginate_query(q, self.db.resource, limit=limit, - marker=marker, sort_keys=sort_keys, - sort_dir=sort_dir): + aggregate = self.db.meter.aggregate([ + {"$match": q}, + {"$sort": dict(sort_instruction)}, + {"$group": { + "_id": "$resource_id", + "user_id": {"$first": "$user_id"}, + "project_id": {"$first": "$project_id"}, + "source": {"$first": "$source"}, + "metadata": {"$first": "$resource_metadata"}, + "meters_name": {"$push": "$counter_name"}, + "meters_type": {"$push": "$counter_type"}, + "meters_unit": {"$push": "$counter_unit"}, + }}, + ]) + + for result in aggregate['result']: + if limit is not None: + if limit == 0: + break + limit -= 1 yield models.Resource( - resource_id=resource['_id'], - project_id=resource['project_id'], - source=resource['source'], - user_id=resource['user_id'], - metadata=resource['metadata'], + resource_id=result['_id'], + user_id=result['user_id'], + project_id=result['project_id'], + source=result['source'], + metadata=result['metadata'], meter=[ models.ResourceMeter( - counter_name=meter['counter_name'], - counter_type=meter['counter_type'], - counter_unit=meter.get('counter_unit', ''), + counter_name=m_n, + counter_type=m_t, + counter_unit=m_u, ) - for meter in resource['meter'] + for m_n, m_u, m_t in zip(result['meters_name'], + result['meters_unit'], + result['meters_type']) ], ) diff --git a/ceilometer/tests/db.py b/ceilometer/tests/db.py index c999c25f9..df117e008 100644 --- a/ceilometer/tests/db.py +++ b/ceilometer/tests/db.py @@ -33,7 +33,10 @@ class TestBase(test_base.TestCase): super(TestBase, self).setUp() cfg.CONF.set_override('connection', str(self.database_connection), group='database') - self.conn = storage.get_connection(cfg.CONF) + try: + self.conn = storage.get_connection(cfg.CONF) + except storage.StorageBadVersion as e: + self.skipTest(str(e)) self.conn.upgrade() def tearDown(self): diff --git a/tests/api/v2/test_app.py b/tests/api/v2/test_app.py index 738022a82..2384b0955 100644 --- a/tests/api/v2/test_app.py +++ b/tests/api/v2/test_app.py @@ -41,6 +41,7 @@ class TestApp(base.TestCase): cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME) cfg.CONF.set_override("pipeline_cfg_file", self.path_get("etc/ceilometer/pipeline.yaml")) + cfg.CONF.set_override('connection', "log://", group="database") api_app = app.setup_app() self.assertEqual(api_app.auth_protocol, 'foottp') @@ -55,6 +56,7 @@ class TestApp(base.TestCase): f.write("auth_version = v2.0\n") service.prepare_service(['ceilometer-api', '--config-file=%s' % tmpfile]) + cfg.CONF.set_override('connection', "log://", group="database") api_app = app.setup_app() self.assertEqual(api_app.auth_protocol, 'barttp') os.unlink(tmpfile) diff --git a/tests/storage/test_impl_mongodb.py b/tests/storage/test_impl_mongodb.py index 54385c02a..d338bc4eb 100644 --- a/tests/storage/test_impl_mongodb.py +++ b/tests/storage/test_impl_mongodb.py @@ -164,11 +164,6 @@ class ResourceTest(base.ResourceTest, MongoDBEngineTestBase): pass -class ResourceTestPagination(base.ResourceTestPagination, - MongoDBEngineTestBase): - pass - - class MeterTest(base.MeterTest, MongoDBEngineTestBase): pass