Merge "Allow specification of object devices for audit"

This commit is contained in:
Jenkins 2014-03-21 06:48:49 +00:00 committed by Gerrit Code Review
commit f73c6c5012
9 changed files with 340 additions and 73 deletions

View File

@ -23,5 +23,7 @@ if __name__ == '__main__':
parser = OptionParser("%prog CONFIG [options]") parser = OptionParser("%prog CONFIG [options]")
parser.add_option('-z', '--zero_byte_fps', parser.add_option('-z', '--zero_byte_fps',
help='Audit only zero byte files at specified files/sec') help='Audit only zero byte files at specified files/sec')
parser.add_option('-d', '--devices',
help='Audit only given devices. Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True) conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectAuditor, conf_file, **options) run_daemon(ObjectAuditor, conf_file, **options)

View File

@ -1067,6 +1067,13 @@ run this command as follows:
`swift-object-auditor /path/to/object-server/config/file.conf once -z 1000` `swift-object-auditor /path/to/object-server/config/file.conf once -z 1000`
"-z" means to only check for zero-byte files at 1000 files per second. "-z" means to only check for zero-byte files at 1000 files per second.
At times it is useful to be able to run the object auditor on a specific
device or set of devices. You can run the object-auditor as follows:
swift-object-auditor /path/to/object-server/config/file.conf once --devices=sda,sdb
This will run the object auditor on only the sda and sdb devices. This param
accepts a comma separated list of values.
----------------- -----------------
Object Replicator Object Replicator
----------------- -----------------

View File

@ -488,6 +488,24 @@ class SwiftRecon(object):
(self._ptime(low), self._ptime(high), self._ptime(average)) (self._ptime(low), self._ptime(high), self._ptime(average))
print "=" * 79 print "=" * 79
def nested_get_value(self, key, recon_entry):
"""
Generator that yields all values for given key in a recon cache entry.
This is for use with object auditor recon cache entries. If the
object auditor has run in 'once' mode with a subset of devices
specified the checksum auditor section will have an entry of the form:
{'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
The same is true of the ZBF auditor cache entry section. We use this
generator to find all instances of a particular key in these multi-
level dictionaries.
"""
for k, v in recon_entry.items():
if isinstance(v, dict):
for value in self.nested_get_value(key, v):
yield value
if k == key:
yield v
def object_auditor_check(self, hosts): def object_auditor_check(self, hosts):
""" """
Obtain and print obj auditor statistics Obtain and print obj auditor statistics
@ -513,11 +531,16 @@ class SwiftRecon(object):
zbf_scan[url] = response['object_auditor_stats_ZBF'] zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0: if len(all_scan) > 0:
stats = {} stats = {}
stats[atime] = [all_scan[i][atime] for i in all_scan] stats[atime] = [(self.nested_get_value(atime, all_scan[i]))
stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan] for i in all_scan]
stats[passes] = [all_scan[i][passes] for i in all_scan] stats[bprocessed] = [(self.nested_get_value(bprocessed,
stats[errors] = [all_scan[i][errors] for i in all_scan] all_scan[i])) for i in all_scan]
stats[quarantined] = [all_scan[i][quarantined] for i in all_scan] stats[passes] = [(self.nested_get_value(passes, all_scan[i]))
for i in all_scan]
stats[errors] = [(self.nested_get_value(errors, all_scan[i]))
for i in all_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
all_scan[i])) for i in all_scan]
for k in stats: for k in stats:
if None in stats[k]: if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None] stats[k] = [x for x in stats[k] if x is not None]
@ -534,10 +557,14 @@ class SwiftRecon(object):
print "[ALL_auditor] - No hosts returned valid data." print "[ALL_auditor] - No hosts returned valid data."
if len(zbf_scan) > 0: if len(zbf_scan) > 0:
stats = {} stats = {}
stats[atime] = [zbf_scan[i][atime] for i in zbf_scan] stats[atime] = [(self.nested_get_value(atime, zbf_scan[i]))
stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan] for i in zbf_scan]
stats[errors] = [zbf_scan[i][errors] for i in zbf_scan] stats[bprocessed] = [(self.nested_get_value(bprocessed,
stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan] zbf_scan[i])) for i in zbf_scan]
stats[errors] = [(self.nested_get_value(errors, zbf_scan[i]))
for i in zbf_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
zbf_scan[i])) for i in zbf_scan]
for k in stats: for k in stats:
if None in stats[k]: if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None] stats[k] = [x for x in stats[k] if x is not None]

View File

@ -2079,6 +2079,28 @@ def human_readable(value):
return '%d%si' % (round(value), suffixes[index]) return '%d%si' % (round(value), suffixes[index])
def put_recon_cache_entry(cache_entry, key, item):
"""
Function that will check if item is a dict, and if so put it under
cache_entry[key]. We use nested recon cache entries when the object
auditor runs in 'once' mode with a specified subset of devices.
"""
if isinstance(item, dict):
if key not in cache_entry or key in cache_entry and not \
isinstance(cache_entry[key], dict):
cache_entry[key] = {}
elif key in cache_entry and item == {}:
cache_entry.pop(key, None)
return
for k, v in item.items():
if v == {}:
cache_entry[key].pop(k, None)
else:
cache_entry[key][k] = v
else:
cache_entry[key] = item
def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2): def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
"""Update recon cache values """Update recon cache values
@ -2098,7 +2120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
#file doesn't have a valid entry, we'll recreate it #file doesn't have a valid entry, we'll recreate it
pass pass
for cache_key, cache_value in cache_dict.items(): for cache_key, cache_value in cache_dict.items():
cache_entry[cache_key] = cache_value put_recon_cache_entry(cache_entry, cache_key, cache_value)
try: try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file), with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf: delete=False) as tf:

View File

@ -14,14 +14,16 @@
# limitations under the License. # limitations under the License.
import os import os
import sys
import time import time
import signal
from swift import gettext_ as _ from swift import gettext_ as _
from contextlib import closing from contextlib import closing
from eventlet import Timeout from eventlet import Timeout
from swift.obj import diskfile from swift.obj import diskfile
from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \ from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \
list_from_csv, json list_from_csv, json, listdir
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
@ -31,10 +33,10 @@ SLEEP_BETWEEN_AUDITS = 30
class AuditorWorker(object): class AuditorWorker(object):
"""Walk through file system to audit objects""" """Walk through file system to audit objects"""
def __init__(self, conf, logger, zero_byte_only_at_fps=0): def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0):
self.conf = conf self.conf = conf
self.logger = logger self.logger = logger
self.devices = conf.get('devices', '/srv/node') self.devices = devices
self.diskfile_mgr = diskfile.DiskFileManager(conf, self.logger) self.diskfile_mgr = diskfile.DiskFileManager(conf, self.logger)
self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_files_per_second = float(conf.get('files_per_second', 20))
self.max_bytes_per_second = float(conf.get('bytes_per_second', self.max_bytes_per_second = float(conf.get('bytes_per_second',
@ -53,24 +55,34 @@ class AuditorWorker(object):
self.passes = 0 self.passes = 0
self.quarantines = 0 self.quarantines = 0
self.errors = 0 self.errors = 0
self.recon_cache_path = conf.get('recon_cache_path', self.rcache = rcache
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
self.stats_sizes = sorted( self.stats_sizes = sorted(
[int(s) for s in list_from_csv(conf.get('object_size_stats'))]) [int(s) for s in list_from_csv(conf.get('object_size_stats'))])
self.stats_buckets = dict( self.stats_buckets = dict(
[(s, 0) for s in self.stats_sizes + ['OVER']]) [(s, 0) for s in self.stats_sizes + ['OVER']])
def audit_all_objects(self, mode='once'): def create_recon_nested_dict(self, top_level_key, device_list, item):
self.logger.info(_('Begin object audit "%s" mode (%s)') % if device_list:
(mode, self.auditor_type)) device_key = ''.join(sorted(device_list))
return {top_level_key: {device_key: item}}
else:
return {top_level_key: item}
def audit_all_objects(self, mode='once', device_dirs=None):
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
(mode, self.auditor_type, description))
begin = reported = time.time() begin = reported = time.time()
self.total_bytes_processed = 0 self.total_bytes_processed = 0
self.total_files_processed = 0 self.total_files_processed = 0
total_quarantines = 0 total_quarantines = 0
total_errors = 0 total_errors = 0
time_auditing = 0 time_auditing = 0
all_locs = self.diskfile_mgr.object_audit_location_generator() all_locs = self.diskfile_mgr.object_audit_location_generator(
device_dirs=device_dirs)
for location in all_locs: for location in all_locs:
loop_time = time.time() loop_time = time.time()
self.failsafe_object_audit(location) self.failsafe_object_audit(location)
@ -87,7 +99,7 @@ class AuditorWorker(object):
'files/sec: %(frate).2f , bytes/sec: %(brate).2f, ' 'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
'Total time: %(total).2f, Auditing time: %(audit).2f, ' 'Total time: %(total).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % { 'Rate: %(audit_rate).2f') % {
'type': self.auditor_type, 'type': '%s%s' % (self.auditor_type, description),
'start_time': time.ctime(reported), 'start_time': time.ctime(reported),
'passes': self.passes, 'quars': self.quarantines, 'passes': self.passes, 'quars': self.quarantines,
'errors': self.errors, 'errors': self.errors,
@ -95,15 +107,14 @@ class AuditorWorker(object):
'brate': self.bytes_processed / (now - reported), 'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing, 'total': (now - begin), 'audit': time_auditing,
'audit_rate': time_auditing / (now - begin)}) 'audit_rate': time_auditing / (now - begin)})
dump_recon_cache({'object_auditor_stats_%s' % cache_entry = self.create_recon_nested_dict(
self.auditor_type: { 'object_auditor_stats_%s' % (self.auditor_type),
'errors': self.errors, device_dirs,
'passes': self.passes, {'errors': self.errors, 'passes': self.passes,
'quarantined': self.quarantines, 'quarantined': self.quarantines,
'bytes_processed': self.bytes_processed, 'bytes_processed': self.bytes_processed,
'start_time': reported, 'start_time': reported, 'audit_time': time_auditing})
'audit_time': time_auditing}}, dump_recon_cache(cache_entry, self.rcache, self.logger)
self.rcache, self.logger)
reported = now reported = now
total_quarantines += self.quarantines total_quarantines += self.quarantines
total_errors += self.errors total_errors += self.errors
@ -120,12 +131,19 @@ class AuditorWorker(object):
'Total errors: %(errors)d, Total files/sec: %(frate).2f, ' 'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, ' 'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % { 'Rate: %(audit_rate).2f') % {
'type': self.auditor_type, 'mode': mode, 'elapsed': elapsed, 'type': '%s%s' % (self.auditor_type, description),
'mode': mode, 'elapsed': elapsed,
'quars': total_quarantines + self.quarantines, 'quars': total_quarantines + self.quarantines,
'errors': total_errors + self.errors, 'errors': total_errors + self.errors,
'frate': self.total_files_processed / elapsed, 'frate': self.total_files_processed / elapsed,
'brate': self.total_bytes_processed / elapsed, 'brate': self.total_bytes_processed / elapsed,
'audit': time_auditing, 'audit_rate': time_auditing / elapsed}) 'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
# Clear recon cache entry if device_dirs is set
if device_dirs:
cache_entry = self.create_recon_nested_dict(
'object_auditor_stats_%s' % (self.auditor_type),
device_dirs, {})
dump_recon_cache(cache_entry, self.rcache, self.logger)
if self.stats_sizes: if self.stats_sizes:
self.logger.info( self.logger.info(
_('Object audit stats: %s') % json.dumps(self.stats_buckets)) _('Object audit stats: %s') % json.dumps(self.stats_buckets))
@ -204,35 +222,100 @@ class ObjectAuditor(Daemon):
def __init__(self, conf, **options): def __init__(self, conf, **options):
self.conf = conf self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor') self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.conf_zero_byte_fps = int( self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50)) conf.get('zero_byte_files_per_second', 50))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def _sleep(self): def _sleep(self):
time.sleep(SLEEP_BETWEEN_AUDITS) time.sleep(SLEEP_BETWEEN_AUDITS)
def clear_recon_cache(self, auditor_type):
"""Clear recon cache entries"""
dump_recon_cache({'object_auditor_stats_%s' % auditor_type: {}},
self.rcache, self.logger)
def run_audit(self, **kwargs):
"""Run the object audit"""
mode = kwargs.get('mode')
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
device_dirs = kwargs.get('device_dirs')
worker = AuditorWorker(self.conf, self.logger, self.rcache,
self.devices,
zero_byte_only_at_fps=zero_byte_only_at_fps)
worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
def fork_child(self, zero_byte_fps=False, **kwargs):
"""Child execution"""
pid = os.fork()
if pid:
return pid
else:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
if zero_byte_fps:
kwargs['zero_byte_fps'] = self.conf_zero_byte_fps
self.run_audit(**kwargs)
sys.exit()
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
"""Audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
if parent:
kwargs['zero_byte_fps'] = zbo_fps
self.run_audit(**kwargs)
else:
pids = []
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid and \
len(pids) > 1:
kwargs['device_dirs'] = override_devices
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.remove(pid)
def run_forever(self, *args, **kwargs): def run_forever(self, *args, **kwargs):
"""Run the object audit until stopped.""" """Run the object audit until stopped."""
# zero byte only command line option # zero byte only command line option
zbo_fps = kwargs.get('zero_byte_fps', 0) zbo_fps = kwargs.get('zero_byte_fps', 0)
parent = False
if zbo_fps: if zbo_fps:
# only start parent # only start parent
parent = True parent = True
else:
parent = os.fork() # child gets parent = 0
kwargs = {'mode': 'forever'} kwargs = {'mode': 'forever'}
if parent:
kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps
while True: while True:
try: try:
self.run_once(**kwargs) self.audit_loop(parent, zbo_fps, **kwargs)
except (Exception, Timeout): except (Exception, Timeout):
self.logger.exception(_('ERROR auditing')) self.logger.exception(_('ERROR auditing'))
self._sleep() self._sleep()
def run_once(self, *args, **kwargs): def run_once(self, *args, **kwargs):
"""Run the object audit once.""" """Run the object audit once"""
mode = kwargs.get('mode', 'once') # zero byte only command line option
zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0) zbo_fps = kwargs.get('zero_byte_fps', 0)
worker = AuditorWorker(self.conf, self.logger, override_devices = list_from_csv(kwargs.get('devices'))
zero_byte_only_at_fps=zero_byte_only_at_fps) # Remove bogus entries and duplicates from override_devices
worker.audit_all_objects(mode=mode) override_devices = list(
set(listdir(self.devices)).intersection(set(override_devices)))
parent = False
if zbo_fps:
# only start parent
parent = True
kwargs = {'mode': 'once'}
try:
self.audit_loop(parent, zbo_fps, override_devices=override_devices,
**kwargs)
except (Exception, Timeout):
self.logger.exception(_('ERROR auditing'))

View File

@ -351,22 +351,32 @@ class AuditLocation(object):
return str(self.path) return str(self.path)
def object_audit_location_generator(devices, mount_check=True, logger=None): def object_audit_location_generator(devices, mount_check=True, logger=None,
device_dirs=None):
""" """
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
objects stored under that directory. The AuditLocation only knows the path objects stored under that directory if device_dirs isn't set. If
to the hash directory, not to the .data file therein (if any). This is to device_dirs is set, only yield AuditLocation for the objects under the
avoid a double listdir(hash_dir); the DiskFile object will always do one, entries in device_dirs. The AuditLocation only knows the path to the hash
so we don't. directory, not to the .data file therein (if any). This is to avoid a
double listdir(hash_dir); the DiskFile object will always do one, so
we don't.
:param devices: parent directory of the devices to be audited :param devices: parent directory of the devices to be audited
:param mount_check: flag to check if a mount check should be performed :param mount_check: flag to check if a mount check should be performed
on devices on devices
:param logger: a logger object :param logger: a logger object
:device_dirs: a list of directories under devices to traverse
""" """
if not device_dirs:
device_dirs = listdir(devices) device_dirs = listdir(devices)
else:
# remove bogus devices and duplicates from device_dirs
device_dirs = list(
set(listdir(devices)).intersection(set(device_dirs)))
# randomize devices in case of process restart before sweep completed # randomize devices in case of process restart before sweep completed
shuffle(device_dirs) shuffle(device_dirs)
for device in device_dirs: for device in device_dirs:
if mount_check and not \ if mount_check and not \
ismount(os.path.join(devices, device)): ismount(os.path.join(devices, device)):
@ -502,9 +512,9 @@ class DiskFileManager(object):
return DiskFile(self, dev_path, self.threadpools[device], return DiskFile(self, dev_path, self.threadpools[device],
partition, account, container, obj, **kwargs) partition, account, container, obj, **kwargs)
def object_audit_location_generator(self): def object_audit_location_generator(self, device_dirs=None):
return object_audit_location_generator(self.devices, self.mount_check, return object_audit_location_generator(self.devices, self.mount_check,
self.logger) self.logger, device_dirs)
def get_diskfile_from_audit_location(self, audit_location): def get_diskfile_from_audit_location(self, audit_location):
dev_path = self.get_dev_path(audit_location.device, mount_check=False) dev_path = self.get_dev_path(audit_location.device, mount_check=False)

View File

@ -562,6 +562,45 @@ class TestReconSuccess(TestCase):
"files_processed": 2310, "files_processed": 2310,
"quarantined": 0}}) "quarantined": 0}})
def test_get_auditor_info_object_once(self):
from_cache_response = {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}}}
self.fakecache.fakeout_calls = []
self.fakecache.fakeout = from_cache_response
rv = self.app.get_auditor_info('object')
self.assertEquals(self.fakecache.fakeout_calls,
[((['object_auditor_stats_ALL',
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
"completed": 46.181446075439453,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}}})
def test_get_unmounted(self): def test_get_unmounted(self):
unmounted_resp = [{'device': 'fakeone', 'mounted': False}, unmounted_resp = [{'device': 'fakeone', 'mounted': False},
{'device': 'faketwo', 'mounted': False}] {'device': 'faketwo', 'mounted': False}]

View File

@ -28,6 +28,7 @@ import random
import re import re
import socket import socket
import sys import sys
import json
from textwrap import dedent from textwrap import dedent
@ -486,6 +487,29 @@ class TestUtils(unittest.TestCase):
utils.sys.stdout = orig_stdout utils.sys.stdout = orig_stdout
utils.sys.stderr = orig_stderr utils.sys.stderr = orig_stderr
def test_dump_recon_cache(self):
testdir_base = mkdtemp()
testcache_file = os.path.join(testdir_base, 'cache.recon')
logger = utils.get_logger(None, 'server', log_route='server')
try:
submit_dict = {'key1': {'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
fd = open(testcache_file)
file_dict = json.loads(fd.readline())
fd.close()
self.assertEquals(submit_dict, file_dict)
# Use a nested entry
submit_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}}}
result_dict = {'key1': {'key2': {'value1': 1, 'value2': 2},
'value1': 1, 'value2': 2}}
utils.dump_recon_cache(submit_dict, testcache_file, logger)
fd = open(testcache_file)
file_dict = json.loads(fd.readline())
fd.close()
self.assertEquals(result_dict, file_dict)
finally:
rmtree(testdir_base)
def test_get_logger(self): def test_get_logger(self):
sio = StringIO() sio = StringIO()
logger = logging.getLogger('server') logger = logging.getLogger('server')

View File

@ -18,6 +18,7 @@ import unittest
import mock import mock
import os import os
import time import time
import string
from shutil import rmtree from shutil import rmtree
from hashlib import md5 from hashlib import md5
from tempfile import mkdtemp from tempfile import mkdtemp
@ -34,6 +35,7 @@ class TestAuditor(unittest.TestCase):
def setUp(self): def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor') self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node') self.devices = os.path.join(self.testdir, 'node')
self.rcache = os.path.join(self.testdir, 'object.recon')
self.logger = FakeLogger() self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1) rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda')) mkdirs(os.path.join(self.devices, 'sda'))
@ -60,7 +62,8 @@ class TestAuditor(unittest.TestCase):
unit.xattr_data = {} unit.xattr_data = {}
def test_object_audit_extra_data(self): def test_object_audit_extra_data(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
with self.disk_file.create() as writer: with self.disk_file.create() as writer:
@ -86,7 +89,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_diff_data(self): def test_object_audit_diff_data(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
data = '0' * 1024 data = '0' * 1024
etag = md5() etag = md5()
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
@ -129,7 +133,8 @@ class TestAuditor(unittest.TestCase):
fp.write('0' * 1024) fp.write('0' * 1024)
fp.close() fp.close()
invalidate_hash(os.path.dirname(self.disk_file._datadir)) invalidate_hash(os.path.dirname(self.disk_file._datadir))
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
pre_quarantines = auditor_worker.quarantines pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit( auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0')) AuditLocation(self.disk_file._datadir, 'sda', '0'))
@ -141,7 +146,8 @@ class TestAuditor(unittest.TestCase):
mkdirs(self.disk_file._datadir) mkdirs(self.disk_file._datadir)
with open(path, 'w') as f: with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'}) write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args): def blowup(*args):
raise NameError('tpyo') raise NameError('tpyo')
@ -156,7 +162,8 @@ class TestAuditor(unittest.TestCase):
mkdirs(self.disk_file._datadir) mkdirs(self.disk_file._datadir)
with open(path, 'w') as f: with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'}) write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args): def blowup(*args):
raise NameError('tpyo') raise NameError('tpyo')
@ -166,7 +173,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.errors, 1) self.assertEquals(auditor_worker.errors, 1)
def test_generic_exception_handling(self): def test_generic_exception_handling(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_errors = auditor_worker.errors pre_errors = auditor_worker.errors
data = '0' * 1024 data = '0' * 1024
@ -186,7 +194,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.errors, pre_errors + 1) self.assertEquals(auditor_worker.errors, pre_errors + 1)
def test_object_run_once_pass(self): def test_object_run_once_pass(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
auditor_worker.log_time = 0 auditor_worker.log_time = 0
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines pre_quarantines = auditor_worker.quarantines
@ -208,7 +217,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.stats_buckets[10240], 0) self.assertEquals(auditor_worker.stats_buckets[10240], 0)
def test_object_run_once_no_sda(self): def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines pre_quarantines = auditor_worker.quarantines
data = '0' * 1024 data = '0' * 1024
@ -228,7 +238,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_run_once_multi_devices(self): def test_object_run_once_multi_devices(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger) auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time())) timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines pre_quarantines = auditor_worker.quarantines
data = '0' * 10 data = '0' * 10
@ -284,9 +295,12 @@ class TestAuditor(unittest.TestCase):
quarantine_path = os.path.join(self.devices, quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects') 'sda', 'quarantined', 'objects')
self.auditor.run_once(zero_byte_fps=50) kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path)) self.assertFalse(os.path.isdir(quarantine_path))
self.auditor.run_once() del(kwargs['zero_byte_fps'])
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path)) self.assertTrue(os.path.isdir(quarantine_path))
def setup_bad_zero_byte(self, with_ts=False): def setup_bad_zero_byte(self, with_ts=False):
@ -322,14 +336,17 @@ class TestAuditor(unittest.TestCase):
def test_object_run_fast_track_all(self): def test_object_run_fast_track_all(self):
self.setup_bad_zero_byte() self.setup_bad_zero_byte()
self.auditor.run_once() kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices, quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects') 'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path)) self.assertTrue(os.path.isdir(quarantine_path))
def test_object_run_fast_track_zero(self): def test_object_run_fast_track_zero(self):
self.setup_bad_zero_byte() self.setup_bad_zero_byte()
self.auditor.run_once(zero_byte_fps=50) kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices, quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects') 'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path)) self.assertTrue(os.path.isdir(quarantine_path))
@ -347,7 +364,9 @@ class TestAuditor(unittest.TestCase):
was_df = auditor.diskfile.DiskFile was_df = auditor.diskfile.DiskFile
try: try:
auditor.diskfile.DiskFile = FakeFile auditor.diskfile.DiskFile = FakeFile
self.auditor.run_once(zero_byte_fps=50) kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices, quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects') 'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path)) self.assertTrue(os.path.isdir(quarantine_path))
@ -358,7 +377,8 @@ class TestAuditor(unittest.TestCase):
def test_with_tombstone(self): def test_with_tombstone(self):
ts_file_path = self.setup_bad_zero_byte(with_ts=True) ts_file_path = self.setup_bad_zero_byte(with_ts=True)
self.assertTrue(ts_file_path.endswith('ts')) self.assertTrue(ts_file_path.endswith('ts'))
self.auditor.run_once() kwargs = {'mode': 'once'}
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.exists(ts_file_path)) self.assertTrue(os.path.exists(ts_file_path))
def test_sleeper(self): def test_sleeper(self):
@ -370,7 +390,7 @@ class TestAuditor(unittest.TestCase):
self.assert_(delta_t > 0.08) self.assert_(delta_t > 0.08)
self.assert_(delta_t < 0.12) self.assert_(delta_t < 0.12)
def test_run_forever(self): def test_run_audit(self):
class StopForever(Exception): class StopForever(Exception):
pass pass
@ -378,45 +398,78 @@ class TestAuditor(unittest.TestCase):
class ObjectAuditorMock(object): class ObjectAuditorMock(object):
check_args = () check_args = ()
check_kwargs = {} check_kwargs = {}
check_device_dir = None
fork_called = 0 fork_called = 0
fork_res = 0 master = 0
wait_called = 0
def mock_run(self, *args, **kwargs): def mock_run(self, *args, **kwargs):
self.check_args = args self.check_args = args
self.check_kwargs = kwargs self.check_kwargs = kwargs
if 'zero_byte_fps' in kwargs:
self.check_device_dir = kwargs.get('device_dirs')
def mock_sleep(self): def mock_sleep(self):
raise StopForever('stop') raise StopForever('stop')
def mock_fork(self): def mock_fork(self):
self.fork_called += 1 self.fork_called += 1
return self.fork_res if self.master:
return self.fork_called
else:
return 0
def mock_wait(self):
self.wait_called += 1
return (self.wait_called, 0)
for i in string.ascii_letters[2:26]:
mkdirs(os.path.join(self.devices, 'sd%s' % i))
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices, my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false', mount_check='false',
zero_byte_files_per_second=89)) zero_byte_files_per_second=89))
mocker = ObjectAuditorMock() mocker = ObjectAuditorMock()
my_auditor.run_once = mocker.mock_run my_auditor.run_audit = mocker.mock_run
my_auditor._sleep = mocker.mock_sleep my_auditor._sleep = mocker.mock_sleep
was_fork = os.fork was_fork = os.fork
was_wait = os.wait
try: try:
os.fork = mocker.mock_fork os.fork = mocker.mock_fork
os.wait = mocker.mock_wait
self.assertRaises(StopForever, self.assertRaises(StopForever,
my_auditor.run_forever, zero_byte_fps=50) my_auditor.run_forever, zero_byte_fps=50)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50) self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50)
self.assertEquals(mocker.fork_called, 0) self.assertEquals(mocker.fork_called, 0)
self.assertRaises(StopForever, my_auditor.run_forever) self.assertRaises(SystemExit, my_auditor.run_forever)
self.assertEquals(mocker.fork_called, 1) self.assertEquals(mocker.fork_called, 1)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEquals(mocker.check_device_dir, None)
self.assertEquals(mocker.check_args, ()) self.assertEquals(mocker.check_args, ())
mocker.fork_res = 1 device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
self.assertRaises(StopForever, my_auditor.run_forever) device_string = ','.join(device_list)
self.assertEquals(mocker.fork_called, 2) device_string_bogus = device_string + ',bogus'
mocker.fork_called = 0
self.assertRaises(SystemExit, my_auditor.run_once,
devices=device_string_bogus)
self.assertEquals(mocker.fork_called, 1)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89) self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
self.assertEquals(sorted(mocker.check_device_dir), device_list)
mocker.master = 1
mocker.fork_called = 0
self.assertRaises(StopForever, my_auditor.run_forever)
# Fork is called 3 times since the zbf process is forked twice
self.assertEquals(mocker.fork_called, 3)
self.assertEquals(mocker.wait_called, 3)
finally: finally:
os.fork = was_fork os.fork = was_fork
os.wait = was_wait
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()