diff --git a/dashboard/memory_storage.py b/dashboard/memory_storage.py index d23e3cbe0..f6669fb15 100644 --- a/dashboard/memory_storage.py +++ b/dashboard/memory_storage.py @@ -54,8 +54,10 @@ class CachedMemoryStorage(MemoryStorage): def update(self, records): for record in records: - if record['record_id'] in self.records: - self._remove_record_from_index(record) + record_id = record['record_id'] + if record_id in self.records: + # remove existing record from indexes + self._remove_record_from_index(self.records[record_id]) self._save_record(record) def _remove_record_from_index(self, record): diff --git a/dashboard/web.py b/dashboard/web.py index 7168ad030..9488844b4 100644 --- a/dashboard/web.py +++ b/dashboard/web.py @@ -86,7 +86,8 @@ def get_vault(): memory_storage.MEMORY_STORAGE_CACHED, vault['runtime_storage'].get_update(os.getpid())) - releases = list(vault['persistent_storage'].get_releases()) + persistent_storage_inst = vault['persistent_storage'] + releases = list(persistent_storage_inst.find('releases')) vault['start_date'] = releases[0]['end_date'] vault['end_date'] = releases[-1]['end_date'] start_date = releases[0]['end_date'] @@ -95,7 +96,7 @@ def get_vault(): start_date = r['end_date'] vault['releases'] = dict((r['release_name'].lower(), r) for r in releases[1:]) - modules = vault['persistent_storage'].get_repos() + modules = persistent_storage_inst.find('repos') vault['modules'] = dict((r['module'].lower(), r['project_type'].lower()) for r in modules) app.stackalytics_vault = vault @@ -120,7 +121,7 @@ def init_project_types(vault): project_type_options = {} project_type_group_index = {'all': set()} - for repo in persistent_storage_inst.get_repos(): + for repo in persistent_storage_inst.find('repos'): project_type = repo['project_type'].lower() project_group = None if 'project_group' in repo: @@ -451,7 +452,7 @@ def module_details(module, records): @record_filter(ignore='metric') def engineer_details(user_id, records): persistent_storage = get_vault()['persistent_storage'] - user = list(persistent_storage.get_users(user_id=user_id))[0] + user = list(persistent_storage.find('users', user_id=user_id))[0] details = contribution_details(records) details['user'] = user @@ -504,7 +505,7 @@ def get_modules(records, metric_filter): def get_engineers(records, metric_filter): response = _get_aggregated_stats(records, metric_filter, get_memory_storage().get_user_ids(), - 'user_id', 'author') + 'user_id', 'author_name') return json.dumps(response) diff --git a/etc/stackalytics.conf b/etc/stackalytics.conf index be420a85b..e78bdfce8 100644 --- a/etc/stackalytics.conf +++ b/etc/stackalytics.conf @@ -14,9 +14,6 @@ # URI of persistent storage # persistent_storage_uri = mongodb://localhost -# Update persistent storage with default data -# read-default-data = False - # Hostname where dashboard listens on # listen_host = 127.0.0.1 diff --git a/stackalytics/processor/config.py b/stackalytics/processor/config.py index 7ad3bbf77..80e2525dd 100644 --- a/stackalytics/processor/config.py +++ b/stackalytics/processor/config.py @@ -24,14 +24,6 @@ OPTS = [ help='Storage URI'), cfg.StrOpt('persistent-storage-uri', default='mongodb://localhost', help='URI of persistent storage'), - cfg.BoolOpt('sync-default-data', default=False, - help='Update persistent storage with default data. ' - 'Existing data is not overwritten'), - cfg.BoolOpt('force-sync-default-data', default=False, - help='Completely overwrite persistent storage with the ' - 'default data'), - cfg.BoolOpt('filter-robots', default=True, - help='Filter out commits from robots'), cfg.StrOpt('listen-host', default='127.0.0.1', help='The address dashboard listens on'), cfg.IntOpt('listen-port', default=8080, diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index a94000343..c4b54d86c 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -14,64 +14,51 @@ # limitations under the License. from stackalytics.openstack.common import log as logging -from stackalytics.processor import utils +from stackalytics.processor import normalizer +from stackalytics.processor import persistent_storage +from stackalytics.processor import record_processor +from stackalytics.processor import vcs LOG = logging.getLogger(__name__) -def normalize_user(user): - user['emails'] = [email.lower() for email in user['emails']] - if user['launchpad_id']: - user['launchpad_id'] = user['launchpad_id'].lower() +def _update_persistent_storage(persistent_storage_inst, default_data): - for c in user['companies']: - end_date_numeric = 0 - if c['end_date']: - end_date_numeric = utils.date_to_timestamp(c['end_date']) - c['end_date'] = end_date_numeric + need_update = False - # sort companies by end_date - def end_date_comparator(x, y): - if x["end_date"] == 0: - return 1 - elif y["end_date"] == 0: - return -1 - else: - return cmp(x["end_date"], y["end_date"]) + for table, primary_key in persistent_storage.PRIMARY_KEYS.iteritems(): + for item in default_data[table]: + param = {primary_key: item[primary_key]} + for p_item in persistent_storage_inst.find(table, **param): + break + else: + p_item = None - user['companies'].sort(cmp=end_date_comparator) + if item != p_item: + need_update = True + if p_item: + persistent_storage_inst.update(table, item) + else: + persistent_storage_inst.insert(table, item) + + return need_update -def _process_users(users): - for user in users: - if ('launchpad_id' not in user) or ('emails' not in user): - LOG.warn('Skipping invalid user: %s', user) - continue +def process(persistent_storage_inst, runtime_storage_inst, default_data, + sources_root): - normalize_user(user) - user['user_id'] = user['launchpad_id'] or user['emails'][0] + normalizer.normalize_default_data(default_data) + if _update_persistent_storage(persistent_storage_inst, default_data): -def _process_releases(releases): - for release in releases: - release['release_name'] = release['release_name'].lower() - release['end_date'] = utils.date_to_timestamp(release['end_date']) - releases.sort(key=lambda x: x['end_date']) + release_index = {} + for repo in persistent_storage_inst.find('repos'): + vcs_inst = vcs.get_vcs(repo, sources_root) + release_index.update(vcs_inst.get_release_index()) - -def _process_repos(repos): - for repo in repos: - if 'releases' not in repo: - repo['releases'] = [] # release will be assigned automatically - -PROCESSORS = { - 'users': _process_users, - 'releases': _process_releases, - 'repos': _process_repos, -} - - -def process(persistent_storage, default_data): - for key, processor in PROCESSORS.items(): - processor(default_data[key]) - persistent_storage.sync(default_data, force=True) + persistent_storage_inst.reset(default_data) + record_processor_inst = record_processor.RecordProcessor( + persistent_storage_inst) + updated_records = record_processor_inst.update( + runtime_storage_inst.get_all_records(), release_index) + runtime_storage_inst.set_records(updated_records) diff --git a/stackalytics/processor/main.py b/stackalytics/processor/main.py index 38860a652..1eefaa997 100644 --- a/stackalytics/processor/main.py +++ b/stackalytics/processor/main.py @@ -15,7 +15,6 @@ import json import urllib -import urllib2 from oslo.config import cfg import psutil @@ -69,7 +68,13 @@ def _merge_commits(original, new): return True -def process_repo(repo, runtime_storage, commit_processor, review_processor): +def _record_typer(record_iterator, record_type): + for record in record_iterator: + record['record_type'] = record_type + yield record + + +def process_repo(repo, runtime_storage, record_processor_inst): uri = repo['uri'] LOG.debug('Processing repo uri %s' % uri) @@ -87,7 +92,9 @@ def process_repo(repo, runtime_storage, commit_processor, review_processor): last_id = runtime_storage.get_last_id(vcs_key) commit_iterator = vcs_inst.log(branch, last_id) - processed_commit_iterator = commit_processor.process(commit_iterator) + commit_iterator_typed = _record_typer(commit_iterator, 'commit') + processed_commit_iterator = record_processor_inst.process( + commit_iterator_typed) runtime_storage.set_records(processed_commit_iterator, _merge_commits) last_id = vcs_inst.get_last_id(branch) @@ -98,27 +105,28 @@ def process_repo(repo, runtime_storage, commit_processor, review_processor): rcs_key = 'rcs:' + str(urllib.quote_plus(uri) + ':' + branch) last_id = runtime_storage.get_last_id(rcs_key) - reviews_iterator = rcs_inst.log(branch, last_id) - processed_review_iterator = review_processor.process(reviews_iterator) + review_iterator = rcs_inst.log(branch, last_id) + review_iterator_typed = _record_typer(review_iterator, 'review') + processed_review_iterator = record_processor_inst.process( + review_iterator_typed) runtime_storage.set_records(processed_review_iterator) last_id = rcs_inst.get_last_id(branch) runtime_storage.set_last_id(rcs_key, last_id) -def update_repos(runtime_storage, persistent_storage): - repos = persistent_storage.get_repos() - commit_processor = record_processor.get_record_processor( - record_processor.COMMIT_PROCESSOR, persistent_storage) - review_processor = record_processor.get_record_processor( - record_processor.REVIEW_PROCESSOR, persistent_storage) +def update_repos(runtime_storage, persistent_storage_inst): + repos = persistent_storage_inst.find('repos') + record_processor_inst = record_processor.RecordProcessor( + persistent_storage_inst) for repo in repos: - process_repo(repo, runtime_storage, commit_processor, review_processor) + process_repo(repo, runtime_storage, record_processor_inst) def apply_corrections(uri, runtime_storage_inst): - corrections_fd = urllib2.urlopen(uri) + LOG.info('Applying corrections from uri %s', uri) + corrections_fd = urllib.urlopen(uri) raw = corrections_fd.read() corrections_fd.close() corrections = json.loads(raw)['corrections'] @@ -140,11 +148,6 @@ def _read_default_persistent_storage(file_name): LOG.error('Error while reading config: %s' % e) -def load_default_data(persistent_storage_inst, file_name, force): - default_data = _read_default_persistent_storage(file_name) - default_data_processor.process(persistent_storage_inst, default_data) - - def main(): # init conf and logging conf = cfg.CONF @@ -158,16 +161,15 @@ def main(): persistent_storage_inst = persistent_storage.get_persistent_storage( cfg.CONF.persistent_storage_uri) - if conf.sync_default_data or conf.force_sync_default_data: - LOG.info('Going to synchronize persistent storage with default data ' - 'from file %s', cfg.CONF.default_data) - load_default_data(persistent_storage_inst, cfg.CONF.default_data, - cfg.CONF.force_sync_default_data) - return 0 - runtime_storage_inst = runtime_storage.get_runtime_storage( cfg.CONF.runtime_storage_uri) + default_data = _read_default_persistent_storage(cfg.CONF.default_data) + default_data_processor.process(persistent_storage_inst, + runtime_storage_inst, + default_data, + cfg.CONF.sources_root) + update_pids(runtime_storage_inst) update_repos(runtime_storage_inst, persistent_storage_inst) diff --git a/stackalytics/processor/normalizer.py b/stackalytics/processor/normalizer.py new file mode 100644 index 000000000..569430bc4 --- /dev/null +++ b/stackalytics/processor/normalizer.py @@ -0,0 +1,77 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from stackalytics.openstack.common import log as logging +from stackalytics.processor import utils + +LOG = logging.getLogger(__name__) + + +def normalize_user(user): + user['emails'] = [email.lower() for email in user['emails']] + if user['launchpad_id']: + user['launchpad_id'] = user['launchpad_id'].lower() + + for c in user['companies']: + end_date_numeric = 0 + if c['end_date']: + end_date_numeric = utils.date_to_timestamp(c['end_date']) + c['end_date'] = end_date_numeric + + # sort companies by end_date + def end_date_comparator(x, y): + if x["end_date"] == 0: + return 1 + elif y["end_date"] == 0: + return -1 + else: + return cmp(x["end_date"], y["end_date"]) + + user['companies'].sort(cmp=end_date_comparator) + user['user_id'] = user['launchpad_id'] or user['emails'][0] + + +def _normalize_users(users): + for user in users: + if ('launchpad_id' not in user) or ('emails' not in user): + LOG.warn('Skipping invalid user: %s', user) + continue + + normalize_user(user) + + +def _normalize_releases(releases): + for release in releases: + release['release_name'] = release['release_name'].lower() + release['end_date'] = utils.date_to_timestamp(release['end_date']) + releases.sort(key=lambda x: x['end_date']) + + +def _normalize_repos(repos): + for repo in repos: + if 'releases' not in repo: + repo['releases'] = [] # release will be assigned automatically + + +NORMALIZERS = { + 'users': _normalize_users, + 'releases': _normalize_releases, + 'repos': _normalize_repos, +} + + +def normalize_default_data(default_data): + for key, normalizer in NORMALIZERS.iteritems(): + normalizer(default_data[key]) diff --git a/stackalytics/processor/persistent_storage.py b/stackalytics/processor/persistent_storage.py index a981691f8..cf2bd1a3e 100644 --- a/stackalytics/processor/persistent_storage.py +++ b/stackalytics/processor/persistent_storage.py @@ -20,61 +20,28 @@ import pymongo LOG = logging.getLogger(__name__) +PRIMARY_KEYS = { + 'companies': 'company_name', + 'repos': 'uri', + 'users': 'user_id', + 'releases': 'release_name', +} + class PersistentStorage(object): def __init__(self, uri): pass - def sync(self, default_data, force=False): - if force: - self.clean_all() - - self._build_index(default_data['repos'], 'uri', - self.get_repos, self.insert_repo) - self._build_index(default_data['companies'], 'company_name', - self.get_companies, self.insert_company) - self._build_index(default_data['users'], 'user_id', - self.get_users, self.insert_user) - self._build_index(default_data['releases'], 'release_name', - self.get_releases, self.insert_release) - - LOG.debug('Sync completed') - - def _build_index(self, default_data, primary_key, getter, inserter): - # loads all items from persistent storage - existing_items = set([item[primary_key] for item in getter()]) - # inserts items from default storage that are not in persistent storage - map(inserter, [item for item in default_data - if item[primary_key] not in existing_items]) - - def get_companies(self, **criteria): + def reset(self, default_data): pass - def insert_company(self, company): + def find(self, table, **criteria): pass - def get_repos(self, **criteria): + def insert(self, table, inst): pass - def insert_repo(self, repo): - pass - - def get_users(self, **criteria): - pass - - def insert_user(self, user): - pass - - def update_user(self, user): - pass - - def get_releases(self, **criteria): - pass - - def insert_release(self, release): - pass - - def clean_all(self): + def update(self, table, inst): pass @@ -85,49 +52,28 @@ class MongodbStorage(PersistentStorage): self.client = pymongo.MongoClient(uri) self.mongo = self.client.stackalytics - self.mongo.companies.create_index([("company", pymongo.ASCENDING)]) - self.mongo.repos.create_index([("uri", pymongo.ASCENDING)]) - self.mongo.users.create_index([("launchpad_id", pymongo.ASCENDING)]) - self.mongo.releases.create_index([("releases", pymongo.ASCENDING)]) + for table, primary_key in PRIMARY_KEYS.iteritems(): + self.mongo[table].create_index([(primary_key, pymongo.ASCENDING)]) LOG.debug('Mongodb storage is created') - def clean_all(self): + def reset(self, default_data): LOG.debug('Clear all tables') - self.mongo.companies.remove() - self.mongo.repos.remove() - self.mongo.users.remove() - self.mongo.releases.remove() + for table in PRIMARY_KEYS.keys(): + self.mongo[table].remove() + if table in default_data: + for item in default_data[table]: + self.insert(table, item) - def get_companies(self, **criteria): - return self.mongo.companies.find(criteria) + def find(self, table, **criteria): + return self.mongo[table].find(criteria) - def insert_company(self, company): - self.mongo.companies.insert(company) + def insert(self, table, inst): + self.mongo[table].insert(inst) - def get_repos(self, **criteria): - return self.mongo.repos.find(criteria) - - def insert_repo(self, repo): - self.mongo.repos.insert(repo) - - def get_users(self, **criteria): - return self.mongo.users.find(criteria) - - def insert_user(self, user): - LOG.debug('Insert new user: %s', user) - self.mongo.users.insert(user) - - def update_user(self, user): - LOG.debug('Update user: %s', user) - launchpad_id = user['launchpad_id'] - self.mongo.users.update({'launchpad_id': launchpad_id}, user) - - def get_releases(self, **criteria): - return self.mongo.releases.find(criteria) - - def insert_release(self, release): - self.mongo.releases.insert(release) + def update(self, table, inst): + primary_key = PRIMARY_KEYS[table] + self.mongo[table].update({primary_key: inst[primary_key]}, inst) def get_persistent_storage(uri): diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 63b7302ec..ff9791304 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -17,42 +17,32 @@ import bisect import re from launchpadlib import launchpad -from oslo.config import cfg from stackalytics.openstack.common import log as logging -from stackalytics.processor import default_data_processor +from stackalytics.processor import normalizer from stackalytics.processor import utils LOG = logging.getLogger(__name__) -COMMIT_PROCESSOR = 1 -REVIEW_PROCESSOR = 2 - class RecordProcessor(object): - def __init__(self, persistent_storage): - self.persistent_storage = persistent_storage + def __init__(self, persistent_storage_inst): + self.persistent_storage_inst = persistent_storage_inst - def process(self, record_iterator): - pass - - -class CachedProcessor(RecordProcessor): - def __init__(self, persistent_storage): - super(CachedProcessor, self).__init__(persistent_storage) - - companies = persistent_storage.get_companies() + companies = persistent_storage_inst.find('companies') self.domains_index = {} for company in companies: for domain in company['domains']: self.domains_index[domain] = company['company_name'] - users = persistent_storage.get_users() + users = persistent_storage_inst.find('users') self.users_index = {} for user in users: + if 'launchpad_id' in user: + self.users_index[user['launchpad_id']] = user for email in user['emails']: self.users_index[email] = user - self.releases = list(persistent_storage.get_releases()) + self.releases = list(persistent_storage_inst.find('releases')) self.releases_dates = [r['end_date'] for r in self.releases] def _get_release(self, timestamp): @@ -77,8 +67,8 @@ class CachedProcessor(RecordProcessor): def _persist_user(self, launchpad_id, email, user_name): # check if user with launchpad_id exists in persistent storage - persistent_user_iterator = self.persistent_storage.get_users( - launchpad_id=launchpad_id) + persistent_user_iterator = self.persistent_storage_inst.find( + 'users', launchpad_id=launchpad_id) for persistent_user in persistent_user_iterator: break else: @@ -93,14 +83,13 @@ class CachedProcessor(RecordProcessor): return persistent_user user = self.users_index[persistent_user_email] user['emails'].append(email) - self.persistent_storage.update_user(user) + self.persistent_storage_inst.update('users', user) else: # add new user LOG.debug('Add new user into persistent storage') company = (self._get_company_by_email(email) or self._get_independent()) user = { - 'user_id': launchpad_id, 'launchpad_id': launchpad_id, 'user_name': user_name, 'emails': [email], @@ -109,8 +98,8 @@ class CachedProcessor(RecordProcessor): 'end_date': 0, }], } - default_data_processor.normalize_user(user) - self.persistent_storage.insert_user(user) + normalizer.normalize_user(user) + self.persistent_storage_inst.insert('users', user) return user @@ -134,7 +123,6 @@ class CachedProcessor(RecordProcessor): LOG.debug('Email is not found at Launchpad, mapping to nobody') user = { 'launchpad_id': None, - 'user_id': email, 'user_name': user_name, 'emails': [email], 'companies': [{ @@ -142,9 +130,9 @@ class CachedProcessor(RecordProcessor): 'end_date': 0 }] } - default_data_processor.normalize_user(user) + normalizer.normalize_user(user) # add new user - self.persistent_storage.insert_user(user) + self.persistent_storage_inst.insert('users', user) else: # get user's launchpad id from his profile launchpad_id = lp_profile.name @@ -160,18 +148,12 @@ class CachedProcessor(RecordProcessor): def _get_independent(self): return self.domains_index[''] - -class CommitProcessor(CachedProcessor): - def __init__(self, persistent_storage): - super(CommitProcessor, self).__init__(persistent_storage) - LOG.debug('Commit processor is instantiated') - def _update_commit_with_user_data(self, commit): email = commit['author_email'].lower() if email in self.users_index: user = self.users_index[email] else: - user = self._unknown_user_email(email, commit['author']) + user = self._unknown_user_email(email, commit['author_name']) commit['launchpad_id'] = user['launchpad_id'] commit['user_id'] = user['user_id'] @@ -184,69 +166,47 @@ class CommitProcessor(CachedProcessor): if 'user_name' in user: commit['author_name'] = user['user_name'] - def process(self, record_iterator): - for record in record_iterator: - self._update_commit_with_user_data(record) + def _process_commit(self, record): + self._update_commit_with_user_data(record) - if cfg.CONF.filter_robots and record['company_name'] == '*robots': - continue + record['primary_key'] = record['commit_id'] + record['loc'] = record['lines_added'] + record['lines_deleted'] - record['record_type'] = 'commit' - record['primary_key'] = record['commit_id'] - record['week'] = utils.timestamp_to_week(record['date']) - record['loc'] = record['lines_added'] + record['lines_deleted'] + yield record - if not record['release']: - record['release'] = self._get_release(record['date']) + def _process_user(self, record): + email = record['author_email'] - yield record - - -class ReviewProcessor(CachedProcessor): - def __init__(self, persistent_storage): - super(ReviewProcessor, self).__init__(persistent_storage) - - self.launchpad_to_company_index = {} - users = persistent_storage.get_users() - for user in users: - self.launchpad_to_company_index[user['launchpad_id']] = user - - LOG.debug('Review processor is instantiated') - - def _process_user(self, email, launchpad_id, user_name, date): if email in self.users_index: user = self.users_index[email] else: - user = self._persist_user(launchpad_id, email, user_name) + user = self._persist_user(record['launchpad_id'], email, + record['author_name']) self.users_index[email] = user company = self._get_company_by_email(email) if not company: - company = self._find_company(user['companies'], date) - return company, user['user_id'] + company = self._find_company(user['companies'], record['date']) + + record['company_name'] = company + record['user_id'] = user['user_id'] def _spawn_review(self, record): # copy everything except pathsets and flatten user data review = dict([(k, v) for k, v in record.iteritems() - if k not in ['patchSets', 'owner']]) + if k not in ['patchSets', 'owner', 'createdOn']]) owner = record['owner'] if 'email' not in owner or 'username' not in owner: return # ignore - review['record_type'] = 'review' review['primary_key'] = review['id'] review['launchpad_id'] = owner['username'] - review['author'] = owner['name'] + review['author_name'] = owner['name'] review['author_email'] = owner['email'].lower() - review['release'] = self._get_release(review['createdOn']) - review['week'] = utils.timestamp_to_week(review['createdOn']) + review['date'] = record['createdOn'] + + self._process_user(review) - company, user_id = self._process_user(review['author_email'], - review['launchpad_id'], - review['author'], - review['createdOn']) - review['company_name'] = company - review['user_id'] = user_id yield review def _spawn_marks(self, record): @@ -259,50 +219,79 @@ class ReviewProcessor(CachedProcessor): for approval in patch['approvals']: # copy everything and flatten user data mark = dict([(k, v) for k, v in approval.iteritems() - if k != 'by']) + if k not in ['by', 'grantedOn']]) reviewer = approval['by'] if 'email' not in reviewer or 'username' not in reviewer: continue # ignore mark['record_type'] = 'mark' + mark['date'] = approval['grantedOn'] mark['primary_key'] = (record['id'] + - str(mark['grantedOn']) + + str(mark['date']) + mark['type']) mark['launchpad_id'] = reviewer['username'] - mark['author'] = reviewer['name'] + mark['author_name'] = reviewer['name'] mark['author_email'] = reviewer['email'].lower() mark['module'] = module mark['review_id'] = review_id - mark['release'] = self._get_release(mark['grantedOn']) - mark['week'] = utils.timestamp_to_week(mark['grantedOn']) - company, user_id = self._process_user(mark['author_email'], - mark['launchpad_id'], - mark['author'], - mark['grantedOn']) - mark['company_name'] = company - mark['user_id'] = user_id + self._process_user(mark) yield mark - def process(self, record_iterator): + def _process_review(self, record): """ Process a review. Review spawns into records of two types: * review - records that a user created review request * mark - records that a user set approval mark to given review """ + for gen in [self._spawn_review, self._spawn_marks]: + for r in gen(record): + yield r + + def _apply_type_based_processing(self, record): + if record['record_type'] == 'commit': + for r in self._process_commit(record): + yield r + elif record['record_type'] == 'review': + for r in self._process_review(record): + yield r + + def process(self, record_iterator): for record in record_iterator: - for gen in [self._spawn_review, self._spawn_marks]: - for r in gen(record): - yield r + for r in self._apply_type_based_processing(record): + if r['company_name'] == '*robots': + continue -def get_record_processor(processor_type, persistent_storage): - LOG.debug('Record processor is requested of type %s' % processor_type) - if processor_type == COMMIT_PROCESSOR: - return CommitProcessor(persistent_storage) - elif processor_type == REVIEW_PROCESSOR: - return ReviewProcessor(persistent_storage) - else: - raise Exception('Unknown commit processor type %s' % processor_type) + r['week'] = utils.timestamp_to_week(r['date']) + if ('release' not in r) or (not r['release']): + r['release'] = self._get_release(r['date']) + + yield r + + def update(self, record_iterator, release_index): + for record in record_iterator: + need_update = False + + company_name = record['company_name'] + user_id = record['user_id'] + + self._process_user(record) + + if ((record['company_name'] != company_name) or + (record['user_id'] != user_id)): + need_update = True + + if record['primary_key'] in release_index: + release = release_index[record['primary_key']] + else: + release = self._get_release(record['date']) + + if record['release'] != release: + need_update = True + record['release'] = release + + if need_update: + yield record diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 95ab90703..cebc1b698 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -65,7 +65,7 @@ class MemcachedStorage(RuntimeStorage): def _build_index(self): self.record_index = {} - for record in self._get_all_records(): + for record in self.get_all_records(): self.record_index[record['primary_key']] = record['record_id'] def set_records(self, records_iterator, merge_handler=None): @@ -73,8 +73,12 @@ class MemcachedStorage(RuntimeStorage): if record['primary_key'] in self.record_index: # update record_id = self.record_index[record['primary_key']] - original = self.memcached.get(self._get_record_name(record_id)) - if merge_handler: + if not merge_handler: + self.memcached.set(self._get_record_name(record_id), + record) + else: + original = self.memcached.get(self._get_record_name( + record_id)) if merge_handler(original, record): LOG.debug('Update record %s' % record) self.memcached.set(self._get_record_name(record_id), @@ -121,7 +125,7 @@ class MemcachedStorage(RuntimeStorage): self._set_pids(pid) if not last_update: - for i in self._get_all_records(): + for i in self.get_all_records(): yield i else: for update_id_set in self._make_range(last_update, update_count, @@ -179,7 +183,7 @@ class MemcachedStorage(RuntimeStorage): if (stop - start) % step > 0: yield range(i, stop) - def _get_all_records(self): + def get_all_records(self): for record_id_set in self._make_range(0, self._get_record_count(), BULK_READ_SIZE): for i in self.memcached.get_multi( diff --git a/stackalytics/processor/vcs.py b/stackalytics/processor/vcs.py index 859538d2e..3f735b19e 100644 --- a/stackalytics/processor/vcs.py +++ b/stackalytics/processor/vcs.py @@ -39,6 +39,9 @@ class Vcs(object): def fetch(self): pass + def get_release_index(self): + pass + def log(self, branch, head_commit_id): pass @@ -49,8 +52,7 @@ class Vcs(object): GIT_LOG_PARAMS = [ ('commit_id', '%H'), ('date', '%at'), - ('author', '%an'), - ('author_email', '%ae'), + ('author_name', '%an'), ('author_email', '%ae'), ('subject', '%s'), ('message', '%b'), @@ -84,9 +86,6 @@ class Git(Vcs): raise Exception('Unexpected uri %s for git' % uri) self.release_index = {} - def _chdir(self): - os.chdir(self.folder) - def fetch(self): LOG.debug('Fetching repo uri %s' % self.repo['uri']) @@ -95,24 +94,30 @@ class Git(Vcs): sh.git('clone', '%s' % self.repo['uri']) os.chdir(self.folder) else: - self._chdir() + os.chdir(self.folder) sh.git('pull', 'origin') - for release in self.repo['releases']: - release_name = release['release_name'].lower() - if 'tag_from' in release: - tag_range = release['tag_from'] + '..' + release['tag_to'] - else: - tag_range = release['tag_to'] - git_log_iterator = sh.git('log', '--pretty=%H', tag_range, - _tty_out=False) - for commit_id in git_log_iterator: - self.release_index[commit_id.strip()] = release_name + self.get_release_index() + + def get_release_index(self): + os.chdir(self.folder) + if not self.release_index: + for release in self.repo['releases']: + release_name = release['release_name'].lower() + if 'tag_from' in release: + tag_range = release['tag_from'] + '..' + release['tag_to'] + else: + tag_range = release['tag_to'] + git_log_iterator = sh.git('log', '--pretty=%H', tag_range, + _tty_out=False) + for commit_id in git_log_iterator: + self.release_index[commit_id.strip()] = release_name + return self.release_index def log(self, branch, head_commit_id): LOG.debug('Parsing git log for repo uri %s' % self.repo['uri']) - self._chdir() + os.chdir(self.folder) sh.git('checkout', '%s' % branch) commit_range = 'HEAD' if head_commit_id: @@ -167,7 +172,7 @@ class Git(Vcs): def get_last_id(self, branch): LOG.debug('Get head commit for repo uri %s' % self.repo['uri']) - self._chdir() + os.chdir(self.folder) sh.git('checkout', '%s' % branch) return str(sh.git('rev-parse', 'HEAD')).strip() diff --git a/tests/unit/test_data.py b/tests/unit/test_data.py new file mode 100644 index 000000000..2da190a24 --- /dev/null +++ b/tests/unit/test_data.py @@ -0,0 +1,74 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +DEFAULT_DATA = { + 'users': [ + { + 'launchpad_id': 'john_doe', + 'user_name': 'John Doe', + 'emails': ['johndoe@gmail.com', 'jdoe@nec.com'], + 'companies': [ + {'company_name': '*independent', 'end_date': '2013-May-01'}, + {'company_name': 'NEC', 'end_date': None}, + ] + }, + { + 'launchpad_id': 'ivan_ivanov', + 'user_name': 'Ivan Ivanov', + 'emails': ['ivanivan@yandex.ru', 'iivanov@mirantis.com'], + 'companies': [ + {'company_name': 'Mirantis', 'end_date': None}, + ] + } + ], + 'companies': [ + { + 'company_name': '*independent', + 'domains': [''] + }, + { + 'company_name': 'NEC', + 'domains': ['nec.com', 'nec.co.jp'] + }, + { + 'company_name': 'Mirantis', + 'domains': ['mirantis.com', 'mirantis.ru'] + }, + ], + 'repos': [ + { + 'branches': ['master'], + 'module': 'stackalytics', + 'project_type': 'stackforge', + 'uri': 'git://github.com/stackforge/stackalytics.git' + } + ], + 'releases': [ + { + 'release_name': 'prehistory', + 'end_date': '2011-Apr-21' + }, + { + 'release_name': 'Havana', + 'end_date': '2013-Oct-17' + } + ] +} + +USERS = DEFAULT_DATA['users'] +REPOS = DEFAULT_DATA['repos'] +COMPANIES = DEFAULT_DATA['companies'] +RELEASES = DEFAULT_DATA['releases'] diff --git a/tests/unit/test_default_data_processor.py b/tests/unit/test_default_data_processor.py new file mode 100644 index 000000000..10bfb34e9 --- /dev/null +++ b/tests/unit/test_default_data_processor.py @@ -0,0 +1,68 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import mock +import testtools + +from stackalytics.processor import normalizer +from stackalytics.processor import persistent_storage +from stackalytics.processor import runtime_storage +from tests.unit import test_data + + +class TestDefaultDataProcessor(testtools.TestCase): + def setUp(self): + super(TestDefaultDataProcessor, self).setUp() + + self.get_users = mock.Mock(return_value=[ + test_data.USERS, + ]) + + normalized_data = copy.deepcopy(test_data.DEFAULT_DATA) + normalizer.normalize_default_data(normalized_data) + + def find(table, *args, **criteria): + if table in normalized_data: + return normalized_data[table] + else: + raise Exception('Wrong table %s' % table) + + self.p_storage = mock.Mock(persistent_storage.PersistentStorage) + self.p_storage.find = mock.Mock(side_effect=find) + + self.r_storage = mock.Mock(runtime_storage.RuntimeStorage) + + self.chdir_patcher = mock.patch('os.chdir') + self.chdir_patcher.start() + + def tearDown(self): + super(TestDefaultDataProcessor, self).tearDown() + self.chdir_patcher.stop() + + def test_normalizer(self): + data = copy.deepcopy(test_data.DEFAULT_DATA) + + normalizer.normalize_default_data(data) + + self.assertIn('releases', data['repos'][0]) + self.assertEqual([], data['repos'][0]['releases'], + message='Empty list of releases expected') + self.assertEqual(0, data['users'][0]['companies'][-1]['end_date'], + message='The last company end date should be 0') + self.assertIn('user_id', data['users'][0]) + self.assertEqual(test_data.USERS[0]['launchpad_id'], + data['users'][0]['user_id'], + message='User id should be set') diff --git a/tests/unit/test_commit_processor.py b/tests/unit/test_record_processor.py similarity index 88% rename from tests/unit/test_commit_processor.py rename to tests/unit/test_record_processor.py index 58d4699d4..c9347acd4 100644 --- a/tests/unit/test_commit_processor.py +++ b/tests/unit/test_record_processor.py @@ -23,12 +23,11 @@ from stackalytics.processor import record_processor from stackalytics.processor import utils -class TestCommitProcessor(testtools.TestCase): +class TestRecordProcessor(testtools.TestCase): def setUp(self): - super(TestCommitProcessor, self).setUp() + super(TestRecordProcessor, self).setUp() - p_storage = mock.Mock(persistent_storage.PersistentStorage) - p_storage.get_companies = mock.Mock(return_value=[ + companies = [ { 'company_name': 'SuperCompany', 'domains': ['super.com', 'super.no'] @@ -41,7 +40,8 @@ class TestCommitProcessor(testtools.TestCase): 'company_name': '*independent', 'domains': [''] }, - ]) + ] + self.user = { 'user_id': 'john_doe', 'launchpad_id': 'john_doe', @@ -54,11 +54,11 @@ class TestCommitProcessor(testtools.TestCase): 'end_date': 0}, ] } - p_storage.get_users = mock.Mock(return_value=[ + self.get_users = mock.Mock(return_value=[ self.user, ]) - p_storage.get_releases = mock.Mock(return_value=[ + releases = [ { 'release_name': 'prehistory', 'end_date': utils.date_to_timestamp('2011-Apr-21') @@ -67,21 +67,34 @@ class TestCommitProcessor(testtools.TestCase): 'release_name': 'Diablo', 'end_date': utils.date_to_timestamp('2011-Sep-08') }, - ]) + ] + + def find(table, **criteria): + if table == 'companies': + return companies + elif table == 'users': + return self.get_users() + elif table == 'releases': + return releases + else: + raise Exception('Wrong table %s' % table) + + p_storage = mock.Mock(persistent_storage.PersistentStorage) + p_storage.find = mock.Mock(side_effect=find) self.persistent_storage = p_storage - self.commit_processor = record_processor.CommitProcessor(p_storage) + self.commit_processor = record_processor.RecordProcessor(p_storage) self.launchpad_patch = mock.patch('launchpadlib.launchpad.Launchpad') self.launchpad_patch.start() cfg.CONF = mock.MagicMock() def tearDown(self): - super(TestCommitProcessor, self).tearDown() + super(TestRecordProcessor, self).tearDown() self.launchpad_patch.stop() def _make_commit(self, email='johndoe@gmail.com', date=1999999999): return { - 'author': 'John Doe', + 'author_name': 'John Doe', 'author_email': email, 'date': date, } @@ -134,11 +147,11 @@ class TestCommitProcessor(testtools.TestCase): lp_mock.people.getByEmail = mock.Mock(return_value=lp_profile) user = self.user.copy() # tell storage to return existing user - self.persistent_storage.get_users.return_value = [user] + self.get_users.return_value = [user] self.commit_processor._update_commit_with_user_data(commit) - self.persistent_storage.update_user.assert_called_once_with(user) + self.persistent_storage.update.assert_called_once_with('users', user) lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertIn(email, user['emails']) self.assertEquals('NEC', commit['company_name']) @@ -158,11 +171,11 @@ class TestCommitProcessor(testtools.TestCase): lp_mock.people.getByEmail = mock.Mock(return_value=lp_profile) user = self.user.copy() # tell storage to return existing user - self.persistent_storage.get_users.return_value = [user] + self.get_users.return_value = [user] self.commit_processor._update_commit_with_user_data(commit) - self.persistent_storage.update_user.assert_called_once_with(user) + self.persistent_storage.update.assert_called_once_with('users', user) lp_mock.people.getByEmail.assert_called_once_with(email=email) self.assertIn(email, user['emails']) self.assertEquals('SuperCompany', commit['company_name']) @@ -181,7 +194,7 @@ class TestCommitProcessor(testtools.TestCase): lp_profile.name = 'smith' lp_profile.display_name = 'Smith' lp_mock.people.getByEmail = mock.Mock(return_value=lp_profile) - self.persistent_storage.get_users.return_value = [] + self.get_users.return_value = [] self.commit_processor._update_commit_with_user_data(commit) @@ -199,7 +212,7 @@ class TestCommitProcessor(testtools.TestCase): lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_mock.people.getByEmail = mock.Mock(return_value=None) - self.persistent_storage.get_users.return_value = [] + self.get_users.return_value = [] self.commit_processor._update_commit_with_user_data(commit) @@ -217,7 +230,7 @@ class TestCommitProcessor(testtools.TestCase): launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_mock.people.getByEmail = mock.Mock(return_value=None, side_effect=Exception) - self.persistent_storage.get_users.return_value = [] + self.get_users.return_value = [] self.commit_processor._update_commit_with_user_data(commit) @@ -234,7 +247,7 @@ class TestCommitProcessor(testtools.TestCase): lp_mock = mock.MagicMock() launchpad.Launchpad.login_anonymously = mock.Mock(return_value=lp_mock) lp_mock.people.getByEmail = mock.Mock(return_value=None) - self.persistent_storage.get_users.return_value = [] + self.get_users.return_value = [] self.commit_processor._update_commit_with_user_data(commit) diff --git a/tests/unit/test_vcs.py b/tests/unit/test_vcs.py index cfc5cfec0..c2d6e15d9 100644 --- a/tests/unit/test_vcs.py +++ b/tests/unit/test_vcs.py @@ -14,11 +14,8 @@ # limitations under the License. import mock -import os import testtools -from oslo.config import cfg - from stackalytics.processor import vcs @@ -32,16 +29,19 @@ class TestVcsProcessor(testtools.TestCase): 'releases': [] } self.git = vcs.Git(self.repo, '/tmp') - cfg.CONF.sources_root = '' - os.chdir = mock.Mock() + self.chdir_patcher = mock.patch('os.chdir') + self.chdir_patcher.start() + + def tearDown(self): + super(TestVcsProcessor, self).tearDown() + self.chdir_patcher.stop() def test_git_log(self): with mock.patch('sh.git') as git_mock: git_mock.return_value = ''' commit_id:b5a416ac344160512f95751ae16e6612aefd4a57 date:1369119386 -author:Akihiro MOTOKI -author_email:motoki@da.jp.nec.com +author_name:Akihiro MOTOKI author_email:motoki@da.jp.nec.com subject:Remove class-based import in the code repo message:Fixes bug 1167901 @@ -55,8 +55,7 @@ diff_stat: 21 files changed, 340 insertions(+), 408 deletions(-) commit_id:5be031f81f76d68c6e4cbaad2247044aca179843 date:1370975889 -author:Monty Taylor -author_email:mordred@inaugust.com +author_name:Monty Taylor author_email:mordred@inaugust.com subject:Remove explicit distribute depend. message:Causes issues with the recent re-merge with setuptools. Advice from @@ -69,8 +68,7 @@ diff_stat: 1 file changed, 1 deletion(-) commit_id:92811c76f3a8308b36f81e61451ec17d227b453b date:1369831203 -author:Mark McClain -author_email:mark.mcclain@dreamhost.com +author_name:Mark McClain author_email:mark.mcclain@dreamhost.com subject:add readme for 2.2.2 message:Change-Id: Id32a4a72ec1d13992b306c4a38e73605758e26c7 @@ -80,8 +78,7 @@ diff_stat: 1 file changed, 8 insertions(+) commit_id:92811c76f3a8308b36f81e61451ec17d227b453b date:1369831203 -author:John Doe -author_email:john.doe@dreamhost.com +author_name:John Doe author_email:john.doe@dreamhost.com subject:add readme for 2.2.2 message:Change-Id: Id32a4a72ec1d13992b306c4a38e73605758e26c7 @@ -91,8 +88,7 @@ diff_stat: 0 files changed commit_id:92811c76f3a8308b36f81e61451ec17d227b453b date:1369831203 -author:Doug Hoffner -author_email:mark.mcclain@dreamhost.com +author_name:Doug Hoffner author_email:mark.mcclain@dreamhost.com subject:add readme for 2.2.2 message:Change-Id: Id32a4a72ec1d13992b306c4a38e73605758e26c7