From 7728904dda966087e29356f8062b843002157a1b Mon Sep 17 00:00:00 2001 From: David Goetz Date: Mon, 14 Feb 2011 20:25:40 +0000 Subject: [PATCH] audit zero byte files quickly without true value --- bin/swift-log-uploader | 2 +- bin/swift-object-auditor | 8 +++- doc/source/admin_guide.rst | 13 +++++ swift/common/daemon.py | 9 ++-- swift/common/utils.py | 10 ++-- swift/obj/auditor.py | 88 +++++++++++++++++++++++++--------- test/unit/common/test_utils.py | 22 +++------ test/unit/obj/test_auditor.py | 77 ++++++++++++++++++++++++----- 8 files changed, 168 insertions(+), 61 deletions(-) diff --git a/bin/swift-log-uploader b/bin/swift-log-uploader index 7c36e2c2cc..94ff8fcc16 100755 --- a/bin/swift-log-uploader +++ b/bin/swift-log-uploader @@ -24,7 +24,7 @@ if __name__ == '__main__': conf_file, options = parse_options(usage="Usage: %prog CONFIG_FILE PLUGIN") try: plugin = options['extra_args'][0] - except IndexError: + except (IndexError, KeyError): print "Error: missing plugin name" sys.exit(1) diff --git a/bin/swift-object-auditor b/bin/swift-object-auditor index c7371bdfeb..2c1211b8d3 100755 --- a/bin/swift-object-auditor +++ b/bin/swift-object-auditor @@ -17,7 +17,13 @@ from swift.obj.auditor import ObjectAuditor from swift.common.utils import parse_options from swift.common.daemon import run_daemon +from optparse import OptionParser if __name__ == '__main__': - conf_file, options = parse_options(once=True) + parser = OptionParser("%prog CONFIG [options]") + parser.add_option('-z', '--zero_byte_only', default=False, + action='store_true', help='Audit only zero byte files') + parser.add_option('-f', '--zero_byte_fps', + help='Override zero byte files per second in config.') + conf_file, options = parse_options(parser=parser, once=True) run_daemon(ObjectAuditor, conf_file, **options) diff --git a/doc/source/admin_guide.rst b/doc/source/admin_guide.rst index 068501a0b2..890f5f0ff8 100644 --- a/doc/source/admin_guide.rst +++ b/doc/source/admin_guide.rst @@ -288,3 +288,16 @@ A graceful shutdown or reload will finish any current requests before completely stopping the old service. There is also a special case of `swift-init all `, which will run the command for all swift services. +-------------- +Object Auditor +-------------- + +On system failures, the XFS file system can sometimes truncate files it's +trying to write and produce zero byte files. The object-auditor will catch +these problems but in the case of a system crash it would be advisable to run +an extra, less rate limited sweep to check for these specific files. You can +run this command as follows: +`swift-object-auditor /path/to/object-server/config/file.conf once -z -f 1000` +"-z" will check for only zero-byte files and "-f" overrides the +zero_byte_files_per_second to a 1000 from the config file, which by default is +only 50. diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 91230e4d2b..d4af63e197 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -41,22 +41,19 @@ class Daemon(object): utils.validate_configuration() utils.capture_stdio(self.logger, **kwargs) utils.drop_privileges(self.conf.get('user', 'swift')) - def kill_children(*args): signal.signal(signal.SIGTERM, signal.SIG_IGN) os.killpg(0, signal.SIGTERM) sys.exit() signal.signal(signal.SIGTERM, kill_children) - if once: - self.run_once() + self.run_once(**kwargs) else: - self.run_forever() + self.run_forever(**kwargs) -def run_daemon(klass, conf_file, section_name='', - once=False, **kwargs): +def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): """ Loads settings from conf, then instantiates daemon "klass" and runs the daemon with the specified once kwarg. The section_name will be derived diff --git a/swift/common/utils.py b/swift/common/utils.py index feeb89ab4d..70ed30e3f1 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -488,11 +488,11 @@ def capture_stdio(logger, **kwargs): sys.stderr = LoggerFileObject(logger) -def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None): +def parse_options(parser=None, once=False, test_args=None): """ Parse standard swift server/daemon options with optparse.OptionParser. - :param usage: String describing usage + :param parser: OptionParser to use. If not sent one will be created. :param once: Boolean indicating the "once" option is available :param test_args: Override sys.argv; used in testing @@ -501,7 +501,8 @@ def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None): :raises SystemExit: First arg (CONFIG) is required, file must exist """ - parser = OptionParser(usage) + if not parser: + parser = OptionParser(usage="%prog CONFIG [options]") parser.add_option("-v", "--verbose", default=False, action="store_true", help="log to console") if once: @@ -530,7 +531,8 @@ def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None): extra_args.append(arg) options = vars(options) - options['extra_args'] = extra_args + if extra_args: + options['extra_args'] = extra_args return config, options diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 8ed05049f3..a2f6ae6ebb 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -21,23 +21,34 @@ from random import random from swift.obj import server as object_server from swift.obj.replicator import invalidate_hash from swift.common.utils import get_logger, renamer, audit_location_generator, \ - ratelimit_sleep + ratelimit_sleep, TRUE_VALUES from swift.common.exceptions import AuditException from swift.common.daemon import Daemon -class ObjectAuditor(Daemon): - """Audit objects.""" - - def __init__(self, conf): +class AuditorWorker(object): + """Walk through file system to audit object""" + def __init__(self, conf, zero_byte_file_worker=False, zero_byte_fps=None): self.conf = conf self.logger = get_logger(conf, log_route='object-auditor') self.devices = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ - ('true', 't', '1', 'on', 'yes', 'y') + TRUE_VALUES self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_bytes_per_second = float(conf.get('bytes_per_second', 10000000)) + self.auditor_type = 'ALL' + self.fasttrack_zero_byte_files = conf.get( + 'fasttrack_zero_byte_files', 'False').lower() in TRUE_VALUES + self.zero_byte_file_worker = zero_byte_file_worker + if self.zero_byte_file_worker: + self.fasttrack_zero_byte_files = True + if zero_byte_fps: + self.max_files_per_second = float(zero_byte_fps) + else: + self.max_files_per_second = float( + conf.get('zero_byte_files_per_second', 50)) + self.auditor_type = 'ZBF' self.log_time = int(conf.get('log_time', 3600)) self.files_running_time = 0 self.bytes_running_time = 0 @@ -48,18 +59,13 @@ class ObjectAuditor(Daemon): self.quarantines = 0 self.errors = 0 - def run_forever(self): - """Run the object audit until stopped.""" - while True: - self.run_once('forever') - self.total_bytes_processed = 0 - self.total_files_processed = 0 - time.sleep(30) - - def run_once(self, mode='once'): - """Run the object audit once.""" - self.logger.info(_('Begin object audit "%s" mode' % mode)) + def audit_all_objects(self, mode='once'): + self.logger.info(_('Begin object audit "%s" mode (%s)' % + (mode, self.auditor_type))) begin = reported = time.time() + self.total_bytes_processed = 0 + self.total_files_processed = 0 + files_running_time = 0 all_locs = audit_location_generator(self.devices, object_server.DATADIR, mount_check=self.mount_check, @@ -71,9 +77,11 @@ class ObjectAuditor(Daemon): self.total_files_processed += 1 if time.time() - reported >= self.log_time: self.logger.info(_( - 'Since %(start_time)s: Locally: %(passes)d passed audit, ' + 'Object audit (%(type)s). ' + 'Since %(start_time)s: Locally: %(passes)d passed, ' '%(quars)d quarantined, %(errors)d errors ' 'files/sec: %(frate).2f , bytes/sec: %(brate).2f') % { + 'type': self.auditor_type, 'start_time': time.ctime(reported), 'passes': self.passes, 'quars': self.quarantines, @@ -88,9 +96,11 @@ class ObjectAuditor(Daemon): self.bytes_processed = 0 elapsed = time.time() - begin self.logger.info(_( - 'Object audit "%(mode)s" mode completed: %(elapsed).02fs. ' + 'Object audit (%(type)s) "%(mode)s" mode ' + 'completed: %(elapsed).02fs. ' 'Total files/sec: %(frate).2f , ' 'Total bytes/sec: %(brate).2f ') % { + 'type': self.auditor_type, 'mode': mode, 'elapsed': elapsed, 'frate': self.total_files_processed / elapsed, @@ -98,7 +108,7 @@ class ObjectAuditor(Daemon): def object_audit(self, path, device, partition): """ - Audits the given object path + Audits the given object path. :param path: a path to an object :param device: the device the path is on @@ -119,11 +129,14 @@ class ObjectAuditor(Daemon): if df.data_file is None: # file is deleted, we found the tombstone return - if os.path.getsize(df.data_file) != \ - int(df.metadata['Content-Length']): + obj_size = os.path.getsize(df.data_file) + if obj_size != int(df.metadata['Content-Length']): raise AuditException('Content-Length of %s does not match ' 'file size of %s' % (int(df.metadata['Content-Length']), os.path.getsize(df.data_file))) + if self.fasttrack_zero_byte_files and \ + bool(self.zero_byte_file_worker) == bool(obj_size): + return etag = md5() for chunk in df: self.bytes_running_time = ratelimit_sleep( @@ -150,3 +163,34 @@ class ObjectAuditor(Daemon): self.logger.exception(_('ERROR Trying to audit %s'), path) return self.passes += 1 + + +class ObjectAuditor(Daemon): + """Audit objects.""" + + def __init__(self, conf, **options): + self.conf = conf + self.logger = get_logger(conf, 'object-auditor') + self.fasttrack_zero_byte_files = conf.get( + 'fasttrack_zero_byte_files', 'False').lower() in TRUE_VALUES + + def run_forever(self, zero_byte_only=False, zero_byte_fps=None): + """Run the object audit until stopped.""" + zero_byte_pid = 1 + if zero_byte_only or self.fasttrack_zero_byte_files: + zero_byte_pid = os.fork() + if zero_byte_pid == 0: + while True: + self.run_once(mode='forever', zero_byte_only=True, + zero_byte_fps=zero_byte_fps) + time.sleep(30) + else: + while not zero_byte_only: + self.run_once(mode='forever') + time.sleep(30) + + def run_once(self, mode='once', zero_byte_only=False, zero_byte_fps=None): + """Run the object audit once.""" + worker = AuditorWorker(self.conf, zero_byte_file_worker=zero_byte_only, + zero_byte_fps=zero_byte_fps) + worker.audit_all_objects(mode=mode) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 4217cb93ed..e8156342c0 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -274,26 +274,16 @@ class TestUtils(unittest.TestCase): stde = StringIO() utils.sys.stdout = stdo utils.sys.stderr = stde - err_msg = """Usage: test usage - -Error: missing config file argument -""" - test_args = [] - self.assertRaises(SystemExit, utils.parse_options, 'test usage', True, - test_args) - self.assertEquals(stdo.getvalue(), err_msg) + self.assertRaises(SystemExit, utils.parse_options, once=True, + test_args=[]) + self.assert_(stdo.getvalue().find('missing config file') >= 0) # verify conf file must exist, context manager will delete temp file with NamedTemporaryFile() as f: conf_file = f.name - err_msg += """Usage: test usage - -Error: unable to locate %s -""" % conf_file - test_args = [conf_file] - self.assertRaises(SystemExit, utils.parse_options, 'test usage', True, - test_args) - self.assertEquals(stdo.getvalue(), err_msg) + self.assertRaises(SystemExit, utils.parse_options, once=True, + test_args=[conf_file]) + self.assert_(stdo.getvalue().find('unable to locate') >= 0) # reset stdio utils.sys.stdout = orig_stdout diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 14d58480dd..373e92b11c 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -60,7 +60,7 @@ class TestAuditor(unittest.TestCase): unit.xattr_data = {} def test_object_audit_extra_data(self): - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) cur_part = '0' disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 1024 @@ -90,7 +90,7 @@ class TestAuditor(unittest.TestCase): self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_audit_diff_data(self): - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) cur_part = '0' disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') data = '0' * 1024 @@ -133,7 +133,7 @@ class TestAuditor(unittest.TestCase): fp.write('0' * 1024) fp.close() invalidate_hash(os.path.dirname(disk_file.datadir)) - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) pre_quarantines = self.auditor.quarantines self.auditor.object_audit( os.path.join(disk_file.datadir, timestamp + '.data'), @@ -141,7 +141,7 @@ class TestAuditor(unittest.TestCase): self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_audit_bad_args(self): - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) pre_errors = self.auditor.errors self.auditor.object_audit(5, 'sda', '0') self.assertEquals(self.auditor.errors, pre_errors + 1) @@ -150,7 +150,7 @@ class TestAuditor(unittest.TestCase): self.assertEquals(self.auditor.errors, pre_errors) # just returns def test_object_run_once_pass(self): - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) self.auditor.log_time = 0 cur_part = '0' timestamp = str(normalize_timestamp(time.time())) @@ -169,11 +169,11 @@ class TestAuditor(unittest.TestCase): } disk_file.put(fd, tmppath, metadata) disk_file.close() - self.auditor.run_once() + self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines) def test_object_run_once_no_sda(self): - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) cur_part = '0' timestamp = str(normalize_timestamp(time.time())) pre_quarantines = self.auditor.quarantines @@ -192,11 +192,11 @@ class TestAuditor(unittest.TestCase): disk_file.put(fd, tmppath, metadata) disk_file.close() os.write(fd, 'extra_data') - self.auditor.run_once() + self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) def test_object_run_once_multi_devices(self): - self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor = auditor.AuditorWorker(self.conf) cur_part = '0' timestamp = str(normalize_timestamp(time.time())) pre_quarantines = self.auditor.quarantines @@ -214,7 +214,7 @@ class TestAuditor(unittest.TestCase): } disk_file.put(fd, tmppath, metadata) disk_file.close() - self.auditor.run_once() + self.auditor.audit_all_objects() disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob') data = '1' * 10 etag = md5() @@ -230,9 +230,64 @@ class TestAuditor(unittest.TestCase): disk_file.put(fd, tmppath, metadata) disk_file.close() os.write(fd, 'extra_data') - self.auditor.run_once() + self.auditor.audit_all_objects() self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) + def test_object_run_fast_track_non_zero(self): + self.conf['fasttrack_zero_byte_files'] = 'yes' + self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor.log_time = 0 + cur_part = '0' + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + data = '0' * 1024 + etag = md5() + with disk_file.mkstemp() as (fd, tmppath): + os.write(fd, data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': str(normalize_timestamp(time.time())), + 'Content-Length': str(os.fstat(fd).st_size), + } + disk_file.put(fd, tmppath, metadata) + etag = md5() + etag.update('1' + '0' * 1023) + etag = etag.hexdigest() + metadata['ETag'] = etag + write_metadata(fd, metadata) + + quarantine_path = os.path.join(self.devices, + 'sda', 'quarantined', 'objects') + self.auditor.run_once(zero_byte_only=True) + self.assertFalse(os.path.isdir(quarantine_path)) + self.auditor.run_once() + self.assertTrue(os.path.isdir(quarantine_path)) + + def test_object_run_fast_track_zero(self): + self.conf['fasttrack_zero_byte_files'] = 'yes' + self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor.log_time = 0 + cur_part = '0' + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + etag = md5() + with disk_file.mkstemp() as (fd, tmppath): + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': str(normalize_timestamp(time.time())), + 'Content-Length': 10, + } + disk_file.put(fd, tmppath, metadata) + etag = md5() + etag = etag.hexdigest() + metadata['ETag'] = etag + write_metadata(fd, metadata) + quarantine_path = os.path.join(self.devices, + 'sda', 'quarantined', 'objects') + self.auditor.run_once() + self.assertTrue(os.path.isdir(quarantine_path)) + if __name__ == '__main__': unittest.main()