Rationalize get_resources for mongodb
Fixes bug 1288372 Previously the performance of the /v2/resources API without constraints degraded sharply as the size of the meter collection grows, regardless of the number of known resources. We rationalize the underpinning storage driver method for mongo so as to rebase the query on the much smaller resource collection. This requires a subtle change in the semantic of the first and last sample timestamps which decorate the resource representation. These values are now defined respectively as being not later than youngest sample, and not earlier than the oldest timestamp, known for each resource. In order to maintain support for "tight" timestamp constraints on resource queries, we adopt a hybrid approach. For resource queries without timestamp constraints, we optimize with a simple query on the resource collection, but revert to a map-reduce on the meter collection for timestamp-constrained queries. The logic being that when one makes a resources query constrained by timestamp, one is essentially asking a question about samples, hence prepared to incur the cost of a query on the meter collection. Whereas for other constraints (user_id, project_id, source etc.) on the resources queries, one is more asking a question purely related to the resources themselves. Change-Id: Ifbf8932b4164f62bfda1cb3b8b9afe181051bc52
This commit is contained in:
parent
2e150c57db
commit
7b4d4d4465
@ -1379,10 +1379,10 @@ class Resource(_Base):
|
||||
"The ID of the user who created the resource or updated it last"
|
||||
|
||||
first_sample_timestamp = datetime.datetime
|
||||
"UTC date & time of the first sample associated with the resource"
|
||||
"UTC date & time not later than the first sample known for this resource"
|
||||
|
||||
last_sample_timestamp = datetime.datetime
|
||||
"UTC date & time of the last sample associated with the resource"
|
||||
"UTC date & time not earlier than the last sample known for this resource"
|
||||
|
||||
metadata = {wtypes.text: wtypes.text}
|
||||
"Arbitrary metadata associated with the resource"
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
import calendar
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
import operator
|
||||
import uuid
|
||||
@ -385,6 +386,10 @@ class Connection(pymongo_base.Connection):
|
||||
return merge;
|
||||
}""")
|
||||
|
||||
_GENESIS = datetime.datetime(year=datetime.MINYEAR, month=1, day=1)
|
||||
_APOCALYPSE = datetime.datetime(year=datetime.MAXYEAR, month=12, day=31,
|
||||
hour=23, minute=59, second=59)
|
||||
|
||||
def __init__(self, conf):
|
||||
url = conf.database.connection
|
||||
|
||||
@ -394,9 +399,9 @@ class Connection(pymongo_base.Connection):
|
||||
# requires a new storage connection.
|
||||
self.conn = self.CONNECTION_POOL.connect(url)
|
||||
|
||||
# Require MongoDB 2.2 to use TTL
|
||||
if self.conn.server_info()['versionArray'] < [2, 2]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
|
||||
# Require MongoDB 2.4 to use $setOnInsert
|
||||
if self.conn.server_info()['versionArray'] < [2, 4]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.4")
|
||||
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
self.db = getattr(self.conn, connection_options['database'])
|
||||
@ -429,6 +434,10 @@ class Connection(pymongo_base.Connection):
|
||||
('timestamp', pymongo.ASCENDING),
|
||||
('source', pymongo.ASCENDING),
|
||||
], name='meter_idx')
|
||||
self.db.resource.ensure_index([('last_sample_timestamp',
|
||||
pymongo.DESCENDING)],
|
||||
name='last_sample_timestamp_idx',
|
||||
sparse=True)
|
||||
self.db.meter.ensure_index([('timestamp', pymongo.DESCENDING)],
|
||||
name='timestamp_idx')
|
||||
|
||||
@ -483,14 +492,20 @@ class Connection(pymongo_base.Connection):
|
||||
upsert=True,
|
||||
)
|
||||
|
||||
# Record the updated resource metadata
|
||||
self.db.resource.update(
|
||||
# Record the updated resource metadata - we use $setOnInsert to
|
||||
# unconditionally insert sample timestamps and resource metadata
|
||||
# (in the update case, this must be conditional on the sample not
|
||||
# being out-of-order)
|
||||
resource = self.db.resource.find_and_modify(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'project_id': data['project_id'],
|
||||
'user_id': data['user_id'],
|
||||
'metadata': data['resource_metadata'],
|
||||
'source': data['source'],
|
||||
},
|
||||
'$setOnInsert': {'metadata': data['resource_metadata'],
|
||||
'first_sample_timestamp': data['timestamp'],
|
||||
'last_sample_timestamp': data['timestamp'],
|
||||
},
|
||||
'$addToSet': {'meter': {'counter_name': data['counter_name'],
|
||||
'counter_type': data['counter_type'],
|
||||
'counter_unit': data['counter_unit'],
|
||||
@ -498,8 +513,33 @@ class Connection(pymongo_base.Connection):
|
||||
},
|
||||
},
|
||||
upsert=True,
|
||||
new=True,
|
||||
)
|
||||
|
||||
# only update last sample timestamp if actually later (the usual
|
||||
# in-order case)
|
||||
last_sample_timestamp = resource.get('last_sample_timestamp')
|
||||
if (last_sample_timestamp is None or
|
||||
last_sample_timestamp <= data['timestamp']):
|
||||
self.db.resource.update(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'metadata': data['resource_metadata'],
|
||||
'last_sample_timestamp': data['timestamp']}}
|
||||
)
|
||||
|
||||
# only update first sample timestamp if actually earlier (the unusual
|
||||
# out-of-order case)
|
||||
# NOTE: a null first sample timestamp is not updated as this indicates
|
||||
# a pre-existing resource document dating from before we started
|
||||
# recording these timestamps in the resource collection
|
||||
first_sample_timestamp = resource.get('first_sample_timestamp')
|
||||
if (first_sample_timestamp is not None and
|
||||
first_sample_timestamp > data['timestamp']):
|
||||
self.db.resource.update(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'first_sample_timestamp': data['timestamp']}}
|
||||
)
|
||||
|
||||
# Record the raw data for the meter. Use a copy so we do not
|
||||
# modify a data structure owned by our caller (the driver adds
|
||||
# a new key '_id').
|
||||
@ -651,6 +691,100 @@ class Connection(pymongo_base.Connection):
|
||||
limit = 0
|
||||
return db_collection.find(q, limit=limit, sort=all_sort)
|
||||
|
||||
def _get_time_constrained_resources(self, query,
|
||||
start_timestamp, start_timestamp_op,
|
||||
end_timestamp, end_timestamp_op,
|
||||
metaquery, resource):
|
||||
"""Return an iterable of models.Resource instances constrained
|
||||
by sample timestamp.
|
||||
|
||||
:param query: project/user/source query
|
||||
:param start_timestamp: modified timestamp start range.
|
||||
:param start_timestamp_op: start time operator, like gt, ge.
|
||||
:param end_timestamp: modified timestamp end range.
|
||||
:param end_timestamp_op: end time operator, like lt, le.
|
||||
:param metaquery: dict with metadata to match on.
|
||||
:param resource: resource filter.
|
||||
"""
|
||||
if resource is not None:
|
||||
query['resource_id'] = resource
|
||||
|
||||
# Add resource_ prefix so it matches the field in the db
|
||||
query.update(dict(('resource_' + k, v)
|
||||
for (k, v) in metaquery.iteritems()))
|
||||
|
||||
# FIXME(dhellmann): This may not perform very well,
|
||||
# but doing any better will require changing the database
|
||||
# schema and that will need more thought than I have time
|
||||
# to put into it today.
|
||||
# Look for resources matching the above criteria and with
|
||||
# samples in the time range we care about, then change the
|
||||
# resource query to return just those resources by id.
|
||||
ts_range = pymongo_base.make_timestamp_range(start_timestamp,
|
||||
end_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp_op)
|
||||
if ts_range:
|
||||
query['timestamp'] = ts_range
|
||||
|
||||
sort_keys = base._handle_sort_key('resource')
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
# use a unique collection name for the results collection,
|
||||
# as result post-sorting (as oppposed to reduce pre-sorting)
|
||||
# is not possible on an inline M-R
|
||||
out = 'resource_list_%s' % uuid.uuid4()
|
||||
self.db.meter.map_reduce(self.MAP_RESOURCES,
|
||||
self.REDUCE_RESOURCES,
|
||||
out=out,
|
||||
sort={'resource_id': 1},
|
||||
query=query)
|
||||
|
||||
try:
|
||||
for r in self.db[out].find(sort=sort_instructions):
|
||||
resource = r['value']
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=resource['user_id'],
|
||||
project_id=resource['project_id'],
|
||||
first_sample_timestamp=resource['first_timestamp'],
|
||||
last_sample_timestamp=resource['last_timestamp'],
|
||||
source=resource['source'],
|
||||
metadata=resource['metadata'])
|
||||
finally:
|
||||
self.db[out].drop()
|
||||
|
||||
def _get_floating_resources(self, query, metaquery, resource):
|
||||
"""Return an iterable of models.Resource instances unconstrained
|
||||
by timestamp.
|
||||
|
||||
:param query: project/user/source query
|
||||
:param metaquery: dict with metadata to match on.
|
||||
:param resource: resource filter.
|
||||
"""
|
||||
if resource is not None:
|
||||
query['_id'] = resource
|
||||
|
||||
query.update(dict((k, v)
|
||||
for (k, v) in metaquery.iteritems()))
|
||||
|
||||
keys = base._handle_sort_key('resource')
|
||||
sort_keys = ['last_sample_timestamp' if i == 'timestamp' else i
|
||||
for i in keys]
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
for r in self.db.resource.find(query, sort=sort_instructions):
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=r['user_id'],
|
||||
project_id=r['project_id'],
|
||||
first_sample_timestamp=r.get('first_sample_timestamp',
|
||||
self._GENESIS),
|
||||
last_sample_timestamp=r.get('last_sample_timestamp',
|
||||
self._APOCALYPSE),
|
||||
source=r['source'],
|
||||
metadata=r['metadata'])
|
||||
|
||||
def get_resources(self, user=None, project=None, source=None,
|
||||
start_timestamp=None, start_timestamp_op=None,
|
||||
end_timestamp=None, end_timestamp_op=None,
|
||||
@ -671,60 +805,23 @@ class Connection(pymongo_base.Connection):
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
|
||||
q = {}
|
||||
query = {}
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
query['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
query['project_id'] = project
|
||||
if source is not None:
|
||||
q['source'] = source
|
||||
if resource is not None:
|
||||
q['resource_id'] = resource
|
||||
# Add resource_ prefix so it matches the field in the db
|
||||
q.update(dict(('resource_' + k, v)
|
||||
for (k, v) in metaquery.iteritems()))
|
||||
query['source'] = source
|
||||
|
||||
# FIXME(dhellmann): This may not perform very well,
|
||||
# but doing any better will require changing the database
|
||||
# schema and that will need more thought than I have time
|
||||
# to put into it today.
|
||||
if start_timestamp or end_timestamp:
|
||||
# Look for resources matching the above criteria and with
|
||||
# samples in the time range we care about, then change the
|
||||
# resource query to return just those resources by id.
|
||||
ts_range = pymongo_base.make_timestamp_range(start_timestamp,
|
||||
end_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp_op)
|
||||
if ts_range:
|
||||
q['timestamp'] = ts_range
|
||||
|
||||
sort_keys = base._handle_sort_key('resource')
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
# use a unique collection name for the results collection,
|
||||
# as result post-sorting (as oppposed to reduce pre-sorting)
|
||||
# is not possible on an inline M-R
|
||||
out = 'resource_list_%s' % uuid.uuid4()
|
||||
self.db.meter.map_reduce(self.MAP_RESOURCES,
|
||||
self.REDUCE_RESOURCES,
|
||||
out=out,
|
||||
sort={'resource_id': 1},
|
||||
query=q)
|
||||
|
||||
try:
|
||||
for r in self.db[out].find(sort=sort_instructions):
|
||||
resource = r['value']
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=resource['user_id'],
|
||||
project_id=resource['project_id'],
|
||||
first_sample_timestamp=resource['first_timestamp'],
|
||||
last_sample_timestamp=resource['last_timestamp'],
|
||||
source=resource['source'],
|
||||
metadata=resource['metadata'])
|
||||
finally:
|
||||
self.db[out].drop()
|
||||
return self._get_time_constrained_resources(query,
|
||||
start_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp,
|
||||
end_timestamp_op,
|
||||
metaquery, resource)
|
||||
else:
|
||||
return self._get_floating_resources(query, metaquery, resource)
|
||||
|
||||
def _aggregate_param(self, fragment_key, aggregate):
|
||||
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
|
||||
|
@ -262,9 +262,21 @@ class ResourceTest(DBTestBase,
|
||||
|
||||
def test_get_resources_by_user(self):
|
||||
resources = list(self.conn.get_resources(user='user-id'))
|
||||
self.assertEqual(len(resources), 2)
|
||||
self.assertTrue(len(resources) == 2 or len(resources) == 1)
|
||||
ids = set(r.resource_id for r in resources)
|
||||
self.assertEqual(ids, set(['resource-id', 'resource-id-alternate']))
|
||||
# tolerate storage driver only reporting latest owner of resource
|
||||
resources_ever_owned_by = set(['resource-id',
|
||||
'resource-id-alternate'])
|
||||
resources_now_owned_by = set(['resource-id'])
|
||||
self.assertTrue(ids == resources_ever_owned_by or
|
||||
ids == resources_now_owned_by,
|
||||
'unexpected resources: %s' % ids)
|
||||
|
||||
def test_get_resources_by_alternate_user(self):
|
||||
resources = list(self.conn.get_resources(user='user-id-alternate'))
|
||||
self.assertEqual(1, len(resources))
|
||||
# only a single resource owned by this user ever
|
||||
self.assertEqual('resource-id-alternate', resources[0].resource_id)
|
||||
|
||||
def test_get_resources_by_project(self):
|
||||
resources = list(self.conn.get_resources(project='project-id'))
|
||||
|
Loading…
x
Reference in New Issue
Block a user