Use MongoDB aggregate to get resources list

This avoids querying two different collections to get the data, doing
aggregation on one is enough.

Change-Id: Ia21693051daefd1ebd4d0c7d07b76d4712b0542c
This commit is contained in:
Julien Danjou 2013-07-02 15:18:59 +02:00
parent fee311c138
commit df126b7652
5 changed files with 85 additions and 36 deletions

View File

@ -58,6 +58,10 @@ cfg.CONF.import_opt('connection',
group='database')
class StorageBadVersion(Exception):
"""Error raised when the storage backend version is not good enough."""
def get_engine(conf):
"""Load the configured engine and return an instance."""
if conf.database_connection:

View File

@ -252,6 +252,10 @@ class Connection(base.Connection):
# requires a new storage connection.
self.conn = self.CONNECTION_POOL.connect(url)
# Require MongoDB 2.2 to use aggregate() and TTL
if self.conn.server_info()['versionArray'] < [2, 2]:
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
connection_options = pymongo.uri_parser.parse_uri(url)
self.db = getattr(self.conn, connection_options['database'])
@ -428,25 +432,19 @@ class Connection(base.Connection):
return dict(criteria_equ, ** criteria_cmp)
@classmethod
def paginate_query(cls, q, db_collection, limit=None, marker=None,
sort_keys=[], sort_dir='desc'):
"""Returns a query result with sorting / pagination.
def _build_paginate_query(cls, marker, sort_keys=[], sort_dir='desc'):
"""Returns a query with sorting / pagination.
Pagination works by requiring sort_key and sort_dir.
We use the last item in previous page as the 'marker' for pagination.
So we return values that follow the passed marker in the order.
:param q: the query dict passed in.
:param db_collection: Database collection that be query.
:param limit: maximum number of items to return.
:param q: The query dict passed in.
:param marker: the last item of the previous page; we return the next
results after this item.
:param sort_keys: array of attributes by which results be sorted.
:param sort_dir: direction in which results be sorted (asc, desc).
return: The query with sorting/pagination added.
:return: sort parameters, query to use
"""
if db_collection is None:
return
all_sort = []
sort_mapping = {'desc': (pymongo.DESCENDING, '$lt'),
'asc': (pymongo.ASCENDING, '$gt')
@ -467,7 +465,33 @@ class Connection(base.Connection):
marker, _sort_flag))
metaquery = {"$or": sort_criteria_list}
q.update(metaquery)
else:
metaquery = {}
return all_sort, metaquery
@classmethod
def paginate_query(cls, q, db_collection, limit=None, marker=None,
sort_keys=[], sort_dir='desc'):
"""Returns a query result with sorting / pagination.
Pagination works by requiring sort_key and sort_dir.
We use the last item in previous page as the 'marker' for pagination.
So we return values that follow the passed marker in the order.
:param q: the query dict passed in.
:param db_collection: Database collection that be query.
:param limit: maximum number of items to return.
:param marker: the last item of the previous page; we return the next
results after this item.
:param sort_keys: array of attributes by which results be sorted.
:param sort_dir: direction in which results be sorted (asc, desc).
return: The query with sorting/pagination added.
"""
all_sort, query = cls._build_paginate_query(marker,
sort_keys,
sort_dir)
q.update(query)
#NOTE(Fengqian):MongoDB collection.find can not handle limit
#when it equals None, it will raise TypeError, so we treate
@ -524,6 +548,11 @@ class Connection(base.Connection):
:param sort_key: Attribute by which results be sorted.
:param sort_dir: Direction with which results be sorted(asc, desc).
"""
if marker_pairs is not None:
raise NotImplementedError(
"Cannot use marker pairs in resource listing")
q = {}
if user is not None:
q['user_id'] = user
@ -551,31 +580,47 @@ class Connection(base.Connection):
if ts_range:
q['timestamp'] = ts_range
marker = self._get_marker(self.db.resource, marker_pairs=marker_pairs)
sort_keys = base._handle_sort_key('resource', sort_key)
sort_instruction, query = self._build_paginate_query(None,
sort_keys,
sort_dir)
q.update(query)
# FIXME(jd): We should use self.db.meter.group() and not use the
# resource collection, but that's not supported by MIM, so it's not
# easily testable yet. Since it was bugged before anyway, it's still
# better for now.
resource_ids = self.db.meter.find(q).distinct('resource_id')
q = {'_id': {'$in': resource_ids}}
for resource in self.paginate_query(q, self.db.resource, limit=limit,
marker=marker, sort_keys=sort_keys,
sort_dir=sort_dir):
aggregate = self.db.meter.aggregate([
{"$match": q},
{"$sort": dict(sort_instruction)},
{"$group": {
"_id": "$resource_id",
"user_id": {"$first": "$user_id"},
"project_id": {"$first": "$project_id"},
"source": {"$first": "$source"},
"metadata": {"$first": "$resource_metadata"},
"meters_name": {"$push": "$counter_name"},
"meters_type": {"$push": "$counter_type"},
"meters_unit": {"$push": "$counter_unit"},
}},
])
for result in aggregate['result']:
if limit is not None:
if limit == 0:
break
limit -= 1
yield models.Resource(
resource_id=resource['_id'],
project_id=resource['project_id'],
source=resource['source'],
user_id=resource['user_id'],
metadata=resource['metadata'],
resource_id=result['_id'],
user_id=result['user_id'],
project_id=result['project_id'],
source=result['source'],
metadata=result['metadata'],
meter=[
models.ResourceMeter(
counter_name=meter['counter_name'],
counter_type=meter['counter_type'],
counter_unit=meter.get('counter_unit', ''),
counter_name=m_n,
counter_type=m_t,
counter_unit=m_u,
)
for meter in resource['meter']
for m_n, m_u, m_t in zip(result['meters_name'],
result['meters_unit'],
result['meters_type'])
],
)

View File

@ -33,7 +33,10 @@ class TestBase(test_base.TestCase):
super(TestBase, self).setUp()
cfg.CONF.set_override('connection', str(self.database_connection),
group='database')
self.conn = storage.get_connection(cfg.CONF)
try:
self.conn = storage.get_connection(cfg.CONF)
except storage.StorageBadVersion as e:
self.skipTest(str(e))
self.conn.upgrade()
def tearDown(self):

View File

@ -41,6 +41,7 @@ class TestApp(base.TestCase):
cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME)
cfg.CONF.set_override("pipeline_cfg_file",
self.path_get("etc/ceilometer/pipeline.yaml"))
cfg.CONF.set_override('connection', "log://", group="database")
api_app = app.setup_app()
self.assertEqual(api_app.auth_protocol, 'foottp')
@ -55,6 +56,7 @@ class TestApp(base.TestCase):
f.write("auth_version = v2.0\n")
service.prepare_service(['ceilometer-api',
'--config-file=%s' % tmpfile])
cfg.CONF.set_override('connection', "log://", group="database")
api_app = app.setup_app()
self.assertEqual(api_app.auth_protocol, 'barttp')
os.unlink(tmpfile)

View File

@ -164,11 +164,6 @@ class ResourceTest(base.ResourceTest, MongoDBEngineTestBase):
pass
class ResourceTestPagination(base.ResourceTestPagination,
MongoDBEngineTestBase):
pass
class MeterTest(base.MeterTest, MongoDBEngineTestBase):
pass