Merge "Expand recon middleware support"

This commit is contained in:
Jenkins 2012-05-25 05:12:34 +00:00 committed by Gerrit Code Review
commit 676c338b7c
19 changed files with 1193 additions and 437 deletions

View File

@ -12,9 +12,9 @@ try:
except ImportError:
import json
from hashlib import md5
import datetime
import eventlet
import optparse
import time
import sys
import os
@ -26,12 +26,7 @@ class Scout(object):
def __init__(self, recon_type, verbose=False, suppress_errors=False,
timeout=5):
recon_uri = ["ringmd5", "async", "replication", "load", "diskusage",
"unmounted", "quarantined", "sockstat"]
if recon_type not in recon_uri:
raise Exception("Invalid scout type requested")
else:
self.recon_type = recon_type
self.recon_type = recon_type
self.verbose = verbose
self.suppress_errors = suppress_errors
self.timeout = timeout
@ -87,6 +82,44 @@ class SwiftRecon(object):
self.timeout = 5
self.pool_size = 30
self.pool = eventlet.GreenPool(self.pool_size)
self.check_types = ['account', 'container', 'object']
self.server_type = 'object'
def _gen_stats(self, stats, name=None):
""" compute various stats from a list of values """
cstats = [x for x in stats if x is not None]
if len(cstats) > 0:
ret_dict = {'low': min(cstats), 'high': max(cstats),
'total': sum(cstats), 'reported': len(cstats),
'number_none': len(stats) - len(cstats), 'name': name}
ret_dict['average'] = \
ret_dict['total'] / float(len(cstats))
ret_dict['perc_none'] = \
ret_dict['number_none'] * 100.0 / len(stats)
else:
ret_dict = {'reported': 0}
return ret_dict
def _print_stats(self, stats):
"""
print out formatted stats to console
:param stats: dict of stats generated by _gen_stats
"""
print '[%(name)s] low: %(low)d, high: %(high)d, avg: ' \
'%(average).1f, total: %(total)d, ' \
'Failed: %(perc_none).1f%%, no_result: %(number_none)d, ' \
'reported: %(reported)d' % stats
def _ptime(self, timev=None):
"""
:param timev: a unix timestamp or None
:returns: a pretty string of the current time or provided time
"""
if timev:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timev))
else:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def get_devices(self, zone_filter, swift_dir, ring_name):
"""
@ -125,10 +158,9 @@ class SwiftRecon(object):
ring_sum = md5sum.hexdigest()
recon = Scout("ringmd5", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking ring md5sum's on %s hosts..." % (now, len(hosts))
print "[%s] Checking ring md5sums" % self._ptime()
if self.verbose:
print "-> On disk md5sum: %s" % ring_sum
print "-> On disk %s md5sum: %s" % (ringfile, ring_sum)
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats[url] = response[ringfile]
@ -152,23 +184,18 @@ class SwiftRecon(object):
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {}
scan = {}
recon = Scout("async", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking async pendings on %s hosts..." % (now, len(hosts))
print "[%s] Checking async pendings" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats[url] = response['async_pending']
if len(stats) > 0:
low = min(stats.values())
high = max(stats.values())
total = sum(stats.values())
average = total / len(stats)
print "Async stats: low: %d, high: %d, avg: %d, total: %d" % (low,
high, average, total)
scan[url] = response['async_pending']
stats = self._gen_stats(scan.values(), 'async_pending')
if stats['reported'] > 0:
self._print_stats(stats)
else:
print "Error: No hosts available or returned valid information."
print "[async_pending] - No hosts returned valid data."
print "=" * 79
def umount_check(self, hosts):
@ -181,9 +208,8 @@ class SwiftRecon(object):
stats = {}
recon = Scout("unmounted", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Getting unmounted drives from %s hosts..." % \
(now, len(hosts))
(self._ptime(), len(hosts))
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
for i in response:
@ -193,31 +219,231 @@ class SwiftRecon(object):
print "Not mounted: %s on %s" % (stats[host], node)
print "=" * 79
def expirer_check(self, hosts):
"""
Obtain and print expirer statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {'object_expiration_pass': [], 'expired_last_pass': []}
recon = Scout("expirer/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking on expirers" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats['object_expiration_pass'].append(
response.get('object_expiration_pass'))
stats['expired_last_pass'].append(
response.get('expired_last_pass'))
for k in stats:
if stats[k]:
computed = self._gen_stats(stats[k], name=k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[%s] - No hosts returned valid data." % k
else:
print "[%s] - No hosts returned valid data." % k
print "=" * 79
def replication_check(self, hosts):
"""
Obtain and print replication statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {'replication_time': [], 'failure': [], 'success': [],
'attempted': []}
recon = Scout("replication/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking on replication" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats['replication_time'].append(
response.get('replication_time'))
repl_stats = response['replication_stats']
if repl_stats:
for stat_key in ['attempted', 'failure', 'success']:
stats[stat_key].append(repl_stats.get(stat_key))
for k in stats:
if stats[k]:
if k != 'replication_time':
computed = self._gen_stats(stats[k],
name='replication_%s' % k)
else:
computed = self._gen_stats(stats[k], name=k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[%s] - No hosts returned valid data." % k
else:
print "[%s] - No hosts returned valid data." % k
print "=" * 79
def object_replication_check(self, hosts):
"""
Obtain and print replication statistics from object servers
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = {}
recon = Scout("replication", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking replication times on %s hosts..." % \
(now, len(hosts))
print "[%s] Checking on replication" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
stats[url] = response['object_replication_time']
if len(stats) > 0:
low = min(stats.values())
high = max(stats.values())
total = sum(stats.values())
average = total / len(stats)
print "[Replication Times] shortest: %s, longest: %s, avg: %s" % \
(low, high, average)
times = [x for x in stats.values() if x is not None]
if len(stats) > 0 and len(times) > 0:
computed = self._gen_stats(times, 'replication_time')
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[replication_time] - No hosts returned valid data."
else:
print "Error: No hosts available or returned valid information."
print "[replication_time] - No hosts returned valid data."
print "=" * 79
def updater_check(self, hosts):
"""
Obtain and print updater statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
stats = []
recon = Scout("updater/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking updater times" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
if response['%s_updater_sweep' % self.server_type]:
stats.append(response['%s_updater_sweep' %
self.server_type])
if len(stats) > 0:
computed = self._gen_stats(stats, name='updater_last_sweep')
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[updater_last_sweep] - No hosts returned valid data."
else:
print "[updater_last_sweep] - No hosts returned valid data."
print "=" * 79
def auditor_check(self, hosts):
"""
Obtain and print obj auditor statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
scan = {}
adone = '%s_auditor_pass_completed' % self.server_type
afail = '%s_audits_failed' % self.server_type
apass = '%s_audits_passed' % self.server_type
asince = '%s_audits_since' % self.server_type
recon = Scout("auditor/%s" % self.server_type, self.verbose,
self.suppress_errors, self.timeout)
print "[%s] Checking auditor stats" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
scan[url] = response
if len(scan) < 1:
print "Error: No hosts available"
return
stats = {}
stats[adone] = [scan[i][adone] for i in scan
if scan[i][adone] is not None]
stats[afail] = [scan[i][afail] for i in scan
if scan[i][afail] is not None]
stats[apass] = [scan[i][apass] for i in scan
if scan[i][apass] is not None]
stats[asince] = [scan[i][asince] for i in scan
if scan[i][asince] is not None]
for k in stats:
if len(stats[k]) < 1:
print "[%s] - No hosts returned valid data." % k
else:
if k != asince:
computed = self._gen_stats(stats[k], k)
if computed['reported'] > 0:
self._print_stats(computed)
if len(stats[asince]) >= 1:
low = min(stats[asince])
high = max(stats[asince])
total = sum(stats[asince])
average = total / len(stats[asince])
print '[last_pass] oldest: %s, newest: %s, avg: %s' % \
(self._ptime(low), self._ptime(high), self._ptime(average))
print "=" * 79
def object_auditor_check(self, hosts):
"""
Obtain and print obj auditor statistics
:param hosts: set of hosts to check. in the format of:
set([('127.0.0.1', 6020), ('127.0.0.2', 6030)])
"""
all_scan = {}
zbf_scan = {}
atime = 'audit_time'
bprocessed = 'bytes_processed'
passes = 'passes'
errors = 'errors'
quarantined = 'quarantined'
recon = Scout("auditor/object", self.verbose, self.suppress_errors,
self.timeout)
print "[%s] Checking auditor stats " % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
if response['object_auditor_stats_ALL']:
all_scan[url] = response['object_auditor_stats_ALL']
if response['object_auditor_stats_ZBF']:
zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0:
stats = {}
stats[atime] = [all_scan[i][atime] for i in all_scan]
stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan]
stats[passes] = [all_scan[i][passes] for i in all_scan]
stats[errors] = [all_scan[i][errors] for i in all_scan]
stats[quarantined] = [all_scan[i][quarantined] for i in all_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
if len(stats[k]) < 1:
print "[Auditor %s] - No hosts returned valid data." % k
else:
computed = self._gen_stats(stats[k],
name='ALL_%s_last_path' % k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[ALL_auditor] - No hosts returned valid data."
else:
print "[ALL_auditor] - No hosts returned valid data."
if len(zbf_scan) > 0:
stats = {}
stats[atime] = [zbf_scan[i][atime] for i in zbf_scan]
stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan]
stats[errors] = [zbf_scan[i][errors] for i in zbf_scan]
stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
if len(stats[k]) < 1:
print "[Auditor %s] - No hosts returned valid data." % k
else:
computed = self._gen_stats(stats[k],
name='ZBF_%s_last_path' % k)
if computed['reported'] > 0:
self._print_stats(computed)
else:
print "[ZBF_auditor] - No hosts returned valid data."
else:
print "[ZBF_auditor] - No hosts returned valid data."
print "=" * 79
def load_check(self, hosts):
@ -232,8 +458,7 @@ class SwiftRecon(object):
load15 = {}
recon = Scout("load", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking load avg's on %s hosts..." % (now, len(hosts))
print "[%s] Checking load averages" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
load1[url] = response['1m']
@ -242,14 +467,11 @@ class SwiftRecon(object):
stats = {"1m": load1, "5m": load5, "15m": load15}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print "[%s load average] lowest: %s, highest: %s, avg: %s" % \
(item, low, high, average)
computed = self._gen_stats(stats[item].values(),
name='%s_load_avg' % item)
self._print_stats(computed)
else:
print "Error: No hosts available or returned valid info."
print "[%s_load_avg] - No hosts returned valid data." % item
print "=" * 79
def quarantine_check(self, hosts):
@ -264,8 +486,7 @@ class SwiftRecon(object):
acctq = {}
recon = Scout("quarantined", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking quarantine on %s hosts..." % (now, len(hosts))
print "[%s] Checking quarantine" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
objq[url] = response['objects']
@ -274,14 +495,11 @@ class SwiftRecon(object):
stats = {"objects": objq, "containers": conq, "accounts": acctq}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print ("[Quarantined %s] low: %d, high: %d, avg: %d, total: %d"
% (item, low, high, average, total))
computed = self._gen_stats(stats[item].values(),
name='quarantined_%s' % item)
self._print_stats(computed)
else:
print "Error: No hosts available or returned valid info."
print "No hosts returned valid data."
print "=" * 79
def socket_usage(self, hosts):
@ -298,8 +516,7 @@ class SwiftRecon(object):
orphan = {}
recon = Scout("sockstat", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking socket usage on %s hosts..." % (now, len(hosts))
print "[%s] Checking socket usage" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
inuse4[url] = response['tcp_in_use']
@ -312,14 +529,10 @@ class SwiftRecon(object):
"orphan": orphan}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print "[%s] low: %d, high: %d, avg: %d, total: %d" % \
(item, low, high, average, total)
computed = self._gen_stats(stats[item].values(), item)
self._print_stats(computed)
else:
print "Error: No hosts or info available."
print "No hosts returned valid data."
print "=" * 79
def disk_usage(self, hosts):
@ -334,12 +547,10 @@ class SwiftRecon(object):
lows = []
raw_total_used = []
raw_total_avail = []
averages = []
percents = {}
recon = Scout("diskusage", self.verbose, self.suppress_errors,
self.timeout)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking disk usage on %s hosts..." % (now, len(hosts))
print "[%s] Checking disk usage now" % self._ptime()
for url, response, status in self.pool.imap(recon.scout, hosts):
if status == 200:
hostusage = []
@ -357,11 +568,8 @@ class SwiftRecon(object):
#get per host hi/los for another day
low = min(stats[url])
high = max(stats[url])
total = sum(stats[url])
average = total / len(stats[url])
highs.append(high)
lows.append(low)
averages.append(average)
for percent in stats[url]:
percents[int(percent)] = percents.get(int(percent), 0) + 1
else:
@ -370,7 +578,6 @@ class SwiftRecon(object):
if len(lows) > 0:
low = min(lows)
high = max(highs)
average = sum(averages) / len(averages)
#dist graph shamelessly stolen from https://github.com/gholt/tcod
print "Distribution Graph:"
mul = 69.0 / max(percents.values())
@ -380,12 +587,13 @@ class SwiftRecon(object):
raw_used = sum(raw_total_used)
raw_avail = sum(raw_total_avail)
raw_total = raw_used + raw_avail
avg_used = 100.0 * raw_used / raw_total
print "Disk usage: space used: %s of %s" % (raw_used, raw_total)
print "Disk usage: space free: %s of %s" % (raw_avail, raw_total)
print "Disk usage: lowest: %s%%, highest: %s%%, avg: %s%%" % \
(low, high, average)
(low, high, avg_used)
else:
print "Error: No hosts available or returned valid information."
print "No hosts returned valid data."
print "=" * 79
def main(self):
@ -394,7 +602,13 @@ class SwiftRecon(object):
"""
print "=" * 79
usage = '''
usage: %prog [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--objmd5]
usage: %prog <server_type> [-v] [--suppress] [-a] [-r] [-u] [-d]
[-l] [--md5] [--auditor] [--updater] [--expirer] [--sockstat]
<server_type>\taccount|container|object
Defaults to object server.
ex: %prog container -l --auditor
'''
args = optparse.OptionParser(usage)
args.add_option('--verbose', '-v', action="store_true",
@ -405,6 +619,12 @@ class SwiftRecon(object):
help="Get async stats")
args.add_option('--replication', '-r', action="store_true",
help="Get replication stats")
args.add_option('--auditor', action="store_true",
help="Get auditor stats")
args.add_option('--updater', action="store_true",
help="Get updater stats")
args.add_option('--expirer', action="store_true",
help="Get expirer stats")
args.add_option('--unmounted', '-u', action="store_true",
help="Check cluster for unmounted devices")
args.add_option('--diskusage', '-d', action="store_true",
@ -413,12 +633,12 @@ class SwiftRecon(object):
help="Get cluster load average stats")
args.add_option('--quarantined', '-q', action="store_true",
help="Get cluster quarantine stats")
args.add_option('--objmd5', action="store_true",
help="Get md5sums of object.ring.gz and compare to local copy")
args.add_option('--md5', action="store_true",
help="Get md5sum of servers ring and compare to local copy")
args.add_option('--sockstat', action="store_true",
help="Get cluster socket usage stats")
args.add_option('--all', action="store_true",
help="Perform all checks. Equal to -arudlq --objmd5 --sockstat")
help="Perform all checks. Equal to -arudlq --md5 --sockstat")
args.add_option('--zone', '-z', type="int",
help="Only query servers in specified zone")
args.add_option('--timeout', '-t', type="int", metavar="SECONDS",
@ -427,44 +647,88 @@ class SwiftRecon(object):
help="Default = /etc/swift")
options, arguments = args.parse_args()
if len(sys.argv) <= 1:
if len(sys.argv) <= 1 or len(arguments) > 1:
args.print_help()
sys.exit(0)
swift_dir = options.swiftdir
obj_ring = os.path.join(swift_dir, 'object.ring.gz')
if arguments:
if arguments[0] in self.check_types:
self.server_type = arguments[0]
else:
print "Invalid Server Type"
args.print_help()
sys.exit(1)
else:
self.server_type = 'object'
swift_dir = options.swiftdir
ring_file = os.path.join(swift_dir, '%s.ring.gz' % self.server_type)
self.verbose = options.verbose
self.suppress_errors = options.suppress
self.timeout = options.timeout
if options.zone:
hosts = self.get_devices(options.zone, swift_dir, 'object')
hosts = self.get_devices(options.zone, swift_dir, self.server_type)
else:
hosts = self.get_devices(None, swift_dir, 'object')
hosts = self.get_devices(None, swift_dir, self.server_type)
print "--> Starting reconnaissance on %s hosts" % len(hosts)
print "=" * 79
if options.all:
self.async_check(hosts)
if self.server_type == 'object':
self.async_check(hosts)
self.object_replication_check(hosts)
self.object_auditor_check(hosts)
self.updater_check(hosts)
self.expirer_check(hosts)
elif self.server_type == 'container':
self.replication_check(hosts)
self.auditor_check(hosts)
self.updater_check(hosts)
elif self.server_type == 'account':
self.replication_check(hosts)
self.auditor_check(hosts)
self.umount_check(hosts)
self.replication_check(hosts)
self.load_check(hosts)
self.disk_usage(hosts)
self.get_ringmd5(hosts, obj_ring)
self.get_ringmd5(hosts, ring_file)
self.quarantine_check(hosts)
self.socket_usage(hosts)
else:
if options.async:
self.async_check(hosts)
if self.server_type == 'object':
self.async_check(hosts)
else:
print "Error: Can't check async's on non object servers."
if options.unmounted:
self.umount_check(hosts)
if options.replication:
self.replication_check(hosts)
if self.server_type == 'object':
self.object_replication_check(hosts)
else:
self.replication_check(hosts)
if options.auditor:
if self.server_type == 'object':
self.object_auditor_check(hosts)
else:
self.auditor_check(hosts)
if options.updater:
if self.server_type == 'account':
print "Error: Can't check updaters on account servers."
else:
self.updater_check(hosts)
if options.expirer:
if self.server_type == 'object':
self.expirer_check(hosts)
else:
print "Error: Can't check expired on non object servers."
if options.loadstats:
self.load_check(hosts)
if options.diskusage:
self.disk_usage(hosts)
if options.objmd5:
self.get_ringmd5(hosts, obj_ring)
if options.md5:
self.get_ringmd5(hosts, ring_file)
if options.quarantined:
self.quarantine_check(hosts)
if options.sockstat:

View File

@ -5,17 +5,11 @@ swift-recon-cron.py
import os
import sys
import optparse
from tempfile import NamedTemporaryFile
try:
import simplejson as json
except ImportError:
import json
from ConfigParser import ConfigParser
from swift.common.utils import get_logger, dump_recon_cache
def async_count(device_dir, logger):
def get_async_count(device_dir, logger):
async_count = 0
for i in os.listdir(device_dir):
asyncdir = os.path.join(device_dir, i, "async_pending")
@ -53,14 +47,13 @@ def main():
print str(e)
sys.exit(1)
try:
asyncs = async_count(device_dir, logger)
asyncs = get_async_count(device_dir, logger)
except Exception:
logger.exception(
_('Exception during recon-cron while accessing devices'))
try:
dump_recon_cache('async_pending', asyncs, cache_file)
except Exception:
logger.exception(_('Exception dumping recon cache'))
dump_recon_cache({'async_pending': asyncs}, cache_file, logger)
try:
os.rmdir(lock_dir)
except Exception:

View File

@ -25,7 +25,7 @@
.SH SYNOPSIS
.LP
.B swift-recon
\ [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--objmd5]
\ <server_type> [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--md5] [--auditor] [--updater] [--expirer] [--sockstat]
.SH DESCRIPTION
.PP
@ -40,6 +40,8 @@ more information in the example section below.
.SH OPTIONS
.RS 0
.PD 1
.IP "\fB<server_type>\fR"
account|container|object - Defaults to object server.
.IP "\fB-h, --help\fR"
show this help message and exit
.IP "\fB-v, --verbose\fR"
@ -48,6 +50,12 @@ Print verbose information
Suppress most connection related errors
.IP "\fB-a, --async\fR"
Get async stats
.IP "\fB--auditor\fR"
Get auditor stats
.IP "\fB--updater\fR"
Get updater stats
.IP "\fB--expirer\fR"
Get expirer stats
.IP "\fB-r, --replication\fR"
Get replication stats
.IP "\fB-u, --unmounted\fR"
@ -58,10 +66,10 @@ Get disk usage stats
Get cluster load average stats
.IP "\fB-q, --quarantined\fR"
Get cluster quarantine stats
.IP "\fB--objmd5\fR"
Get md5sums of object.ring.gz and compare to local copy
.IP "\fB--md5\fR"
Get md5sum of servers ring and compare to local cop
.IP "\fB--all\fR"
Perform all checks. Equivalent to -arudlq --objmd5
Perform all checks. Equivalent to -arudlq --md5
.IP "\fB-z ZONE, --zone=ZONE\fR"
Only query servers in specified zone
.IP "\fB--swiftdir=PATH\fR"

View File

@ -248,37 +248,50 @@ allows it to be more easily consumed by third party utilities::
Cluster Telemetry and Monitoring
--------------------------------
Various metrics and telemetry can be obtained from the object servers using
the recon server middleware and the swift-recon cli. To do so update your
object-server.conf to enable the recon middleware by adding a pipeline entry
and setting its one option::
Various metrics and telemetry can be obtained from the account, container, and
object servers using the recon server middleware and the swift-recon cli. To do
so update your account, container, or object servers pipelines to include recon
and add the associated filter config.
object-server.conf sample::
[pipeline:main]
pipeline = recon object-server
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
container-server.conf sample::
[pipeline:main]
pipeline = recon container-server
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
account-server.conf sample::
[pipeline:main]
pipeline = recon account-server
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
The recon_cache_path simply sets the directory where stats for a few items will
be stored. Depending on the method of deployment you may need to create this
directory manually and ensure that swift has read/write.
directory manually and ensure that swift has read/write access.
If you wish to enable reporting of replication times you can enable recon
support in the object-replicator section of the object-server.conf::
[object-replicator]
...
recon_enable = yes
recon_cache_path = /var/cache/swift
Finally if you also wish to track asynchronous pending's you will need to setup
a cronjob to run the swift-recon-cron script periodically::
Finally, if you also wish to track asynchronous pending on your object
servers you will need to setup a cronjob to run the swift-recon-cron script
periodically on your object servers::
*/5 * * * * swift /usr/bin/swift-recon-cron /etc/swift/object-server.conf
Once enabled a GET request for "/recon/<metric>" to the object server will
return a json formatted response::
Once the recon middleware is enabled a GET request for "/recon/<metric>" to
the server will return a json formatted response::
fhines@ubuntu:~$ curl -i http://localhost:6030/recon/async
HTTP/1.1 200 OK
@ -288,30 +301,39 @@ return a json formatted response::
{"async_pending": 0}
The following metrics and telemetry are currently exposed:
The following metrics and telemetry are currently exposed::
================== ====================================================
Request URI Description
------------------ ----------------------------------------------------
/recon/load returns 1,5, and 15 minute load average
/recon/async returns count of async pending
/recon/mem returns /proc/meminfo
/recon/replication returns last logged object replication time
/recon/mounted returns *ALL* currently mounted filesystems
/recon/unmounted returns all unmounted drives if mount_check = True
/recon/diskusage returns disk utilization for storage devices
/recon/ringmd5 returns object/container/account ring md5sums
/recon/quarantined returns # of quarantined objects/accounts/containers
/recon/sockstat returns consumable info from /proc/net/sockstat|6
================== ====================================================
======================== ========================================================================================
Request URI Description
------------------------ ----------------------------------------------------------------------------------------
/recon/load returns 1,5, and 15 minute load average
/recon/mem returns /proc/meminfo
/recon/mounted returns *ALL* currently mounted filesystems
/recon/unmounted returns all unmounted drives if mount_check = True
/recon/diskusage returns disk utilization for storage devices
/recon/ringmd5 returns object/container/account ring md5sums
/recon/quarantined returns # of quarantined objects/accounts/containers
/recon/sockstat returns consumable info from /proc/net/sockstat|6
/recon/devices returns list of devices and devices dir i.e. /srv/node
/recon/async returns count of async pending
/recon/replication returns object replication times (for backward compatability)
/recon/replication/<type> returns replication info for given type (account, container, object)
/recon/auditor/<type> returns auditor stats on last reported scan for given type (account, container, object)
/recon/updater/<type> returns last updater sweep times for given type (container, object)
========================= =======================================================================================
This information can also be queried via the swift-recon command line utility::
fhines@ubuntu:~$ swift-recon -h
===============================================================================
Usage:
usage: swift-recon [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [--objmd5]
usage: swift-recon <server_type> [-v] [--suppress] [-a] [-r] [-u] [-d]
[-l] [--md5] [--auditor] [--updater] [--expirer] [--sockstat]
<server_type> account|container|object
Defaults to object server.
ex: swift-recon container -l --auditor
Options:
-h, --help show this help message and exit
@ -319,28 +341,32 @@ This information can also be queried via the swift-recon command line utility::
--suppress Suppress most connection related errors
-a, --async Get async stats
-r, --replication Get replication stats
--auditor Get auditor stats
--updater Get updater stats
--expirer Get expirer stats
-u, --unmounted Check cluster for unmounted devices
-d, --diskusage Get disk usage stats
-l, --loadstats Get cluster load average stats
-q, --quarantined Get cluster quarantine stats
--objmd5 Get md5sums of object.ring.gz and compare to local
copy
--md5 Get md5sum of servers ring and compare to local copy
--sockstat Get cluster socket usage stats
--all Perform all checks. Equivalent to -arudlq --objmd5
--socketstat
--all Perform all checks. Equal to -arudlq --md5 --sockstat
-z ZONE, --zone=ZONE Only query servers in specified zone
-t SECONDS, --timeout=SECONDS
Time to wait for a response from a server
--swiftdir=SWIFTDIR Default = /etc/swift
For example, to obtain quarantine stats from all hosts in zone "3"::
For example, to obtain container replication info from all hosts in zone "3"::
fhines@ubuntu:~$ swift-recon -q --zone 3
fhines@ubuntu:~$ swift-recon container -r --zone 3
===============================================================================
[2011-10-18 19:36:00] Checking quarantine dirs on 1 hosts...
[Quarantined objects] low: 4, high: 4, avg: 4, total: 4
[Quarantined accounts] low: 0, high: 0, avg: 0, total: 0
[Quarantined containers] low: 0, high: 0, avg: 0, total: 0
--> Starting reconnaissance on 1 hosts
===============================================================================
[2012-04-02 02:45:48] Checking on replication
[failure] low: 0.000, high: 0.000, avg: 0.000, reported: 1
[success] low: 486.000, high: 486.000, avg: 486.000, reported: 1
[replication_time] low: 20.853, high: 20.853, avg: 20.853, reported: 1
[attempted] low: 243.000, high: 243.000, avg: 243.000, reported: 1
---------------------------
Reporting Metrics to StatsD

View File

@ -22,7 +22,7 @@
# db_preallocation = off
[pipeline:main]
pipeline = account-server
pipeline = recon account-server
[app:account-server]
use = egg:swift#account
@ -33,6 +33,10 @@ use = egg:swift#account
# set log_requests = True
# auto_create_account_prefix = .
[filter:recon]
use = egg:swift#recon
# recon_cache_path = /var/cache/swift
[account-replicator]
# You can override the default log routing for this app here (don't use set!):
# log_name = account-replicator
@ -54,6 +58,7 @@ use = egg:swift#account
# reclaim_age = 604800
# Time in seconds to wait between replication passes
# run_pause = 30
# recon_cache_path = /var/cache/swift
[account-auditor]
# You can override the default log routing for this app here (don't use set!):
@ -62,6 +67,9 @@ use = egg:swift#account
# log_level = INFO
# Will audit, at most, 1 account per device per interval
# interval = 1800
# log_facility = LOG_LOCAL0
# log_level = INFO
# recon_cache_path = /var/cache/swift
[account-reaper]
# You can override the default log routing for this app here (don't use set!):

View File

@ -25,7 +25,7 @@
# db_preallocation = off
[pipeline:main]
pipeline = container-server
pipeline = recon container-server
[app:container-server]
use = egg:swift#container
@ -39,6 +39,10 @@ use = egg:swift#container
# allow_versions = False
# auto_create_account_prefix = .
[filter:recon]
use = egg:swift#recon
#recon_cache_path = /var/cache/swift
[container-replicator]
# You can override the default log routing for this app here (don't use set!):
# log_name = container-replicator
@ -55,7 +59,7 @@ use = egg:swift#container
# reclaim_age = 604800
# Time in seconds to wait between replication passes
# run_pause = 30
# recon_cache_path = /var/cache/swift
[container-updater]
# You can override the default log routing for this app here (don't use set!):
@ -70,6 +74,7 @@ use = egg:swift#container
# slowdown = 0.01
# Seconds to suppress updating an account that has generated an error
# account_suppression_time = 60
# recon_cache_path = /var/cache/swift
[container-auditor]
# You can override the default log routing for this app here (don't use set!):
@ -78,6 +83,7 @@ use = egg:swift#container
# log_level = INFO
# Will audit, at most, 1 container per device per interval
# interval = 1800
# recon_cache_path = /var/cache/swift
[container-sync]
# You can override the default log routing for this app here (don't use set!):

View File

@ -45,8 +45,8 @@ use = egg:swift#object
[filter:recon]
use = egg:swift#recon
recon_cache_path = /var/cache/swift
recon_lock_path = /var/lock
#recon_cache_path = /var/cache/swift
#recon_lock_path = /var/lock
[object-replicator]
# You can override the default log routing for this app here (don't use set!):
@ -68,10 +68,8 @@ recon_lock_path = /var/lock
# lockup_timeout = 1800
# The replicator also performs reclamation
# reclaim_age = 604800
# enable logging of replication stats for recon
# recon_enable = no
# recon_cache_path = /var/cache/swift
# ring_check_interval = 15
# recon_cache_path = /var/cache/swift
[object-updater]
# You can override the default log routing for this app here (don't use set!):
@ -84,6 +82,7 @@ recon_lock_path = /var/lock
# conn_timeout = 0.5
# slowdown will sleep that amount between objects
# slowdown = 0.01
# recon_cache_path = /var/cache/swift
[object-auditor]
# You can override the default log routing for this app here (don't use set!):
@ -94,4 +93,4 @@ recon_lock_path = /var/lock
# bytes_per_second = 10000000
# log_time = 3600
# zero_byte_files_per_second = 50
# recon_cache_path = /var/cache/swift

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from random import random
@ -20,7 +21,7 @@ import swift.common.db
from swift.account import server as account_server
from swift.common.db import AccountBroker
from swift.common.utils import get_logger, audit_location_generator, \
TRUE_VALUES
TRUE_VALUES, dump_recon_cache
from swift.common.daemon import Daemon
from eventlet import Timeout
@ -40,6 +41,9 @@ class AccountAuditor(Daemon):
self.account_failures = 0
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "account.recon")
def _one_audit_pass(self, reported):
all_locs = audit_location_generator(self.devices,
@ -53,6 +57,12 @@ class AccountAuditor(Daemon):
{'time': time.ctime(reported),
'passed': self.account_passes,
'failed': self.account_failures})
self.account_audit(path)
dump_recon_cache({'account_audits_since': reported,
'account_audits_passed': self.account_passes,
'account_audits_failed':
self.account_failures},
self.rcache, self.logger)
reported = time.time()
self.account_passes = 0
self.account_failures = 0
@ -75,6 +85,8 @@ class AccountAuditor(Daemon):
time.sleep(self.interval - elapsed)
self.logger.info(
_('Account audit pass completed: %.02fs'), elapsed)
dump_recon_cache({'account_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def run_once(self, *args, **kwargs):
"""Run the account audit once."""
@ -84,6 +96,8 @@ class AccountAuditor(Daemon):
elapsed = time.time() - begin
self.logger.info(
_('Account audit "once" mode completed: %.02fs'), elapsed)
dump_recon_cache({'account_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def account_audit(self, path):
"""

View File

@ -31,7 +31,8 @@ from webob.exc import HTTPNotFound, HTTPNoContent, HTTPAccepted, \
import swift.common.db
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, TRUE_VALUES, unlink_older_than
renamer, mkdirs, lock_parent_directory, TRUE_VALUES, unlink_older_than, \
dump_recon_cache
from swift.common import ring
from swift.common.bufferedhttp import BufferedHTTPConnection
from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
@ -124,6 +125,11 @@ class Replicator(Daemon):
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self._zero_stats()
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.recon_replicator = '%s.recon' % self.server_type
self.rcache = os.path.join(self.recon_cache_path,
self.recon_replicator)
def _zero_stats(self):
"""Zero out the stats."""
@ -144,6 +150,9 @@ class Replicator(Daemon):
self.logger.info(_('Removed %(remove)d dbs') % self.stats)
self.logger.info(_('%(success)s successes, %(failure)s failures')
% self.stats)
dump_recon_cache({'replication_stats': self.stats,
'replication_time': time.time() - self.stats['start']
}, self.rcache, self.logger)
self.logger.info(' '.join(['%s:%s' % item for item in
self.stats.items() if item[0] in
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty',

View File

@ -17,7 +17,7 @@ import errno
import os
from webob import Request, Response
from swift.common.utils import split_path, get_logger
from swift.common.utils import split_path, get_logger, TRUE_VALUES
from swift.common.constraints import check_mount
from resource import getpagesize
from hashlib import md5
@ -46,16 +46,41 @@ class ReconMiddleware(object):
self.devices = conf.get('devices', '/srv/node/')
swift_dir = conf.get('swift_dir', '/etc/swift')
self.logger = get_logger(conf, log_route='recon')
self.recon_cache_path = conf.get('recon_cache_path', \
'/var/cache/swift')
self.object_recon_cache = "%s/object.recon" % self.recon_cache_path
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.object_recon_cache = os.path.join(self.recon_cache_path,
'object.recon')
self.container_recon_cache = os.path.join(self.recon_cache_path,
'container.recon')
self.account_recon_cache = os.path.join(self.recon_cache_path,
'account.recon')
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
self.rings = [self.account_ring_path, self.container_ring_path, \
self.object_ring_path]
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.rings = [self.account_ring_path, self.container_ring_path,
self.object_ring_path]
self.mount_check = conf.get('mount_check', 'true').lower() \
in TRUE_VALUES
def _from_recon_cache(self, cache_keys, cache_file, openr=open):
"""retrieve values from a recon cache file
:params cache_keys: list of cache items to retrieve
:params cache_file: cache file to retrieve items from.
:params openr: open to use [for unittests]
:return: dict of cache items and their value or none if not found
"""
try:
with openr(cache_file, 'r') as f:
recondata = json.load(f)
return dict((key, recondata.get(key)) for key in cache_keys)
except IOError:
self.logger.exception(_('Error reading recon cache file'))
except ValueError:
self.logger.exception(_('Error parsing recon cache file'))
except Exception:
self.logger.exception(_('Error retrieving recon data'))
return dict((key, None) for key in cache_keys)
def get_mounted(self, openr=open):
"""get ALL mounted fs from /proc/mounts"""
@ -89,36 +114,73 @@ class ReconMiddleware(object):
meminfo[entry[0]] = entry[1].strip()
return meminfo
def get_async_info(self, openr=open):
def get_async_info(self):
"""get # of async pendings"""
asyncinfo = {}
with openr(self.object_recon_cache, 'r') as f:
recondata = json.load(f)
if 'async_pending' in recondata:
asyncinfo['async_pending'] = recondata['async_pending']
else:
self.logger.notice( \
_('NOTICE: Async pendings not in recon data.'))
asyncinfo['async_pending'] = -1
return asyncinfo
return self._from_recon_cache(['async_pending'],
self.object_recon_cache)
def get_replication_info(self, openr=open):
"""grab last object replication time"""
repinfo = {}
with openr(self.object_recon_cache, 'r') as f:
recondata = json.load(f)
if 'object_replication_time' in recondata:
repinfo['object_replication_time'] = \
recondata['object_replication_time']
else:
self.logger.notice( \
_('NOTICE: obj replication time not in recon data'))
repinfo['object_replication_time'] = -1
return repinfo
def get_replication_info(self, recon_type):
"""get replication info"""
if recon_type == 'account':
return self._from_recon_cache(['replication_time',
'replication_stats'],
self.account_recon_cache)
elif recon_type == 'container':
return self._from_recon_cache(['replication_time',
'replication_stats'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_replication_time'],
self.object_recon_cache)
else:
return None
def get_device_info(self):
"""place holder, grab dev info"""
return self.devices
"""get devices"""
try:
return {self.devices: os.listdir(self.devices)}
except Exception:
self.logger.exception(_('Error listing devices'))
return {self.devices: None}
def get_updater_info(self, recon_type):
"""get updater info"""
if recon_type == 'container':
return self._from_recon_cache(['container_updater_sweep'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_updater_sweep'],
self.object_recon_cache)
else:
return None
def get_expirer_info(self, recon_type):
"""get expirer info"""
if recon_type == 'object':
return self._from_recon_cache(['object_expiration_pass',
'expired_last_pass'],
self.object_recon_cache)
def get_auditor_info(self, recon_type):
"""get auditor info"""
if recon_type == 'account':
return self._from_recon_cache(['account_audits_passed',
'account_auditor_pass_completed',
'account_audits_since',
'account_audits_failed'],
self.account_recon_cache)
elif recon_type == 'container':
return self._from_recon_cache(['container_audits_passed',
'container_auditor_pass_completed',
'container_audits_since',
'container_audits_failed'],
self.container_recon_cache)
elif recon_type == 'object':
return self._from_recon_cache(['object_auditor_stats_ALL',
'object_auditor_stats_ZBF'],
self.object_recon_cache)
else:
return None
def get_unmounted(self):
"""list unmounted (failed?) devices"""
@ -152,12 +214,18 @@ class ReconMiddleware(object):
sums = {}
for ringfile in self.rings:
md5sum = md5()
with openr(ringfile, 'rb') as f:
block = f.read(4096)
while block:
md5sum.update(block)
block = f.read(4096)
sums[ringfile] = md5sum.hexdigest()
if os.path.exists(ringfile):
try:
with openr(ringfile, 'rb') as f:
block = f.read(4096)
while block:
md5sum.update(block)
block = f.read(4096)
sums[ringfile] = md5sum.hexdigest()
except IOError, err:
sums[ringfile] = None
if err.errno != errno.ENOENT:
self.logger.exception(_('Error reading ringfile'))
return sums
def get_quarantine_count(self):
@ -193,7 +261,7 @@ class ReconMiddleware(object):
int(tcpstats[10]) * getpagesize()
except IOError as e:
if e.errno != errno.ENOENT:
raise
raise
try:
with openr('/proc/net/sockstat6', 'r') as proc_sockstat6:
for entry in proc_sockstat6:
@ -205,54 +273,50 @@ class ReconMiddleware(object):
return sockstat
def GET(self, req):
error = False
root, type = split_path(req.path, 1, 2, False)
try:
if type == "mem":
content = json.dumps(self.get_mem())
elif type == "load":
content = json.dumps(self.get_load(), sort_keys=True)
elif type == "async":
try:
content = json.dumps(self.get_async_info())
except IOError as e:
error = True
content = "async - %s" % e
elif type == "replication":
try:
content = json.dumps(self.get_replication_info())
except IOError as e:
error = True
content = "replication - %s" % e
elif type == "mounted":
content = json.dumps(self.get_mounted())
elif type == "unmounted":
content = json.dumps(self.get_unmounted())
elif type == "diskusage":
content = json.dumps(self.get_diskusage())
elif type == "ringmd5":
content = json.dumps(self.get_ring_md5())
elif type == "quarantined":
content = json.dumps(self.get_quarantine_count())
elif type == "sockstat":
content = json.dumps(self.get_socket_info())
else:
content = "Invalid path: %s" % req.path
return Response(request=req, status="400 Bad Request", \
body=content, content_type="text/plain")
except ValueError as e:
error = True
content = "ValueError: %s" % e
if not error:
return Response(request=req, body=content, \
content_type="application/json")
root, rcheck, rtype = split_path(req.path, 1, 3, True)
all_rtypes = ['account', 'container', 'object']
if rcheck == "mem":
content = self.get_mem()
elif rcheck == "load":
content = self.get_load()
elif rcheck == "async":
content = self.get_async_info()
elif rcheck == 'replication' and rtype in all_rtypes:
content = self.get_replication_info(rtype)
elif rcheck == 'replication' and rtype is None:
#handle old style object replication requests
content = self.get_replication_info('object')
elif rcheck == "devices":
content = self.get_device_info()
elif rcheck == "updater" and rtype in ['container', 'object']:
content = self.get_updater_info(rtype)
elif rcheck == "auditor" and rtype in all_rtypes:
content = self.get_auditor_info(rtype)
elif rcheck == "expirer" and rtype == 'object':
content = self.get_expirer_info(rtype)
elif rcheck == "mounted":
content = self.get_mounted()
elif rcheck == "unmounted":
content = self.get_unmounted()
elif rcheck == "diskusage":
content = self.get_diskusage()
elif rcheck == "ringmd5":
content = self.get_ring_md5()
elif rcheck == "quarantined":
content = self.get_quarantine_count()
elif rcheck == "sockstat":
content = self.get_socket_info()
else:
msg = 'CRITICAL recon - %s' % str(content)
self.logger.critical(msg)
body = "Internal server error."
return Response(request=req, status="500 Server Error", \
body=body, content_type="text/plain")
content = "Invalid path: %s" % req.path
return Response(request=req, status="404 Not Found",
body=content, content_type="text/plain")
if content:
return Response(request=req, body=json.dumps(content),
content_type="application/json")
else:
return Response(request=req, status="500 Server Error",
body="Internal server error.",
content_type="text/plain")
def __call__(self, env, start_response):
req = Request(env)

View File

@ -41,7 +41,7 @@ import glob
from urlparse import urlparse as stdlib_urlparse, ParseResult
import eventlet
from eventlet import GreenPool, sleep
from eventlet import GreenPool, sleep, Timeout
from eventlet.green import socket, threading
import netifaces
@ -1184,35 +1184,39 @@ def human_readable(value):
return '%d%si' % (round(value), suffixes[index])
def dump_recon_cache(cache_key, cache_value, cache_file, lock_timeout=2):
def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
"""Update recon cache values
:param cache_key: key to update
:param cache_value: value you want to set key too
:param cache_dict: Dictionary of cache key/value pairs to write out
:param cache_file: cache file to update
:param logger: the logger to use to log an encountered error
:param lock_timeout: timeout (in seconds)
"""
with lock_file(cache_file, lock_timeout, unlink=False) as cf:
cache_entry = {}
try:
existing_entry = cf.readline()
if existing_entry:
cache_entry = json.loads(existing_entry)
except ValueError:
#file doesn't have a valid entry, we'll recreate it
pass
cache_entry[cache_key] = cache_value
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:
tf.write(json.dumps(cache_entry) + '\n')
os.rename(tf.name, cache_file)
finally:
try:
with lock_file(cache_file, lock_timeout, unlink=False) as cf:
cache_entry = {}
try:
os.unlink(tf.name)
except OSError, err:
if err.errno != errno.ENOENT:
raise
existing_entry = cf.readline()
if existing_entry:
cache_entry = json.loads(existing_entry)
except ValueError:
#file doesn't have a valid entry, we'll recreate it
pass
for cache_key, cache_value in cache_dict.items():
cache_entry[cache_key] = cache_value
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:
tf.write(json.dumps(cache_entry) + '\n')
os.rename(tf.name, cache_file)
finally:
try:
os.unlink(tf.name)
except OSError, err:
if err.errno != errno.ENOENT:
raise
except (Exception, Timeout):
logger.exception(_('Exception dumping recon cache'))
def listdir(path):

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from random import random
@ -22,7 +23,7 @@ import swift.common.db
from swift.container import server as container_server
from swift.common.db import ContainerBroker
from swift.common.utils import get_logger, audit_location_generator, \
TRUE_VALUES
TRUE_VALUES, dump_recon_cache
from swift.common.daemon import Daemon
@ -36,11 +37,13 @@ class ContainerAuditor(Daemon):
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.interval = int(conf.get('interval', 1800))
swift_dir = conf.get('swift_dir', '/etc/swift')
self.container_passes = 0
self.container_failures = 0
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "container.recon")
def _one_audit_pass(self, reported):
all_locs = audit_location_generator(self.devices,
@ -56,6 +59,12 @@ class ContainerAuditor(Daemon):
{'time': time.ctime(reported),
'pass': self.container_passes,
'fail': self.container_failures})
dump_recon_cache({'container_audits_since': reported,
'container_audits_passed':
self.container_passes,
'container_audits_failed':
self.container_failures},
self.rcache, self.logger)
reported = time.time()
self.container_passes = 0
self.container_failures = 0
@ -78,6 +87,8 @@ class ContainerAuditor(Daemon):
time.sleep(self.interval - elapsed)
self.logger.info(
_('Container audit pass completed: %.02fs'), elapsed)
dump_recon_cache({'container_auditor_pass_completed': elapsed},
self.rcache, self.logger)
def run_once(self, *args, **kwargs):
"""Run the container audit once."""
@ -87,6 +98,8 @@ class ContainerAuditor(Daemon):
elapsed = time.time() - begin
self.logger.info(
_('Container audit "once" mode completed: %.02fs'), elapsed)
dump_recon_cache({'container_auditor_pass_completed': elapsed},
self.recon_container)
def container_audit(self, path):
"""

View File

@ -29,7 +29,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.db import ContainerBroker
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, TRUE_VALUES
from swift.common.utils import get_logger, TRUE_VALUES, dump_recon_cache
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
@ -59,6 +59,9 @@ class ContainerUpdater(Daemon):
self.new_account_suppressions = None
swift.common.db.DB_PREALLOCATION = \
conf.get('db_preallocation', 'f').lower() in TRUE_VALUES
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "container.recon")
def get_account_ring(self):
"""Get the account ring. Load it if it hasn't been yet."""
@ -154,6 +157,8 @@ class ContainerUpdater(Daemon):
elapsed = time.time() - begin
self.logger.info(_('Container update sweep completed: %.02fs'),
elapsed)
dump_recon_cache({'container_updater_sweep': elapsed},
self.rcache, self.logger)
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
@ -175,6 +180,8 @@ class ContainerUpdater(Daemon):
'%(no_change)s with no changes'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures, 'no_change': self.no_changes})
dump_recon_cache({'container_updater_sweep': elapsed},
self.rcache, self.logger)
def container_sweep(self, path):
"""

View File

@ -20,7 +20,7 @@ from eventlet import Timeout
from swift.obj import server as object_server
from swift.common.utils import get_logger, audit_location_generator, \
ratelimit_sleep, TRUE_VALUES
ratelimit_sleep, TRUE_VALUES, dump_recon_cache
from swift.common.exceptions import AuditException, DiskFileError, \
DiskFileNotExist
from swift.common.daemon import Daemon
@ -54,6 +54,9 @@ class AuditorWorker(object):
self.passes = 0
self.quarantines = 0
self.errors = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def audit_all_objects(self, mode='once'):
self.logger.info(_('Begin object audit "%s" mode (%s)' %
@ -63,7 +66,6 @@ class AuditorWorker(object):
self.total_files_processed = 0
total_quarantines = 0
total_errors = 0
files_running_time = 0
time_auditing = 0
all_locs = audit_location_generator(self.devices,
object_server.DATADIR,
@ -93,6 +95,16 @@ class AuditorWorker(object):
'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing,
'audit_rate': time_auditing / (now - begin)})
dump_recon_cache({'object_auditor_stats_%s' %
self.auditor_type: {
'errors': self.errors,
'passes': self.passes,
'quarantined': self.quarantines,
'bytes_processed':
self.bytes_processed,
'start_time': reported,
'audit_time': time_auditing}
}, self.rcache, self.logger)
reported = now
total_quarantines += self.quarantines
total_errors += self.errors

View File

@ -14,16 +14,15 @@
# limitations under the License.
from random import random
from sys import exc_info
from time import time
from urllib import quote
from os.path import join
from eventlet import sleep, Timeout
from webob import Request
from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient
from swift.common.utils import get_logger
from swift.common.utils import get_logger, dump_recon_cache
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_PRECONDITION_FAILED
@ -55,6 +54,9 @@ class ObjectExpirer(Daemon):
self.report_interval = int(conf.get('report_interval') or 300)
self.report_first_time = self.report_last_time = time()
self.report_objects = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = join(self.recon_cache_path, 'object.recon')
def report(self, final=False):
"""
@ -68,6 +70,9 @@ class ObjectExpirer(Daemon):
elapsed = time() - self.report_first_time
self.logger.info(_('Pass completed in %ds; %d objects expired') %
(elapsed, self.report_objects))
dump_recon_cache({'object_expiration_pass': elapsed,
'expired_last_pass': self.report_objects},
self.rcache, self.logger)
elif time() - self.report_last_time >= self.report_interval:
elapsed = time() - self.report_first_time
self.logger.info(_('Pass so far %ds; %d objects expired') %

View File

@ -32,8 +32,7 @@ from eventlet.support.greenlets import GreenletExit
from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \
TRUE_VALUES
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -247,11 +246,9 @@ class ObjectReplicator(Daemon):
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_enable = conf.get(
'recon_enable', 'no').lower() in TRUE_VALUES
self.recon_cache_path = conf.get(
'recon_cache_path', '/var/cache/swift')
self.recon_object = os.path.join(self.recon_cache_path, "object.recon")
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def _rsync(self, args):
"""
@ -598,12 +595,8 @@ class ObjectReplicator(Daemon):
total = (time.time() - start) / 60
self.logger.info(
_("Object replication complete. (%.02f minutes)"), total)
if self.recon_enable:
try:
dump_recon_cache('object_replication_time', total, \
self.recon_object)
except (Exception, Timeout):
self.logger.exception(_('Exception dumping recon cache'))
dump_recon_cache({'object_replication_time': total},
self.rcache, self.logger)
def run_forever(self, *args, **kwargs):
self.logger.info(_("Starting object replicator in daemon mode."))
@ -616,12 +609,8 @@ class ObjectReplicator(Daemon):
total = (time.time() - start) / 60
self.logger.info(
_("Object replication complete. (%.02f minutes)"), total)
if self.recon_enable:
try:
dump_recon_cache('object_replication_time', total, \
self.recon_object)
except (Exception, Timeout):
self.logger.exception(_('Exception dumping recon cache'))
dump_recon_cache({'object_replication_time': total},
self.rcache, self.logger)
self.logger.debug(_('Replication sleeping for %s seconds.'),
self.run_pause)
sleep(self.run_pause)

View File

@ -25,7 +25,8 @@ from eventlet import patcher, Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache
from swift.common.daemon import Daemon
from swift.obj.server import ASYNCDIR
from swift.common.http import is_success, HTTP_NOT_FOUND, \
@ -50,6 +51,9 @@ class ObjectUpdater(Daemon):
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.successes = 0
self.failures = 0
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
def get_container_ring(self):
"""Get the container ring. Load it, if it hasn't been yet."""
@ -97,6 +101,8 @@ class ObjectUpdater(Daemon):
elapsed = time.time() - begin
self.logger.info(_('Object update sweep completed: %.02fs'),
elapsed)
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
@ -119,6 +125,8 @@ class ObjectUpdater(Daemon):
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures})
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
def object_sweep(self, device):
"""

View File

@ -30,6 +30,16 @@ class FakeApp(object):
def start_response(*args):
pass
class FakeFromCache(object):
def __init__(self, out=None):
self.fakeout = out
self.fakeout_calls = []
def fake_from_recon_cache(self, *args, **kwargs):
self.fakeout_calls.append((args, kwargs))
return self.fakeout
class OpenAndReadTester(object):
def __init__(self, output_iter):
@ -93,12 +103,78 @@ class MockOS(object):
return stat_result(self.lstat_output_tuple)
class FakeRecon(object):
def __init__(self):
self.fake_replication_rtype = None
self.fake_updater_rtype = None
self.fake_auditor_rtype = None
self.fake_expirer_rtype = None
def fake_mem(self):
return {'memtest': "1"}
def fake_load(self):
return {'loadtest': "1"}
def fake_async(self):
return {'asynctest': "1"}
def fake_get_device_info(self):
return {"/srv/1/node": ["sdb1"]}
def fake_replication(self, recon_type):
self.fake_replication_rtype = recon_type
return {'replicationtest': "1"}
def fake_updater(self, recon_type):
self.fake_updater_rtype = recon_type
return {'updatertest': "1"}
def fake_auditor(self, recon_type):
self.fake_auditor_rtype = recon_type
return {'auditortest': "1"}
def fake_expirer(self, recon_type):
self.fake_expirer_rtype = recon_type
return {'expirertest': "1"}
def fake_mounted(self):
return {'mountedtest': "1"}
def fake_unmounted(self):
return {'unmountedtest': "1"}
def fake_diskusage(self):
return {'diskusagetest': "1"}
def fake_ringmd5(self):
return {'ringmd5test': "1"}
def fake_quarantined(self):
return {'quarantinedtest': "1"}
def fake_sockstat(self):
return {'sockstattest': "1"}
def nocontent(self):
return None
def raise_IOError(self, *args, **kwargs):
raise IOError
def raise_ValueError(self, *args, **kwargs):
raise ValueError
def raise_Exception(self, *args, **kwargs):
raise Exception
class TestReconSuccess(TestCase):
def setUp(self):
self.app = recon.ReconMiddleware(FakeApp(), {})
self.mockos = MockOS()
self.fakecache = FakeFromCache()
self.real_listdir = os.listdir
self.real_path_exists = os.path.exists
self.real_lstat = os.lstat
@ -107,6 +183,9 @@ class TestReconSuccess(TestCase):
os.path.exists = self.mockos.fake_path_exists
os.lstat = self.mockos.fake_lstat
os.statvfs = self.mockos.fake_statvfs
self.real_from_cache = self.app._from_recon_cache
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
self.frecon = FakeRecon()
def tearDown(self):
os.listdir = self.real_listdir
@ -114,6 +193,42 @@ class TestReconSuccess(TestCase):
os.lstat = self.real_lstat
os.statvfs = self.real_statvfs
del self.mockos
self.app._from_recon_cache = self.real_from_cache
del self.fakecache
def test_from_recon_cache(self):
oart = OpenAndReadTester(['{"notneeded": 5, "testkey1": "canhazio"}'])
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('test.cache', 'r'), {})])
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': 'canhazio'})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_from_recon_cache_ioerror(self):
oart = self.frecon.raise_IOError
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart)
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': None})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_from_recon_cache_valueerror(self):
oart = self.frecon.raise_ValueError
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart)
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': None})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_from_recon_cache_exception(self):
oart = self.frecon.raise_Exception
self.app._from_recon_cache = self.real_from_cache
rv = self.app._from_recon_cache(['testkey1', 'notpresentkey'],
'test.cache', openr=oart)
self.assertEquals(rv, {'notpresentkey': None, 'testkey1': None})
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
def test_get_mounted(self):
mounts_content = ['rootfs / rootfs rw 0 0',
@ -255,40 +370,166 @@ class TestReconSuccess(TestCase):
self.assertEquals(rv, meminfo_resp)
def test_get_async_info(self):
obj_recon_content = """{"object_replication_time": 200.0, "async_pending": 5}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_async_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
from_cache_response = {'async_pending': 5}
self.fakecache.fakeout = from_cache_response
rv = self.app.get_async_info()
self.assertEquals(rv, {'async_pending': 5})
def test_get_async_info_empty_file(self):
obj_recon_content = """{"object_replication_time": 200.0}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_async_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
self.assertEquals(rv, {'async_pending': -1})
def test_get_replication_info_account(self):
from_cache_response = {"replication_stats": {
"attempted": 1, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 2, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 1333044050.855202,
"success": 2, "ts_repl": 0 },
"replication_time": 0.2615511417388916}
self.fakecache.fakeout = from_cache_response
rv = self.app.get_replication_info('account')
self.assertEquals(self.fakecache.fakeout_calls,
[((['replication_time', 'replication_stats'],
'/var/cache/swift/account.recon'), {})])
self.assertEquals(rv, {"replication_stats": {
"attempted": 1, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 2, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 1333044050.855202,
"success": 2, "ts_repl": 0 },
"replication_time": 0.2615511417388916})
def test_get_replication_info(self):
obj_recon_content = """{"object_replication_time": 200.0, "async_pending": 5}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_replication_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
def test_get_replication_info_container(self):
from_cache_response = {"replication_time": 200.0,
"replication_stats": {
"attempted": 179, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 358, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 5.5, "success": 358,
"ts_repl": 0}}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_replication_info('container')
self.assertEquals(self.fakecache.fakeout_calls,
[((['replication_time', 'replication_stats'],
'/var/cache/swift/container.recon'), {})])
self.assertEquals(rv, {"replication_time": 200.0,
"replication_stats": {
"attempted": 179, "diff": 0,
"diff_capped": 0, "empty": 0,
"failure": 0, "hashmatch": 0,
"no_change": 358, "remote_merge": 0,
"remove": 0, "rsync": 0,
"start": 5.5, "success": 358,
"ts_repl": 0}})
def test_get_replication_object(self):
from_cache_response = {"object_replication_time": 200.0}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_replication_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_replication_time'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {'object_replication_time': 200.0})
def test_get_replication_info_empty_file(self):
obj_recon_content = """{"async_pending": 5}"""
oart = OpenAndReadTester([obj_recon_content])
rv = self.app.get_replication_info(openr=oart.open)
self.assertEquals(oart.read_calls, [((), {})])
self.assertEquals(oart.open_calls, [(('/var/cache/swift/object.recon', 'r'), {})])
self.assertEquals(rv, {'object_replication_time': -1})
def test_get_updater_info_container(self):
from_cache_response = {"container_updater_sweep": 18.476239919662476}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_updater_info('container')
self.assertEquals(self.fakecache.fakeout_calls,
[((['container_updater_sweep'],
'/var/cache/swift/container.recon'), {})])
self.assertEquals(rv, {"container_updater_sweep": 18.476239919662476})
def test_get_device_info(self):
rv = self.app.get_device_info()
self.assertEquals(rv, '/srv/node/')
def test_get_updater_info_object(self):
from_cache_response = {"object_updater_sweep": 0.79848217964172363}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_updater_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_updater_sweep'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {"object_updater_sweep": 0.79848217964172363})
def test_get_auditor_info_account(self):
from_cache_response = {"account_auditor_pass_completed": 0.24,
"account_audits_failed": 0,
"account_audits_passed": 6,
"account_audits_since": "1333145374.1373529"}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('account')
self.assertEquals(self.fakecache.fakeout_calls,
[((['account_audits_passed',
'account_auditor_pass_completed',
'account_audits_since',
'account_audits_failed'],
'/var/cache/swift/account.recon'), {})])
self.assertEquals(rv, {"account_auditor_pass_completed": 0.24,
"account_audits_failed": 0,
"account_audits_passed": 6,
"account_audits_since": "1333145374.1373529"})
def test_get_auditor_info_container(self):
from_cache_response = {"container_auditor_pass_completed": 0.24,
"container_audits_failed": 0,
"container_audits_passed": 6,
"container_audits_since": "1333145374.1373529"}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('container')
self.assertEquals(self.fakecache.fakeout_calls,
[((['container_audits_passed',
'container_auditor_pass_completed',
'container_audits_since',
'container_audits_failed'],
'/var/cache/swift/container.recon'), {})])
self.assertEquals(rv, {"container_auditor_pass_completed": 0.24,
"container_audits_failed": 0,
"container_audits_passed": 6,
"container_audits_since": "1333145374.1373529"})
def test_get_auditor_info_object(self):
from_cache_response = {"object_auditor_stats_ALL": {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 },
"object_auditor_stats_ZBF": {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 }}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_auditor_stats_ALL',
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {"object_auditor_stats_ALL": {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 },
"object_auditor_stats_ZBF": {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0 }})
def test_get_unmounted(self):
@ -319,7 +560,8 @@ class TestReconSuccess(TestCase):
self.mockos.statvfs_output=statvfs_content
self.mockos.path_exists_output=True
rv = self.app.get_diskusage()
self.assertEquals(self.mockos.statvfs_calls,[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(self.mockos.statvfs_calls,
[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(rv, du_resp)
def test_get_diskusage_checkmount_fail(self):
@ -329,11 +571,12 @@ class TestReconSuccess(TestCase):
self.mockos.path_exists_output=False
rv = self.app.get_diskusage()
self.assertEquals(self.mockos.listdir_calls,[(('/srv/node/',), {})])
self.assertEquals(self.mockos.path_exists_calls,[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(self.mockos.path_exists_calls,
[(('/srv/node/canhazdrive1',), {})])
self.assertEquals(rv, du_resp)
def test_get_quarantine_count(self):
#posix.lstat_result(st_mode=1, st_ino=2, st_dev=3, st_nlink=4,
#posix.lstat_result(st_mode=1, st_ino=2, st_dev=3, st_nlink=4,
# st_uid=5, st_gid=6, st_size=7, st_atime=8,
# st_mtime=9, st_ctime=10)
lstat_content = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
@ -352,63 +595,28 @@ class TestReconSuccess(TestCase):
sockstat6_content = ['TCP6: inuse 1',
'UDP6: inuse 3',
'UDPLITE6: inuse 0',
'RAW6: inuse 0',
'RAW6: inuse 0',
'FRAG6: inuse 0 memory 0',
'']
oart = OpenAndReadTester(sockstat_content)
rv = self.app.get_socket_info(openr=oart.open)
self.assertEquals(oart.open_calls, [(('/proc/net/sockstat', 'r'), {}),
(('/proc/net/sockstat6', 'r'), {})])
#todo verify parsed result of sockstat6
#self.assertEquals(rv, {'time_wait': 0, 'tcp_in_use': 30, 'orphan': 0, 'tcp_mem_allocated_bytes': 0})
class FakeRecon(object):
def fake_mem(self):
return {'memtest': "1"}
def fake_load(self):
return {'loadtest': "1"}
def fake_async(self):
return {'asynctest': "1"}
def fake_replication(self):
return {'replicationtest': "1"}
def fake_mounted(self):
return {'mountedtest': "1"}
def fake_unmounted(self):
return {'unmountedtest': "1"}
def fake_diskusage(self):
return {'diskusagetest': "1"}
def fake_ringmd5(self):
return {'ringmd5test': "1"}
def fake_quarantined(self):
return {'quarantinedtest': "1"}
def fake_sockstat(self):
return {'sockstattest': "1"}
def raise_IOError(self):
raise IOError
def raise_ValueError(self):
raise ValueError
class TestHealthCheck(unittest.TestCase):
class TestReconMiddleware(unittest.TestCase):
def setUp(self):
self.frecon = FakeRecon()
self.app = recon.ReconMiddleware(FakeApp(), {})
self.app = recon.ReconMiddleware(FakeApp(), {'object_recon': "true"})
#self.app.object_recon = True
self.app.get_mem = self.frecon.fake_mem
self.app.get_load = self.frecon.fake_load
self.app.get_async_info = self.frecon.fake_async
self.app.get_device_info = self.frecon.fake_get_device_info
self.app.get_replication_info = self.frecon.fake_replication
self.app.get_auditor_info = self.frecon.fake_auditor
self.app.get_updater_info = self.frecon.fake_updater
self.app.get_expirer_info = self.frecon.fake_expirer
self.app.get_mounted = self.frecon.fake_mounted
self.app.get_unmounted = self.frecon.fake_unmounted
self.app.get_diskusage = self.frecon.fake_diskusage
@ -434,75 +642,185 @@ class TestHealthCheck(unittest.TestCase):
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_async_resp)
def test_recon_get_async_ioerror(self):
orig = self.app.get_async_info
self.app.get_async_info = self.frecon.raise_IOError
req = Request.blank('/recon/async', environ={'REQUEST_METHOD': 'GET'})
def test_get_device_info(self):
get_device_resp = ['{"/srv/1/node": ["sdb1"]}']
req = Request.blank('/recon/devices',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.app.get_async_info = orig
self.assertEquals(resp, ['Internal server error.'])
self.assertEquals(resp, get_device_resp)
def test_recon_get_replication(self):
def test_recon_get_replication_notype(self):
get_replication_resp = ['{"replicationtest": "1"}']
req = Request.blank('/recon/replication', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/replication',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'object')
self.frecon.fake_replication_rtype = None
def test_recon_get_replication_ioerror(self):
orig = self.app.get_replication_info
self.app.get_replication_info = self.frecon.raise_IOError
req = Request.blank('/recon/replication', environ={'REQUEST_METHOD': 'GET'})
def test_recon_get_replication_all(self):
get_replication_resp = ['{"replicationtest": "1"}']
#test account
req = Request.blank('/recon/replication/account',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.app.get_async_info = orig
self.assertEquals(resp, ['Internal server error.'])
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'account')
self.frecon.fake_replication_rtype = None
#test container
req = Request.blank('/recon/replication/container',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'container')
self.frecon.fake_replication_rtype = None
#test object
req = Request.blank('/recon/replication/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_replication_resp)
self.assertEquals(self.frecon.fake_replication_rtype, 'object')
self.frecon.fake_replication_rtype = None
def test_recon_get_auditor_invalid(self):
get_auditor_resp = ['Invalid path: /recon/auditor/invalid']
req = Request.blank('/recon/auditor/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
def test_recon_get_auditor_notype(self):
get_auditor_resp = ['Invalid path: /recon/auditor']
req = Request.blank('/recon/auditor',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
def test_recon_get_auditor_all(self):
get_auditor_resp = ['{"auditortest": "1"}']
req = Request.blank('/recon/auditor/account',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
self.assertEquals(self.frecon.fake_auditor_rtype, 'account')
self.frecon.fake_auditor_rtype = None
req = Request.blank('/recon/auditor/container',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
self.assertEquals(self.frecon.fake_auditor_rtype, 'container')
self.frecon.fake_auditor_rtype = None
req = Request.blank('/recon/auditor/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_auditor_resp)
self.assertEquals(self.frecon.fake_auditor_rtype, 'object')
self.frecon.fake_auditor_rtype = None
def test_recon_get_updater_invalid(self):
get_updater_resp = ['Invalid path: /recon/updater/invalid']
req = Request.blank('/recon/updater/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_updater_notype(self):
get_updater_resp = ['Invalid path: /recon/updater']
req = Request.blank('/recon/updater',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_updater(self):
get_updater_resp = ['{"updatertest": "1"}']
req = Request.blank('/recon/updater/container',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(self.frecon.fake_updater_rtype, 'container')
self.frecon.fake_updater_rtype = None
self.assertEquals(resp, get_updater_resp)
req = Request.blank('/recon/updater/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
self.assertEquals(self.frecon.fake_updater_rtype, 'object')
self.frecon.fake_updater_rtype = None
def test_recon_get_expirer_invalid(self):
get_updater_resp = ['Invalid path: /recon/expirer/invalid']
req = Request.blank('/recon/expirer/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_expirer_notype(self):
get_updater_resp = ['Invalid path: /recon/expirer']
req = Request.blank('/recon/expirer',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_updater_resp)
def test_recon_get_expirer_object(self):
get_expirer_resp = ['{"expirertest": "1"}']
req = Request.blank('/recon/expirer/object',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_expirer_resp)
self.assertEquals(self.frecon.fake_expirer_rtype, 'object')
self.frecon.fake_updater_rtype = None
def test_recon_get_mounted(self):
get_mounted_resp = ['{"mountedtest": "1"}']
req = Request.blank('/recon/mounted', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/mounted',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_mounted_resp)
def test_recon_get_unmounted(self):
get_unmounted_resp = ['{"unmountedtest": "1"}']
req = Request.blank('/recon/unmounted', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/unmounted',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_unmounted_resp)
def test_recon_get_diskusage(self):
get_diskusage_resp = ['{"diskusagetest": "1"}']
req = Request.blank('/recon/diskusage', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/diskusage',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_diskusage_resp)
def test_recon_get_ringmd5(self):
get_ringmd5_resp = ['{"ringmd5test": "1"}']
req = Request.blank('/recon/ringmd5', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/ringmd5',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_ringmd5_resp)
def test_recon_get_quarantined(self):
get_quarantined_resp = ['{"quarantinedtest": "1"}']
req = Request.blank('/recon/quarantined', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/quarantined',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_quarantined_resp)
def test_recon_get_sockstat(self):
get_sockstat_resp = ['{"sockstattest": "1"}']
req = Request.blank('/recon/sockstat', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/sockstat',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, get_sockstat_resp)
def test_recon_invalid_path(self):
req = Request.blank('/recon/invalid', environ={'REQUEST_METHOD': 'GET'})
req = Request.blank('/recon/invalid',
environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.assertEquals(resp, ['Invalid path: /recon/invalid'])
def test_recon_failed_json_dumps(self):
orig = self.app.get_replication_info
self.app.get_replication_info = self.frecon.raise_ValueError
req = Request.blank('/recon/replication', environ={'REQUEST_METHOD': 'GET'})
def test_no_content(self):
self.app.get_load = self.frecon.nocontent
req = Request.blank('/recon/load', environ={'REQUEST_METHOD': 'GET'})
resp = self.app(req.environ, start_response)
self.app.get_async_info = orig
self.assertEquals(resp, ['Internal server error.'])
def test_recon_pass(self):

View File

@ -96,7 +96,6 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = InternalClient()
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [])
self.assertEquals(
x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
@ -121,7 +120,9 @@ class TestObjectExpirer(TestCase):
x.logger = FakeLogger()
x.swift = InternalClient([{'name': str(int(time() + 86400))}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [])
for exccall in x.logger.log_dict['exception']:
self.assertTrue(
'This should not have been called' not in exccall[0][0])
self.assertEquals(
x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
@ -163,7 +164,9 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % int(time() + 86400)}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [])
for exccall in x.logger.log_dict['exception']:
self.assertTrue(
'This should not have been called' not in exccall[0][0])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -177,10 +180,13 @@ class TestObjectExpirer(TestCase):
[{'name': '%d-actual-obj' % ts}])
x.delete_actual_object = should_not_be_called
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
[(('Exception while deleting object %d %d-actual-obj '
'This should not have been called' % (ts, ts),), {},
'This should not have been called')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj '
'This should not have been called' % (ts, ts)])
def test_failed_delete_keeps_entry(self):
class InternalClient(object):
@ -217,10 +223,13 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
[(('Exception while deleting object %d %d-actual-obj '
'failed to delete actual object' % (ts, ts),), {},
'failed to delete actual object')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj '
'failed to delete actual object' % (ts, ts)])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -234,10 +243,13 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient([{'name': str(int(time() - 86400))}],
[{'name': '%d-actual-obj' % ts}])
x.run_once()
self.assertEquals(x.logger.log_dict['exception'],
[(('Exception while deleting object %d %d-actual-obj This should '
'not have been called' % (ts, ts),), {},
'This should not have been called')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting,
['Exception while deleting object %d %d-actual-obj This should '
'not have been called' % (ts, ts)])
def test_success_gets_counted(self):
class InternalClient(object):
@ -268,7 +280,6 @@ class TestObjectExpirer(TestCase):
[{'name': '%d-actual-obj' % int(time() - 86400)}])
x.run_once()
self.assertEquals(x.report_objects, 1)
self.assertEquals(x.logger.log_dict['exception'], [])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),
@ -317,25 +328,23 @@ class TestObjectExpirer(TestCase):
x.swift = InternalClient(containers, objects)
x.delete_actual_object = fail_delete_actual_object
x.run_once()
self.assertEquals(x.logger.log_dict['exception'], [
(('Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts, ots),), {},
'failed to delete actual object'),
(('Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts, ots),), {},
'failed to delete actual object'),
(('Exception while deleting container %d failed to delete '
'container' % (cts,),), {},
'failed to delete container'),
(('Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts + 1, ots),), {},
'failed to delete actual object'),
(('Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts + 1, ots),), {},
'failed to delete actual object'),
(('Exception while deleting container %d failed to delete '
'container' % (cts + 1,),), {},
'failed to delete container')])
excswhiledeleting = []
for exccall in x.logger.log_dict['exception']:
if exccall[0][0].startswith('Exception while deleting '):
excswhiledeleting.append(exccall[0][0])
self.assertEquals(excswhiledeleting, [
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts, ots),
'Exception while deleting container %d failed to delete '
'container' % (cts,),
'Exception while deleting object %d %d-actual-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting object %d %d-next-obj failed to '
'delete actual object' % (cts + 1, ots),
'Exception while deleting container %d failed to delete '
'container' % (cts + 1,)])
self.assertEquals(x.logger.log_dict['info'],
[(('Pass beginning; 1 possible containers; '
'2 possible objects',), {}),