Merge "Rationalize get_resources for mongodb"
This commit is contained in:
commit
2e7da5cce3
@ -1379,10 +1379,10 @@ class Resource(_Base):
|
||||
"The ID of the user who created the resource or updated it last"
|
||||
|
||||
first_sample_timestamp = datetime.datetime
|
||||
"UTC date & time of the first sample associated with the resource"
|
||||
"UTC date & time not later than the first sample known for this resource"
|
||||
|
||||
last_sample_timestamp = datetime.datetime
|
||||
"UTC date & time of the last sample associated with the resource"
|
||||
"UTC date & time not earlier than the last sample known for this resource"
|
||||
|
||||
metadata = {wtypes.text: wtypes.text}
|
||||
"Arbitrary metadata associated with the resource"
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
import calendar
|
||||
import copy
|
||||
import datetime
|
||||
import json
|
||||
import operator
|
||||
import uuid
|
||||
@ -385,6 +386,10 @@ class Connection(pymongo_base.Connection):
|
||||
return merge;
|
||||
}""")
|
||||
|
||||
_GENESIS = datetime.datetime(year=datetime.MINYEAR, month=1, day=1)
|
||||
_APOCALYPSE = datetime.datetime(year=datetime.MAXYEAR, month=12, day=31,
|
||||
hour=23, minute=59, second=59)
|
||||
|
||||
def __init__(self, conf):
|
||||
url = conf.database.connection
|
||||
|
||||
@ -394,9 +399,9 @@ class Connection(pymongo_base.Connection):
|
||||
# requires a new storage connection.
|
||||
self.conn = self.CONNECTION_POOL.connect(url)
|
||||
|
||||
# Require MongoDB 2.2 to use TTL
|
||||
if self.conn.server_info()['versionArray'] < [2, 2]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
|
||||
# Require MongoDB 2.4 to use $setOnInsert
|
||||
if self.conn.server_info()['versionArray'] < [2, 4]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.4")
|
||||
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
self.db = getattr(self.conn, connection_options['database'])
|
||||
@ -429,6 +434,10 @@ class Connection(pymongo_base.Connection):
|
||||
('timestamp', pymongo.ASCENDING),
|
||||
('source', pymongo.ASCENDING),
|
||||
], name='meter_idx')
|
||||
self.db.resource.ensure_index([('last_sample_timestamp',
|
||||
pymongo.DESCENDING)],
|
||||
name='last_sample_timestamp_idx',
|
||||
sparse=True)
|
||||
self.db.meter.ensure_index([('timestamp', pymongo.DESCENDING)],
|
||||
name='timestamp_idx')
|
||||
|
||||
@ -483,14 +492,20 @@ class Connection(pymongo_base.Connection):
|
||||
upsert=True,
|
||||
)
|
||||
|
||||
# Record the updated resource metadata
|
||||
self.db.resource.update(
|
||||
# Record the updated resource metadata - we use $setOnInsert to
|
||||
# unconditionally insert sample timestamps and resource metadata
|
||||
# (in the update case, this must be conditional on the sample not
|
||||
# being out-of-order)
|
||||
resource = self.db.resource.find_and_modify(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'project_id': data['project_id'],
|
||||
'user_id': data['user_id'],
|
||||
'metadata': data['resource_metadata'],
|
||||
'source': data['source'],
|
||||
},
|
||||
'$setOnInsert': {'metadata': data['resource_metadata'],
|
||||
'first_sample_timestamp': data['timestamp'],
|
||||
'last_sample_timestamp': data['timestamp'],
|
||||
},
|
||||
'$addToSet': {'meter': {'counter_name': data['counter_name'],
|
||||
'counter_type': data['counter_type'],
|
||||
'counter_unit': data['counter_unit'],
|
||||
@ -498,8 +513,33 @@ class Connection(pymongo_base.Connection):
|
||||
},
|
||||
},
|
||||
upsert=True,
|
||||
new=True,
|
||||
)
|
||||
|
||||
# only update last sample timestamp if actually later (the usual
|
||||
# in-order case)
|
||||
last_sample_timestamp = resource.get('last_sample_timestamp')
|
||||
if (last_sample_timestamp is None or
|
||||
last_sample_timestamp <= data['timestamp']):
|
||||
self.db.resource.update(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'metadata': data['resource_metadata'],
|
||||
'last_sample_timestamp': data['timestamp']}}
|
||||
)
|
||||
|
||||
# only update first sample timestamp if actually earlier (the unusual
|
||||
# out-of-order case)
|
||||
# NOTE: a null first sample timestamp is not updated as this indicates
|
||||
# a pre-existing resource document dating from before we started
|
||||
# recording these timestamps in the resource collection
|
||||
first_sample_timestamp = resource.get('first_sample_timestamp')
|
||||
if (first_sample_timestamp is not None and
|
||||
first_sample_timestamp > data['timestamp']):
|
||||
self.db.resource.update(
|
||||
{'_id': data['resource_id']},
|
||||
{'$set': {'first_sample_timestamp': data['timestamp']}}
|
||||
)
|
||||
|
||||
# Record the raw data for the meter. Use a copy so we do not
|
||||
# modify a data structure owned by our caller (the driver adds
|
||||
# a new key '_id').
|
||||
@ -651,6 +691,100 @@ class Connection(pymongo_base.Connection):
|
||||
limit = 0
|
||||
return db_collection.find(q, limit=limit, sort=all_sort)
|
||||
|
||||
def _get_time_constrained_resources(self, query,
|
||||
start_timestamp, start_timestamp_op,
|
||||
end_timestamp, end_timestamp_op,
|
||||
metaquery, resource):
|
||||
"""Return an iterable of models.Resource instances constrained
|
||||
by sample timestamp.
|
||||
|
||||
:param query: project/user/source query
|
||||
:param start_timestamp: modified timestamp start range.
|
||||
:param start_timestamp_op: start time operator, like gt, ge.
|
||||
:param end_timestamp: modified timestamp end range.
|
||||
:param end_timestamp_op: end time operator, like lt, le.
|
||||
:param metaquery: dict with metadata to match on.
|
||||
:param resource: resource filter.
|
||||
"""
|
||||
if resource is not None:
|
||||
query['resource_id'] = resource
|
||||
|
||||
# Add resource_ prefix so it matches the field in the db
|
||||
query.update(dict(('resource_' + k, v)
|
||||
for (k, v) in metaquery.iteritems()))
|
||||
|
||||
# FIXME(dhellmann): This may not perform very well,
|
||||
# but doing any better will require changing the database
|
||||
# schema and that will need more thought than I have time
|
||||
# to put into it today.
|
||||
# Look for resources matching the above criteria and with
|
||||
# samples in the time range we care about, then change the
|
||||
# resource query to return just those resources by id.
|
||||
ts_range = pymongo_base.make_timestamp_range(start_timestamp,
|
||||
end_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp_op)
|
||||
if ts_range:
|
||||
query['timestamp'] = ts_range
|
||||
|
||||
sort_keys = base._handle_sort_key('resource')
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
# use a unique collection name for the results collection,
|
||||
# as result post-sorting (as oppposed to reduce pre-sorting)
|
||||
# is not possible on an inline M-R
|
||||
out = 'resource_list_%s' % uuid.uuid4()
|
||||
self.db.meter.map_reduce(self.MAP_RESOURCES,
|
||||
self.REDUCE_RESOURCES,
|
||||
out=out,
|
||||
sort={'resource_id': 1},
|
||||
query=query)
|
||||
|
||||
try:
|
||||
for r in self.db[out].find(sort=sort_instructions):
|
||||
resource = r['value']
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=resource['user_id'],
|
||||
project_id=resource['project_id'],
|
||||
first_sample_timestamp=resource['first_timestamp'],
|
||||
last_sample_timestamp=resource['last_timestamp'],
|
||||
source=resource['source'],
|
||||
metadata=resource['metadata'])
|
||||
finally:
|
||||
self.db[out].drop()
|
||||
|
||||
def _get_floating_resources(self, query, metaquery, resource):
|
||||
"""Return an iterable of models.Resource instances unconstrained
|
||||
by timestamp.
|
||||
|
||||
:param query: project/user/source query
|
||||
:param metaquery: dict with metadata to match on.
|
||||
:param resource: resource filter.
|
||||
"""
|
||||
if resource is not None:
|
||||
query['_id'] = resource
|
||||
|
||||
query.update(dict((k, v)
|
||||
for (k, v) in metaquery.iteritems()))
|
||||
|
||||
keys = base._handle_sort_key('resource')
|
||||
sort_keys = ['last_sample_timestamp' if i == 'timestamp' else i
|
||||
for i in keys]
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
for r in self.db.resource.find(query, sort=sort_instructions):
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=r['user_id'],
|
||||
project_id=r['project_id'],
|
||||
first_sample_timestamp=r.get('first_sample_timestamp',
|
||||
self._GENESIS),
|
||||
last_sample_timestamp=r.get('last_sample_timestamp',
|
||||
self._APOCALYPSE),
|
||||
source=r['source'],
|
||||
metadata=r['metadata'])
|
||||
|
||||
def get_resources(self, user=None, project=None, source=None,
|
||||
start_timestamp=None, start_timestamp_op=None,
|
||||
end_timestamp=None, end_timestamp_op=None,
|
||||
@ -671,60 +805,23 @@ class Connection(pymongo_base.Connection):
|
||||
if pagination:
|
||||
raise NotImplementedError(_('Pagination not implemented'))
|
||||
|
||||
q = {}
|
||||
query = {}
|
||||
if user is not None:
|
||||
q['user_id'] = user
|
||||
query['user_id'] = user
|
||||
if project is not None:
|
||||
q['project_id'] = project
|
||||
query['project_id'] = project
|
||||
if source is not None:
|
||||
q['source'] = source
|
||||
if resource is not None:
|
||||
q['resource_id'] = resource
|
||||
# Add resource_ prefix so it matches the field in the db
|
||||
q.update(dict(('resource_' + k, v)
|
||||
for (k, v) in metaquery.iteritems()))
|
||||
query['source'] = source
|
||||
|
||||
# FIXME(dhellmann): This may not perform very well,
|
||||
# but doing any better will require changing the database
|
||||
# schema and that will need more thought than I have time
|
||||
# to put into it today.
|
||||
if start_timestamp or end_timestamp:
|
||||
# Look for resources matching the above criteria and with
|
||||
# samples in the time range we care about, then change the
|
||||
# resource query to return just those resources by id.
|
||||
ts_range = pymongo_base.make_timestamp_range(start_timestamp,
|
||||
end_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp_op)
|
||||
if ts_range:
|
||||
q['timestamp'] = ts_range
|
||||
|
||||
sort_keys = base._handle_sort_key('resource')
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
# use a unique collection name for the results collection,
|
||||
# as result post-sorting (as oppposed to reduce pre-sorting)
|
||||
# is not possible on an inline M-R
|
||||
out = 'resource_list_%s' % uuid.uuid4()
|
||||
self.db.meter.map_reduce(self.MAP_RESOURCES,
|
||||
self.REDUCE_RESOURCES,
|
||||
out=out,
|
||||
sort={'resource_id': 1},
|
||||
query=q)
|
||||
|
||||
try:
|
||||
for r in self.db[out].find(sort=sort_instructions):
|
||||
resource = r['value']
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=resource['user_id'],
|
||||
project_id=resource['project_id'],
|
||||
first_sample_timestamp=resource['first_timestamp'],
|
||||
last_sample_timestamp=resource['last_timestamp'],
|
||||
source=resource['source'],
|
||||
metadata=resource['metadata'])
|
||||
finally:
|
||||
self.db[out].drop()
|
||||
return self._get_time_constrained_resources(query,
|
||||
start_timestamp,
|
||||
start_timestamp_op,
|
||||
end_timestamp,
|
||||
end_timestamp_op,
|
||||
metaquery, resource)
|
||||
else:
|
||||
return self._get_floating_resources(query, metaquery, resource)
|
||||
|
||||
def _aggregate_param(self, fragment_key, aggregate):
|
||||
fragment_map = self.STANDARD_AGGREGATES[fragment_key]
|
||||
|
@ -262,9 +262,21 @@ class ResourceTest(DBTestBase,
|
||||
|
||||
def test_get_resources_by_user(self):
|
||||
resources = list(self.conn.get_resources(user='user-id'))
|
||||
self.assertEqual(len(resources), 2)
|
||||
self.assertTrue(len(resources) == 2 or len(resources) == 1)
|
||||
ids = set(r.resource_id for r in resources)
|
||||
self.assertEqual(ids, set(['resource-id', 'resource-id-alternate']))
|
||||
# tolerate storage driver only reporting latest owner of resource
|
||||
resources_ever_owned_by = set(['resource-id',
|
||||
'resource-id-alternate'])
|
||||
resources_now_owned_by = set(['resource-id'])
|
||||
self.assertTrue(ids == resources_ever_owned_by or
|
||||
ids == resources_now_owned_by,
|
||||
'unexpected resources: %s' % ids)
|
||||
|
||||
def test_get_resources_by_alternate_user(self):
|
||||
resources = list(self.conn.get_resources(user='user-id-alternate'))
|
||||
self.assertEqual(1, len(resources))
|
||||
# only a single resource owned by this user ever
|
||||
self.assertEqual('resource-id-alternate', resources[0].resource_id)
|
||||
|
||||
def test_get_resources_by_project(self):
|
||||
resources = list(self.conn.get_resources(project='project-id'))
|
||||
|
Loading…
x
Reference in New Issue
Block a user