Merge user profiles during default data update

* Existing user profiles are updated with data read from default-data
* Refactoring of default data processor
* Removed unused code from normalizer (validity is enforced by schema)
* Add more restrictions into the schema

Fixes bug 1260696

Change-Id: Iab1f190ff8a7e2a0c0800712174565437149e119
This commit is contained in:
Ilya Shakhat 2013-12-19 18:47:31 +04:00
parent 0ff9761a64
commit 82600d8805
7 changed files with 94 additions and 90 deletions

View File

@ -39,8 +39,10 @@
"pattern": "20\\d{2}-\\w{3}-[0-3]\\d"
}
},
"required": ["company_name", "end_date"],
"additionalProperties": false
}
},
"minItems": 1
}
},
"required": ["launchpad_id", "user_name", "emails"],

View File

@ -34,7 +34,7 @@ def _check_default_data_change(runtime_storage_inst, default_data):
p_digest = runtime_storage_inst.get_by_key('default_data_digest')
if digest == p_digest:
LOG.debug('No changes in default data detected, sha1: %s', digest)
LOG.debug('No changes in default data, sha1: %s', digest)
return False
LOG.debug('Default data has changes, sha1: %s', digest)
@ -42,21 +42,16 @@ def _check_default_data_change(runtime_storage_inst, default_data):
return True
def _retrieve_project_list(default_data):
def _retrieve_project_list_from_github(project_sources):
LOG.info('Retrieving project list from GitHub')
repo_index = {}
for repo in default_data['repos']:
repo_index[repo['uri']] = repo
github = MainClass.Github(timeout=60)
for project_source in default_data['project_sources']:
repos = []
for project_source in project_sources:
organization = project_source['organization']
LOG.debug('Get list of projects for organization %s', organization)
try:
repos = github.get_organization(organization).get_repos()
github_repos = github.get_organization(organization).get_repos()
except Exception as e:
LOG.exception(e)
LOG.warn('Fail to retrieve list of projects. Keep it unmodified')
@ -64,42 +59,42 @@ def _retrieve_project_list(default_data):
exclude = set(project_source.get('exclude', []))
for repo in repos:
repo_uri = repo.git_url
repo_name = repo.name
if (repo_uri not in repo_index) and (repo_name not in exclude):
for repo in github_repos:
if repo.name not in exclude:
r = {
'branches': ['master'],
'module': repo_name,
'module': repo.name,
'organization': organization,
'project_type': project_source['project_type'],
'project_group': project_source['project_group'],
'uri': repo_uri,
'uri': repo.git_url,
'releases': []
}
default_data['repos'].append(r)
repos.append(r)
LOG.debug('Project is added to default data: %s', r)
return True
return repos
def _process_users(runtime_storage_inst, users):
users_index = {}
def _update_project_list(default_data):
configured_repos = set([r['uri'] for r in default_data['repos']])
repos = _retrieve_project_list_from_github(default_data['project_sources'])
if repos:
default_data['repos'] += [r for r in repos
if r['uri'] not in configured_repos]
def _store_users(runtime_storage_inst, users):
for user in users:
stored_user = utils.load_user(runtime_storage_inst, user['user_id'])
if stored_user:
stored_user.update(user)
user = stored_user
utils.store_user(runtime_storage_inst, user)
if 'user_id' in user:
users_index[user['user_id']] = user
if 'launchpad_id' in user:
users_index[user['launchpad_id']] = user
if 'gerrit_id' in user:
users_index[user['gerrit_id']] = user
for email in user['emails']:
users_index[email] = user
runtime_storage_inst.set_by_key('users', users_index)
def _process_companies(runtime_storage_inst, companies):
def _store_companies(runtime_storage_inst, companies):
domains_index = {}
for company in companies:
for domain in company['domains']:
@ -107,50 +102,54 @@ def _process_companies(runtime_storage_inst, companies):
runtime_storage_inst.set_by_key('companies', domains_index)
KEYS = {
'users': _process_users,
'companies': _process_companies,
STORE_FUNCS = {
'users': _store_users,
'companies': _store_companies,
}
def _update_default_data(runtime_storage_inst, default_data):
def _store_default_data(runtime_storage_inst, default_data):
normalizer.normalize_default_data(default_data)
LOG.debug('Update runtime storage with default data')
for key, value in default_data.iteritems():
if key in KEYS:
KEYS[key](runtime_storage_inst, value)
if key in STORE_FUNCS:
STORE_FUNCS[key](runtime_storage_inst, value)
else:
runtime_storage_inst.set_by_key(key, value)
def _update_records(runtime_storage_inst, sources_root):
LOG.debug('Gather release index for all repos')
release_index = {}
for repo in utils.load_repos(runtime_storage_inst):
vcs_inst = vcs.get_vcs(repo, sources_root)
release_index.update(vcs_inst.get_release_index())
record_processor_inst = record_processor.RecordProcessor(
runtime_storage_inst)
# need to iterate over full view of records and generate valid
# users profiles
LOG.debug('Iterate all records to create valid users profiles')
for record in runtime_storage_inst.get_all_records():
record_processor_inst.update_user(record)
# update records according to generated users profiles
LOG.debug('Update all records according to users profiles')
updated_records = record_processor_inst.update(
runtime_storage_inst.get_all_records(), release_index)
runtime_storage_inst.set_records(updated_records)
def process(runtime_storage_inst, default_data, sources_root, force_update):
LOG.debug('Process default data')
normalizer.normalize_default_data(default_data)
dd_changed = _check_default_data_change(runtime_storage_inst, default_data)
if 'project_sources' in default_data:
if not _retrieve_project_list(default_data):
raise Exception('Unable to retrieve project list')
_update_project_list(default_data)
_update_default_data(runtime_storage_inst, default_data)
if (dd_changed or force_update):
LOG.debug('Gather release index for all repos')
release_index = {}
for repo in utils.load_repos(runtime_storage_inst):
vcs_inst = vcs.get_vcs(repo, sources_root)
release_index.update(vcs_inst.get_release_index())
record_processor_inst = record_processor.RecordProcessor(
runtime_storage_inst)
# need to iterate over full view of records and generate valid
# users profiles
LOG.debug('Iterate all records to create valid users profiles')
for record in runtime_storage_inst.get_all_records():
record_processor_inst.update_user(record)
# update records according to generated users profiles
LOG.debug('Update all records according to users profiles')
updated_records = record_processor_inst.update(
runtime_storage_inst.get_all_records(), release_index)
runtime_storage_inst.set_records(updated_records)
if dd_changed or force_update:
_store_default_data(runtime_storage_inst, default_data)
_update_records(runtime_storage_inst, sources_root)

View File

@ -19,20 +19,9 @@ from stackalytics.processor import utils
LOG = logging.getLogger(__name__)
def get_user_id(launchpad_id, email):
return launchpad_id or email
def normalize_user(user):
user['emails'] = [email.lower() for email in user['emails']]
if user['launchpad_id']:
user['launchpad_id'] = user['launchpad_id'].lower()
def _normalize_user(user):
for c in user['companies']:
end_date_numeric = 0
if c['end_date']:
end_date_numeric = utils.date_to_timestamp(c['end_date'])
c['end_date'] = end_date_numeric
c['end_date'] = utils.date_to_timestamp(c['end_date'])
# sort companies by end_date
def end_date_comparator(x, y):
@ -44,19 +33,12 @@ def normalize_user(user):
return cmp(x["end_date"], y["end_date"])
user['companies'].sort(cmp=end_date_comparator)
if user['emails']:
user['user_id'] = get_user_id(user['launchpad_id'], user['emails'][0])
else:
user['user_id'] = user['launchpad_id']
user['user_id'] = user['launchpad_id']
def _normalize_users(users):
for user in users:
if ('launchpad_id' not in user) or ('emails' not in user):
LOG.warn('Skipping invalid user: %s', user)
continue
normalize_user(user)
_normalize_user(user)
def _normalize_releases(releases):

View File

@ -18,7 +18,6 @@ import time
from stackalytics.openstack.common import log as logging
from stackalytics.processor import launchpad_utils
from stackalytics.processor import normalizer
from stackalytics.processor import utils
LOG = logging.getLogger(__name__)
@ -82,7 +81,7 @@ class RecordProcessor(object):
company = (self._get_company_by_email(email) or
self._get_independent())
user = {
'user_id': normalizer.get_user_id(launchpad_id, email),
'user_id': launchpad_id or email,
'launchpad_id': launchpad_id,
'user_name': user_name or '',
'companies': [{
@ -94,7 +93,6 @@ class RecordProcessor(object):
user['emails'] = [email]
else:
user['emails'] = []
normalizer.normalize_user(user)
return user
def _get_lp_info(self, email):

View File

@ -28,6 +28,8 @@ LOG = logging.getLogger(__name__)
def date_to_timestamp(d):
if not d:
return 0
if d == 'now':
return int(time.time())
return int(time.mktime(

View File

@ -139,7 +139,7 @@ class Git(Vcs):
LOG.debug('Get release index for repo uri: %s', self.repo['uri'])
os.chdir(self.folder)
if not self.release_index:
for release in self.repo['releases']:
for release in self.repo.get('releases', []):
release_name = release['release_name'].lower()
if 'branch' in release:

View File

@ -17,6 +17,7 @@ import copy
import mock
import testtools
from stackalytics.processor import default_data_processor
from stackalytics.processor import normalizer
from tests.unit import test_data
@ -49,3 +50,23 @@ class TestDefaultDataProcessor(testtools.TestCase):
self.assertEqual(test_data.USERS[0]['launchpad_id'],
data['users'][0]['user_id'],
message='User id should be set')
def test_update_project_list(self):
with mock.patch('stackalytics.processor.default_data_processor.'
'_retrieve_project_list_from_github') as retriever:
retriever.return_value = [
{'module': 'nova', 'uri': 'git://github.com/openstack/nova'},
{'module': 'qa', 'uri': 'git://github.com/openstack/qa'},
]
dd = {
'repos': [
{'module': 'qa', 'uri': 'git://github.com/openstack/qa'},
],
'project_sources': ['any']
}
default_data_processor._update_project_list(dd)
self.assertEqual(2, len(dd['repos']))
self.assertIn('qa', set([r['module'] for r in dd['repos']]))
self.assertIn('nova', set([r['module'] for r in dd['repos']]))