Merge "Auditor will clean up stale rsync tempfiles"

This commit is contained in:
Jenkins 2016-03-23 22:21:51 +00:00 committed by Gerrit Code Review
commit 4be3701805
9 changed files with 344 additions and 37 deletions

View File

@ -499,6 +499,9 @@ and ensure that swift has read/write. The default is /var/cache/swift.
Takes a comma separated list of ints. If set, the object auditor will
increment a counter for every object whose size is <= to the given break
points and report the result after a full scan.
.IP \fBrsync_tempfile_timeout\fR
Time elapsed in seconds before rsync tempfiles will be unlinked. Config value of "auto"
will try to use object-replicator's rsync_timeout + 900 or fall-back to 86400 (1 day).
.RE

View File

@ -738,6 +738,11 @@ concurrency 1 The number of parallel processes
zero_byte_files_per_second 50
object_size_stats
recon_cache_path /var/cache/swift Path to recon cache
rsync_tempfile_timeout auto Time elapsed in seconds before rsync
tempfiles will be unlinked. Config value
of "auto" try to use object-replicator's
rsync_timeout + 900 or fallback to 86400
(1 day).
=========================== =================== ==========================================
------------------------------

View File

@ -306,6 +306,13 @@ use = egg:swift#recon
# points and report the result after a full scan.
# object_size_stats =
# The auditor will cleanup old rsync tempfiles after they are "old
# enough" to delete. You can configure the time elapsed in seconds
# before rsync tempfiles will be unlinked, or the default value of
# "auto" try to use object-replicator's rsync_timeout + 900 and fallback
# to 86400 (1 day).
# rsync_tempfile_timeout = auto
# Note: Put it at the beginning of the pipleline to profile all middleware. But
# it is safer to put this after healthcheck.
[filter:xprofile]

View File

@ -2122,10 +2122,21 @@ def unlink_older_than(path, mtime):
Remove any file in a given path that that was last modified before mtime.
:param path: path to remove file from
:mtime: timestamp of oldest file to keep
:param mtime: timestamp of oldest file to keep
"""
for fname in listdir(path):
fpath = os.path.join(path, fname)
filepaths = map(functools.partial(os.path.join, path), listdir(path))
return unlink_paths_older_than(filepaths, mtime)
def unlink_paths_older_than(filepaths, mtime):
"""
Remove any files from the given list that that were
last modified before mtime.
:param filepaths: a list of strings, the full paths of files to check
:param mtime: timestamp of oldest file to keep
"""
for fpath in filepaths:
try:
if os.path.getmtime(fpath) < mtime:
os.unlink(fpath)

View File

@ -18,18 +18,23 @@ import os
import sys
import time
import signal
import re
from random import shuffle
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
from swift.obj import diskfile
from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \
list_from_csv, listdir
from swift.obj import diskfile, replicator
from swift.common.utils import (
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
unlink_paths_older_than, readconf, config_auto_int_value)
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
# This matches rsync tempfiles, like ".<timestamp>.data.Xy095a"
RE_RSYNC_TEMPFILE = re.compile(r'^\..*\.([a-zA-Z0-9_]){6}$')
class AuditorWorker(object):
"""Walk through file system to audit objects"""
@ -42,6 +47,27 @@ class AuditorWorker(object):
self.max_files_per_second = float(conf.get('files_per_second', 20))
self.max_bytes_per_second = float(conf.get('bytes_per_second',
10000000))
try:
# ideally unless ops overrides the rsync_tempfile_timeout in the
# auditor section we can base our behavior on whatever they
# configure for their replicator
replicator_config = readconf(self.conf['__file__'],
'object-replicator')
except (KeyError, SystemExit):
# if we can't parse the real config (generally a KeyError on
# __file__, or SystemExit on no object-replicator section) we use
# a very conservative default
default = 86400
else:
replicator_rsync_timeout = int(replicator_config.get(
'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT))
# Here we can do some light math for ops and use the *replicator's*
# rsync_timeout (plus 15 mins to avoid deleting local tempfiles
# before the remote replicator kills it's rsync)
default = replicator_rsync_timeout + 900
self.rsync_tempfile_timeout = config_auto_int_value(
self.conf.get('rsync_tempfile_timeout'), default)
self.auditor_type = 'ALL'
self.zero_byte_only_at_fps = zero_byte_only_at_fps
if self.zero_byte_only_at_fps:
@ -200,34 +226,46 @@ class AuditorWorker(object):
raise DiskFileQuarantined(msg)
diskfile_mgr = self.diskfile_router[location.policy]
# this method doesn't normally raise errors, even if the audit
# location does not exist; if this raises an unexpected error it
# will get logged in failsafe
df = diskfile_mgr.get_diskfile_from_audit_location(location)
reader = None
try:
df = diskfile_mgr.get_diskfile_from_audit_location(location)
with df.open():
metadata = df.get_metadata()
obj_size = int(metadata['Content-Length'])
if self.stats_sizes:
self.record_stats(obj_size)
if self.zero_byte_only_at_fps and obj_size:
self.passes += 1
return
reader = df.reader(_quarantine_hook=raise_dfq)
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
if obj_size and not self.zero_byte_only_at_fps:
reader = df.reader(_quarantine_hook=raise_dfq)
if reader:
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
self.bytes_running_time = ratelimit_sleep(
self.bytes_running_time,
self.max_bytes_per_second,
incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
except DiskFileNotExist:
return
pass
except DiskFileQuarantined as err:
self.quarantines += 1
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
' quarantined: %(err)s'),
{'obj': location, 'err': err})
self.passes += 1
# _ondisk_info attr is initialized to None and filled in by open
ondisk_info_dict = df._ondisk_info or {}
if 'unexpected' in ondisk_info_dict:
is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match(
os.path.basename(fpath))
rsync_tempfile_paths = filter(is_rsync_tempfile,
ondisk_info_dict['unexpected'])
mtime = time.time() - self.rsync_tempfile_timeout
unlink_paths_older_than(rsync_tempfile_paths, mtime)
class ObjectAuditor(Daemon):

View File

@ -741,7 +741,10 @@ class BaseDiskFileManager(object):
# dicts for the files having that extension. The file_info dicts are of
# the form returned by parse_on_disk_filename, with the filename added.
# Each list is sorted in reverse timestamp order.
#
# the results dict is used to collect results of file filtering
results = {}
# The exts dict will be modified during subsequent processing as files
# are removed to be discarded or ignored.
exts = defaultdict(list)
@ -752,16 +755,15 @@ class BaseDiskFileManager(object):
file_info['filename'] = afile
exts[file_info['ext']].append(file_info)
except DiskFileError as e:
self.logger.warning('Unexpected file %s: %s' %
(os.path.join(datadir or '', afile), e))
file_path = os.path.join(datadir or '', afile)
self.logger.warning('Unexpected file %s: %s',
file_path, e)
results.setdefault('unexpected', []).append(file_path)
for ext in exts:
# For each extension sort files into reverse chronological order.
exts[ext] = sorted(
exts[ext], key=lambda info: info['timestamp'], reverse=True)
# the results dict is used to collect results of file filtering
results = {}
if exts.get('.ts'):
# non-tombstones older than or equal to latest tombstone are
# obsolete

View File

@ -41,6 +41,7 @@ from swift.obj import ssync_sender
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir
from swift.common.storage_policy import POLICIES, REPL_POLICY
DEFAULT_RSYNC_TIMEOUT = 900
hubs.use_hub(get_hub())
@ -76,7 +77,8 @@ class ObjectReplicator(Daemon):
self.partition_times = []
self.interval = int(conf.get('interval') or
conf.get('run_pause') or 30)
self.rsync_timeout = int(conf.get('rsync_timeout', 900))
self.rsync_timeout = int(conf.get('rsync_timeout',
DEFAULT_RSYNC_TIMEOUT))
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
self.rsync_bwlimit = conf.get('rsync_bwlimit', '0')
self.rsync_compress = config_true_value(

View File

@ -18,6 +18,7 @@ from __future__ import print_function
from test.unit import temptree
import ctypes
import contextlib
import errno
import eventlet
import eventlet.event
@ -3422,6 +3423,86 @@ class ResellerConfReader(unittest.TestCase):
self.assertEqual('pre2_group', options['PRE2_'].get('require_group'))
class TestUnlinkOlder(unittest.TestCase):
def setUp(self):
self.tempdir = mkdtemp()
self.mtime = {}
def tearDown(self):
rmtree(self.tempdir, ignore_errors=True)
def touch(self, fpath, mtime=None):
self.mtime[fpath] = mtime or time.time()
open(fpath, 'w')
@contextlib.contextmanager
def high_resolution_getmtime(self):
orig_getmtime = os.path.getmtime
def mock_getmtime(fpath):
mtime = self.mtime.get(fpath)
if mtime is None:
mtime = orig_getmtime(fpath)
return mtime
with mock.patch('os.path.getmtime', mock_getmtime):
yield
def test_unlink_older_than_path_not_exists(self):
path = os.path.join(self.tempdir, 'does-not-exist')
# just make sure it doesn't blow up
utils.unlink_older_than(path, time.time())
def test_unlink_older_than_file(self):
path = os.path.join(self.tempdir, 'some-file')
self.touch(path)
with self.assertRaises(OSError) as ctx:
utils.unlink_older_than(path, time.time())
self.assertEqual(ctx.exception.errno, errno.ENOTDIR)
def test_unlink_older_than_now(self):
self.touch(os.path.join(self.tempdir, 'test'))
with self.high_resolution_getmtime():
utils.unlink_older_than(self.tempdir, time.time())
self.assertEqual([], os.listdir(self.tempdir))
def test_unlink_not_old_enough(self):
start = time.time()
self.touch(os.path.join(self.tempdir, 'test'))
with self.high_resolution_getmtime():
utils.unlink_older_than(self.tempdir, start)
self.assertEqual(['test'], os.listdir(self.tempdir))
def test_unlink_mixed(self):
self.touch(os.path.join(self.tempdir, 'first'))
cutoff = time.time()
self.touch(os.path.join(self.tempdir, 'second'))
with self.high_resolution_getmtime():
utils.unlink_older_than(self.tempdir, cutoff)
self.assertEqual(['second'], os.listdir(self.tempdir))
def test_unlink_paths(self):
paths = []
for item in ('first', 'second', 'third'):
path = os.path.join(self.tempdir, item)
self.touch(path)
paths.append(path)
# don't unlink everyone
with self.high_resolution_getmtime():
utils.unlink_paths_older_than(paths[:2], time.time())
self.assertEqual(['third'], os.listdir(self.tempdir))
def test_unlink_empty_paths(self):
# just make sure it doesn't blow up
utils.unlink_paths_older_than([], time.time())
def test_unlink_not_exists_paths(self):
path = os.path.join(self.tempdir, 'does-not-exist')
# just make sure it doesn't blow up
utils.unlink_paths_older_than([path], time.time())
class TestSwiftInfo(unittest.TestCase):
def tearDown(self):

View File

@ -22,15 +22,18 @@ import string
from shutil import rmtree
from hashlib import md5
from tempfile import mkdtemp
from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \
DEFAULT_TEST_EC_TYPE
from swift.obj import auditor
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \
clear_auditor_status, get_auditor_status
from swift.common.utils import mkdirs, normalize_timestamp, Timestamp
from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \
POLICIES
import textwrap
from test.unit import (FakeLogger, patch_policies, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE)
from swift.obj import auditor, replicator
from swift.obj.diskfile import (
DiskFile, write_metadata, invalidate_hash, get_data_dir,
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
get_auditor_status)
from swift.common.utils import (
mkdirs, normalize_timestamp, Timestamp, readconf)
from swift.common.storage_policy import (
ECStoragePolicy, StoragePolicy, POLICIES)
_mocked_policies = [
@ -275,6 +278,161 @@ class TestAuditor(unittest.TestCase):
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.errors, 1)
def test_audit_location_gets_quarantined(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
# instead of a datadir, we'll make a file!
mkdirs(os.path.dirname(self.disk_file._datadir))
open(self.disk_file._datadir, 'w')
# after we turn the crank ...
auditor_worker.object_audit(location)
# ... it should get quarantined
self.assertFalse(os.path.exists(self.disk_file._datadir))
self.assertEqual(1, auditor_worker.quarantines)
def test_rsync_tempfile_timeout_auto_option(self):
# if we don't have access to the replicator config section we'll use
# our default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
# if the rsync_tempfile_timeout option is set explicitly we use that
self.conf['rsync_tempfile_timeout'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
# the Daemon loader will hand the object-auditor config to the
# auditor who will build the workers from it
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
stub_config = """
[object-replicator]
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if the object-replicator section will parse but does not override
# the default rsync_timeout we assume the default rsync_timeout value
# and add 15mins
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
rsync_timeout = 3600
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is an object-replicator section with a rsync_timeout
# configured we'll use that value (3600) + 900
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
def test_inprogress_rsync_tempfiles_get_cleaned_up(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
location = AuditLocation(self.disk_file._datadir, 'sda', '0',
policy=self.disk_file.policy)
data = 'VERIFY'
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
metadata = {
'ETag': etag.hexdigest(),
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
writer.commit(Timestamp(timestamp))
datafilename = None
datadir_files = os.listdir(self.disk_file._datadir)
for filename in datadir_files:
if filename.endswith('.data'):
datafilename = filename
break
else:
self.fail('Did not find .data file in %r: %r' %
(self.disk_file._datadir, datadir_files))
rsynctempfile_path = os.path.join(self.disk_file._datadir,
'.%s.9ILVBL' % datafilename)
open(rsynctempfile_path, 'w')
# sanity check we have an extra file
rsync_files = os.listdir(self.disk_file._datadir)
self.assertEqual(len(datadir_files) + 1, len(rsync_files))
# and after we turn the crank ...
auditor_worker.object_audit(location)
# ... we've still got the rsync file
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# and we'll keep it - depending on the rsync_tempfile_timeout
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
self.conf['rsync_tempfile_timeout'] = '3600'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600)
now = time.time() + 1900
with mock.patch('swift.obj.auditor.time.time',
return_value=now):
auditor_worker.object_audit(location)
self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir))
# but *tomorrow* when we run
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# ... we'll totally clean that stuff up!
self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir))
# but if we have some random crazy file in there
random_crazy_file_path = os.path.join(self.disk_file._datadir,
'.random.crazy.file')
open(random_crazy_file_path, 'w')
tomorrow = time.time() + 86400
with mock.patch('swift.obj.auditor.time.time',
return_value=tomorrow):
auditor_worker.object_audit(location)
# that's someone elses problem
self.assertIn(os.path.basename(random_crazy_file_path),
os.listdir(self.disk_file._datadir))
def test_generic_exception_handling(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)