From 793489b80d212ef53748c8dc1ac0acb8bdfd628e Mon Sep 17 00:00:00 2001 From: Eamonn O'Toole Date: Mon, 24 Feb 2014 11:24:56 +0000 Subject: [PATCH] Allow specification of object devices for audit In object audit "once" mode we are allowing the user to specify a sub-set of devices to audit using the "--devices" command-line option. The sub-set is specified as a comma-separated list. This patch is taken from a larger patch to enable parallel processing in the object auditor. We've had to modify recon so that it will work properly with this change to "once" mode. We've modified dump_recon_cache() so that it will store nested dictionaries, in other words it will store a recon cache entry such as {'key1': {'key2': {...}}}. When the object auditor is run in "once" mode with "--devices" set the object_auditor_stats_ALL and ZBF entries look like: {'object_auditor_stats_ALL': {'disk1disk2..diskn': {...}}}. When swift-recon is run, it hunts through the nested dicts to find the appropriate entries. The object auditor recon cache entries are set to {} at the beginning of each audit cycle, and individual disk entries are cleared from cache at the end of each disk's audit cycle. DocImpact Change-Id: Icc53dac0a8136f1b2f61d5e08baf7b4fd87c8123 --- bin/swift-object-auditor | 2 + doc/source/admin_guide.rst | 7 ++ swift/cli/recon.py | 45 +++++-- swift/common/utils.py | 24 +++- swift/obj/auditor.py | 147 +++++++++++++++++----- swift/obj/diskfile.py | 26 ++-- test/unit/common/middleware/test_recon.py | 39 ++++++ test/unit/common/test_utils.py | 24 ++++ test/unit/obj/test_auditor.py | 99 +++++++++++---- 9 files changed, 340 insertions(+), 73 deletions(-) diff --git a/bin/swift-object-auditor b/bin/swift-object-auditor index 356fa9af52..55d0054b77 100755 --- a/bin/swift-object-auditor +++ b/bin/swift-object-auditor @@ -23,5 +23,7 @@ if __name__ == '__main__': parser = OptionParser("%prog CONFIG [options]") parser.add_option('-z', '--zero_byte_fps', help='Audit only zero byte files at specified files/sec') + parser.add_option('-d', '--devices', + help='Audit only given devices. Comma-separated list') conf_file, options = parse_options(parser=parser, once=True) run_daemon(ObjectAuditor, conf_file, **options) diff --git a/doc/source/admin_guide.rst b/doc/source/admin_guide.rst index e5a641f003..5ea49f2180 100644 --- a/doc/source/admin_guide.rst +++ b/doc/source/admin_guide.rst @@ -1067,6 +1067,13 @@ run this command as follows: `swift-object-auditor /path/to/object-server/config/file.conf once -z 1000` "-z" means to only check for zero-byte files at 1000 files per second. +At times it is useful to be able to run the object auditor on a specific +device or set of devices. You can run the object-auditor as follows: +swift-object-auditor /path/to/object-server/config/file.conf once --devices=sda,sdb + +This will run the object auditor on only the sda and sdb devices. This param +accepts a comma separated list of values. + ----------------- Object Replicator ----------------- diff --git a/swift/cli/recon.py b/swift/cli/recon.py index ba14489084..c91baf8e79 100755 --- a/swift/cli/recon.py +++ b/swift/cli/recon.py @@ -488,6 +488,24 @@ class SwiftRecon(object): (self._ptime(low), self._ptime(high), self._ptime(average)) print "=" * 79 + def nested_get_value(self, key, recon_entry): + """ + Generator that yields all values for given key in a recon cache entry. + This is for use with object auditor recon cache entries. If the + object auditor has run in 'once' mode with a subset of devices + specified the checksum auditor section will have an entry of the form: + {'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}} + The same is true of the ZBF auditor cache entry section. We use this + generator to find all instances of a particular key in these multi- + level dictionaries. + """ + for k, v in recon_entry.items(): + if isinstance(v, dict): + for value in self.nested_get_value(key, v): + yield value + if k == key: + yield v + def object_auditor_check(self, hosts): """ Obtain and print obj auditor statistics @@ -513,11 +531,16 @@ class SwiftRecon(object): zbf_scan[url] = response['object_auditor_stats_ZBF'] if len(all_scan) > 0: stats = {} - stats[atime] = [all_scan[i][atime] for i in all_scan] - stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan] - stats[passes] = [all_scan[i][passes] for i in all_scan] - stats[errors] = [all_scan[i][errors] for i in all_scan] - stats[quarantined] = [all_scan[i][quarantined] for i in all_scan] + stats[atime] = [(self.nested_get_value(atime, all_scan[i])) + for i in all_scan] + stats[bprocessed] = [(self.nested_get_value(bprocessed, + all_scan[i])) for i in all_scan] + stats[passes] = [(self.nested_get_value(passes, all_scan[i])) + for i in all_scan] + stats[errors] = [(self.nested_get_value(errors, all_scan[i])) + for i in all_scan] + stats[quarantined] = [(self.nested_get_value(quarantined, + all_scan[i])) for i in all_scan] for k in stats: if None in stats[k]: stats[k] = [x for x in stats[k] if x is not None] @@ -534,10 +557,14 @@ class SwiftRecon(object): print "[ALL_auditor] - No hosts returned valid data." if len(zbf_scan) > 0: stats = {} - stats[atime] = [zbf_scan[i][atime] for i in zbf_scan] - stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan] - stats[errors] = [zbf_scan[i][errors] for i in zbf_scan] - stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan] + stats[atime] = [(self.nested_get_value(atime, zbf_scan[i])) + for i in zbf_scan] + stats[bprocessed] = [(self.nested_get_value(bprocessed, + zbf_scan[i])) for i in zbf_scan] + stats[errors] = [(self.nested_get_value(errors, zbf_scan[i])) + for i in zbf_scan] + stats[quarantined] = [(self.nested_get_value(quarantined, + zbf_scan[i])) for i in zbf_scan] for k in stats: if None in stats[k]: stats[k] = [x for x in stats[k] if x is not None] diff --git a/swift/common/utils.py b/swift/common/utils.py index abebc6515c..be76ddb6c1 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2079,6 +2079,28 @@ def human_readable(value): return '%d%si' % (round(value), suffixes[index]) +def put_recon_cache_entry(cache_entry, key, item): + """ + Function that will check if item is a dict, and if so put it under + cache_entry[key]. We use nested recon cache entries when the object + auditor runs in 'once' mode with a specified subset of devices. + """ + if isinstance(item, dict): + if key not in cache_entry or key in cache_entry and not \ + isinstance(cache_entry[key], dict): + cache_entry[key] = {} + elif key in cache_entry and item == {}: + cache_entry.pop(key, None) + return + for k, v in item.items(): + if v == {}: + cache_entry[key].pop(k, None) + else: + cache_entry[key][k] = v + else: + cache_entry[key] = item + + def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2): """Update recon cache values @@ -2098,7 +2120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2): #file doesn't have a valid entry, we'll recreate it pass for cache_key, cache_value in cache_dict.items(): - cache_entry[cache_key] = cache_value + put_recon_cache_entry(cache_entry, cache_key, cache_value) try: with NamedTemporaryFile(dir=os.path.dirname(cache_file), delete=False) as tf: diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index d22fd5f42a..c5b708acce 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -14,14 +14,16 @@ # limitations under the License. import os +import sys import time +import signal from swift import gettext_ as _ from contextlib import closing from eventlet import Timeout from swift.obj import diskfile from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \ - list_from_csv, json + list_from_csv, json, listdir from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist from swift.common.daemon import Daemon @@ -31,10 +33,10 @@ SLEEP_BETWEEN_AUDITS = 30 class AuditorWorker(object): """Walk through file system to audit objects""" - def __init__(self, conf, logger, zero_byte_only_at_fps=0): + def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0): self.conf = conf self.logger = logger - self.devices = conf.get('devices', '/srv/node') + self.devices = devices self.diskfile_mgr = diskfile.DiskFileManager(conf, self.logger) self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_bytes_per_second = float(conf.get('bytes_per_second', @@ -53,24 +55,34 @@ class AuditorWorker(object): self.passes = 0 self.quarantines = 0 self.errors = 0 - self.recon_cache_path = conf.get('recon_cache_path', - '/var/cache/swift') - self.rcache = os.path.join(self.recon_cache_path, "object.recon") + self.rcache = rcache self.stats_sizes = sorted( [int(s) for s in list_from_csv(conf.get('object_size_stats'))]) self.stats_buckets = dict( [(s, 0) for s in self.stats_sizes + ['OVER']]) - def audit_all_objects(self, mode='once'): - self.logger.info(_('Begin object audit "%s" mode (%s)') % - (mode, self.auditor_type)) + def create_recon_nested_dict(self, top_level_key, device_list, item): + if device_list: + device_key = ''.join(sorted(device_list)) + return {top_level_key: {device_key: item}} + else: + return {top_level_key: item} + + def audit_all_objects(self, mode='once', device_dirs=None): + description = '' + if device_dirs: + device_dir_str = ','.join(sorted(device_dirs)) + description = _(' - %s') % device_dir_str + self.logger.info(_('Begin object audit "%s" mode (%s%s)') % + (mode, self.auditor_type, description)) begin = reported = time.time() self.total_bytes_processed = 0 self.total_files_processed = 0 total_quarantines = 0 total_errors = 0 time_auditing = 0 - all_locs = self.diskfile_mgr.object_audit_location_generator() + all_locs = self.diskfile_mgr.object_audit_location_generator( + device_dirs=device_dirs) for location in all_locs: loop_time = time.time() self.failsafe_object_audit(location) @@ -87,7 +99,7 @@ class AuditorWorker(object): 'files/sec: %(frate).2f , bytes/sec: %(brate).2f, ' 'Total time: %(total).2f, Auditing time: %(audit).2f, ' 'Rate: %(audit_rate).2f') % { - 'type': self.auditor_type, + 'type': '%s%s' % (self.auditor_type, description), 'start_time': time.ctime(reported), 'passes': self.passes, 'quars': self.quarantines, 'errors': self.errors, @@ -95,15 +107,14 @@ class AuditorWorker(object): 'brate': self.bytes_processed / (now - reported), 'total': (now - begin), 'audit': time_auditing, 'audit_rate': time_auditing / (now - begin)}) - dump_recon_cache({'object_auditor_stats_%s' % - self.auditor_type: { - 'errors': self.errors, - 'passes': self.passes, - 'quarantined': self.quarantines, - 'bytes_processed': self.bytes_processed, - 'start_time': reported, - 'audit_time': time_auditing}}, - self.rcache, self.logger) + cache_entry = self.create_recon_nested_dict( + 'object_auditor_stats_%s' % (self.auditor_type), + device_dirs, + {'errors': self.errors, 'passes': self.passes, + 'quarantined': self.quarantines, + 'bytes_processed': self.bytes_processed, + 'start_time': reported, 'audit_time': time_auditing}) + dump_recon_cache(cache_entry, self.rcache, self.logger) reported = now total_quarantines += self.quarantines total_errors += self.errors @@ -120,12 +131,19 @@ class AuditorWorker(object): 'Total errors: %(errors)d, Total files/sec: %(frate).2f, ' 'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, ' 'Rate: %(audit_rate).2f') % { - 'type': self.auditor_type, 'mode': mode, 'elapsed': elapsed, + 'type': '%s%s' % (self.auditor_type, description), + 'mode': mode, 'elapsed': elapsed, 'quars': total_quarantines + self.quarantines, 'errors': total_errors + self.errors, 'frate': self.total_files_processed / elapsed, 'brate': self.total_bytes_processed / elapsed, 'audit': time_auditing, 'audit_rate': time_auditing / elapsed}) + # Clear recon cache entry if device_dirs is set + if device_dirs: + cache_entry = self.create_recon_nested_dict( + 'object_auditor_stats_%s' % (self.auditor_type), + device_dirs, {}) + dump_recon_cache(cache_entry, self.rcache, self.logger) if self.stats_sizes: self.logger.info( _('Object audit stats: %s') % json.dumps(self.stats_buckets)) @@ -204,35 +222,100 @@ class ObjectAuditor(Daemon): def __init__(self, conf, **options): self.conf = conf self.logger = get_logger(conf, log_route='object-auditor') + self.devices = conf.get('devices', '/srv/node') self.conf_zero_byte_fps = int( conf.get('zero_byte_files_per_second', 50)) + self.recon_cache_path = conf.get('recon_cache_path', + '/var/cache/swift') + self.rcache = os.path.join(self.recon_cache_path, "object.recon") def _sleep(self): time.sleep(SLEEP_BETWEEN_AUDITS) + def clear_recon_cache(self, auditor_type): + """Clear recon cache entries""" + dump_recon_cache({'object_auditor_stats_%s' % auditor_type: {}}, + self.rcache, self.logger) + + def run_audit(self, **kwargs): + """Run the object audit""" + mode = kwargs.get('mode') + zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0) + device_dirs = kwargs.get('device_dirs') + worker = AuditorWorker(self.conf, self.logger, self.rcache, + self.devices, + zero_byte_only_at_fps=zero_byte_only_at_fps) + worker.audit_all_objects(mode=mode, device_dirs=device_dirs) + + def fork_child(self, zero_byte_fps=False, **kwargs): + """Child execution""" + pid = os.fork() + if pid: + return pid + else: + signal.signal(signal.SIGTERM, signal.SIG_DFL) + if zero_byte_fps: + kwargs['zero_byte_fps'] = self.conf_zero_byte_fps + self.run_audit(**kwargs) + sys.exit() + + def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs): + """Audit loop""" + self.clear_recon_cache('ALL') + self.clear_recon_cache('ZBF') + kwargs['device_dirs'] = override_devices + if parent: + kwargs['zero_byte_fps'] = zbo_fps + self.run_audit(**kwargs) + else: + pids = [] + if self.conf_zero_byte_fps: + zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs) + pids.append(zbf_pid) + pids.append(self.fork_child(**kwargs)) + while pids: + pid = os.wait()[0] + # ZBF scanner must be restarted as soon as it finishes + if self.conf_zero_byte_fps and pid == zbf_pid and \ + len(pids) > 1: + kwargs['device_dirs'] = override_devices + zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs) + pids.append(zbf_pid) + pids.remove(pid) + def run_forever(self, *args, **kwargs): """Run the object audit until stopped.""" # zero byte only command line option zbo_fps = kwargs.get('zero_byte_fps', 0) + parent = False if zbo_fps: # only start parent parent = True - else: - parent = os.fork() # child gets parent = 0 kwargs = {'mode': 'forever'} - if parent: - kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps + while True: try: - self.run_once(**kwargs) + self.audit_loop(parent, zbo_fps, **kwargs) except (Exception, Timeout): self.logger.exception(_('ERROR auditing')) self._sleep() def run_once(self, *args, **kwargs): - """Run the object audit once.""" - mode = kwargs.get('mode', 'once') - zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0) - worker = AuditorWorker(self.conf, self.logger, - zero_byte_only_at_fps=zero_byte_only_at_fps) - worker.audit_all_objects(mode=mode) + """Run the object audit once""" + # zero byte only command line option + zbo_fps = kwargs.get('zero_byte_fps', 0) + override_devices = list_from_csv(kwargs.get('devices')) + # Remove bogus entries and duplicates from override_devices + override_devices = list( + set(listdir(self.devices)).intersection(set(override_devices))) + parent = False + if zbo_fps: + # only start parent + parent = True + kwargs = {'mode': 'once'} + + try: + self.audit_loop(parent, zbo_fps, override_devices=override_devices, + **kwargs) + except (Exception, Timeout): + self.logger.exception(_('ERROR auditing')) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 7c0a9483a1..1e74815570 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -351,22 +351,32 @@ class AuditLocation(object): return str(self.path) -def object_audit_location_generator(devices, mount_check=True, logger=None): +def object_audit_location_generator(devices, mount_check=True, logger=None, + device_dirs=None): """ Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all - objects stored under that directory. The AuditLocation only knows the path - to the hash directory, not to the .data file therein (if any). This is to - avoid a double listdir(hash_dir); the DiskFile object will always do one, - so we don't. + objects stored under that directory if device_dirs isn't set. If + device_dirs is set, only yield AuditLocation for the objects under the + entries in device_dirs. The AuditLocation only knows the path to the hash + directory, not to the .data file therein (if any). This is to avoid a + double listdir(hash_dir); the DiskFile object will always do one, so + we don't. :param devices: parent directory of the devices to be audited :param mount_check: flag to check if a mount check should be performed on devices :param logger: a logger object + :device_dirs: a list of directories under devices to traverse """ - device_dirs = listdir(devices) + if not device_dirs: + device_dirs = listdir(devices) + else: + # remove bogus devices and duplicates from device_dirs + device_dirs = list( + set(listdir(devices)).intersection(set(device_dirs))) # randomize devices in case of process restart before sweep completed shuffle(device_dirs) + for device in device_dirs: if mount_check and not \ ismount(os.path.join(devices, device)): @@ -502,9 +512,9 @@ class DiskFileManager(object): return DiskFile(self, dev_path, self.threadpools[device], partition, account, container, obj, **kwargs) - def object_audit_location_generator(self): + def object_audit_location_generator(self, device_dirs=None): return object_audit_location_generator(self.devices, self.mount_check, - self.logger) + self.logger, device_dirs) def get_diskfile_from_audit_location(self, audit_location): dev_path = self.get_dev_path(audit_location.device, mount_check=False) diff --git a/test/unit/common/middleware/test_recon.py b/test/unit/common/middleware/test_recon.py index 5cdd2edf40..b94bd825e1 100644 --- a/test/unit/common/middleware/test_recon.py +++ b/test/unit/common/middleware/test_recon.py @@ -562,6 +562,45 @@ class TestReconSuccess(TestCase): "files_processed": 2310, "quarantined": 0}}) + def test_get_auditor_info_object_once(self): + from_cache_response = { + "object_auditor_stats_ALL": {'disk1disk2': { + "audit_time": 115.14418768882751, + "bytes_processed": 234660, + "completed": 115.4512460231781, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}}, + "object_auditor_stats_ZBF": {'disk1disk2': { + "audit_time": 45.877294063568115, + "bytes_processed": 0, + "completed": 46.181446075439453, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}}} + self.fakecache.fakeout_calls = [] + self.fakecache.fakeout = from_cache_response + rv = self.app.get_auditor_info('object') + self.assertEquals(self.fakecache.fakeout_calls, + [((['object_auditor_stats_ALL', + 'object_auditor_stats_ZBF'], + '/var/cache/swift/object.recon'), {})]) + self.assertEquals(rv, { + "object_auditor_stats_ALL": {'disk1disk2': { + "audit_time": 115.14418768882751, + "bytes_processed": 234660, + "completed": 115.4512460231781, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}}, + "object_auditor_stats_ZBF": {'disk1disk2': { + "audit_time": 45.877294063568115, + "bytes_processed": 0, + "completed": 46.181446075439453, + "errors": 0, + "files_processed": 2310, + "quarantined": 0}}}) + def test_get_unmounted(self): unmounted_resp = [{'device': 'fakeone', 'mounted': False}, {'device': 'faketwo', 'mounted': False}] diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 103ee87739..4455a5f727 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -28,6 +28,7 @@ import random import re import socket import sys +import json from textwrap import dedent @@ -486,6 +487,29 @@ class TestUtils(unittest.TestCase): utils.sys.stdout = orig_stdout utils.sys.stderr = orig_stderr + def test_dump_recon_cache(self): + testdir_base = mkdtemp() + testcache_file = os.path.join(testdir_base, 'cache.recon') + logger = utils.get_logger(None, 'server', log_route='server') + try: + submit_dict = {'key1': {'value1': 1, 'value2': 2}} + utils.dump_recon_cache(submit_dict, testcache_file, logger) + fd = open(testcache_file) + file_dict = json.loads(fd.readline()) + fd.close() + self.assertEquals(submit_dict, file_dict) + # Use a nested entry + submit_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}}} + result_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}, + 'value1': 1, 'value2': 2}} + utils.dump_recon_cache(submit_dict, testcache_file, logger) + fd = open(testcache_file) + file_dict = json.loads(fd.readline()) + fd.close() + self.assertEquals(result_dict, file_dict) + finally: + rmtree(testdir_base) + def test_get_logger(self): sio = StringIO() logger = logging.getLogger('server') diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index c6788c163d..40e3f3c111 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -18,6 +18,7 @@ import unittest import mock import os import time +import string from shutil import rmtree from hashlib import md5 from tempfile import mkdtemp @@ -34,6 +35,7 @@ class TestAuditor(unittest.TestCase): def setUp(self): self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor') self.devices = os.path.join(self.testdir, 'node') + self.rcache = os.path.join(self.testdir, 'object.recon') self.logger = FakeLogger() rmtree(self.testdir, ignore_errors=1) mkdirs(os.path.join(self.devices, 'sda')) @@ -60,7 +62,8 @@ class TestAuditor(unittest.TestCase): unit.xattr_data = {} def test_object_audit_extra_data(self): - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) data = '0' * 1024 etag = md5() with self.disk_file.create() as writer: @@ -86,7 +89,8 @@ class TestAuditor(unittest.TestCase): self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) def test_object_audit_diff_data(self): - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) data = '0' * 1024 etag = md5() timestamp = str(normalize_timestamp(time.time())) @@ -129,7 +133,8 @@ class TestAuditor(unittest.TestCase): fp.write('0' * 1024) fp.close() invalidate_hash(os.path.dirname(self.disk_file._datadir)) - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) pre_quarantines = auditor_worker.quarantines auditor_worker.object_audit( AuditLocation(self.disk_file._datadir, 'sda', '0')) @@ -141,7 +146,8 @@ class TestAuditor(unittest.TestCase): mkdirs(self.disk_file._datadir) with open(path, 'w') as f: write_metadata(f, {'name': '/a/c/o'}) - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) def blowup(*args): raise NameError('tpyo') @@ -156,7 +162,8 @@ class TestAuditor(unittest.TestCase): mkdirs(self.disk_file._datadir) with open(path, 'w') as f: write_metadata(f, {'name': '/a/c/o'}) - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) def blowup(*args): raise NameError('tpyo') @@ -166,7 +173,8 @@ class TestAuditor(unittest.TestCase): self.assertEquals(auditor_worker.errors, 1) def test_generic_exception_handling(self): - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) timestamp = str(normalize_timestamp(time.time())) pre_errors = auditor_worker.errors data = '0' * 1024 @@ -186,7 +194,8 @@ class TestAuditor(unittest.TestCase): self.assertEquals(auditor_worker.errors, pre_errors + 1) def test_object_run_once_pass(self): - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) auditor_worker.log_time = 0 timestamp = str(normalize_timestamp(time.time())) pre_quarantines = auditor_worker.quarantines @@ -208,7 +217,8 @@ class TestAuditor(unittest.TestCase): self.assertEquals(auditor_worker.stats_buckets[10240], 0) def test_object_run_once_no_sda(self): - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) timestamp = str(normalize_timestamp(time.time())) pre_quarantines = auditor_worker.quarantines data = '0' * 1024 @@ -228,7 +238,8 @@ class TestAuditor(unittest.TestCase): self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) def test_object_run_once_multi_devices(self): - auditor_worker = auditor.AuditorWorker(self.conf, self.logger) + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) timestamp = str(normalize_timestamp(time.time())) pre_quarantines = auditor_worker.quarantines data = '0' * 10 @@ -284,9 +295,12 @@ class TestAuditor(unittest.TestCase): quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') - self.auditor.run_once(zero_byte_fps=50) + kwargs = {'mode': 'once'} + kwargs['zero_byte_fps'] = 50 + self.auditor.run_audit(**kwargs) self.assertFalse(os.path.isdir(quarantine_path)) - self.auditor.run_once() + del(kwargs['zero_byte_fps']) + self.auditor.run_audit(**kwargs) self.assertTrue(os.path.isdir(quarantine_path)) def setup_bad_zero_byte(self, with_ts=False): @@ -322,14 +336,17 @@ class TestAuditor(unittest.TestCase): def test_object_run_fast_track_all(self): self.setup_bad_zero_byte() - self.auditor.run_once() + kwargs = {'mode': 'once'} + self.auditor.run_audit(**kwargs) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') self.assertTrue(os.path.isdir(quarantine_path)) def test_object_run_fast_track_zero(self): self.setup_bad_zero_byte() - self.auditor.run_once(zero_byte_fps=50) + kwargs = {'mode': 'once'} + kwargs['zero_byte_fps'] = 50 + self.auditor.run_audit(**kwargs) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') self.assertTrue(os.path.isdir(quarantine_path)) @@ -347,7 +364,9 @@ class TestAuditor(unittest.TestCase): was_df = auditor.diskfile.DiskFile try: auditor.diskfile.DiskFile = FakeFile - self.auditor.run_once(zero_byte_fps=50) + kwargs = {'mode': 'once'} + kwargs['zero_byte_fps'] = 50 + self.auditor.run_audit(**kwargs) quarantine_path = os.path.join(self.devices, 'sda', 'quarantined', 'objects') self.assertTrue(os.path.isdir(quarantine_path)) @@ -358,7 +377,8 @@ class TestAuditor(unittest.TestCase): def test_with_tombstone(self): ts_file_path = self.setup_bad_zero_byte(with_ts=True) self.assertTrue(ts_file_path.endswith('ts')) - self.auditor.run_once() + kwargs = {'mode': 'once'} + self.auditor.run_audit(**kwargs) self.assertTrue(os.path.exists(ts_file_path)) def test_sleeper(self): @@ -370,7 +390,7 @@ class TestAuditor(unittest.TestCase): self.assert_(delta_t > 0.08) self.assert_(delta_t < 0.12) - def test_run_forever(self): + def test_run_audit(self): class StopForever(Exception): pass @@ -378,45 +398,78 @@ class TestAuditor(unittest.TestCase): class ObjectAuditorMock(object): check_args = () check_kwargs = {} + check_device_dir = None fork_called = 0 - fork_res = 0 + master = 0 + wait_called = 0 def mock_run(self, *args, **kwargs): self.check_args = args self.check_kwargs = kwargs + if 'zero_byte_fps' in kwargs: + self.check_device_dir = kwargs.get('device_dirs') def mock_sleep(self): raise StopForever('stop') def mock_fork(self): self.fork_called += 1 - return self.fork_res + if self.master: + return self.fork_called + else: + return 0 + + def mock_wait(self): + self.wait_called += 1 + return (self.wait_called, 0) + + for i in string.ascii_letters[2:26]: + mkdirs(os.path.join(self.devices, 'sd%s' % i)) my_auditor = auditor.ObjectAuditor(dict(devices=self.devices, mount_check='false', zero_byte_files_per_second=89)) mocker = ObjectAuditorMock() - my_auditor.run_once = mocker.mock_run + my_auditor.run_audit = mocker.mock_run my_auditor._sleep = mocker.mock_sleep was_fork = os.fork + was_wait = os.wait try: os.fork = mocker.mock_fork + os.wait = mocker.mock_wait self.assertRaises(StopForever, my_auditor.run_forever, zero_byte_fps=50) self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50) self.assertEquals(mocker.fork_called, 0) - self.assertRaises(StopForever, my_auditor.run_forever) + self.assertRaises(SystemExit, my_auditor.run_forever) self.assertEquals(mocker.fork_called, 1) + self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89) + self.assertEquals(mocker.check_device_dir, None) self.assertEquals(mocker.check_args, ()) - mocker.fork_res = 1 - self.assertRaises(StopForever, my_auditor.run_forever) - self.assertEquals(mocker.fork_called, 2) + device_list = ['sd%s' % i for i in string.ascii_letters[2:10]] + device_string = ','.join(device_list) + device_string_bogus = device_string + ',bogus' + + mocker.fork_called = 0 + self.assertRaises(SystemExit, my_auditor.run_once, + devices=device_string_bogus) + self.assertEquals(mocker.fork_called, 1) self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89) + self.assertEquals(sorted(mocker.check_device_dir), device_list) + + mocker.master = 1 + + mocker.fork_called = 0 + self.assertRaises(StopForever, my_auditor.run_forever) + # Fork is called 3 times since the zbf process is forked twice + self.assertEquals(mocker.fork_called, 3) + self.assertEquals(mocker.wait_called, 3) finally: os.fork = was_fork + os.wait = was_wait if __name__ == '__main__': unittest.main()