From 8de351099a608e6409b9ddad30134a99fbd97add Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 6 Aug 2013 13:30:24 +0400 Subject: [PATCH] Got rid of persistent storage Implements blueprint remove-persistent-storage Change-Id: I640abbf3472eba35e8b456a0ba42c45dc88d3d31 --- dashboard/web.py | 27 +++--- etc/default_data.json | 2 +- etc/stackalytics.conf | 3 - etc/test_default_data.json | 4 +- requirements.txt | 1 - stackalytics/processor/config.py | 2 - .../processor/default_data_processor.py | 71 +++++++++++---- stackalytics/processor/main.py | 25 +++--- stackalytics/processor/persistent_storage.py | 86 ------------------- stackalytics/processor/record_processor.py | 79 ++++++----------- stackalytics/processor/runtime_storage.py | 14 +-- tests/unit/test_default_data_processor.py | 17 ---- tests/unit/test_record_processor.py | 66 ++++++++------ 13 files changed, 154 insertions(+), 243 deletions(-) delete mode 100644 stackalytics/processor/persistent_storage.py diff --git a/dashboard/web.py b/dashboard/web.py index 39861bf4d..f7e9b5300 100644 --- a/dashboard/web.py +++ b/dashboard/web.py @@ -29,7 +29,6 @@ import time from dashboard import memory_storage from stackalytics.openstack.common import log as logging from stackalytics.processor import config -from stackalytics.processor import persistent_storage from stackalytics.processor import runtime_storage from stackalytics.processor import utils @@ -77,17 +76,14 @@ def get_vault(): vault = getattr(app, 'stackalytics_vault', None) if not vault: vault = {} - vault['runtime_storage'] = runtime_storage.get_runtime_storage( + runtime_storage_inst = runtime_storage.get_runtime_storage( cfg.CONF.runtime_storage_uri) - vault['persistent_storage'] = ( - persistent_storage.get_persistent_storage( - cfg.CONF.persistent_storage_uri)) + vault['runtime_storage'] = runtime_storage_inst vault['memory_storage'] = memory_storage.get_memory_storage( memory_storage.MEMORY_STORAGE_CACHED, vault['runtime_storage'].get_update(os.getpid())) - persistent_storage_inst = vault['persistent_storage'] - releases = list(persistent_storage_inst.find('releases')) + releases = list(runtime_storage_inst.get_by_key('releases')) vault['start_date'] = releases[0]['end_date'] vault['end_date'] = releases[-1]['end_date'] start_date = releases[0]['end_date'] @@ -96,7 +92,7 @@ def get_vault(): start_date = r['end_date'] vault['releases'] = dict((r['release_name'].lower(), r) for r in releases[1:]) - modules = persistent_storage_inst.find('repos') + modules = runtime_storage_inst.get_by_key('repos') vault['modules'] = dict((r['module'].lower(), r['project_type'].lower()) for r in modules) app.stackalytics_vault = vault @@ -117,11 +113,11 @@ def get_memory_storage(): def init_project_types(vault): - persistent_storage_inst = vault['persistent_storage'] + runtime_storage_inst = vault['runtime_storage'] project_type_options = {} project_type_group_index = {'all': set()} - for repo in persistent_storage_inst.find('repos'): + for repo in runtime_storage_inst.get_by_key('repos'): project_type = repo['project_type'].lower() project_group = None if ('project_group' in repo) and (repo['project_group']): @@ -159,7 +155,8 @@ def get_project_type_options(): def get_release_options(): - releases = list((get_vault()['persistent_storage']).find('releases'))[1:] + runtime_storage_inst = get_vault()['runtime_storage'] + releases = runtime_storage_inst.get_by_key('releases')[1:] releases.reverse() return releases @@ -461,11 +458,13 @@ def module_details(module, records): @templated() @record_filter(ignore='metric') def engineer_details(user_id, records): - persistent_storage = get_vault()['persistent_storage'] - user = list(persistent_storage.find('users', user_id=user_id))[0] + runtime_storage_inst = get_vault()['runtime_storage'] + users_index = runtime_storage_inst.get_by_key('users') + if user_id not in users_index: + raise Exception('User "%s" not in index' % user_id) details = contribution_details(records) - details['user'] = user + details['user'] = users_index[user_id] return details diff --git a/etc/default_data.json b/etc/default_data.json index 811732b32..a239e258b 100644 --- a/etc/default_data.json +++ b/etc/default_data.json @@ -14102,7 +14102,7 @@ { "organization": "stackforge", "project_type": "stackforge", - "project_group": null, + "project_group": null } ], diff --git a/etc/stackalytics.conf b/etc/stackalytics.conf index 4ae588e6c..d459447ad 100644 --- a/etc/stackalytics.conf +++ b/etc/stackalytics.conf @@ -11,9 +11,6 @@ # Runtime storage URI # runtime_storage_uri = memcached://127.0.0.1:11211 -# URI of persistent storage -# persistent_storage_uri = mongodb://localhost - # Hostname where dashboard listens on # listen_host = 127.0.0.1 diff --git a/etc/test_default_data.json b/etc/test_default_data.json index 057b3d58a..993de569c 100644 --- a/etc/test_default_data.json +++ b/etc/test_default_data.json @@ -47,8 +47,8 @@ "project_sources": [ { - "organization": "stackforge", - "project_type": "stackforge", + "organization": "openstack-dev", + "project_type": "openstack", "project_group": null } ], diff --git a/requirements.txt b/requirements.txt index 31e44c542..1de5ecf0e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,5 @@ pbr>=0.5.16,<0.6 psutil PyGithub python-memcached -pymongo sh six diff --git a/stackalytics/processor/config.py b/stackalytics/processor/config.py index 5d699cd3c..1ba498d70 100644 --- a/stackalytics/processor/config.py +++ b/stackalytics/processor/config.py @@ -23,8 +23,6 @@ OPTS = [ help='The folder that holds all project sources to analyze'), cfg.StrOpt('runtime-storage-uri', default='memcached://127.0.0.1:11211', help='Storage URI'), - cfg.StrOpt('persistent-storage-uri', default='mongodb://localhost', - help='URI of persistent storage'), cfg.StrOpt('listen-host', default='127.0.0.1', help='The address dashboard listens on'), cfg.IntOpt('listen-port', default=8080, diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index 942c35fb3..aa06e2c26 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -31,30 +31,28 @@ def _check_default_data_change(runtime_storage_inst, default_data): h.update(json.dumps(default_data)) digest = h.hexdigest() - p_digest = runtime_storage_inst.get_last_id('default_data_digest') + p_digest = runtime_storage_inst.get_by_key('default_data_digest') if digest == p_digest: LOG.debug('No changes in default data detected, sha1: %s', digest) return False LOG.debug('Default data has changes, sha1: %s', digest) - runtime_storage_inst.set_last_id('default_data_digest', digest) + runtime_storage_inst.set_by_key('default_data_digest', digest) return True -def _retrieve_project_list(default_data): - - if 'project_sources' not in default_data: - return +def _retrieve_project_list(runtime_storage_inst, project_sources): LOG.info('Retrieving project list from GitHub') repo_index = {} - for repo in default_data['repos']: + stored_repos = runtime_storage_inst.get_by_key('repos') + for repo in stored_repos: repo_index[repo['uri']] = repo github = MainClass.Github() - for project_source in default_data['project_sources']: + for project_source in project_sources: organization = project_source['organization'] repos = github.get_organization(organization).get_repos() LOG.debug('Get list of projects for organization %s', organization) @@ -69,30 +67,71 @@ def _retrieve_project_list(default_data): 'module': repo_name, 'project_type': project_source['project_type'], 'project_group': project_source['project_group'], - 'uri': repo_uri + 'uri': repo_uri, + 'releases': [] } - default_data['repos'].append(r) + stored_repos.append(r) LOG.debug('Project is added to default data: %s', r) + runtime_storage_inst.set_by_key('repos', stored_repos) -def process(persistent_storage_inst, runtime_storage_inst, default_data, - sources_root): - _retrieve_project_list(default_data) +def _process_users(users): + users_index = {} + for user in users: + if 'user_id' in user: + users_index[user['user_id']] = user + if 'launchpad_id' in user: + users_index[user['launchpad_id']] = user + for email in user['emails']: + users_index[email] = user + return users_index + + +def _process_companies(companies): + domains_index = {} + for company in companies: + for domain in company['domains']: + domains_index[domain] = company['company_name'] + return domains_index + + +KEYS = { + 'users': _process_users, + 'repos': None, + 'releases': None, + 'companies': _process_companies, +} + + +def _update_default_data(runtime_storage_inst, default_data): + for key, processor in KEYS.iteritems(): + if processor: + value = processor(default_data[key]) + else: + value = default_data[key] + runtime_storage_inst.set_by_key(key, value) + + +def process(runtime_storage_inst, default_data, sources_root): normalizer.normalize_default_data(default_data) if _check_default_data_change(runtime_storage_inst, default_data): - persistent_storage_inst.reset(default_data) + _update_default_data(runtime_storage_inst, default_data) release_index = {} - for repo in persistent_storage_inst.find('repos'): + for repo in runtime_storage_inst.get_by_key('repos'): vcs_inst = vcs.get_vcs(repo, sources_root) release_index.update(vcs_inst.get_release_index()) record_processor_inst = record_processor.RecordProcessor( - persistent_storage_inst) + runtime_storage_inst) updated_records = record_processor_inst.update( runtime_storage_inst.get_all_records(), release_index) runtime_storage_inst.set_records(updated_records) + + if 'project_sources' in default_data: + _retrieve_project_list(runtime_storage_inst, + default_data['project_sources']) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 29252007a..6e0a4880d 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -23,7 +23,6 @@ from psutil import _error from stackalytics.openstack.common import log as logging from stackalytics.processor import config from stackalytics.processor import default_data_processor -from stackalytics.processor import persistent_storage from stackalytics.processor import rcs from stackalytics.processor import record_processor from stackalytics.processor import runtime_storage @@ -89,7 +88,7 @@ def process_repo(repo, runtime_storage, record_processor_inst): LOG.debug('Processing repo %s, branch %s', uri, branch) vcs_key = 'vcs:' + str(urllib.quote_plus(uri) + ':' + branch) - last_id = runtime_storage.get_last_id(vcs_key) + last_id = runtime_storage.get_by_key(vcs_key) commit_iterator = vcs_inst.log(branch, last_id) commit_iterator_typed = _record_typer(commit_iterator, 'commit') @@ -98,12 +97,12 @@ def process_repo(repo, runtime_storage, record_processor_inst): runtime_storage.set_records(processed_commit_iterator, _merge_commits) last_id = vcs_inst.get_last_id(branch) - runtime_storage.set_last_id(vcs_key, last_id) + runtime_storage.set_by_key(vcs_key, last_id) LOG.debug('Processing reviews for repo %s, branch %s', uri, branch) rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch) - last_id = runtime_storage.get_last_id(rcs_key) + last_id = runtime_storage.get_by_key(rcs_key) review_iterator = rcs_inst.log(branch, last_id) review_iterator_typed = _record_typer(review_iterator, 'review') @@ -112,16 +111,16 @@ def process_repo(repo, runtime_storage, record_processor_inst): runtime_storage.set_records(processed_review_iterator) last_id = rcs_inst.get_last_id(branch) - runtime_storage.set_last_id(rcs_key, last_id) + runtime_storage.set_by_key(rcs_key, last_id) -def update_repos(runtime_storage, persistent_storage_inst): - repos = persistent_storage_inst.find('repos') +def update_repos(runtime_storage_inst): + repos = runtime_storage_inst.get_by_key('repos') record_processor_inst = record_processor.RecordProcessor( - persistent_storage_inst) + runtime_storage_inst) for repo in repos: - process_repo(repo, runtime_storage, record_processor_inst) + process_repo(repo, runtime_storage_inst, record_processor_inst) def apply_corrections(uri, runtime_storage_inst): @@ -159,21 +158,17 @@ def main(): logging.setup('stackalytics') LOG.info('Logging enabled') - persistent_storage_inst = persistent_storage.get_persistent_storage( - cfg.CONF.persistent_storage_uri) - runtime_storage_inst = runtime_storage.get_runtime_storage( cfg.CONF.runtime_storage_uri) default_data = _read_default_data(cfg.CONF.default_data_uri) - default_data_processor.process(persistent_storage_inst, - runtime_storage_inst, + default_data_processor.process(runtime_storage_inst, default_data, cfg.CONF.sources_root) update_pids(runtime_storage_inst) - update_repos(runtime_storage_inst, persistent_storage_inst) + update_repos(runtime_storage_inst) apply_corrections(cfg.CONF.corrections_uri, runtime_storage_inst) diff --git a/stackalytics/processor/persistent_storage.py b/stackalytics/processor/persistent_storage.py deleted file mode 100644 index 809eb046c..000000000 --- a/stackalytics/processor/persistent_storage.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright (c) 2013 Mirantis Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import re - -import pymongo - -LOG = logging.getLogger(__name__) - -PRIMARY_KEYS = { - 'companies': 'company_name', - 'repos': 'uri', - 'users': 'user_id', - 'releases': 'release_name', - 'project_sources': 'organization', -} - - -class PersistentStorage(object): - def __init__(self, uri): - pass - - def reset(self, default_data): - pass - - def find(self, table, **criteria): - pass - - def insert(self, table, inst): - pass - - def update(self, table, inst): - pass - - -class MongodbStorage(PersistentStorage): - def __init__(self, uri): - super(MongodbStorage, self).__init__(uri) - - self.client = pymongo.MongoClient(uri) - self.mongo = self.client.stackalytics - - for table, primary_key in PRIMARY_KEYS.iteritems(): - self.mongo[table].create_index([(primary_key, pymongo.ASCENDING)]) - - LOG.debug('Mongodb storage is created') - - def reset(self, default_data): - LOG.debug('Clear all tables') - for table in PRIMARY_KEYS.keys(): - self.mongo[table].remove() - if table in default_data: - for item in default_data[table]: - self.insert(table, item) - - def find(self, table, **criteria): - return self.mongo[table].find(criteria) - - def insert(self, table, inst): - self.mongo[table].insert(inst) - - def update(self, table, inst): - primary_key = PRIMARY_KEYS[table] - self.mongo[table].update({primary_key: inst[primary_key]}, inst) - - -def get_persistent_storage(uri): - LOG.debug('Persistent storage is requested for uri %s' % uri) - match = re.search(r'^mongodb:\/\/', uri) - if match: - return MongodbStorage(uri) - else: - raise Exception('Unknown persistent storage uri %s' % uri) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index db81c9600..d2063d862 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -25,22 +25,14 @@ LOG = logging.getLogger(__name__) class RecordProcessor(object): - def __init__(self, persistent_storage_inst): - self.persistent_storage_inst = persistent_storage_inst + def __init__(self, runtime_storage_inst): + self.runtime_storage_inst = runtime_storage_inst - companies = persistent_storage_inst.find('companies') - self.domains_index = {} - for company in companies: - for domain in company['domains']: - self.domains_index[domain] = company['company_name'] + self.domains_index = runtime_storage_inst.get_by_key('companies') - users = persistent_storage_inst.find('users') - self.users_index = {} - for user in users: - for email in user['emails']: - self.users_index[email] = user + self.users_index = runtime_storage_inst.get_by_key('users') - self.releases = list(persistent_storage_inst.find('releases')) + self.releases = runtime_storage_inst.get_by_key('releases') self.releases_dates = [r['end_date'] for r in self.releases] def _get_release(self, timestamp): @@ -77,40 +69,6 @@ class RecordProcessor(object): }], } normalizer.normalize_user(user) - self.persistent_storage_inst.insert('users', user) - return user - - def _persist_user(self, record): - launchpad_id = record['launchpad_id'] - email = record['author_email'] - user_name = record['author_name'] - - # check if user with user_id exists in persistent storage - user_id = normalizer.get_user_id(launchpad_id, email) - persistent_user_iterator = self.persistent_storage_inst.find( - 'users', user_id=user_id) - - for persistent_user in persistent_user_iterator: - break - else: - persistent_user = None - - if persistent_user: - # user already exist, merge - LOG.debug('User %s (%s) exists, add new email %s', - launchpad_id, user_name, email) - persistent_user_email = persistent_user['emails'][0] - if persistent_user_email not in self.users_index: - raise Exception('Email %s not found in user index' % - persistent_user_email) - user = self.users_index[persistent_user_email] - user['emails'].append(email) - self.persistent_storage_inst.update('users', user) - else: - # add new user - LOG.debug('Add new user %s (%s)', launchpad_id, user_name) - user = self._create_user(launchpad_id, email, user_name) - return user def _get_lp_info(self, email): @@ -127,6 +85,7 @@ class RecordProcessor(object): LOG.warn('Lookup of email %s failed %s', email, error.message) if not lp_profile: + LOG.debug('User with email %s not found', email) return None, None return lp_profile.name, lp_profile.display_name @@ -142,14 +101,26 @@ class RecordProcessor(object): user = self.users_index[email] record['launchpad_id'] = user['launchpad_id'] else: - if ('launchpad_id' not in record) or (not record['launchpad_id']): + if ('launchpad_id' in record) and (record['launchpad_id']): + user = self._create_user(record['launchpad_id'], email, + record['author_name']) + else: launchpad_id, user_name = self._get_lp_info(email) record['launchpad_id'] = launchpad_id - if user_name: - record['author_name'] = user_name - user = self._persist_user(record) + if (launchpad_id) and (launchpad_id in self.users_index): + # merge emails + user = self.users_index[launchpad_id] + user['emails'].append(email) + else: + # create new + if not user_name: + user_name = record['author_name'] + user = self._create_user(launchpad_id, email, user_name) + self.users_index[email] = user + if user['launchpad_id']: + self.users_index[user['launchpad_id']] = user record['user_id'] = user['user_id'] @@ -158,7 +129,7 @@ class RecordProcessor(object): company = self._find_company(user['companies'], record['date']) record['company_name'] = company - if 'user_name' in user: + if ('user_name' in user) and (user['user_name']): record['author_name'] = user['user_name'] def _process_commit(self, record): @@ -249,6 +220,8 @@ class RecordProcessor(object): yield r + self.runtime_storage_inst.set_by_key('users', self.users_index) + def update(self, record_iterator, release_index): for record in record_iterator: need_update = False @@ -273,3 +246,5 @@ class RecordProcessor(object): if need_update: yield record + + self.runtime_storage_inst.set_by_key('users', self.users_index) diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 4783f7ad7..bc65e9e93 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - import re import memcache +from stackalytics.openstack.common import log as logging + LOG = logging.getLogger(__name__) BULK_READ_SIZE = 64 @@ -37,10 +37,10 @@ class RuntimeStorage(object): def apply_corrections(self, corrections_iterator): pass - def get_last_id(self, key): + def get_by_key(self, key): pass - def set_last_id(self, key, head_commit_id): + def set_by_key(self, key, head_commit_id): pass def get_update(self, pid): @@ -112,11 +112,11 @@ class MemcachedStorage(RuntimeStorage): self.memcached.set(self._get_record_name(record_id), original) self._commit_update(record_id) - def get_last_id(self, key): + def get_by_key(self, key): return self.memcached.get(key) - def set_last_id(self, key, head_commit_id): - self.memcached.set(key, head_commit_id) + def set_by_key(self, key, value): + self.memcached.set(key, value) def get_update(self, pid): last_update = self.memcached.get('pid:%s' % pid) diff --git a/tests/unit/test_default_data_processor.py b/tests/unit/test_default_data_processor.py index 10bfb34e9..ad1e9baa9 100644 --- a/tests/unit/test_default_data_processor.py +++ b/tests/unit/test_default_data_processor.py @@ -18,8 +18,6 @@ import mock import testtools from stackalytics.processor import normalizer -from stackalytics.processor import persistent_storage -from stackalytics.processor import runtime_storage from tests.unit import test_data @@ -34,23 +32,8 @@ class TestDefaultDataProcessor(testtools.TestCase): normalized_data = copy.deepcopy(test_data.DEFAULT_DATA) normalizer.normalize_default_data(normalized_data) - def find(table, *args, **criteria): - if table in normalized_data: - return normalized_data[table] - else: - raise Exception('Wrong table %s' % table) - - self.p_storage = mock.Mock(persistent_storage.PersistentStorage) - self.p_storage.find = mock.Mock(side_effect=find) - - self.r_storage = mock.Mock(runtime_storage.RuntimeStorage) - - self.chdir_patcher = mock.patch('os.chdir') - self.chdir_patcher.start() - def tearDown(self): super(TestDefaultDataProcessor, self).tearDown() - self.chdir_patcher.stop() def test_normalizer(self): data = copy.deepcopy(test_data.DEFAULT_DATA) diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index 578e8f257..19e6ac490 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -18,8 +18,9 @@ import mock from oslo.config import cfg import testtools -from stackalytics.processor import persistent_storage +from stackalytics.processor import default_data_processor from stackalytics.processor import record_processor +from stackalytics.processor import runtime_storage from stackalytics.processor import utils @@ -67,22 +68,26 @@ class TestRecordProcessor(testtools.TestCase): 'release_name': 'Diablo', 'end_date': utils.date_to_timestamp('2011-Sep-08') }, + { + 'release_name': 'Zoo', + 'end_date': utils.date_to_timestamp('2035-Sep-08') + }, ] - def find(table, **criteria): + def get_by_key(table): if table == 'companies': - return companies + return default_data_processor._process_companies(companies) elif table == 'users': - return self.get_users() + return default_data_processor._process_users(self.get_users()) elif table == 'releases': return releases else: raise Exception('Wrong table %s' % table) - p_storage = mock.Mock(persistent_storage.PersistentStorage) - p_storage.find = mock.Mock(side_effect=find) + p_storage = mock.Mock(runtime_storage.RuntimeStorage) + p_storage.get_by_key = mock.Mock(side_effect=get_by_key) - self.persistent_storage = p_storage + self.runtime_storage = p_storage self.commit_processor = record_processor.RecordProcessor(p_storage) self.launchpad_patch = mock.patch('launchpadlib.launchpad.Launchpad') self.launchpad_patch.start() @@ -92,11 +97,16 @@ class TestRecordProcessor(testtools.TestCase): super(TestRecordProcessor, self).tearDown() self.launchpad_patch.stop() - def _make_commit(self, email='johndoe@gmail.com', date=1999999999): - return { + def _generate_commits(self, email='johndoe@gmail.com', date=1999999999): + yield { + 'record_type': 'commit', + 'commit_id': 'de7e8f297c193fb310f22815334a54b9c76a0be1', 'author_name': 'John Doe', 'author_email': email, 'date': date, + 'lines_added': 25, + 'lines_deleted': 9, + 'release_name': 'havana', } def test_get_company_by_email_mapped(self): @@ -120,15 +130,15 @@ class TestRecordProcessor(testtools.TestCase): self.assertEquals(None, res) def test_update_commit_existing_user(self): - commit = self._make_commit() - self.commit_processor._update_record_and_user(commit) + commit_generator = self._generate_commits() + commit = list(self.commit_processor.process(commit_generator))[0] self.assertEquals('SuperCompany', commit['company_name']) self.assertEquals('john_doe', commit['launchpad_id']) def test_update_commit_existing_user_old_job(self): - commit = self._make_commit(date=1000000000) - self.commit_processor._update_record_and_user(commit) + commit_generator = self._generate_commits(date=1000000000) + commit = list(self.commit_processor.process(commit_generator))[0] self.assertEquals('*independent', commit['company_name']) self.assertEquals('john_doe', commit['launchpad_id']) @@ -139,7 +149,7 @@ class TestRecordProcessor(testtools.TestCase): Should return other company instead of those mentioned in user db """ email = 'johndoe@nec.co.jp' - commit = self._make_commit(email=email) + commit_generator = self._generate_commits(email=email) lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_profile = mock.Mock() @@ -149,9 +159,10 @@ class TestRecordProcessor(testtools.TestCase): # tell storage to return existing user self.get_users.return_value = [user] - self.commit_processor._update_record_and_user(commit) + commit = list(self.commit_processor.process(commit_generator))[0] - self.persistent_storage.update.assert_called_once_with('users', user) + self.runtime_storage.set_by_key.assert_called_once_with('users', + mock.ANY) lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertIn(email, user['emails']) self.assertEquals('NEC', commit['company_name']) @@ -163,7 +174,7 @@ class TestRecordProcessor(testtools.TestCase): the user and return current company """ email = 'johndoe@yahoo.com' - commit = self._make_commit(email=email) + commit_generator = self._generate_commits(email=email) lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_profile = mock.Mock() @@ -173,9 +184,10 @@ class TestRecordProcessor(testtools.TestCase): # tell storage to return existing user self.get_users.return_value = [user] - self.commit_processor._update_record_and_user(commit) + commit = list(self.commit_processor.process(commit_generator))[0] - self.persistent_storage.update.assert_called_once_with('users', user) + self.runtime_storage.set_by_key.assert_called_once_with('users', + mock.ANY) lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertIn(email, user['emails']) self.assertEquals('SuperCompany', commit['company_name']) @@ -187,7 +199,7 @@ class TestRecordProcessor(testtools.TestCase): Should add new user and set company depending on email """ email = 'smith@nec.com' - commit = self._make_commit(email=email) + commit_generator = self._generate_commits(email=email) lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_profile = mock.Mock() @@ -196,7 +208,7 @@ class TestRecordProcessor(testtools.TestCase): lp_mock.people.getByEmail = mock.Mock(return_value=lp_profile) self.get_users.return_value = [] - self.commit_processor._update_record_and_user(commit) + commit = list(self.commit_processor.process(commit_generator))[0] lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertEquals('NEC', commit['company_name']) @@ -208,13 +220,13 @@ class TestRecordProcessor(testtools.TestCase): Should set user name and empty LPid """ email = 'inkognito@avs.com' - commit = self._make_commit(email=email) + commit_generator = self._generate_commits(email=email) lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_mock.people.getByEmail = mock.Mock(return_value=None) self.get_users.return_value = [] - self.commit_processor._update_record_and_user(commit) + commit = list(self.commit_processor.process(commit_generator))[0] lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertEquals('*independent', commit['company_name']) @@ -225,14 +237,14 @@ class TestRecordProcessor(testtools.TestCase): LP raises error during getting user info """ email = 'smith@avs.com' - commit = self._make_commit(email=email) + commit_generator = self._generate_commits(email=email) lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_mock.people.getByEmail = mock.Mock(return_value=None, side_effect=Exception) self.get_users.return_value = [] - self.commit_processor._update_record_and_user(commit) + commit = list(self.commit_processor.process(commit_generator))[0] lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertEquals('*independent', commit['company_name']) @@ -243,13 +255,13 @@ class TestRecordProcessor(testtools.TestCase): User's email is malformed """ email = 'error.root' - commit = self._make_commit(email=email) + commit_generator = self._generate_commits(email=email) lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_mock.people.getByEmail = mock.Mock(return_value=None) self.get_users.return_value = [] - self.commit_processor._update_record_and_user(commit) + commit = list(self.commit_processor.process(commit_generator))[0] self.assertEquals(0, lp_mock.people.getByEmail.called) self.assertEquals('*independent', commit['company_name'])