Merge "Add pagination support for MongoDB"

This commit is contained in:
Jenkins 2013-08-07 09:57:32 +00:00 committed by Gerrit Code Review
commit 61e8a89597
5 changed files with 408 additions and 11 deletions

View File

@ -45,6 +45,40 @@ def iter_period(start, end, period):
period_start = next_start period_start = next_start
def _handle_sort_key(model_name, sort_key=None):
"""Generate sort keys according to the passed in sort key from user.
:param model_name: Database model name be query.(alarm, meter, etc.)
:param sort_key: sort key passed from user.
return: sort keys list
"""
sort_keys_extra = {'alarm': ['name', 'user_id', 'project_id'],
'meter': ['user_id', 'project_id'],
'resource': ['user_id', 'project_id'],
}
sort_keys = sort_keys_extra[model_name]
if not sort_key:
return sort_keys
# NOTE(Fengqian): We need to put the sort key from user
#in the first place of sort keys list.
try:
sort_keys.remove(sort_key)
except ValueError:
pass
finally:
sort_keys.insert(0, sort_key)
return sort_keys
class MultipleResultsFound(Exception):
pass
class NoResultFound(Exception):
pass
class StorageEngine(object): class StorageEngine(object):
"""Base class for storage engines.""" """Base class for storage engines."""

View File

@ -395,6 +395,87 @@ class Connection(base.Connection):
self.db.project.remove({'_id': {'$nin': results['projects']}}) self.db.project.remove({'_id': {'$nin': results['projects']}})
self.db.resource.remove({'_id': {'$nin': results['resources']}}) self.db.resource.remove({'_id': {'$nin': results['resources']}})
@staticmethod
def _get_marker(db_collection, marker_pairs):
"""Return the mark document according to the attribute-value pairs.
:param db_collection: Database collection that be query.
:param maker_pairs: Attribute-value pairs filter.
"""
if db_collection is None:
return
if not marker_pairs:
return
ret = db_collection.find(marker_pairs, limit=2)
if ret.count() == 0:
raise base.NoResultFound
elif ret.count() > 1:
raise base.MultipleResultsFound
else:
_ret = ret.__getitem__(0)
return _ret
@classmethod
def _recurse_sort_keys(cls, sort_keys, marker, flag):
_first = sort_keys[0]
value = marker[_first]
if len(sort_keys) == 1:
return {_first: {flag: value}}
else:
criteria_equ = {_first: {'eq': value}}
criteria_cmp = cls._recurse_sort_keys(sort_keys[1:], marker, flag)
return dict(criteria_equ, ** criteria_cmp)
@classmethod
def paginate_query(cls, q, db_collection, limit=None, marker=None,
sort_keys=[], sort_dir='desc'):
"""Returns a query result with sorting / pagination.
Pagination works by requiring sort_key and sort_dir.
We use the last item in previous page as the 'marker' for pagination.
So we return values that follow the passed marker in the order.
:param q: the query dict passed in.
:param db_collection: Database collection that be query.
:param limit: maximum number of items to return.
:param marker: the last item of the previous page; we return the next
results after this item.
:param sort_keys: array of attributes by which results be sorted.
:param sort_dir: direction in which results be sorted (asc, desc).
return: The query with sorting/pagination added.
"""
if db_collection is None:
return
all_sort = []
sort_mapping = {'desc': (pymongo.DESCENDING, '$lt'),
'asc': (pymongo.ASCENDING, '$gt')
}
_sort_dir, _sort_flag = sort_mapping.get(sort_dir,
sort_mapping['desc'])
for _sort_key in sort_keys:
_all_sort = (_sort_key, _sort_dir)
all_sort.append(_all_sort)
if marker is not None:
sort_criteria_list = []
for i in range(0, len(sort_keys)):
sort_criteria_list.append(cls._recurse_sort_keys(
sort_keys[:(len(sort_keys) - i)],
marker, _sort_flag))
metaquery = {"$or": sort_criteria_list}
q.update(metaquery)
#NOTE(Fengqian):MongoDB collection.find can not handle limit
#when it equals None, it will raise TypeError, so we treate
#None as 0 for the value of limit.
if limit is None:
limit = 0
return db_collection.find(q, limit=limit, sort=all_sort)
def get_users(self, source=None): def get_users(self, source=None):
"""Return an iterable of user id strings. """Return an iterable of user id strings.
@ -424,7 +505,8 @@ class Connection(base.Connection):
def get_resources(self, user=None, project=None, source=None, def get_resources(self, user=None, project=None, source=None,
start_timestamp=None, start_timestamp_op=None, start_timestamp=None, start_timestamp_op=None,
end_timestamp=None, end_timestamp_op=None, end_timestamp=None, end_timestamp_op=None,
metaquery={}, resource=None): metaquery={}, resource=None, limit=None,
marker_pairs=None, sort_key=None, sort_dir=None):
"""Return an iterable of models.Resource instances """Return an iterable of models.Resource instances
:param user: Optional ID for user that owns the resource. :param user: Optional ID for user that owns the resource.
@ -436,6 +518,11 @@ class Connection(base.Connection):
:param end_timestamp_op: Optional end time operator, like lt, le. :param end_timestamp_op: Optional end time operator, like lt, le.
:param metaquery: Optional dict with metadata to match on. :param metaquery: Optional dict with metadata to match on.
:param resource: Optional resource filter. :param resource: Optional resource filter.
:param limit: Number of documents should be returned.
:param marker_pairs: Attribute-value pairs to identify the last item of
the previous page.
:param sort_key: Attribute by which results be sorted.
:param sort_dir: Direction with which results be sorted(asc, desc).
""" """
q = {} q = {}
if user is not None: if user is not None:
@ -464,13 +551,18 @@ class Connection(base.Connection):
if ts_range: if ts_range:
q['timestamp'] = ts_range q['timestamp'] = ts_range
marker = self._get_marker(self.db.resource, marker_pairs=marker_pairs)
sort_keys = base._handle_sort_key('resource', sort_key)
# FIXME(jd): We should use self.db.meter.group() and not use the # FIXME(jd): We should use self.db.meter.group() and not use the
# resource collection, but that's not supported by MIM, so it's not # resource collection, but that's not supported by MIM, so it's not
# easily testable yet. Since it was bugged before anyway, it's still # easily testable yet. Since it was bugged before anyway, it's still
# better for now. # better for now.
resource_ids = self.db.meter.find(q).distinct('resource_id') resource_ids = self.db.meter.find(q).distinct('resource_id')
q = {'_id': {'$in': resource_ids}} q = {'_id': {'$in': resource_ids}}
for resource in self.db.resource.find(q): for resource in self.paginate_query(q, self.db.resource, limit=limit,
marker=marker, sort_keys=sort_keys,
sort_dir=sort_dir):
yield models.Resource( yield models.Resource(
resource_id=resource['_id'], resource_id=resource['_id'],
project_id=resource['project_id'], project_id=resource['project_id'],
@ -488,7 +580,8 @@ class Connection(base.Connection):
) )
def get_meters(self, user=None, project=None, resource=None, source=None, def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}): metaquery={}, limit=None, marker_pairs=None, sort_key=None,
sort_dir=None):
"""Return an iterable of models.Meter instances """Return an iterable of models.Meter instances
:param user: Optional ID for user that owns the resource. :param user: Optional ID for user that owns the resource.
@ -496,6 +589,11 @@ class Connection(base.Connection):
:param resource: Optional resource filter. :param resource: Optional resource filter.
:param source: Optional source filter. :param source: Optional source filter.
:param metaquery: Optional dict with metadata to match on. :param metaquery: Optional dict with metadata to match on.
:param limit: Number of documents should be returned.
:param marker_pairs: Attribute-value pairs to identify the last item of
the previous page.
:param sort_key: Attribute by which results be sorted.
:param sort_dir: Direction with which results be sorted(asc, desc).
""" """
q = {} q = {}
if user is not None: if user is not None:
@ -508,7 +606,12 @@ class Connection(base.Connection):
q['source'] = source q['source'] = source
q.update(metaquery) q.update(metaquery)
for r in self.db.resource.find(q): marker = self._get_marker(self.db.resource, marker_pairs=marker_pairs)
sort_keys = base._handle_sort_key('meter', sort_key)
for r in self.paginate_query(q, self.db.resource, limit=limit,
marker=marker,
sort_keys=sort_keys, sort_dir=sort_dir):
for r_meter in r['meter']: for r_meter in r['meter']:
yield models.Meter( yield models.Meter(
name=r_meter['counter_name'], name=r_meter['counter_name'],
@ -601,8 +704,19 @@ class Connection(base.Connection):
return matching_metadata return matching_metadata
def get_alarms(self, name=None, user=None, def get_alarms(self, name=None, user=None,
project=None, enabled=True, alarm_id=None): project=None, enabled=True, alarm_id=None, limit=None,
marker_pairs=None, sort_key=None, sort_dir=None):
"""Yields a lists of alarms that match filters """Yields a lists of alarms that match filters
:param name: The Alarm name.
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
:param enabled: Optional boolean to list disable alarm.
:param alarm_id: Optional alarm_id to return one alarm.
:param limit: Number of rows should be returned.
:param marker_pairs: Attribute-value pairs to identify the last item of
the previous page.
:param sort_key: Attribute by which results be sorted.
:param sort_dir: Direction with which results be sorted(asc, desc).
""" """
q = {} q = {}
if user is not None: if user is not None:
@ -616,7 +730,12 @@ class Connection(base.Connection):
if alarm_id is not None: if alarm_id is not None:
q['alarm_id'] = alarm_id q['alarm_id'] = alarm_id
for alarm in self.db.alarm.find(q): marker = self._get_marker(self.db.alarm, marker_pairs=marker_pairs)
sort_keys = base._handle_sort_key('alarm', sort_key)
for alarm in self.paginate_query(q, self.db.alarm, limit=limit,
marker=marker, sort_keys=sort_keys,
sort_dir=sort_dir):
a = {} a = {}
a.update(alarm) a.update(alarm)
del a['_id'] del a['_id']

View File

@ -312,6 +312,49 @@ class ResourceTest(DBTestBase):
self.assertEqual(len(resources), 9) self.assertEqual(len(resources), 9)
class ResourceTestPagination(DBTestBase):
def test_get_resource_all_limit(self):
results = list(self.conn.get_resources(limit=8))
self.assertEqual(len(results), 8)
results = list(self.conn.get_resources(limit=5))
self.assertEqual(len(results), 5)
def test_get_resources_all_marker(self):
marker_pairs = {'user_id': 'user-id-4',
'project_id': 'project-id-4'}
results = list(self.conn.get_resources(marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='asc'))
self.assertEqual(len(results), 5)
def test_get_resources_paginate(self):
marker_pairs = {'user_id': 'user-id-4'}
results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='asc')
self.assertEquals(['user-id-5', 'user-id-6', 'user-id-7'],
[i.user_id for i in results])
marker_pairs = {'user_id': 'user-id-4'}
results = list(self.conn.get_resources(limit=2,
marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='desc'))
self.assertEquals(['user-id-3', 'user-id-2'],
[i.user_id for i in results])
marker_pairs = {'project_id': 'project-id-5'}
results = list(self.conn.get_resources(limit=3,
marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='asc'))
self.assertEquals(['resource-id-6', 'resource-id-7',
'resource-id-8'],
[i.resource_id for i in results])
class MeterTest(DBTestBase): class MeterTest(DBTestBase):
def test_get_meters(self): def test_get_meters(self):
@ -345,6 +388,50 @@ class MeterTest(DBTestBase):
self.assertEqual(len(results), 9) self.assertEqual(len(results), 9)
class MeterTestPagination(DBTestBase):
def tet_get_meters_all_limit(self):
results = list(self.conn.get_meters(limit=8))
self.assertEqual(len(results), 8)
results = list(self.conn.get_meters(limit=5))
self.assertEqual(len(results), 5)
def test_get_meters_all_marker(self):
marker_pairs = {'user_id': 'user-id-alternate'}
results = list(self.conn.get_meters(marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='desc'))
self.assertEqual(len(results), 8)
def test_get_meters_paginate(self):
marker_pairs = {'user_id': 'user-id-alternate'}
results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs,
sort_key='user_id', sort_dir='desc')
self.assertEquals(['user-id-8', 'user-id-7', 'user-id-6'],
[i.user_id for i in results])
marker_pairs = {'user_id': 'user-id-4'}
results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='asc')
self.assertEquals(['user-id-5', 'user-id-6', 'user-id-7'],
[i.user_id for i in results])
marker_pairs = {'user_id': 'user-id-4'}
results = list(self.conn.get_resources(limit=2,
marker_pairs=marker_pairs,
sort_key='user_id',
sort_dir='desc'))
self.assertEquals(['user-id-3', 'user-id-2'],
[i.user_id for i in results])
marker_pairs = {'user_id': 'user-id'}
results = self.conn.get_meters(limit=3, marker_pairs=marker_pairs,
sort_key='user_id', sort_dir='desc')
self.assertEquals([], [i.user_id for i in results])
class RawSampleTest(DBTestBase): class RawSampleTest(DBTestBase):
def test_get_samples_limit_zero(self): def test_get_samples_limit_zero(self):
@ -830,11 +917,7 @@ class CounterDataTypeTest(DBTestBase):
self.assertEqual(results[0].counter_volume, 1938495037.53697) self.assertEqual(results[0].counter_volume, 1938495037.53697)
class AlarmTest(DBTestBase): class AlarmTestBase(DBTestBase):
def test_empty(self):
alarms = list(self.conn.get_alarms())
self.assertEquals([], alarms)
def add_some_alarms(self): def add_some_alarms(self):
alarms = [models.Alarm('red-alert', alarms = [models.Alarm('red-alert',
@ -860,6 +943,13 @@ class AlarmTest(DBTestBase):
for a in alarms: for a in alarms:
self.conn.update_alarm(a) self.conn.update_alarm(a)
class AlarmTest(AlarmTestBase):
def test_empty(self):
alarms = list(self.conn.get_alarms())
self.assertEquals([], alarms)
def test_add(self): def test_add(self):
self.add_some_alarms() self.add_some_alarms()
alarms = list(self.conn.get_alarms()) alarms = list(self.conn.get_alarms())
@ -918,6 +1008,71 @@ class AlarmTest(DBTestBase):
self.assertNotEquals(victim.name, s.name) self.assertNotEquals(victim.name, s.name)
class AlarmTestPagination(AlarmTestBase):
def test_get_alarm_all_limit(self):
self.add_some_alarms()
alarms = list(self.conn.get_alarms(limit=2))
self.assertEqual(len(alarms), 2)
alarms = list(self.conn.get_alarms(limit=1))
self.assertEqual(len(alarms), 1)
def test_get_alarm_all_marker(self):
self.add_some_alarms()
marker_pairs = {'name': 'orange-alert'}
alarms = list(self.conn.get_alarms(marker_pairs=marker_pairs,
sort_key='name',
sort_dir='desc'))
self.assertEqual(len(alarms), 0)
marker_pairs = {'name': 'red-alert'}
alarms = list(self.conn.get_alarms(marker_pairs=marker_pairs,
sort_key='name',
sort_dir='desc'))
self.assertEqual(len(alarms), 1)
marker_pairs = {'name': 'yellow-alert'}
alarms = list(self.conn.get_alarms(marker_pairs=marker_pairs,
sort_key='name',
sort_dir='desc'))
self.assertEqual(len(alarms), 2)
def test_get_alarm_sort_marker(self):
self.add_some_alarms()
marker_pairs = {'name': 'orange-alert'}
alarms = list(self.conn.get_alarms(sort_key='counter_name',
sort_dir='desc',
marker_pairs=marker_pairs))
self.assertEqual(len(alarms), 1)
marker_pairs = {'name': 'yellow-alert'}
alarms = list(self.conn.get_alarms(sort_key='comparison_operator',
sort_dir='desc',
marker_pairs=marker_pairs))
self.assertEqual(len(alarms), 2)
def test_get_alarm_paginate(self):
self.add_some_alarms()
marker_pairs = {'name': 'yellow-alert'}
page = list(self.conn.get_alarms(limit=4,
marker_pairs=marker_pairs,
sort_key='name', sort_dir='desc'))
self.assertEquals(['red-alert', 'orange-alert'],
[i.name for i in page])
marker_pairs = {'name': 'orange-alert'}
page1 = list(self.conn.get_alarms(limit=2,
sort_key='comparison_operator',
sort_dir='desc',
marker_pairs=marker_pairs))
self.assertEquals(['red-alert'], [i.name for i in page1])
class EventTestBase(test_db.TestBase): class EventTestBase(test_db.TestBase):
"""Separate test base class because we don't want to """Separate test base class because we don't want to
inherit all the Meter stuff. inherit all the Meter stuff.

View File

@ -49,3 +49,13 @@ class BaseTest(test_base.TestCase):
self.assertEqual(times[21], self.assertEqual(times[21],
(datetime.datetime(2013, 1, 2, 13, 19, 15), (datetime.datetime(2013, 1, 2, 13, 19, 15),
datetime.datetime(2013, 1, 2, 13, 20, 10))) datetime.datetime(2013, 1, 2, 13, 20, 10)))
def test_handle_sort_key(self):
sort_keys_alarm = base._handle_sort_key('alarm')
self.assertEquals(sort_keys_alarm, ['name', 'user_id', 'project_id'])
sort_keys_meter = base._handle_sort_key('meter', 'foo')
self.assertEquals(sort_keys_meter, ['foo', 'user_id', 'project_id'])
sort_keys_resource = base._handle_sort_key('resource', 'project_id')
self.assertEquals(sort_keys_resource, ['project_id', 'user_id'])

View File

@ -37,6 +37,8 @@ from ceilometer import sample
from ceilometer.storage import impl_mongodb from ceilometer.storage import impl_mongodb
from ceilometer.storage import models from ceilometer.storage import models
from ceilometer.tests import db as tests_db from ceilometer.tests import db as tests_db
from ceilometer.storage.base import NoResultFound
from ceilometer.storage.base import MultipleResultsFound
class MongoDBEngineTestBase(base.DBTestBase): class MongoDBEngineTestBase(base.DBTestBase):
@ -56,6 +58,44 @@ class MongoDBConnection(MongoDBEngineTestBase):
conn = impl_mongodb.Connection(cfg.CONF) conn = impl_mongodb.Connection(cfg.CONF)
self.assertTrue(conn.conn) self.assertTrue(conn.conn)
def test_recurse_sort_keys(self):
sort_keys = ['k1', 'k2', 'k3']
marker = {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}
flag = '$lt'
ret = impl_mongodb.Connection._recurse_sort_keys(sort_keys=sort_keys,
marker=marker,
flag=flag)
expect = {'k3': {'$lt': 'v3'}, 'k2': {'eq': 'v2'}, 'k1': {'eq': 'v1'}}
self.assertEquals(ret, expect)
class MongoDBTestMarkerBase(MongoDBEngineTestBase):
#NOTE(Fengqian): All these three test case are the same for resource
#and meter collection. As to alarm, we will set up in AlarmTestPagination.
def test_get_marker(self):
marker_pairs = {'user_id': 'user-id-4'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.resource,
marker_pairs)
self.assertEqual(ret['project_id'], 'project-id-4')
def test_get_marker_None(self):
try:
marker_pairs = {'user_id': 'user-id-foo'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.resource,
marker_pairs)
self.assertEqual(ret['project_id'], 'project-id-foo')
except NoResultFound:
self.assertTrue(True)
def test_get_marker_multiple(self):
try:
marker_pairs = {'project_id': 'project-id'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.resource,
marker_pairs)
self.assertEqual(ret['project_id'], 'project-id-foo')
except MultipleResultsFound:
self.assertTrue(True)
class IndexTest(MongoDBEngineTestBase): class IndexTest(MongoDBEngineTestBase):
def test_meter_ttl_index_absent(self): def test_meter_ttl_index_absent(self):
@ -124,10 +164,19 @@ class ResourceTest(base.ResourceTest, MongoDBEngineTestBase):
pass pass
class ResourceTestPagination(base.ResourceTestPagination,
MongoDBEngineTestBase):
pass
class MeterTest(base.MeterTest, MongoDBEngineTestBase): class MeterTest(base.MeterTest, MongoDBEngineTestBase):
pass pass
class MeterTestPagination(base.MeterTestPagination, MongoDBEngineTestBase):
pass
class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase): class RawSampleTest(base.RawSampleTest, MongoDBEngineTestBase):
def test_clear_metering_data(self): def test_clear_metering_data(self):
# NOTE(sileht): ensure this tests is played for any version of mongo # NOTE(sileht): ensure this tests is played for any version of mongo
@ -248,5 +297,35 @@ class CompatibilityTest(MongoDBEngineTestBase):
self.assertEqual(len(meters), 1) self.assertEqual(len(meters), 1)
class AlarmTestPagination(base.AlarmTestPagination, MongoDBEngineTestBase):
def test_alarm_get_marker(self):
self.add_some_alarms()
marker_pairs = {'name': 'red-alert'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm,
marker_pairs=marker_pairs)
self.assertEqual(ret['counter_name'], 'test.one')
def test_alarm_get_marker_None(self):
self.add_some_alarms()
try:
marker_pairs = {'name': 'user-id-foo'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm,
marker_pairs)
self.assertEqual(ret['counter_name'], 'counter_name-foo')
except NoResultFound:
self.assertTrue(True)
def test_alarm_get_marker_multiple(self):
self.add_some_alarms()
try:
marker_pairs = {'user_id': 'me'}
ret = impl_mongodb.Connection._get_marker(self.conn.db.alarm,
marker_pairs)
self.assertEqual(ret['counter_name'], 'counter-name-foo')
except MultipleResultsFound:
self.assertTrue(True)
class CounterDataTypeTest(base.CounterDataTypeTest, MongoDBEngineTestBase): class CounterDataTypeTest(base.CounterDataTypeTest, MongoDBEngineTestBase):
pass pass