audit zero byte files quickly without true value
This commit is contained in:
parent
b2e0b926a4
commit
7728904dda
@ -24,7 +24,7 @@ if __name__ == '__main__':
|
||||
conf_file, options = parse_options(usage="Usage: %prog CONFIG_FILE PLUGIN")
|
||||
try:
|
||||
plugin = options['extra_args'][0]
|
||||
except IndexError:
|
||||
except (IndexError, KeyError):
|
||||
print "Error: missing plugin name"
|
||||
sys.exit(1)
|
||||
|
||||
|
@ -17,7 +17,13 @@
|
||||
from swift.obj.auditor import ObjectAuditor
|
||||
from swift.common.utils import parse_options
|
||||
from swift.common.daemon import run_daemon
|
||||
from optparse import OptionParser
|
||||
|
||||
if __name__ == '__main__':
|
||||
conf_file, options = parse_options(once=True)
|
||||
parser = OptionParser("%prog CONFIG [options]")
|
||||
parser.add_option('-z', '--zero_byte_only', default=False,
|
||||
action='store_true', help='Audit only zero byte files')
|
||||
parser.add_option('-f', '--zero_byte_fps',
|
||||
help='Override zero byte files per second in config.')
|
||||
conf_file, options = parse_options(parser=parser, once=True)
|
||||
run_daemon(ObjectAuditor, conf_file, **options)
|
||||
|
@ -288,3 +288,16 @@ A graceful shutdown or reload will finish any current requests before
|
||||
completely stopping the old service. There is also a special case of
|
||||
`swift-init all <command>`, which will run the command for all swift services.
|
||||
|
||||
--------------
|
||||
Object Auditor
|
||||
--------------
|
||||
|
||||
On system failures, the XFS file system can sometimes truncate files it's
|
||||
trying to write and produce zero byte files. The object-auditor will catch
|
||||
these problems but in the case of a system crash it would be advisable to run
|
||||
an extra, less rate limited sweep to check for these specific files. You can
|
||||
run this command as follows:
|
||||
`swift-object-auditor /path/to/object-server/config/file.conf once -z -f 1000`
|
||||
"-z" will check for only zero-byte files and "-f" overrides the
|
||||
zero_byte_files_per_second to a 1000 from the config file, which by default is
|
||||
only 50.
|
||||
|
@ -41,22 +41,19 @@ class Daemon(object):
|
||||
utils.validate_configuration()
|
||||
utils.capture_stdio(self.logger, **kwargs)
|
||||
utils.drop_privileges(self.conf.get('user', 'swift'))
|
||||
|
||||
def kill_children(*args):
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
sys.exit()
|
||||
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
|
||||
if once:
|
||||
self.run_once()
|
||||
self.run_once(**kwargs)
|
||||
else:
|
||||
self.run_forever()
|
||||
self.run_forever(**kwargs)
|
||||
|
||||
|
||||
def run_daemon(klass, conf_file, section_name='',
|
||||
once=False, **kwargs):
|
||||
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
|
||||
"""
|
||||
Loads settings from conf, then instantiates daemon "klass" and runs the
|
||||
daemon with the specified once kwarg. The section_name will be derived
|
||||
|
@ -488,11 +488,11 @@ def capture_stdio(logger, **kwargs):
|
||||
sys.stderr = LoggerFileObject(logger)
|
||||
|
||||
|
||||
def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None):
|
||||
def parse_options(parser=None, once=False, test_args=None):
|
||||
"""
|
||||
Parse standard swift server/daemon options with optparse.OptionParser.
|
||||
|
||||
:param usage: String describing usage
|
||||
:param parser: OptionParser to use. If not sent one will be created.
|
||||
:param once: Boolean indicating the "once" option is available
|
||||
:param test_args: Override sys.argv; used in testing
|
||||
|
||||
@ -501,7 +501,8 @@ def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None):
|
||||
|
||||
:raises SystemExit: First arg (CONFIG) is required, file must exist
|
||||
"""
|
||||
parser = OptionParser(usage)
|
||||
if not parser:
|
||||
parser = OptionParser(usage="%prog CONFIG [options]")
|
||||
parser.add_option("-v", "--verbose", default=False, action="store_true",
|
||||
help="log to console")
|
||||
if once:
|
||||
@ -530,7 +531,8 @@ def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None):
|
||||
extra_args.append(arg)
|
||||
|
||||
options = vars(options)
|
||||
options['extra_args'] = extra_args
|
||||
if extra_args:
|
||||
options['extra_args'] = extra_args
|
||||
return config, options
|
||||
|
||||
|
||||
|
@ -21,23 +21,34 @@ from random import random
|
||||
from swift.obj import server as object_server
|
||||
from swift.obj.replicator import invalidate_hash
|
||||
from swift.common.utils import get_logger, renamer, audit_location_generator, \
|
||||
ratelimit_sleep
|
||||
ratelimit_sleep, TRUE_VALUES
|
||||
from swift.common.exceptions import AuditException
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
|
||||
class ObjectAuditor(Daemon):
|
||||
"""Audit objects."""
|
||||
|
||||
def __init__(self, conf):
|
||||
class AuditorWorker(object):
|
||||
"""Walk through file system to audit object"""
|
||||
def __init__(self, conf, zero_byte_file_worker=False, zero_byte_fps=None):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='object-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.mount_check = conf.get('mount_check', 'true').lower() in \
|
||||
('true', 't', '1', 'on', 'yes', 'y')
|
||||
TRUE_VALUES
|
||||
self.max_files_per_second = float(conf.get('files_per_second', 20))
|
||||
self.max_bytes_per_second = float(conf.get('bytes_per_second',
|
||||
10000000))
|
||||
self.auditor_type = 'ALL'
|
||||
self.fasttrack_zero_byte_files = conf.get(
|
||||
'fasttrack_zero_byte_files', 'False').lower() in TRUE_VALUES
|
||||
self.zero_byte_file_worker = zero_byte_file_worker
|
||||
if self.zero_byte_file_worker:
|
||||
self.fasttrack_zero_byte_files = True
|
||||
if zero_byte_fps:
|
||||
self.max_files_per_second = float(zero_byte_fps)
|
||||
else:
|
||||
self.max_files_per_second = float(
|
||||
conf.get('zero_byte_files_per_second', 50))
|
||||
self.auditor_type = 'ZBF'
|
||||
self.log_time = int(conf.get('log_time', 3600))
|
||||
self.files_running_time = 0
|
||||
self.bytes_running_time = 0
|
||||
@ -48,18 +59,13 @@ class ObjectAuditor(Daemon):
|
||||
self.quarantines = 0
|
||||
self.errors = 0
|
||||
|
||||
def run_forever(self):
|
||||
"""Run the object audit until stopped."""
|
||||
while True:
|
||||
self.run_once('forever')
|
||||
self.total_bytes_processed = 0
|
||||
self.total_files_processed = 0
|
||||
time.sleep(30)
|
||||
|
||||
def run_once(self, mode='once'):
|
||||
"""Run the object audit once."""
|
||||
self.logger.info(_('Begin object audit "%s" mode' % mode))
|
||||
def audit_all_objects(self, mode='once'):
|
||||
self.logger.info(_('Begin object audit "%s" mode (%s)' %
|
||||
(mode, self.auditor_type)))
|
||||
begin = reported = time.time()
|
||||
self.total_bytes_processed = 0
|
||||
self.total_files_processed = 0
|
||||
files_running_time = 0
|
||||
all_locs = audit_location_generator(self.devices,
|
||||
object_server.DATADIR,
|
||||
mount_check=self.mount_check,
|
||||
@ -71,9 +77,11 @@ class ObjectAuditor(Daemon):
|
||||
self.total_files_processed += 1
|
||||
if time.time() - reported >= self.log_time:
|
||||
self.logger.info(_(
|
||||
'Since %(start_time)s: Locally: %(passes)d passed audit, '
|
||||
'Object audit (%(type)s). '
|
||||
'Since %(start_time)s: Locally: %(passes)d passed, '
|
||||
'%(quars)d quarantined, %(errors)d errors '
|
||||
'files/sec: %(frate).2f , bytes/sec: %(brate).2f') % {
|
||||
'type': self.auditor_type,
|
||||
'start_time': time.ctime(reported),
|
||||
'passes': self.passes,
|
||||
'quars': self.quarantines,
|
||||
@ -88,9 +96,11 @@ class ObjectAuditor(Daemon):
|
||||
self.bytes_processed = 0
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(_(
|
||||
'Object audit "%(mode)s" mode completed: %(elapsed).02fs. '
|
||||
'Object audit (%(type)s) "%(mode)s" mode '
|
||||
'completed: %(elapsed).02fs. '
|
||||
'Total files/sec: %(frate).2f , '
|
||||
'Total bytes/sec: %(brate).2f ') % {
|
||||
'type': self.auditor_type,
|
||||
'mode': mode,
|
||||
'elapsed': elapsed,
|
||||
'frate': self.total_files_processed / elapsed,
|
||||
@ -98,7 +108,7 @@ class ObjectAuditor(Daemon):
|
||||
|
||||
def object_audit(self, path, device, partition):
|
||||
"""
|
||||
Audits the given object path
|
||||
Audits the given object path.
|
||||
|
||||
:param path: a path to an object
|
||||
:param device: the device the path is on
|
||||
@ -119,11 +129,14 @@ class ObjectAuditor(Daemon):
|
||||
if df.data_file is None:
|
||||
# file is deleted, we found the tombstone
|
||||
return
|
||||
if os.path.getsize(df.data_file) != \
|
||||
int(df.metadata['Content-Length']):
|
||||
obj_size = os.path.getsize(df.data_file)
|
||||
if obj_size != int(df.metadata['Content-Length']):
|
||||
raise AuditException('Content-Length of %s does not match '
|
||||
'file size of %s' % (int(df.metadata['Content-Length']),
|
||||
os.path.getsize(df.data_file)))
|
||||
if self.fasttrack_zero_byte_files and \
|
||||
bool(self.zero_byte_file_worker) == bool(obj_size):
|
||||
return
|
||||
etag = md5()
|
||||
for chunk in df:
|
||||
self.bytes_running_time = ratelimit_sleep(
|
||||
@ -150,3 +163,34 @@ class ObjectAuditor(Daemon):
|
||||
self.logger.exception(_('ERROR Trying to audit %s'), path)
|
||||
return
|
||||
self.passes += 1
|
||||
|
||||
|
||||
class ObjectAuditor(Daemon):
|
||||
"""Audit objects."""
|
||||
|
||||
def __init__(self, conf, **options):
|
||||
self.conf = conf
|
||||
self.logger = get_logger(conf, 'object-auditor')
|
||||
self.fasttrack_zero_byte_files = conf.get(
|
||||
'fasttrack_zero_byte_files', 'False').lower() in TRUE_VALUES
|
||||
|
||||
def run_forever(self, zero_byte_only=False, zero_byte_fps=None):
|
||||
"""Run the object audit until stopped."""
|
||||
zero_byte_pid = 1
|
||||
if zero_byte_only or self.fasttrack_zero_byte_files:
|
||||
zero_byte_pid = os.fork()
|
||||
if zero_byte_pid == 0:
|
||||
while True:
|
||||
self.run_once(mode='forever', zero_byte_only=True,
|
||||
zero_byte_fps=zero_byte_fps)
|
||||
time.sleep(30)
|
||||
else:
|
||||
while not zero_byte_only:
|
||||
self.run_once(mode='forever')
|
||||
time.sleep(30)
|
||||
|
||||
def run_once(self, mode='once', zero_byte_only=False, zero_byte_fps=None):
|
||||
"""Run the object audit once."""
|
||||
worker = AuditorWorker(self.conf, zero_byte_file_worker=zero_byte_only,
|
||||
zero_byte_fps=zero_byte_fps)
|
||||
worker.audit_all_objects(mode=mode)
|
||||
|
@ -274,26 +274,16 @@ class TestUtils(unittest.TestCase):
|
||||
stde = StringIO()
|
||||
utils.sys.stdout = stdo
|
||||
utils.sys.stderr = stde
|
||||
err_msg = """Usage: test usage
|
||||
|
||||
Error: missing config file argument
|
||||
"""
|
||||
test_args = []
|
||||
self.assertRaises(SystemExit, utils.parse_options, 'test usage', True,
|
||||
test_args)
|
||||
self.assertEquals(stdo.getvalue(), err_msg)
|
||||
self.assertRaises(SystemExit, utils.parse_options, once=True,
|
||||
test_args=[])
|
||||
self.assert_(stdo.getvalue().find('missing config file') >= 0)
|
||||
|
||||
# verify conf file must exist, context manager will delete temp file
|
||||
with NamedTemporaryFile() as f:
|
||||
conf_file = f.name
|
||||
err_msg += """Usage: test usage
|
||||
|
||||
Error: unable to locate %s
|
||||
""" % conf_file
|
||||
test_args = [conf_file]
|
||||
self.assertRaises(SystemExit, utils.parse_options, 'test usage', True,
|
||||
test_args)
|
||||
self.assertEquals(stdo.getvalue(), err_msg)
|
||||
self.assertRaises(SystemExit, utils.parse_options, once=True,
|
||||
test_args=[conf_file])
|
||||
self.assert_(stdo.getvalue().find('unable to locate') >= 0)
|
||||
|
||||
# reset stdio
|
||||
utils.sys.stdout = orig_stdout
|
||||
|
@ -60,7 +60,7 @@ class TestAuditor(unittest.TestCase):
|
||||
unit.xattr_data = {}
|
||||
|
||||
def test_object_audit_extra_data(self):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
cur_part = '0'
|
||||
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||
data = '0' * 1024
|
||||
@ -90,7 +90,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_audit_diff_data(self):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
cur_part = '0'
|
||||
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||
data = '0' * 1024
|
||||
@ -133,7 +133,7 @@ class TestAuditor(unittest.TestCase):
|
||||
fp.write('0' * 1024)
|
||||
fp.close()
|
||||
invalidate_hash(os.path.dirname(disk_file.datadir))
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
pre_quarantines = self.auditor.quarantines
|
||||
self.auditor.object_audit(
|
||||
os.path.join(disk_file.datadir, timestamp + '.data'),
|
||||
@ -141,7 +141,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_audit_bad_args(self):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
pre_errors = self.auditor.errors
|
||||
self.auditor.object_audit(5, 'sda', '0')
|
||||
self.assertEquals(self.auditor.errors, pre_errors + 1)
|
||||
@ -150,7 +150,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEquals(self.auditor.errors, pre_errors) # just returns
|
||||
|
||||
def test_object_run_once_pass(self):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
self.auditor.log_time = 0
|
||||
cur_part = '0'
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
@ -169,11 +169,11 @@ class TestAuditor(unittest.TestCase):
|
||||
}
|
||||
disk_file.put(fd, tmppath, metadata)
|
||||
disk_file.close()
|
||||
self.auditor.run_once()
|
||||
self.auditor.audit_all_objects()
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines)
|
||||
|
||||
def test_object_run_once_no_sda(self):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
cur_part = '0'
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
pre_quarantines = self.auditor.quarantines
|
||||
@ -192,11 +192,11 @@ class TestAuditor(unittest.TestCase):
|
||||
disk_file.put(fd, tmppath, metadata)
|
||||
disk_file.close()
|
||||
os.write(fd, 'extra_data')
|
||||
self.auditor.run_once()
|
||||
self.auditor.audit_all_objects()
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_run_once_multi_devices(self):
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor = auditor.AuditorWorker(self.conf)
|
||||
cur_part = '0'
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
pre_quarantines = self.auditor.quarantines
|
||||
@ -214,7 +214,7 @@ class TestAuditor(unittest.TestCase):
|
||||
}
|
||||
disk_file.put(fd, tmppath, metadata)
|
||||
disk_file.close()
|
||||
self.auditor.run_once()
|
||||
self.auditor.audit_all_objects()
|
||||
disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob')
|
||||
data = '1' * 10
|
||||
etag = md5()
|
||||
@ -230,9 +230,64 @@ class TestAuditor(unittest.TestCase):
|
||||
disk_file.put(fd, tmppath, metadata)
|
||||
disk_file.close()
|
||||
os.write(fd, 'extra_data')
|
||||
self.auditor.run_once()
|
||||
self.auditor.audit_all_objects()
|
||||
self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_run_fast_track_non_zero(self):
|
||||
self.conf['fasttrack_zero_byte_files'] = 'yes'
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor.log_time = 0
|
||||
cur_part = '0'
|
||||
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||
data = '0' * 1024
|
||||
etag = md5()
|
||||
with disk_file.mkstemp() as (fd, tmppath):
|
||||
os.write(fd, data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': str(normalize_timestamp(time.time())),
|
||||
'Content-Length': str(os.fstat(fd).st_size),
|
||||
}
|
||||
disk_file.put(fd, tmppath, metadata)
|
||||
etag = md5()
|
||||
etag.update('1' + '0' * 1023)
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
write_metadata(fd, metadata)
|
||||
|
||||
quarantine_path = os.path.join(self.devices,
|
||||
'sda', 'quarantined', 'objects')
|
||||
self.auditor.run_once(zero_byte_only=True)
|
||||
self.assertFalse(os.path.isdir(quarantine_path))
|
||||
self.auditor.run_once()
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
|
||||
def test_object_run_fast_track_zero(self):
|
||||
self.conf['fasttrack_zero_byte_files'] = 'yes'
|
||||
self.auditor = auditor.ObjectAuditor(self.conf)
|
||||
self.auditor.log_time = 0
|
||||
cur_part = '0'
|
||||
disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
|
||||
etag = md5()
|
||||
with disk_file.mkstemp() as (fd, tmppath):
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': str(normalize_timestamp(time.time())),
|
||||
'Content-Length': 10,
|
||||
}
|
||||
disk_file.put(fd, tmppath, metadata)
|
||||
etag = md5()
|
||||
etag = etag.hexdigest()
|
||||
metadata['ETag'] = etag
|
||||
write_metadata(fd, metadata)
|
||||
quarantine_path = os.path.join(self.devices,
|
||||
'sda', 'quarantined', 'objects')
|
||||
self.auditor.run_once()
|
||||
self.assertTrue(os.path.isdir(quarantine_path))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user