From 82600d880554eab73efe13de73d882faaee940db Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Thu, 19 Dec 2013 18:47:31 +0400 Subject: [PATCH] Merge user profiles during default data update * Existing user profiles are updated with data read from default-data * Refactoring of default data processor * Removed unused code from normalizer (validity is enforced by schema) * Add more restrictions into the schema Fixes bug 1260696 Change-Id: Iab1f190ff8a7e2a0c0800712174565437149e119 --- etc/default_data.schema.json | 4 +- .../processor/default_data_processor.py | 125 +++++++++--------- stackalytics/processor/normalizer.py | 26 +--- stackalytics/processor/record_processor.py | 4 +- stackalytics/processor/utils.py | 2 + stackalytics/processor/vcs.py | 2 +- tests/unit/test_default_data_processor.py | 21 +++ 7 files changed, 94 insertions(+), 90 deletions(-) diff --git a/etc/default_data.schema.json b/etc/default_data.schema.json index ad1e17065..b032d25a6 100644 --- a/etc/default_data.schema.json +++ b/etc/default_data.schema.json @@ -39,8 +39,10 @@ "pattern": "20\\d{2}-\\w{3}-[0-3]\\d" } }, + "required": ["company_name", "end_date"], "additionalProperties": false - } + }, + "minItems": 1 } }, "required": ["launchpad_id", "user_name", "emails"], diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index 0dd430cdf..a8a71ed8b 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -34,7 +34,7 @@ def _check_default_data_change(runtime_storage_inst, default_data): p_digest = runtime_storage_inst.get_by_key('default_data_digest') if digest == p_digest: - LOG.debug('No changes in default data detected, sha1: %s', digest) + LOG.debug('No changes in default data, sha1: %s', digest) return False LOG.debug('Default data has changes, sha1: %s', digest) @@ -42,21 +42,16 @@ def _check_default_data_change(runtime_storage_inst, default_data): return True -def _retrieve_project_list(default_data): - +def _retrieve_project_list_from_github(project_sources): LOG.info('Retrieving project list from GitHub') - - repo_index = {} - for repo in default_data['repos']: - repo_index[repo['uri']] = repo - github = MainClass.Github(timeout=60) - for project_source in default_data['project_sources']: + repos = [] + for project_source in project_sources: organization = project_source['organization'] LOG.debug('Get list of projects for organization %s', organization) try: - repos = github.get_organization(organization).get_repos() + github_repos = github.get_organization(organization).get_repos() except Exception as e: LOG.exception(e) LOG.warn('Fail to retrieve list of projects. Keep it unmodified') @@ -64,42 +59,42 @@ def _retrieve_project_list(default_data): exclude = set(project_source.get('exclude', [])) - for repo in repos: - repo_uri = repo.git_url - repo_name = repo.name - - if (repo_uri not in repo_index) and (repo_name not in exclude): + for repo in github_repos: + if repo.name not in exclude: r = { 'branches': ['master'], - 'module': repo_name, + 'module': repo.name, 'organization': organization, 'project_type': project_source['project_type'], 'project_group': project_source['project_group'], - 'uri': repo_uri, + 'uri': repo.git_url, 'releases': [] } - default_data['repos'].append(r) + repos.append(r) LOG.debug('Project is added to default data: %s', r) - - return True + return repos -def _process_users(runtime_storage_inst, users): - users_index = {} +def _update_project_list(default_data): + + configured_repos = set([r['uri'] for r in default_data['repos']]) + + repos = _retrieve_project_list_from_github(default_data['project_sources']) + if repos: + default_data['repos'] += [r for r in repos + if r['uri'] not in configured_repos] + + +def _store_users(runtime_storage_inst, users): for user in users: + stored_user = utils.load_user(runtime_storage_inst, user['user_id']) + if stored_user: + stored_user.update(user) + user = stored_user utils.store_user(runtime_storage_inst, user) - if 'user_id' in user: - users_index[user['user_id']] = user - if 'launchpad_id' in user: - users_index[user['launchpad_id']] = user - if 'gerrit_id' in user: - users_index[user['gerrit_id']] = user - for email in user['emails']: - users_index[email] = user - runtime_storage_inst.set_by_key('users', users_index) -def _process_companies(runtime_storage_inst, companies): +def _store_companies(runtime_storage_inst, companies): domains_index = {} for company in companies: for domain in company['domains']: @@ -107,50 +102,54 @@ def _process_companies(runtime_storage_inst, companies): runtime_storage_inst.set_by_key('companies', domains_index) -KEYS = { - 'users': _process_users, - 'companies': _process_companies, +STORE_FUNCS = { + 'users': _store_users, + 'companies': _store_companies, } -def _update_default_data(runtime_storage_inst, default_data): +def _store_default_data(runtime_storage_inst, default_data): + normalizer.normalize_default_data(default_data) + LOG.debug('Update runtime storage with default data') for key, value in default_data.iteritems(): - if key in KEYS: - KEYS[key](runtime_storage_inst, value) + if key in STORE_FUNCS: + STORE_FUNCS[key](runtime_storage_inst, value) else: runtime_storage_inst.set_by_key(key, value) +def _update_records(runtime_storage_inst, sources_root): + LOG.debug('Gather release index for all repos') + release_index = {} + for repo in utils.load_repos(runtime_storage_inst): + vcs_inst = vcs.get_vcs(repo, sources_root) + release_index.update(vcs_inst.get_release_index()) + + record_processor_inst = record_processor.RecordProcessor( + runtime_storage_inst) + + # need to iterate over full view of records and generate valid + # users profiles + LOG.debug('Iterate all records to create valid users profiles') + for record in runtime_storage_inst.get_all_records(): + record_processor_inst.update_user(record) + + # update records according to generated users profiles + LOG.debug('Update all records according to users profiles') + updated_records = record_processor_inst.update( + runtime_storage_inst.get_all_records(), release_index) + runtime_storage_inst.set_records(updated_records) + + def process(runtime_storage_inst, default_data, sources_root, force_update): LOG.debug('Process default data') - normalizer.normalize_default_data(default_data) - dd_changed = _check_default_data_change(runtime_storage_inst, default_data) if 'project_sources' in default_data: - if not _retrieve_project_list(default_data): - raise Exception('Unable to retrieve project list') + _update_project_list(default_data) - _update_default_data(runtime_storage_inst, default_data) - - if (dd_changed or force_update): - LOG.debug('Gather release index for all repos') - release_index = {} - for repo in utils.load_repos(runtime_storage_inst): - vcs_inst = vcs.get_vcs(repo, sources_root) - release_index.update(vcs_inst.get_release_index()) - - record_processor_inst = record_processor.RecordProcessor( - runtime_storage_inst) - # need to iterate over full view of records and generate valid - # users profiles - LOG.debug('Iterate all records to create valid users profiles') - for record in runtime_storage_inst.get_all_records(): - record_processor_inst.update_user(record) - # update records according to generated users profiles - LOG.debug('Update all records according to users profiles') - updated_records = record_processor_inst.update( - runtime_storage_inst.get_all_records(), release_index) - runtime_storage_inst.set_records(updated_records) + if dd_changed or force_update: + _store_default_data(runtime_storage_inst, default_data) + _update_records(runtime_storage_inst, sources_root) diff --git a/stackalytics/processor/normalizer.py b/stackalytics/processor/normalizer.py index 4f033fcbd..3ac02f0fd 100644 --- a/stackalytics/processor/normalizer.py +++ b/stackalytics/processor/normalizer.py @@ -19,20 +19,9 @@ from stackalytics.processor import utils LOG = logging.getLogger(__name__) -def get_user_id(launchpad_id, email): - return launchpad_id or email - - -def normalize_user(user): - user['emails'] = [email.lower() for email in user['emails']] - if user['launchpad_id']: - user['launchpad_id'] = user['launchpad_id'].lower() - +def _normalize_user(user): for c in user['companies']: - end_date_numeric = 0 - if c['end_date']: - end_date_numeric = utils.date_to_timestamp(c['end_date']) - c['end_date'] = end_date_numeric + c['end_date'] = utils.date_to_timestamp(c['end_date']) # sort companies by end_date def end_date_comparator(x, y): @@ -44,19 +33,12 @@ def normalize_user(user): return cmp(x["end_date"], y["end_date"]) user['companies'].sort(cmp=end_date_comparator) - if user['emails']: - user['user_id'] = get_user_id(user['launchpad_id'], user['emails'][0]) - else: - user['user_id'] = user['launchpad_id'] + user['user_id'] = user['launchpad_id'] def _normalize_users(users): for user in users: - if ('launchpad_id' not in user) or ('emails' not in user): - LOG.warn('Skipping invalid user: %s', user) - continue - - normalize_user(user) + _normalize_user(user) def _normalize_releases(releases): diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index fd72a41e3..513dcea9f 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -18,7 +18,6 @@ import time from stackalytics.openstack.common import log as logging from stackalytics.processor import launchpad_utils -from stackalytics.processor import normalizer from stackalytics.processor import utils LOG = logging.getLogger(__name__) @@ -82,7 +81,7 @@ class RecordProcessor(object): company = (self._get_company_by_email(email) or self._get_independent()) user = { - 'user_id': normalizer.get_user_id(launchpad_id, email), + 'user_id': launchpad_id or email, 'launchpad_id': launchpad_id, 'user_name': user_name or '', 'companies': [{ @@ -94,7 +93,6 @@ class RecordProcessor(object): user['emails'] = [email] else: user['emails'] = [] - normalizer.normalize_user(user) return user def _get_lp_info(self, email): diff --git a/stackalytics/processor/utils.py b/stackalytics/processor/utils.py index 996694311..f57a60a8a 100644 --- a/stackalytics/processor/utils.py +++ b/stackalytics/processor/utils.py @@ -28,6 +28,8 @@ LOG = logging.getLogger(__name__) def date_to_timestamp(d): + if not d: + return 0 if d == 'now': return int(time.time()) return int(time.mktime( diff --git a/stackalytics/processor/vcs.py b/stackalytics/processor/vcs.py index 810d0c25a..48314567e 100644 --- a/stackalytics/processor/vcs.py +++ b/stackalytics/processor/vcs.py @@ -139,7 +139,7 @@ class Git(Vcs): LOG.debug('Get release index for repo uri: %s', self.repo['uri']) os.chdir(self.folder) if not self.release_index: - for release in self.repo['releases']: + for release in self.repo.get('releases', []): release_name = release['release_name'].lower() if 'branch' in release: diff --git a/tests/unit/test_default_data_processor.py b/tests/unit/test_default_data_processor.py index ad1e9baa9..14edbbfd5 100644 --- a/tests/unit/test_default_data_processor.py +++ b/tests/unit/test_default_data_processor.py @@ -17,6 +17,7 @@ import copy import mock import testtools +from stackalytics.processor import default_data_processor from stackalytics.processor import normalizer from tests.unit import test_data @@ -49,3 +50,23 @@ class TestDefaultDataProcessor(testtools.TestCase): self.assertEqual(test_data.USERS[0]['launchpad_id'], data['users'][0]['user_id'], message='User id should be set') + + def test_update_project_list(self): + with mock.patch('stackalytics.processor.default_data_processor.' + '_retrieve_project_list_from_github') as retriever: + retriever.return_value = [ + {'module': 'nova', 'uri': 'git://github.com/openstack/nova'}, + {'module': 'qa', 'uri': 'git://github.com/openstack/qa'}, + ] + dd = { + 'repos': [ + {'module': 'qa', 'uri': 'git://github.com/openstack/qa'}, + ], + 'project_sources': ['any'] + } + + default_data_processor._update_project_list(dd) + + self.assertEqual(2, len(dd['repos'])) + self.assertIn('qa', set([r['module'] for r in dd['repos']])) + self.assertIn('nova', set([r['module'] for r in dd['repos']]))