Optimize performance of compact records

* Make compact records pure namedtuples (remove all dict behaviour)
* Add profiler decorator for dashboard requests performance analysis
* Introduced new parameter 'collect_profiler_stats' for file name
  where profile stats could be stored
* Fix py33 test failures

Change-Id: Ic5c900047a493541510974e9bc9c161f5606739e
This commit is contained in:
Ilya Shakhat 2014-06-30 19:56:23 +04:00 committed by Ilya Shakhat
parent 8ab2a069b4
commit a7eb7d024b
16 changed files with 134 additions and 112 deletions

View File

@ -54,4 +54,7 @@
# default_project_type = openstack
# The interval specifies how frequently dashboard should check for updates in seconds
# dashboard-update-interval = 3600
# dashboard_update_interval = 3600
# Name of file to store python profiler data. This option works for dashboard only
# collect_profiler_stats =

View File

@ -13,10 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cProfile
import functools
import json
import operator
import flask
from oslo.config import cfg
import six
from werkzeug import exceptions
@ -210,23 +213,23 @@ def record_filter(ignore=None):
def incremental_filter(result, record, param_id, context):
result[record[param_id]]['metric'] += 1
result[getattr(record, param_id)]['metric'] += 1
def loc_filter(result, record, param_id, context):
result[record[param_id]]['metric'] += record['loc']
result[getattr(record, param_id)]['metric'] += record.loc
def mark_filter(result, record, param_id, context):
result_by_param = result[record[param_id]]
if record['type'] == 'Workflow' and record['value'] == 1:
result_by_param = result[getattr(record, param_id)]
if record.type == 'Workflow' and record.value == 1:
value = 'A'
else:
value = record['value']
value = record.value
result_by_param['metric'] += 1
result_by_param[value] = result_by_param.get(value, 0) + 1
if record.get('disagreement'):
if record.disagreement:
result_by_param['disagreements'] = (
result_by_param.get('disagreements', 0) + 1)
@ -265,22 +268,21 @@ def mark_finalize(record):
def person_day_filter(result, record, param_id, context):
if record['record_type'] == 'commit' or record['record_type'] == 'member':
record_type = record.record_type
if record_type == 'commit' or record_type == 'member':
# 1. commit is attributed with the date of the merge which is not an
# effort of the author (author's effort is represented in patches)
# 2. registration on openstack.org is not an effort
return
day = utils.timestamp_to_day(record['date'])
day = utils.timestamp_to_day(record.date)
# fact that record-days are grouped by days in some order is used
if context.get('last_processed_day') != day:
context['last_processed_day'] = day
context['counted_user_ids'] = set()
user = vault.get_user_from_runtime_storage(record['user_id'])
user_id = user['seq']
value = record[param_id]
user_id = record.user_id
value = getattr(record, param_id)
if user_id not in context['counted_user_ids']:
context['counted_user_ids'].add(user_id)
result[value]['metric'] += 1
@ -288,9 +290,14 @@ def person_day_filter(result, record, param_id, context):
def generate_records_for_person_day(record_ids):
memory_storage_inst = vault.get_memory_storage()
for values in memory_storage_inst.day_index.values():
for record in memory_storage_inst.get_records(record_ids & values):
yield record
id_dates = []
for record in memory_storage_inst.get_records(record_ids):
id_dates.append((record.date, record.record_id))
id_dates.sort(key=operator.itemgetter(0))
for record in memory_storage_inst.get_records(
record_id for date, record_id in id_dates):
yield record
def aggregate_filter():
@ -438,9 +445,33 @@ def jsonify(root='data'):
return decorator
def profiler_decorator(func):
@functools.wraps(func)
def profiler_decorated_function(*args, **kwargs):
profiler = None
profile_filename = cfg.CONF.collect_profiler_stats
if profile_filename:
LOG.debug('Profiler is enabled')
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
if profile_filename:
profiler.disable()
profiler.dump_stats(profile_filename)
LOG.debug('Profiler stats is written to file %s', profile_filename)
return result
return profiler_decorated_function
def response():
def decorator(func):
@functools.wraps(func)
@profiler_decorator
def response_decorated_function(*args, **kwargs):
callback = flask.app.request.args.get('callback', False)
data = func(*args, **kwargs)

View File

@ -113,12 +113,16 @@ def extend_user(user):
def get_activity(records, start_record, page_size, query_message=None):
if query_message:
# note that all records are now dicts!
key_func = operator.itemgetter('date')
records = [vault.extend_record(r) for r in records]
records = [r for r in records
if (r.get('message') and
r.get('message').find(query_message) > 0)]
records_sorted = sorted(records, key=operator.itemgetter('date'),
reverse=True)
else:
key_func = operator.attrgetter('date')
records_sorted = sorted(records, key=key_func, reverse=True)
result = []
for record in records_sorted[start_record:]:

View File

@ -53,35 +53,35 @@ class CachedMemoryStorage(MemoryStorage):
}
def _save_record(self, record):
if record.get('company_name') == '*robots':
if record.company_name == '*robots':
return
self.records[record['record_id']] = record
self.records[record.record_id] = record
for key, index in six.iteritems(self.indexes):
self._add_to_index(index, record, key)
for bp_id in (record.get('blueprint_id') or []):
for bp_id in (record.blueprint_id or []):
if bp_id in self.blueprint_id_index:
self.blueprint_id_index[bp_id].add(record['record_id'])
self.blueprint_id_index[bp_id].add(record.record_id)
else:
self.blueprint_id_index[bp_id] = set([record['record_id']])
self.blueprint_id_index[bp_id] = set([record.record_id])
record_day = utils.timestamp_to_day(record['date'])
record_day = utils.timestamp_to_day(record.date)
if record_day in self.day_index:
self.day_index[record_day].add(record['record_id'])
self.day_index[record_day].add(record.record_id)
else:
self.day_index[record_day] = set([record['record_id']])
self.day_index[record_day] = set([record.record_id])
mr = (record['module'], record['release'])
mr = (record.module, record.release)
if mr in self.module_release_index:
self.module_release_index[mr].add(record['record_id'])
self.module_release_index[mr].add(record.record_id)
else:
self.module_release_index[mr] = set([record['record_id']])
self.module_release_index[mr] = set([record.record_id])
def update(self, records):
have_updates = False
for record in records:
have_updates = True
record_id = record['record_id']
record_id = record.record_id
if record_id in self.records:
# remove existing record from indexes
self._remove_record_from_index(self.records[record_id])
@ -95,19 +95,19 @@ class CachedMemoryStorage(MemoryStorage):
def _remove_record_from_index(self, record):
for key, index in six.iteritems(self.indexes):
index[record[key]].remove(record['record_id'])
index[getattr(record, key)].remove(record.record_id)
record_day = utils.timestamp_to_day(record['date'])
self.day_index[record_day].remove(record['record_id'])
record_day = utils.timestamp_to_day(record.date)
self.day_index[record_day].remove(record.record_id)
self.module_release_index[
(record['module'], record['release'])].remove(record['record_id'])
(record.module, record.release)].remove(record.record_id)
def _add_to_index(self, record_index, record, key):
record_key = record[key]
record_key = getattr(record, key)
if record_key in record_index:
record_index[record_key].add(record['record_id'])
record_index[record_key].add(record.record_id)
else:
record_index[record_key] = set([record['record_id']])
record_index[record_key] = set([record.record_id])
def _get_record_ids_from_index(self, items, index):
record_ids = set()

View File

@ -37,7 +37,7 @@ METRIC_LABELS = {
'bpc': 'Completed Blueprints',
'filed-bugs': 'Filed Bugs',
'resolved-bugs': 'Resolved Bugs',
# 'person-day': "Person-day effort"
'person-day': "Person-day effort",
'ci': 'CI votes',
}

View File

@ -103,9 +103,9 @@ def open_reviews(module):
total_open = 0
for review in memory_storage_inst.get_records(review_ids):
if review['status'] == 'NEW':
if review.status == 'NEW':
total_open += 1
if review['value'] in [1, 2]:
if review.value in [1, 2]:
waiting_on_reviewer.append(vault.extend_record(review))
return {
@ -163,7 +163,7 @@ def _get_punch_card_data(records):
for wday in six.moves.range(0, 7):
punch_card_raw.append([0] * 24)
for record in records:
tt = datetime.datetime.fromtimestamp(record['date']).timetuple()
tt = datetime.datetime.fromtimestamp(record.date).timetuple()
punch_card_raw[tt.tm_wday][tt.tm_hour] += 1
punch_card_data = [] # format for jqplot bubble renderer

View File

@ -15,10 +15,8 @@
import collections
import os
import UserDict
import flask
import itertools
from oslo.config import cfg
import six
@ -37,27 +35,8 @@ RECORD_FIELDS_FOR_AGGREGATE = ['record_id', 'primary_key', 'record_type',
'disagreement', 'value', 'status',
'blueprint_id']
_CompactRecordTuple = collections.namedtuple('CompactRecord',
RECORD_FIELDS_FOR_AGGREGATE)
class CompactRecord(_CompactRecordTuple, UserDict.DictMixin):
__slots__ = ()
def __getitem__(self, key):
if isinstance(key, str):
return getattr(self, key)
else:
return super(CompactRecord, self).__getitem__(key)
def keys(self):
return RECORD_FIELDS_FOR_AGGREGATE
def has_key(self, key):
return key in RECORD_FIELDS_FOR_AGGREGATE
def iteritems(self):
return itertools.izip(RECORD_FIELDS_FOR_AGGREGATE, self)
CompactRecord = collections.namedtuple('CompactRecord',
RECORD_FIELDS_FOR_AGGREGATE)
def compact_records(records):
@ -70,7 +49,7 @@ def compact_records(records):
def extend_record(record):
runtime_storage_inst = get_vault()['runtime_storage']
return runtime_storage_inst.get_by_key(
runtime_storage_inst._get_record_name(record['record_id']))
runtime_storage_inst._get_record_name(record.record_id))
def get_vault():

View File

@ -72,19 +72,21 @@ def _get_aggregated_stats(records, metric_filter, keys, param_id,
param_title=None, finalize_handler=None):
param_title = param_title or param_id
result = dict((c, {'metric': 0, 'id': c}) for c in keys)
context = {}
context = {'vault': vault.get_vault()}
if metric_filter:
for record in records:
metric_filter(result, record, param_id, context)
result[record[param_id]]['name'] = record[param_title]
result[getattr(record, param_id)]['name'] = (
getattr(record, param_title))
else:
for record in records:
record_param_id = record[param_id]
record_param_id = getattr(record, param_id)
result[record_param_id]['metric'] += 1
result[record_param_id]['name'] = record[param_title]
result[record_param_id]['name'] = getattr(record, param_title)
response = [r for r in result.values() if r['metric']]
response = [item for item in map(finalize_handler, response) if item]
if finalize_handler:
response = [item for item in map(finalize_handler, response) if item]
response.sort(key=lambda x: x['metric'], reverse=True)
utils.add_index(response, item_filter=lambda x: x['id'] != '*independent')
return response
@ -102,8 +104,8 @@ def get_new_companies(records, **kwargs):
result = {}
for record in records:
company_name = record['company_name']
date = record['date']
company_name = record.company_name
date = record.date
if company_name not in result or result[company_name] > date:
result[company_name] = date
@ -204,21 +206,21 @@ def get_engineers_extended(records, **kwargs):
return record
def record_processing(result, record, param_id):
result_row = result[record[param_id]]
record_type = record['record_type']
result_row = result[getattr(record, param_id)]
record_type = record.record_type
result_row[record_type] = result_row.get(record_type, 0) + 1
if record_type == 'mark':
decorators.mark_filter(result, record, param_id, {})
result = {}
for record in records:
user_id = record['user_id']
user_id = record.user_id
if user_id not in result:
result[user_id] = {'id': user_id, 'mark': 0, 'review': 0,
'commit': 0, 'email': 0, 'patch': 0,
'metric': 0}
record_processing(result, record, 'user_id')
result[user_id]['name'] = record['author_name']
result[user_id]['name'] = record.author_name
response = result.values()
response = [item for item in map(postprocessing, response) if item]
@ -237,9 +239,9 @@ def get_engineers_extended(records, **kwargs):
def get_distinct_engineers(records, **kwargs):
result = {}
for record in records:
result[record['user_id']] = {
'author_name': record['author_name'],
'author_email': record['author_email'],
result[record.user_id] = {
'author_name': record.author_name,
'author_email': record.author_email,
}
return result
@ -526,7 +528,7 @@ def timeline(records, **kwargs):
week_stat_commits_hl = dict((c, 0) for c in weeks)
if ('commits' in metric) or ('loc' in metric):
handler = lambda record: record['loc']
handler = lambda record: record.loc
else:
handler = lambda record: 0
@ -536,13 +538,13 @@ def timeline(records, **kwargs):
release_stat = collections.defaultdict(set)
all_stat = collections.defaultdict(set)
for record in records:
if ((record['record_type'] in ['commit', 'member']) or
(record['week'] not in weeks)):
if ((record.record_type in ['commit', 'member']) or
(record.week not in weeks)):
continue
day = utils.timestamp_to_day(record['date'])
user = vault.get_user_from_runtime_storage(record['user_id'])
if record['release'] == release_name:
day = utils.timestamp_to_day(record.date)
user = vault.get_user_from_runtime_storage(record.user_id)
if record.release == release_name:
release_stat[day] |= set([user['seq']])
all_stat[day] |= set([user['seq']])
for day, users in six.iteritems(release_stat):
@ -553,15 +555,15 @@ def timeline(records, **kwargs):
week_stat_commits[week] += len(users)
else:
for record in records:
week = record['week']
week = record.week
if week in weeks:
week_stat_loc[week] += handler(record)
week_stat_commits[week] += 1
if 'members' in metric:
if record['date'] >= start_date:
if record.date >= start_date:
week_stat_commits_hl[week] += 1
else:
if record['release'] == release_name:
if record.release == release_name:
week_stat_commits_hl[week] += 1
if 'all' == release_name and 'members' not in metric:

View File

@ -62,4 +62,7 @@ OPTS = [
cfg.IntOpt('dashboard-update-interval', default=3600,
help='The interval specifies how frequently dashboard should '
'check for updates in seconds'),
cfg.StrOpt('collect-profiler-stats',
help='Name of file to store python profiler data. This option '
'works for dashboard only'),
]

View File

@ -15,6 +15,7 @@
import contextlib
import itertools
import json
import uuid
import mock
@ -172,3 +173,7 @@ def algebraic_product(**kwargs):
for position, key in six.iteritems(position_to_key):
result[key] = chain[position]
yield result
def load_json(api_response):
return json.loads(api_response.data.decode('utf8'))

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from tests.api import test_api
@ -64,14 +62,14 @@ class TestAPICompanies(test_api.TestAPI):
response = self.app.get('/api/1.0/companies?metric=commits&'
'module=glance')
companies = json.loads(response.data)['data']
companies = test_api.load_json(response)['data']
self.assertEqual([{'id': 'ibm', 'text': 'IBM'},
{'id': 'nec', 'text': 'NEC'},
{'id': 'ntt', 'text': 'NTT'}], companies)
response = self.app.get('/api/1.0/companies?metric=marks&'
'module=glance')
companies = json.loads(response.data)['data']
companies = test_api.load_json(response)['data']
self.assertEqual([{'id': 'ibm', 'text': 'IBM'},
{'id': 'nec', 'text': 'NEC'}], companies)
@ -90,7 +88,7 @@ class TestAPICompanies(test_api.TestAPI):
company_name=['NEC', 'IBM', 'NTT'])):
response = self.app.get('/api/1.0/companies/nec?module=glance')
company = json.loads(response.data)['company']
company = test_api.load_json(response)['company']
self.assertEqual({'id': 'nec', 'text': 'NEC'}, company)
response = self.app.get('/api/1.0/companies/google?module=glance')

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from tests.api import test_api
@ -50,7 +48,7 @@ class TestAPIModules(test_api.TestAPI):
response = self.app.get('/api/1.0/modules?'
'project_type=all&metric=commits')
modules = json.loads(response.data)['data']
modules = test_api.load_json(response)['data']
self.assertEqual(
[{'id': 'glance', 'text': 'glance', 'tag': 'module'},
{'id': 'nova', 'text': 'nova', 'tag': 'module'},
@ -63,7 +61,7 @@ class TestAPIModules(test_api.TestAPI):
response = self.app.get('/api/1.0/modules?module=nova-group&'
'project_type=integrated&metric=commits')
modules = json.loads(response.data)['data']
modules = test_api.load_json(response)['data']
self.assertEqual(
[{'id': 'glance', 'text': 'glance', 'tag': 'module'},
{'id': 'nova', 'text': 'nova', 'tag': 'module'},
@ -89,12 +87,12 @@ class TestAPIModules(test_api.TestAPI):
test_api.make_records(record_type=['commit'])):
response = self.app.get('/api/1.0/modules/nova')
module = json.loads(response.data)['module']
module = test_api.load_json(response)['module']
self.assertEqual(
{'id': 'nova', 'text': 'nova', 'tag': 'module'}, module)
response = self.app.get('/api/1.0/modules/nova-group')
module = json.loads(response.data)['module']
module = test_api.load_json(response)['module']
self.assertEqual(
{'tag': 'group', 'id': 'nova-group', 'text': 'nova-group'},
module)

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from tests.api import test_api
@ -28,7 +26,7 @@ class TestAPIReleases(test_api.TestAPI):
{'release_name': 'icehouse', 'end_date': 1397692800}]},
test_api.make_records(record_type=['commit'])):
response = self.app.get('/api/1.0/releases')
releases = json.loads(response.data)['data']
releases = test_api.load_json(response)['data']
self.assertEqual(3, len(releases))
self.assertIn({'id': 'all', 'text': 'All'}, releases)
self.assertIn({'id': 'icehouse', 'text': 'Icehouse'}, releases)

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from tests.api import test_api
@ -50,7 +48,7 @@ class TestAPIStats(test_api.TestAPI):
module=['glance'])):
response = self.app.get('/api/1.0/stats/modules?metric=loc&'
'project_type=all')
stats = json.loads(response.data)['stats']
stats = test_api.load_json(response)['stats']
self.assertEqual(2, len(stats))
self.assertEqual(600, stats[0]['metric'])
self.assertEqual('glance', stats[0]['id'])
@ -106,7 +104,7 @@ class TestAPIStats(test_api.TestAPI):
user_id=['john_doe', 'bill'])):
response = self.app.get('/api/1.0/stats/engineers?metric=loc&'
'project_type=all')
stats = json.loads(response.data)['stats']
stats = test_api.load_json(response)['stats']
self.assertEqual(1, len(stats))
self.assertEqual(660, stats[0]['metric'])
@ -163,7 +161,7 @@ class TestAPIStats(test_api.TestAPI):
user_id=['smith'])):
response = self.app.get('/api/1.0/stats/engineers_extended?'
'project_type=all')
stats = json.loads(response.data)['stats']
stats = test_api.load_json(response)['stats']
self.assertEqual(2, len(stats))
self.assertEqual(2, stats[0]['mark'])
self.assertEqual('john_doe', stats[0]['id'])

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from tests.api import test_api
@ -41,7 +39,7 @@ class TestAPIUsers(test_api.TestAPI):
user_id=['john_doe', 'bill_smith'])):
response = self.app.get('/api/1.0/users?'
'module=nova&metric=commits')
users = json.loads(response.data)['data']
users = test_api.load_json(response)['data']
self.assertEqual(2, len(users))
self.assertIn({'id': 'john_doe', 'text': 'John Doe'}, users)
self.assertIn({'id': 'bill_smith', 'text': 'Bill Smith'}, users)
@ -55,7 +53,7 @@ class TestAPIUsers(test_api.TestAPI):
test_api.make_records(record_type=['commit'], module=['nova'],
user_name=['John Doe', 'Bill Smith'])):
response = self.app.get('/api/1.0/users/john_doe')
user = json.loads(response.data)['user']
user = test_api.load_json(response)['user']
self.assertEqual('john_doe', user['user_id'])
def test_user_not_found(self):

View File

@ -27,7 +27,12 @@ commands = python -m testtools.run \
tests.unit.test_mls \
tests.unit.test_record_processor \
tests.unit.test_utils \
tests.unit.test_vcs
tests.unit.test_vcs \
tests.api.test_companies \
tests.api.test_modules \
tests.api.test_releases \
tests.api.test_stats \
tests.api.test_users
[testenv:pep8]
commands = flake8