From ce85dbc43ed31df13652968e84ae836de2e415c5 Mon Sep 17 00:00:00 2001 From: "Fengqian.Gao" Date: Wed, 3 Jul 2013 15:31:00 +0800 Subject: [PATCH] Add pagination support for MongoDB Paginate db query result of MongoDB. The query methods support pagination include get_resources, get_meters, get_alarms. It is a part of blueprint paginate-db-search. Implements blueprint paginate-db-search Change-Id: Id9162782a6c96b4bb0ed04d828791b8b1b1379d4 --- ceilometer/storage/base.py | 34 ++++++ ceilometer/storage/impl_mongodb.py | 131 +++++++++++++++++++++-- tests/storage/base.py | 165 ++++++++++++++++++++++++++++- tests/storage/test_base.py | 10 ++ tests/storage/test_impl_mongodb.py | 79 ++++++++++++++ 5 files changed, 408 insertions(+), 11 deletions(-) diff --git a/ceilometer/storage/base.py b/ceilometer/storage/base.py index 67f7aecf2..6880593e3 100644 --- a/ceilometer/storage/base.py +++ b/ceilometer/storage/base.py @@ -45,6 +45,40 @@ def iter_period(start, end, period): period_start = next_start +def _handle_sort_key(model_name, sort_key=None): + """Generate sort keys according to the passed in sort key from user. + + :param model_name: Database model name be query.(alarm, meter, etc.) + :param sort_key: sort key passed from user. + return: sort keys list + """ + sort_keys_extra = {'alarm': ['name', 'user_id', 'project_id'], + 'meter': ['user_id', 'project_id'], + 'resource': ['user_id', 'project_id'], + } + + sort_keys = sort_keys_extra[model_name] + if not sort_key: + return sort_keys + # NOTE(Fengqian): We need to put the sort key from user + #in the first place of sort keys list. + try: + sort_keys.remove(sort_key) + except ValueError: + pass + finally: + sort_keys.insert(0, sort_key) + return sort_keys + + +class MultipleResultsFound(Exception): + pass + + +class NoResultFound(Exception): + pass + + class StorageEngine(object): """Base class for storage engines.""" diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 1111ef2d8..7f4e085f2 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -395,6 +395,87 @@ class Connection(base.Connection): self.db.project.remove({'_id': {'$nin': results['projects']}}) self.db.resource.remove({'_id': {'$nin': results['resources']}}) + @staticmethod + def _get_marker(db_collection, marker_pairs): + """Return the mark document according to the attribute-value pairs. + + :param db_collection: Database collection that be query. + :param maker_pairs: Attribute-value pairs filter. + """ + if db_collection is None: + return + if not marker_pairs: + return + ret = db_collection.find(marker_pairs, limit=2) + + if ret.count() == 0: + raise base.NoResultFound + elif ret.count() > 1: + raise base.MultipleResultsFound + else: + _ret = ret.__getitem__(0) + return _ret + + @classmethod + def _recurse_sort_keys(cls, sort_keys, marker, flag): + _first = sort_keys[0] + value = marker[_first] + if len(sort_keys) == 1: + return {_first: {flag: value}} + else: + criteria_equ = {_first: {'eq': value}} + criteria_cmp = cls._recurse_sort_keys(sort_keys[1:], marker, flag) + return dict(criteria_equ, ** criteria_cmp) + + @classmethod + def paginate_query(cls, q, db_collection, limit=None, marker=None, + sort_keys=[], sort_dir='desc'): + """Returns a query result with sorting / pagination. + + Pagination works by requiring sort_key and sort_dir. + We use the last item in previous page as the 'marker' for pagination. + So we return values that follow the passed marker in the order. + :param q: the query dict passed in. + :param db_collection: Database collection that be query. + :param limit: maximum number of items to return. + :param marker: the last item of the previous page; we return the next + results after this item. + :param sort_keys: array of attributes by which results be sorted. + :param sort_dir: direction in which results be sorted (asc, desc). + return: The query with sorting/pagination added. + """ + + if db_collection is None: + return + all_sort = [] + sort_mapping = {'desc': (pymongo.DESCENDING, '$lt'), + 'asc': (pymongo.ASCENDING, '$gt') + } + _sort_dir, _sort_flag = sort_mapping.get(sort_dir, + sort_mapping['desc']) + + for _sort_key in sort_keys: + _all_sort = (_sort_key, _sort_dir) + all_sort.append(_all_sort) + + if marker is not None: + sort_criteria_list = [] + + for i in range(0, len(sort_keys)): + sort_criteria_list.append(cls._recurse_sort_keys( + sort_keys[:(len(sort_keys) - i)], + marker, _sort_flag)) + + metaquery = {"$or": sort_criteria_list} + q.update(metaquery) + + #NOTE(Fengqian):MongoDB collection.find can not handle limit + #when it equals None, it will raise TypeError, so we treate + #None as 0 for the value of limit. + if limit is None: + limit = 0 + return db_collection.find(q, limit=limit, sort=all_sort) + def get_users(self, source=None): """Return an iterable of user id strings. @@ -424,7 +505,8 @@ class Connection(base.Connection): def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, - metaquery={}, resource=None): + metaquery={}, resource=None, limit=None, + marker_pairs=None, sort_key=None, sort_dir=None): """Return an iterable of models.Resource instances :param user: Optional ID for user that owns the resource. @@ -436,6 +518,11 @@ class Connection(base.Connection): :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. :param resource: Optional resource filter. + :param limit: Number of documents should be returned. + :param marker_pairs: Attribute-value pairs to identify the last item of + the previous page. + :param sort_key: Attribute by which results be sorted. + :param sort_dir: Direction with which results be sorted(asc, desc). """ q = {} if user is not None: @@ -464,13 +551,18 @@ class Connection(base.Connection): if ts_range: q['timestamp'] = ts_range + marker = self._get_marker(self.db.resource, marker_pairs=marker_pairs) + sort_keys = base._handle_sort_key('resource', sort_key) + # FIXME(jd): We should use self.db.meter.group() and not use the # resource collection, but that's not supported by MIM, so it's not # easily testable yet. Since it was bugged before anyway, it's still # better for now. resource_ids = self.db.meter.find(q).distinct('resource_id') q = {'_id': {'$in': resource_ids}} - for resource in self.db.resource.find(q): + for resource in self.paginate_query(q, self.db.resource, limit=limit, + marker=marker, sort_keys=sort_keys, + sort_dir=sort_dir): yield models.Resource( resource_id=resource['_id'], project_id=resource['project_id'], @@ -488,7 +580,8 @@ class Connection(base.Connection): ) def get_meters(self, user=None, project=None, resource=None, source=None, - metaquery={}): + metaquery={}, limit=None, marker_pairs=None, sort_key=None, + sort_dir=None): """Return an iterable of models.Meter instances :param user: Optional ID for user that owns the resource. @@ -496,6 +589,11 @@ class Connection(base.Connection): :param resource: Optional resource filter. :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. + :param limit: Number of documents should be returned. + :param marker_pairs: Attribute-value pairs to identify the last item of + the previous page. + :param sort_key: Attribute by which results be sorted. + :param sort_dir: Direction with which results be sorted(asc, desc). """ q = {} if user is not None: @@ -508,7 +606,12 @@ class Connection(base.Connection): q['source'] = source q.update(metaquery) - for r in self.db.resource.find(q): + marker = self._get_marker(self.db.resource, marker_pairs=marker_pairs) + sort_keys = base._handle_sort_key('meter', sort_key) + + for r in self.paginate_query(q, self.db.resource, limit=limit, + marker=marker, + sort_keys=sort_keys, sort_dir=sort_dir): for r_meter in r['meter']: yield models.Meter( name=r_meter['counter_name'], @@ -601,8 +704,19 @@ class Connection(base.Connection): return matching_metadata def get_alarms(self, name=None, user=None, - project=None, enabled=True, alarm_id=None): + project=None, enabled=True, alarm_id=None, limit=None, + marker_pairs=None, sort_key=None, sort_dir=None): """Yields a lists of alarms that match filters + :param name: The Alarm name. + :param user: Optional ID for user that owns the resource. + :param project: Optional ID for project that owns the resource. + :param enabled: Optional boolean to list disable alarm. + :param alarm_id: Optional alarm_id to return one alarm. + :param limit: Number of rows should be returned. + :param marker_pairs: Attribute-value pairs to identify the last item of + the previous page. + :param sort_key: Attribute by which results be sorted. + :param sort_dir: Direction with which results be sorted(asc, desc). """ q = {} if user is not None: @@ -616,7 +730,12 @@ class Connection(base.Connection): if alarm_id is not None: q['alarm_id'] = alarm_id - for alarm in self.db.alarm.find(q): + marker = self._get_marker(self.db.alarm, marker_pairs=marker_pairs) + sort_keys = base._handle_sort_key('alarm', sort_key) + + for alarm in self.paginate_query(q, self.db.alarm, limit=limit, + marker=marker, sort_keys=sort_keys, + sort_dir=sort_dir): a = {} a.update(alarm) del a['_id'] diff --git a/tests/storage/base.py b/tests/storage/base.py index 56718a731..d6d714cd4 100644 --- a/tests/storage/base.py +++ b/tests/storage/base.py @@ -312,6 +312,49 @@ class ResourceTest(DBTestBase): self.assertEqual(len(resources), 9) +class ResourceTestPagination(DBTestBase): + + def test_get_resource_all_limit(self): + results = list(self.conn.get_resources(limit=8)) + self.assertEqual(len(results), 8) + + results = list(self.conn.get_resources(limit=5)) + self.assertEqual(len(results), 5) + + def test_get_resources_all_marker(self): + marker_pairs = {'user_id': 'user-id-4', + 'project_id': 'project-id-4'} + results = list(self.conn.get_resources(marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='asc')) + self.assertEqual(len(results), 5) + + def test_get_resources_paginate(self): + marker_pairs = {'user_id': 'user-id-4'} + results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='asc') + self.assertEquals(['user-id-5', 'user-id-6', 'user-id-7'], + [i.user_id for i in results]) + + marker_pairs = {'user_id': 'user-id-4'} + results = list(self.conn.get_resources(limit=2, + marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='desc')) + self.assertEquals(['user-id-3', 'user-id-2'], + [i.user_id for i in results]) + + marker_pairs = {'project_id': 'project-id-5'} + results = list(self.conn.get_resources(limit=3, + marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='asc')) + self.assertEquals(['resource-id-6', 'resource-id-7', + 'resource-id-8'], + [i.resource_id for i in results]) + + class MeterTest(DBTestBase): def test_get_meters(self): @@ -345,6 +388,50 @@ class MeterTest(DBTestBase): self.assertEqual(len(results), 9) +class MeterTestPagination(DBTestBase): + + def tet_get_meters_all_limit(self): + results = list(self.conn.get_meters(limit=8)) + self.assertEqual(len(results), 8) + + results = list(self.conn.get_meters(limit=5)) + self.assertEqual(len(results), 5) + + def test_get_meters_all_marker(self): + marker_pairs = {'user_id': 'user-id-alternate'} + results = list(self.conn.get_meters(marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='desc')) + self.assertEqual(len(results), 8) + + def test_get_meters_paginate(self): + marker_pairs = {'user_id': 'user-id-alternate'} + results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs, + sort_key='user_id', sort_dir='desc') + self.assertEquals(['user-id-8', 'user-id-7', 'user-id-6'], + [i.user_id for i in results]) + + marker_pairs = {'user_id': 'user-id-4'} + results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='asc') + self.assertEquals(['user-id-5', 'user-id-6', 'user-id-7'], + [i.user_id for i in results]) + + marker_pairs = {'user_id': 'user-id-4'} + results = list(self.conn.get_resources(limit=2, + marker_pairs=marker_pairs, + sort_key='user_id', + sort_dir='desc')) + self.assertEquals(['user-id-3', 'user-id-2'], + [i.user_id for i in results]) + + marker_pairs = {'user_id': 'user-id'} + results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs, + sort_key='user_id', sort_dir='desc') + self.assertEquals([], [i.user_id for i in results]) + + class RawSampleTest(DBTestBase): def test_get_samples_limit_zero(self): @@ -830,11 +917,7 @@ class CounterDataTypeTest(DBTestBase): self.assertEqual(results[0].counter_volume, 1938495037.53697) -class AlarmTest(DBTestBase): - - def test_empty(self): - alarms = list(self.conn.get_alarms()) - self.assertEquals([], alarms) +class AlarmTestBase(DBTestBase): def add_some_alarms(self): alarms = [models.Alarm('red-alert', @@ -860,6 +943,13 @@ class AlarmTest(DBTestBase): for a in alarms: self.conn.update_alarm(a) + +class AlarmTest(AlarmTestBase): + + def test_empty(self): + alarms = list(self.conn.get_alarms()) + self.assertEquals([], alarms) + def test_add(self): self.add_some_alarms() alarms = list(self.conn.get_alarms()) @@ -918,6 +1008,71 @@ class AlarmTest(DBTestBase): self.assertNotEquals(victim.name, s.name) +class AlarmTestPagination(AlarmTestBase): + + def test_get_alarm_all_limit(self): + self.add_some_alarms() + alarms = list(self.conn.get_alarms(limit=2)) + self.assertEqual(len(alarms), 2) + + alarms = list(self.conn.get_alarms(limit=1)) + self.assertEqual(len(alarms), 1) + + def test_get_alarm_all_marker(self): + self.add_some_alarms() + + marker_pairs = {'name': 'orange-alert'} + alarms = list(self.conn.get_alarms(marker_pairs=marker_pairs, + sort_key='name', + sort_dir='desc')) + self.assertEqual(len(alarms), 0) + + marker_pairs = {'name': 'red-alert'} + alarms = list(self.conn.get_alarms(marker_pairs=marker_pairs, + sort_key='name', + sort_dir='desc')) + self.assertEqual(len(alarms), 1) + + marker_pairs = {'name': 'yellow-alert'} + alarms = list(self.conn.get_alarms(marker_pairs=marker_pairs, + sort_key='name', + sort_dir='desc')) + self.assertEqual(len(alarms), 2) + + def test_get_alarm_sort_marker(self): + self.add_some_alarms() + + marker_pairs = {'name': 'orange-alert'} + alarms = list(self.conn.get_alarms(sort_key='counter_name', + sort_dir='desc', + marker_pairs=marker_pairs)) + self.assertEqual(len(alarms), 1) + + marker_pairs = {'name': 'yellow-alert'} + alarms = list(self.conn.get_alarms(sort_key='comparison_operator', + sort_dir='desc', + marker_pairs=marker_pairs)) + self.assertEqual(len(alarms), 2) + + def test_get_alarm_paginate(self): + + self.add_some_alarms() + + marker_pairs = {'name': 'yellow-alert'} + page = list(self.conn.get_alarms(limit=4, + marker_pairs=marker_pairs, + sort_key='name', sort_dir='desc')) + self.assertEquals(['red-alert', 'orange-alert'], + [i.name for i in page]) + + marker_pairs = {'name': 'orange-alert'} + page1 = list(self.conn.get_alarms(limit=2, + sort_key='comparison_operator', + sort_dir='desc', + marker_pairs=marker_pairs)) + self.assertEquals(['red-alert'], [i.name for i in page1]) + + class EventTestBase(test_db.TestBase): """Separate test base class because we don't want to inherit all the Meter stuff. diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index f1eb5e7ed..4a2e535fa 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -49,3 +49,13 @@ class BaseTest(test_base.TestCase): self.assertEqual(times[21], (datetime.datetime(2013, 1, 2, 13, 19, 15), datetime.datetime(2013, 1, 2, 13, 20, 10))) + + def test_handle_sort_key(self): + sort_keys_alarm = base._handle_sort_key('alarm') + self.assertEquals(sort_keys_alarm, ['name', 'user_id', 'project_id']) + + sort_keys_meter = base._handle_sort_key('meter', 'foo') + self.assertEquals(sort_keys_meter, ['foo', 'user_id', 'project_id']) + + sort_keys_resource = base._handle_sort_key('resource', 'project_id') + self.assertEquals(sort_keys_resource, ['project_id', 'user_id']) diff --git a/tests/storage/test_impl_mongodb.py b/tests/storage/test_impl_mongodb.py index eb39c43e9..2d66012d7 100644 --- a/tests/storage/test_impl_mongodb.py +++ b/tests/storage/test_impl_mongodb.py @@ -37,6 +37,8 @@ from ceilometer import sample from ceilometer.storage import impl_mongodb from ceilometer.storage import models from ceilometer.tests import db as tests_db +from ceilometer.storage.base import NoResultFound +from ceilometer.storage.base import MultipleResultsFound class MongoDBEngineTestBase(base.DBTestBase): @@ -56,6 +58,44 @@ class MongoDBConnection(MongoDBEngineTestBase): conn = impl_mongodb.Connection(cfg.CONF) self.assertTrue(conn.conn) + def test_recurse_sort_keys(self): + sort_keys = ['k1', 'k2', 'k3'] + marker = {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'} + flag = '$lt' + ret = impl_mongodb.Connection._recurse_sort_keys(sort_keys=sort_keys, + marker=marker, + flag=flag) + expect = {'k3': {'$lt': 'v3'}, 'k2': {'eq': 'v2'}, 'k1': {'eq': 'v1'}} + self.assertEquals(ret, expect) + + +class MongoDBTestMarkerBase(MongoDBEngineTestBase): + #NOTE(Fengqian): All these three test case are the same for resource + #and meter collection. As to alarm, we will set up in AlarmTestPagination. + def test_get_marker(self): + marker_pairs = {'user_id': 'user-id-4'} + ret = impl_mongodb.Connection._get_marker(self.conn.db.resource, + marker_pairs) + self.assertEqual(ret['project_id'], 'project-id-4') + + def test_get_marker_None(self): + try: + marker_pairs = {'user_id': 'user-id-foo'} + ret = impl_mongodb.Connection._get_marker(self.conn.db.resource, + marker_pairs) + self.assertEqual(ret['project_id'], 'project-id-foo') + except NoResultFound: + self.assertTrue(True) + + def test_get_marker_multiple(self): + try: + marker_pairs = {'project_id': 'project-id'} + ret = impl_mongodb.Connection._get_marker(self.conn.db.resource, + marker_pairs) + self.assertEqual(ret['project_id'], 'project-id-foo') + except MultipleResultsFound: + self.assertTrue(True) + class IndexTest(MongoDBEngineTestBase): def test_meter_ttl_index_absent(self): @@ -124,10 +164,19 @@ class ResourceTest(base.ResourceTest, MongoDBEngineTestBase): pass +class ResourceTestPagination(base.ResourceTestPagination, + MongoDBEngineTestBase): + pass + + class MeterTest(base.MeterTest, MongoDBEngineTestBase): pass +class MeterTestPagination(base.MeterTestPagination, MongoDBEngineTestBase): + pass + + class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase): def test_clear_metering_data(self): # NOTE(sileht): ensure this tests is played for any version of mongo @@ -248,5 +297,35 @@ class CompatibilityTest(MongoDBEngineTestBase): self.assertEqual(len(meters), 1) +class AlarmTestPagination(base.AlarmTestPagination, MongoDBEngineTestBase): + + def test_alarm_get_marker(self): + self.add_some_alarms() + marker_pairs = {'name': 'red-alert'} + ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm, + marker_pairs=marker_pairs) + self.assertEqual(ret['counter_name'], 'test.one') + + def test_alarm_get_marker_None(self): + self.add_some_alarms() + try: + marker_pairs = {'name': 'user-id-foo'} + ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm, + marker_pairs) + self.assertEqual(ret['counter_name'], 'counter_name-foo') + except NoResultFound: + self.assertTrue(True) + + def test_alarm_get_marker_multiple(self): + self.add_some_alarms() + try: + marker_pairs = {'user_id': 'me'} + ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm, + marker_pairs) + self.assertEqual(ret['counter_name'], 'counter-name-foo') + except MultipleResultsFound: + self.assertTrue(True) + + class CounterDataTypeTest(base.CounterDataTypeTest, MongoDBEngineTestBase): pass