Replace mongo aggregation with plain ol' map-reduce
Fixes bug 1262571 Previously, the mongodb storage driver an aggregation pipeline over the meter collection in order to construct a list of resources adorned with first & last sample timestamps etc. However mongodb aggregation framework performs sorting in-memory, in this case operating over a potentially very large collection. It is also hardcoded to abort any sorts in an aggregation pipeline that will consume more than 10% of physical memory, which is observed in this case. Now, we avoid the aggregation framework altogether and instead use an equivalent map-reduce. Change-Id: Ibef4a95acada411af385ff75ccb36c5724068b59
This commit is contained in:
parent
31d4c0ced4
commit
ba6641afac
@ -24,6 +24,7 @@ import calendar
|
||||
import copy
|
||||
import json
|
||||
import operator
|
||||
import uuid
|
||||
import weakref
|
||||
|
||||
import bson.code
|
||||
@ -302,6 +303,39 @@ class Connection(base.Connection):
|
||||
SORT_OPERATION_MAPPING = {'desc': (pymongo.DESCENDING, '$lt'),
|
||||
'asc': (pymongo.ASCENDING, '$gt')}
|
||||
|
||||
MAP_RESOURCES = bson.code.Code("""
|
||||
function () {
|
||||
emit(this.resource_id,
|
||||
{user_id: this.user_id,
|
||||
project_id: this.project_id,
|
||||
source: this.source,
|
||||
first_timestamp: this.timestamp,
|
||||
last_timestamp: this.timestamp,
|
||||
metadata: this.resource_metadata})
|
||||
}""")
|
||||
|
||||
REDUCE_RESOURCES = bson.code.Code("""
|
||||
function (key, values) {
|
||||
var merge = {user_id: values[0].user_id,
|
||||
project_id: values[0].project_id,
|
||||
source: values[0].source,
|
||||
first_timestamp: values[0].first_timestamp,
|
||||
last_timestamp: values[0].last_timestamp,
|
||||
metadata: values[0].metadata}
|
||||
values.forEach(function(value) {
|
||||
if (merge.first_timestamp - value.first_timestamp > 0) {
|
||||
merge.first_timestamp = value.first_timestamp;
|
||||
merge.user_id = value.user_id;
|
||||
merge.project_id = value.project_id;
|
||||
merge.source = value.source;
|
||||
} else if (merge.last_timestamp - value.last_timestamp <= 0) {
|
||||
merge.last_timestamp = value.last_timestamp;
|
||||
merge.metadata = value.metadata;
|
||||
}
|
||||
});
|
||||
return merge;
|
||||
}""")
|
||||
|
||||
def __init__(self, conf):
|
||||
url = conf.database.connection
|
||||
|
||||
@ -311,7 +345,7 @@ class Connection(base.Connection):
|
||||
# requires a new storage connection.
|
||||
self.conn = self.CONNECTION_POOL.connect(url)
|
||||
|
||||
# Require MongoDB 2.2 to use aggregate() and TTL
|
||||
# Require MongoDB 2.2 to use TTL
|
||||
if self.conn.server_info()['versionArray'] < [2, 2]:
|
||||
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
|
||||
|
||||
@ -635,33 +669,29 @@ class Connection(base.Connection):
|
||||
sort_keys = base._handle_sort_key('resource')
|
||||
sort_instructions = self._build_sort_instructions(sort_keys)[0]
|
||||
|
||||
aggregate = self.db.meter.aggregate([
|
||||
{"$match": q},
|
||||
{"$sort": dict(sort_instructions)},
|
||||
{"$group": {
|
||||
"_id": "$resource_id",
|
||||
"user_id": {"$first": "$user_id"},
|
||||
"project_id": {"$first": "$project_id"},
|
||||
"source": {"$first": "$source"},
|
||||
"first_sample_timestamp": {"$min": "$timestamp"},
|
||||
"last_sample_timestamp": {"$max": "$timestamp"},
|
||||
"metadata": {"$first": "$resource_metadata"},
|
||||
"meters_name": {"$push": "$counter_name"},
|
||||
"meters_type": {"$push": "$counter_type"},
|
||||
"meters_unit": {"$push": "$counter_unit"},
|
||||
}},
|
||||
])
|
||||
# use a unique collection name for the results collection,
|
||||
# as result post-sorting (as oppposed to reduce pre-sorting)
|
||||
# is not possible on an inline M-R
|
||||
out = 'resource_list_%s' % uuid.uuid4()
|
||||
self.db.meter.map_reduce(self.MAP_RESOURCES,
|
||||
self.REDUCE_RESOURCES,
|
||||
out=out,
|
||||
sort={'resource_id': 1},
|
||||
query=q)
|
||||
|
||||
for result in aggregate['result']:
|
||||
yield models.Resource(
|
||||
resource_id=result['_id'],
|
||||
user_id=result['user_id'],
|
||||
project_id=result['project_id'],
|
||||
first_sample_timestamp=result['first_sample_timestamp'],
|
||||
last_sample_timestamp=result['last_sample_timestamp'],
|
||||
source=result['source'],
|
||||
metadata=result['metadata'],
|
||||
)
|
||||
try:
|
||||
for r in self.db[out].find(sort=sort_instructions):
|
||||
resource = r['value']
|
||||
yield models.Resource(
|
||||
resource_id=r['_id'],
|
||||
user_id=resource['user_id'],
|
||||
project_id=resource['project_id'],
|
||||
first_sample_timestamp=resource['first_timestamp'],
|
||||
last_sample_timestamp=resource['last_timestamp'],
|
||||
source=resource['source'],
|
||||
metadata=resource['metadata'])
|
||||
finally:
|
||||
self.db[out].drop()
|
||||
|
||||
def get_meters(self, user=None, project=None, resource=None, source=None,
|
||||
metaquery={}, pagination=None):
|
||||
|
@ -130,9 +130,9 @@ class TestListResources(FunctionalTest,
|
||||
|
||||
def test_instance_multiple_samples(self):
|
||||
timestamps = [
|
||||
datetime.datetime(2012, 7, 2, 10, 40),
|
||||
datetime.datetime(2012, 7, 2, 10, 41),
|
||||
datetime.datetime(2012, 7, 2, 10, 42),
|
||||
datetime.datetime(2012, 7, 2, 10, 40),
|
||||
]
|
||||
for timestamp in timestamps:
|
||||
datapoint = sample.Sample(
|
||||
@ -145,7 +145,7 @@ class TestListResources(FunctionalTest,
|
||||
'resource-id',
|
||||
timestamp=timestamp,
|
||||
resource_metadata={'display_name': 'test-server',
|
||||
'tag': 'self.sample',
|
||||
'tag': 'self.sample-%s' % timestamp,
|
||||
},
|
||||
source='test',
|
||||
)
|
||||
@ -157,7 +157,7 @@ class TestListResources(FunctionalTest,
|
||||
|
||||
data = self.get_json('/resources')
|
||||
self.assertEqual(1, len(data))
|
||||
self._verify_sample_timestamps(data[0], timestamps[0], timestamps[-1])
|
||||
self._verify_sample_timestamps(data[0], timestamps[-1], timestamps[1])
|
||||
|
||||
def test_instances_one(self):
|
||||
sample1 = sample.Sample(
|
||||
|
Loading…
Reference in New Issue
Block a user