Clark Boylan 7ee556ca44 Have audit-users.py write out serialized data
This allows us to "query" the datastructure for different perspectives
without needing to rerun the costly queries each time we update
audit-uses.py. The script is predominantly collecting data now, then we
can use the python repl or other scripts to give us better insights.

We also do a small refactoring to simplify the collection of data.

Change-Id: Ie777ae706050b38ce294a1acf9b1b843fcf5ab41
2021-03-15 13:11:18 -07:00

300 lines
12 KiB
Python

# Script to query Gerrit users by email address to debug accounts with email
# address conflicts. The idea here is we'll identify which users are active
# and need proper manipulation to correct and which are inactive and can
# be retired.
#
# The input list of emails can be generated by a gerrit config consistency
# check again external ids.
#
# This script should also identify when accounts are inactive according to
# Gerrit and not just by our "have they pushed or reviewed code in the last
# year metric. Accounts that are already inactive can be safely retired too.
# This script builds and operates on a datastructure that looks like this.
# john.doe@example.com:
# 1234:
# active: True
# recently_used: True
# recent_change: '2021-01-23 17:31:25.000000000'
# recent_review: None
# 5678:
# active: False
# recently_used: False
# recent_change: None
# recent_review: '2019-03-05 12:15:34.000000000'
# active:
# - 1234
# inactive
# - 5678
# recently_used:
# - 1234
# nonrecently_used:
# - 5678
import datetime
import json
import getpass
import requests
import yaml
TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
TODAY = datetime.datetime.now()
DELTAT = datetime.timedelta(days=int(365))
SINCET = TODAY - DELTAT
def query_gerrit(loc, query, auth=None):
# Need to do this authenticated and as admin. Start with first pass just
# normal user, then switch to admin and rerun.
if auth:
loc = 'a/' + loc
r = requests.get('https://review.opendev.org/%s/' % loc,
params=query, auth=auth)
# Strip off the gerrit json prefix
j = json.loads(r.text[5:])
return j
def get_account_detail(account_id, auth=None):
# Need to do this authenticated and as admin. We do this without auth
# for quicker debugging cycles, but proper data should be generated with
# auth.
return query_gerrit('accounts/%s/detail' % account_id, {}, auth)
def get_account_sshkeys(account_id, auth=None):
# Need to do this authenticated and as admin. We do this without auth
# for quicker debugging cycles, but proper data should be generated with
# auth.
try:
sshkeys = query_gerrit('accounts/%s/sshkeys' % account_id, {}, auth)
except json.JSONDecodeError:
if auth:
raise
# This handles lack of auth error above
sshkeys = []
return sshkeys
def get_account_externalids(account_id, auth=None):
# Need to do this authenticated and as admin. We do this without auth
# for quicker debugging cycles, but proper data should be generated with
# auth.
try:
eids = query_gerrit('accounts/%s/external.ids' % account_id, {}, auth)
except json.JSONDecodeError:
if auth:
raise
# This handles lack of auth error above
eids = []
return eids
def recently_used(timestamp):
# Gerrit apparently gives us nanoseconds which we can't parse.
timestamp = timestamp[:-3]
activity = datetime.datetime.strptime(timestamp, TIME_FORMAT)
if TODAY - activity < DELTAT:
# We decide the account was recently used if it has reviewed or
# pushed code within the last year.
return True
else:
return False
def read_email_list():
with open('email_list.txt') as f:
users = {}
for email in f:
users[email.strip()] = {}
return users
def check_recent_changes(account_id, account_info, auth):
# Gerrit appears to do a reverse sort giving you the newest results
# first. Since we only care about the most recent activity we set
# n = 1 here.
#query = {'q': 'owner:%s after:%s' % (account_id, SINCET.strftime('%Y-%m-%d')), 'n': 1}
query = {'q': 'owner:%s' % account_id, 'n': 1}
j = query_gerrit('changes', query, auth)
if j:
account_info['recent_change'] = j[0]['updated']
if recently_used(account_info['recent_change']):
account_info['recently_used'] = True
else:
account_info['recent_change'] = None
#query = {'q': 'reviewedby:%s after:%s' % (account_id, SINCET.strftime('%Y-%m-%d')), 'n': 1}
query = {'q': 'reviewedby:%s' % account_id, 'n': 1}
j = query_gerrit('changes', query, auth)
if j:
account_info['recent_review'] = j[0]['updated']
if recently_used(account_info['recent_review']):
account_info['recently_used'] = True
else:
account_info['recent_review'] = None
def gather_user_info(account_id, user, auth):
detail = get_account_detail(account_id, auth)
if 'registered_on' in detail:
user[account_id]['registered_on'] = detail['registered_on']
if 'username' in detail:
user[account_id]['username'] = detail['username']
sshkeys = get_account_sshkeys(account_id, auth)
if sshkeys:
user[account_id]['sshkeys'] = True
eids = get_account_externalids(account_id, auth)
for eid in eids:
# We only care about login.ubuntu urls now
if 'login.ubuntu' in eid['identity']:
r = requests.head(eid['identity'])
if r.status_code == 200:
# If there is an openid and it is valid we add it
# to the list of valid openids
user[account_id]['openids'].append(eid['identity'])
else:
user[account_id]['invalid_openids'].append(eid['identity'])
check_recent_changes(account_id, user[account_id], auth)
if user[account_id]['recently_used']:
user['recently_used'].append(account_id)
else:
user['nonrecently_used'].append(account_id)
def get_user_activity(users, auth=None):
for email in users.keys():
users[email]['active'] = []
users[email]['inactive'] = []
users[email]['recently_used'] = []
users[email]['nonrecently_used'] = []
active_query = {'q': 'email:%s is:active' % email}
active_j = query_gerrit('accounts', active_query, auth)
inactive_query = {'q': 'email:%s is:inactive' % email}
inactive_j = query_gerrit('accounts', inactive_query, auth)
if len(active_j + inactive_j) < 2:
# Using an admin account to query this info seems to address
# this problem, but we'll leave this here as a double check.
print("Email %s only has one account" % email)
continue
for account in active_j:
account_id = str(account['_account_id'])
users[email][account_id] = {'recently_used': False,
'active': True,
'username': None,
'sshkeys': None,
'openids': [],
'invalid_openids': []}
users[email]['active'].append(account_id)
gather_user_info(account_id, users[email], auth)
for account in inactive_j:
account_id = str(account['_account_id'])
users[email][account_id] = {'recently_used': False,
'active': False,
'username': None,
'sshkeys': None,
'openids': [],
'invalid_openids': []}
users[email]['inactive'].append(account_id)
gather_user_info(account_id, users[email], auth)
if __name__ == '__main__':
query_user = input('Username: ')
query_pass = getpass.getpass('Password: ')
if query_user and query_pass:
auth = (query_user, query_pass)
else:
auth = None
users = read_email_list()
get_user_activity(users, auth=auth)
with open('audit-results.yaml', 'w') as f:
yaml.dump(users, default_flow_style=False, explicit_start=True,
indent=4, stream=f)
# TODO there are probably better ways to present this data.
print()
print('Users with inactive accounts. We may just be able to retire these.'
'\nThen remove their external ids.')
print('Email active accounts|inactive accounts')
for email in users:
if users[email]['inactive']:
print(email + ' ' + ','.join(users[email]['active']) + '|'
+ ','.join(users[email]['inactive']))
print()
print('Users without username, ssh keys, valid openid, and no changes or reviews')
print('Email accounts with creds or activity|accounts without creds or activity')
for email in users:
all_accounts = users[email]['recently_used'] + users[email]['nonrecently_used']
accounts_with_creds = []
accounts_without_creds = []
for account_id in all_accounts:
if (not users[email][account_id]['username'] and
not users[email][account_id]['sshkeys'] and
not users[email][account_id]['recent_change'] and
not users[email][account_id]['recent_review'] and
not users[email][account_id]['openids']):
accounts_without_creds.append(account_id)
else:
accounts_with_creds.append(account_id)
if accounts_without_creds:
print(email + ' ' +
','.join(accounts_with_creds) + '|' +
','.join(accounts_without_creds))
print()
print('Users without username, sshkeys and zero changes pushed or reviews')
print('Email accounts with usage|accounts without usage')
for email in users:
all_accounts = users[email]['recently_used'] + users[email]['nonrecently_used']
accounts_with_usage = []
accounts_without_usage = []
for account_id in all_accounts:
if (not users[email][account_id]['username'] and
not users[email][account_id]['sshkeys'] and
not users[email][account_id]['recent_change'] and
not users[email][account_id]['recent_review']):
accounts_without_usage.append(account_id)
else:
accounts_with_usage.append(account_id)
if accounts_without_usage:
print(email + ' ' +
','.join(accounts_with_usage) + '|' +
','.join(accounts_without_usage))
print()
print('Non recently used Users without username or ssh keys')
print('Email accounts with creds|accounts without creds')
for email in users:
if not users[email]['recently_used'] and users[email]['nonrecently_used']:
accounts_with_creds = []
accounts_without_creds = []
for account_id in users[email]['nonrecently_used']:
if not users[email][account_id]['username'] and \
not users[email][account_id]['sshkeys']:
accounts_without_creds.append(account_id)
else:
accounts_with_creds.append(account_id)
if not accounts_with_creds == users[email]['nonrecently_used']:
print(email + ' ' +
','.join(accounts_with_creds) + '|' +
','.join(accounts_without_creds))
print()
print('Non recently used Users')
print('Email non recent accounts')
for email in users:
if not users[email]['recently_used'] and users[email]['nonrecently_used']:
print(email + ' ' + ','.join(users[email]['nonrecently_used']))
print()
print('Recently used Users')
print('Email recent accounts|nonrecent accounts')
for email in users:
if users[email]['recently_used']:
print(email + ' ' + ','.join(users[email]['recently_used']) + '|'
+ ','.join(users[email]['nonrecently_used']))
print()
print('Emails that need further investigation')
for email in users:
if not users[email]['recently_used'] and not users[email]['nonrecently_used']:
print(email)