Correctly use user_id instead of launchpad_id

Previously launchpad_id was set for all records, but we already
have primary key 'user_id' that should be used instead.
When merging two profiles it is possible that there are 2 known
launchpad ids or gerrit ids for the same user. Now processor writes
info-level message for such profiles

Change-Id: I27580578b10272a2e1b7e998025db523b4075a18
This commit is contained in:
Ilya Shakhat 2014-10-13 23:27:58 +04:00 committed by Ilya Shakhat
parent b9cbe4e3d1
commit 27e8be9ccc
7 changed files with 147 additions and 52 deletions

View File

@ -114,26 +114,31 @@ def _update_with_driverlog_data(default_data, driverlog_data_uri):
LOG.info('Reading DriverLog data from uri: %s', driverlog_data_uri)
driverlog_data = utils.read_json_from_uri(driverlog_data_uri)
cis = {}
module_ci_ids = {}
ci_ids = set()
for driver in driverlog_data['drivers']:
if 'ci' in driver:
module = driver['project_id'].split('/')[1]
if module not in cis:
cis[module] = {}
cis[module][driver['ci']['id']] = driver
if module not in module_ci_ids:
module_ci_ids[module] = {}
ci_id = driver['ci']['id']
module_ci_ids[module][ci_id] = driver
default_data['users'].append({
'gerrit_id': driver['ci']['id'],
'user_name': driver['ci']['id'],
'static': True,
'companies': [
{'company_name': driver['vendor'], 'end_date': None}],
})
if ci_id not in ci_ids:
ci_ids.add(ci_id)
default_data['users'].append({
'user_id': user_processor.make_user_id(gerrit_id=ci_id),
'gerrit_id': ci_id,
'user_name': ci_id,
'static': True,
'companies': [
{'company_name': driver['vendor'], 'end_date': None}],
})
for repo in default_data['repos']:
if repo['module'] in cis:
repo['ci'] = cis[repo['module']]
if repo['module'] in module_ci_ids:
repo['ci'] = module_ci_ids[repo['module']]
def _store_users(runtime_storage_inst, users):

View File

@ -110,7 +110,7 @@ def export_data(memcached_inst, fd):
key_prefix = key + ':'
for record_id_set in utils.make_range(0, count, BULK_READ_SIZE):
for record_id_set in utils.make_range(0, count + 1, BULK_READ_SIZE):
# memcache limits the size of returned data to specific yet unknown
# chunk size, the code should verify that all requested records are
# returned an be able to fall back to one-by-one retrieval
@ -126,13 +126,17 @@ def export_data(memcached_inst, fd):
for k, v in six.iteritems(chunk):
pickle.dump((key_prefix + str(k), v), fd)
for user_seq in range(memcached_inst.get('user:count') or 0):
for user_seq in range((memcached_inst.get('user:count') or 0) + 1):
user = memcached_inst.get('user:%s' % user_seq)
if user:
if user.get('user_id'):
pickle.dump(('user:%s' % user['user_id'], user), fd)
if user.get('launchpad_id'):
pickle.dump(('user:%s' % user['launchpad_id'], user), fd)
if user.get('gerrit_id'):
pickle.dump(('user:gerrit:%s' % user['gerrit_id'], user), fd)
if user.get('member_id'):
pickle.dump(('user:member:%s' % user['member_id'], user), fd)
for email in user.get('emails') or []:
pickle.dump(('user:%s' % email, user), fd)

View File

@ -39,7 +39,8 @@ def _normalize_user(user):
user['companies'].sort(key=utils.cmp_to_key(end_date_comparator))
user['user_id'] = user_processor.make_user_id(
launchpad_id=user.get('launchpad_id'),
email=user.get('email'))
emails=user.get('emails'),
gerrit_id=user.get('gerrit_id'))
def _normalize_users(users):

View File

@ -98,22 +98,22 @@ class RecordProcessor(object):
def _create_user(self, launchpad_id, email, gerrit_id, user_name):
company = (self._get_company_by_email(email) or
self._get_independent())
emails = []
if email:
emails = [email]
user = {
'user_id': user_processor.make_user_id(
email=email, launchpad_id=launchpad_id, gerrit_id=gerrit_id),
emails=emails, launchpad_id=launchpad_id, gerrit_id=gerrit_id),
'launchpad_id': launchpad_id,
'user_name': user_name or '',
'companies': [{
'company_name': company,
'end_date': 0,
}],
'emails': emails,
}
if gerrit_id:
user['gerrit_id'] = gerrit_id
if email:
user['emails'] = [email]
else:
user['emails'] = []
return user
def _get_lp_info(self, email):
@ -163,6 +163,20 @@ class RecordProcessor(object):
return None
def _merge_user_profiles(self, user_profiles):
LOG.debug('Merge profiles: %s', user_profiles)
# check of there are more than 1 launchpad_id nor gerrit_id
lp_ids = set(u.get('launchpad_id') for u in user_profiles
if u.get('launchpad_id'))
if len(lp_ids) > 1:
LOG.info('Ambiguous launchpad ids: %s on profiles: %s',
(lp_ids, user_profiles))
g_ids = set(u.get('gerrit_id') for u in user_profiles
if u.get('gerrit_id'))
if len(g_ids) > 1:
LOG.info('Ambiguous gerrit ids: %s on profiles: %s',
(g_ids, user_profiles))
merged_user = {} # merged user profile
# collect ordinary fields
@ -219,7 +233,8 @@ class RecordProcessor(object):
user_name = record.get('author_name')
launchpad_id = record.get('launchpad_id')
if email and (not user_e) and (not launchpad_id):
if (email and (not user_e) and (not launchpad_id) and
(not user_e.get('launchpad_id'))):
# query LP
launchpad_id, lp_user_name = self._get_lp_info(email)
if lp_user_name:
@ -229,7 +244,8 @@ class RecordProcessor(object):
if gerrit_id:
user_g = user_processor.load_user(
self.runtime_storage_inst, gerrit_id=gerrit_id) or {}
if (not user_g) and (not launchpad_id):
if ((not user_g) and (not launchpad_id) and
(not user_e.get('launchpad_id'))):
# query LP
guessed_lp_id = gerrit_id
lp_user_name = self._get_lp_user_name(guessed_lp_id)
@ -241,13 +257,13 @@ class RecordProcessor(object):
user_l = user_processor.load_user(
self.runtime_storage_inst, launchpad_id=launchpad_id) or {}
user = self._create_user(launchpad_id, email, gerrit_id, user_name)
if ((user_e.get('seq') == user_l.get('seq') == user_g.get('seq')) and
user_e.get('seq')):
# sequence numbers are set and the same, merge is not needed
user = user_e
else:
user = self._create_user(launchpad_id, email, gerrit_id, user_name)
if user_e or user_l or user_g:
user = self._merge_user_profiles(
[user_e, user_l, user_g, user])
@ -268,8 +284,6 @@ class RecordProcessor(object):
user = self.update_user(record)
record['user_id'] = user['user_id']
if user.get('launchpad_id'):
record['launchpad_id'] = user['launchpad_id']
if user.get('user_name'):
record['author_name'] = user['user_name']
@ -535,7 +549,7 @@ class RecordProcessor(object):
ci_vote['primary_key'] = ('%s:%s' % (reviewer['username'],
ci_vote['date']))
ci_vote['user_id'] = reviewer['username']
ci_vote['launchpad_id'] = reviewer['username']
ci_vote['gerrit_id'] = reviewer['username']
ci_vote['author_name'] = reviewer.get('name') or reviewer['username']
ci_vote['author_email'] = (
reviewer.get('email') or reviewer['username']).lower()

View File

@ -13,11 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from stackalytics.openstack.common import log as logging
def make_user_id(email=None, launchpad_id=None, gerrit_id=None,
LOG = logging.getLogger(__name__)
def make_user_id(emails=None, launchpad_id=None, gerrit_id=None,
member_id=None):
if launchpad_id or email:
return launchpad_id or email
if launchpad_id or emails:
return launchpad_id or emails[0]
if gerrit_id:
return 'gerrit:%s' % gerrit_id
if member_id:
@ -25,9 +30,23 @@ def make_user_id(email=None, launchpad_id=None, gerrit_id=None,
def store_user(runtime_storage_inst, user):
write_flag = False
if not user.get('seq'):
user['seq'] = runtime_storage_inst.inc_user_count()
runtime_storage_inst.set_by_key('user:%s' % user['seq'], user)
LOG.debug('New user: %s', user)
write_flag = True
else:
stored_user = runtime_storage_inst.get_by_key(
'user:%d' % user.get('seq'))
if stored_user != user:
LOG.debug('User updated: %s', user)
write_flag = True
if not write_flag:
return
runtime_storage_inst.set_by_key('user:%d' % user['seq'], user)
if user.get('user_id'):
runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user)
if user.get('launchpad_id'):
@ -53,4 +72,5 @@ def load_user(runtime_storage_inst, seq=None, user_id=None, email=None,
def delete_user(runtime_storage_inst, user):
LOG.debug('Delete user: %s', user)
runtime_storage_inst.delete_by_key('user:%s' % user['seq'])

View File

@ -139,7 +139,7 @@ class TestRecordProcessor(testtools.TestCase):
author_name='John Doe')))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -170,7 +170,7 @@ class TestRecordProcessor(testtools.TestCase):
date=1000000000)))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': '*independent',
@ -199,7 +199,7 @@ class TestRecordProcessor(testtools.TestCase):
author_name='John Doe')))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@ibm.com',
'author_name': 'John Doe',
'company_name': 'IBM',
@ -232,7 +232,7 @@ class TestRecordProcessor(testtools.TestCase):
author_name='John Doe')))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@ibm.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -268,7 +268,7 @@ class TestRecordProcessor(testtools.TestCase):
date=1000000000)))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@nec.com',
'author_name': 'John Doe',
'company_name': 'IBM',
@ -296,7 +296,7 @@ class TestRecordProcessor(testtools.TestCase):
author_name='John Doe')))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -326,7 +326,7 @@ class TestRecordProcessor(testtools.TestCase):
author_name='John Doe')))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@ibm.com',
'author_name': 'John Doe',
'company_name': 'IBM',
@ -352,7 +352,7 @@ class TestRecordProcessor(testtools.TestCase):
author_name='John Doe')))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@ibm.com',
'author_name': 'John Doe',
'company_name': 'IBM',
@ -566,14 +566,14 @@ class TestRecordProcessor(testtools.TestCase):
self.assertRecordsMatch(
{'record_type': 'bpd',
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_name': 'John Doe',
'company_name': '*independent'},
processed_records[0])
self.assertRecordsMatch(
{'record_type': 'review',
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_name': 'John Doe',
'author_email': 'john_doe@gmail.com',
'company_name': '*independent'},
@ -625,7 +625,7 @@ class TestRecordProcessor(testtools.TestCase):
self.assertRecordsMatch(
{'record_type': 'commit',
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_name': 'John Doe',
'author_email': 'john_doe@gmail.com',
'company_name': '*independent'},
@ -667,7 +667,7 @@ class TestRecordProcessor(testtools.TestCase):
self.assertRecordsMatch(
{'record_type': 'review',
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_name': 'John Doe',
'author_email': 'john_doe@gmail.com',
'company_name': '*independent'},
@ -675,7 +675,7 @@ class TestRecordProcessor(testtools.TestCase):
self.assertRecordsMatch(
{'record_type': 'bpd',
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_name': 'John Doe',
'company_name': '*independent'},
processed_records[1])
@ -1021,12 +1021,12 @@ class TestRecordProcessor(testtools.TestCase):
self.assertEqual(3, len(processed_commits))
self.assertRecordsMatch({
'launchpad_id': 'tupac',
'user_id': 'tupac',
'author_email': 'tupac.shakur@openstack.com',
'author_name': 'Tupac Shakur',
}, processed_commits[0])
self.assertRecordsMatch({
'launchpad_id': 'jimi',
'user_id': 'jimi',
'author_email': 'jimi.hendrix@openstack.com',
'author_name': 'Jimi Hendrix',
}, processed_commits[2])
@ -1320,7 +1320,7 @@ class TestRecordProcessor(testtools.TestCase):
))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -1353,7 +1353,7 @@ class TestRecordProcessor(testtools.TestCase):
))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -1387,7 +1387,7 @@ class TestRecordProcessor(testtools.TestCase):
))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -1421,7 +1421,7 @@ class TestRecordProcessor(testtools.TestCase):
))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',
@ -1454,7 +1454,7 @@ class TestRecordProcessor(testtools.TestCase):
))[0]
expected_commit = {
'launchpad_id': 'john_doe',
'user_id': 'john_doe',
'author_email': 'johndoe@gmail.com',
'author_name': 'John Doe',
'company_name': 'NEC',

View File

@ -0,0 +1,51 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import memcache
def check(expected, actual):
if expected != actual:
print ('Expected: %s\nActual: %s' % (expected, actual))
def main():
m = memcache.Client(['localhost:11211'])
count = m.get('user:count') + 1
users = [m.get('user:%d' % seq) for seq in range(count)]
users = [u for u in users if u]
for u in users:
user_id = u.get('user_id')
lp = u.get('launchpad_id')
g = u.get('gerrit_id')
emails = u.get('emails')
if user_id:
check(u, m.get('user:%s' % user_id.encode('utf8')))
if lp:
check(u, m.get('user:%s' % lp.encode('utf8')))
if g:
check(u, m.get('user:gerrit:%s' % g.encode('utf8')))
if emails:
for e in emails:
check(u, m.get('user:%s' % e.encode('utf8')))
if __name__ == '__main__':
main()