diff --git a/doc/manpages/object-server.conf.5 b/doc/manpages/object-server.conf.5 index ac1a8889e3..2e58de32fb 100644 --- a/doc/manpages/object-server.conf.5 +++ b/doc/manpages/object-server.conf.5 @@ -499,6 +499,9 @@ and ensure that swift has read/write. The default is /var/cache/swift. Takes a comma separated list of ints. If set, the object auditor will increment a counter for every object whose size is <= to the given break points and report the result after a full scan. +.IP \fBrsync_tempfile_timeout\fR +Time elapsed in seconds before rsync tempfiles will be unlinked. Config value of "auto" +will try to use object-replicator's rsync_timeout + 900 or fall-back to 86400 (1 day). .RE diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 9743aef8b2..ec86c90c93 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -738,6 +738,11 @@ concurrency 1 The number of parallel processes zero_byte_files_per_second 50 object_size_stats recon_cache_path /var/cache/swift Path to recon cache +rsync_tempfile_timeout auto Time elapsed in seconds before rsync + tempfiles will be unlinked. Config value + of "auto" try to use object-replicator's + rsync_timeout + 900 or fallback to 86400 + (1 day). =========================== =================== ========================================== ------------------------------ diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 80731584ee..e01193bff2 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -306,6 +306,13 @@ use = egg:swift#recon # points and report the result after a full scan. # object_size_stats = +# The auditor will cleanup old rsync tempfiles after they are "old +# enough" to delete. You can configure the time elapsed in seconds +# before rsync tempfiles will be unlinked, or the default value of +# "auto" try to use object-replicator's rsync_timeout + 900 and fallback +# to 86400 (1 day). +# rsync_tempfile_timeout = auto + # Note: Put it at the beginning of the pipleline to profile all middleware. But # it is safer to put this after healthcheck. [filter:xprofile] diff --git a/swift/common/utils.py b/swift/common/utils.py index e975bf1ad2..210dd9f4e0 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2122,10 +2122,21 @@ def unlink_older_than(path, mtime): Remove any file in a given path that that was last modified before mtime. :param path: path to remove file from - :mtime: timestamp of oldest file to keep + :param mtime: timestamp of oldest file to keep """ - for fname in listdir(path): - fpath = os.path.join(path, fname) + filepaths = map(functools.partial(os.path.join, path), listdir(path)) + return unlink_paths_older_than(filepaths, mtime) + + +def unlink_paths_older_than(filepaths, mtime): + """ + Remove any files from the given list that that were + last modified before mtime. + + :param filepaths: a list of strings, the full paths of files to check + :param mtime: timestamp of oldest file to keep + """ + for fpath in filepaths: try: if os.path.getmtime(fpath) < mtime: os.unlink(fpath) diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index db6e4f4988..3b5f2de785 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -18,18 +18,23 @@ import os import sys import time import signal +import re from random import shuffle from swift import gettext_ as _ from contextlib import closing from eventlet import Timeout -from swift.obj import diskfile -from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \ - list_from_csv, listdir +from swift.obj import diskfile, replicator +from swift.common.utils import ( + get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir, + unlink_paths_older_than, readconf, config_auto_int_value) from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES +# This matches rsync tempfiles, like "..data.Xy095a" +RE_RSYNC_TEMPFILE = re.compile(r'^\..*\.([a-zA-Z0-9_]){6}$') + class AuditorWorker(object): """Walk through file system to audit objects""" @@ -42,6 +47,27 @@ class AuditorWorker(object): self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_bytes_per_second = float(conf.get('bytes_per_second', 10000000)) + try: + # ideally unless ops overrides the rsync_tempfile_timeout in the + # auditor section we can base our behavior on whatever they + # configure for their replicator + replicator_config = readconf(self.conf['__file__'], + 'object-replicator') + except (KeyError, SystemExit): + # if we can't parse the real config (generally a KeyError on + # __file__, or SystemExit on no object-replicator section) we use + # a very conservative default + default = 86400 + else: + replicator_rsync_timeout = int(replicator_config.get( + 'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT)) + # Here we can do some light math for ops and use the *replicator's* + # rsync_timeout (plus 15 mins to avoid deleting local tempfiles + # before the remote replicator kills it's rsync) + default = replicator_rsync_timeout + 900 + self.rsync_tempfile_timeout = config_auto_int_value( + self.conf.get('rsync_tempfile_timeout'), default) + self.auditor_type = 'ALL' self.zero_byte_only_at_fps = zero_byte_only_at_fps if self.zero_byte_only_at_fps: @@ -200,34 +226,46 @@ class AuditorWorker(object): raise DiskFileQuarantined(msg) diskfile_mgr = self.diskfile_router[location.policy] + # this method doesn't normally raise errors, even if the audit + # location does not exist; if this raises an unexpected error it + # will get logged in failsafe + df = diskfile_mgr.get_diskfile_from_audit_location(location) + reader = None try: - df = diskfile_mgr.get_diskfile_from_audit_location(location) with df.open(): metadata = df.get_metadata() obj_size = int(metadata['Content-Length']) if self.stats_sizes: self.record_stats(obj_size) - if self.zero_byte_only_at_fps and obj_size: - self.passes += 1 - return - reader = df.reader(_quarantine_hook=raise_dfq) - with closing(reader): - for chunk in reader: - chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) - self.bytes_processed += chunk_len - self.total_bytes_processed += chunk_len + if obj_size and not self.zero_byte_only_at_fps: + reader = df.reader(_quarantine_hook=raise_dfq) + if reader: + with closing(reader): + for chunk in reader: + chunk_len = len(chunk) + self.bytes_running_time = ratelimit_sleep( + self.bytes_running_time, + self.max_bytes_per_second, + incr_by=chunk_len) + self.bytes_processed += chunk_len + self.total_bytes_processed += chunk_len except DiskFileNotExist: - return + pass except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and was' ' quarantined: %(err)s'), {'obj': location, 'err': err}) self.passes += 1 + # _ondisk_info attr is initialized to None and filled in by open + ondisk_info_dict = df._ondisk_info or {} + if 'unexpected' in ondisk_info_dict: + is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match( + os.path.basename(fpath)) + rsync_tempfile_paths = filter(is_rsync_tempfile, + ondisk_info_dict['unexpected']) + mtime = time.time() - self.rsync_tempfile_timeout + unlink_paths_older_than(rsync_tempfile_paths, mtime) class ObjectAuditor(Daemon): diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 881371b07c..7ea2f25109 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -741,7 +741,10 @@ class BaseDiskFileManager(object): # dicts for the files having that extension. The file_info dicts are of # the form returned by parse_on_disk_filename, with the filename added. # Each list is sorted in reverse timestamp order. - # + + # the results dict is used to collect results of file filtering + results = {} + # The exts dict will be modified during subsequent processing as files # are removed to be discarded or ignored. exts = defaultdict(list) @@ -752,16 +755,15 @@ class BaseDiskFileManager(object): file_info['filename'] = afile exts[file_info['ext']].append(file_info) except DiskFileError as e: - self.logger.warning('Unexpected file %s: %s' % - (os.path.join(datadir or '', afile), e)) + file_path = os.path.join(datadir or '', afile) + self.logger.warning('Unexpected file %s: %s', + file_path, e) + results.setdefault('unexpected', []).append(file_path) for ext in exts: # For each extension sort files into reverse chronological order. exts[ext] = sorted( exts[ext], key=lambda info: info['timestamp'], reverse=True) - # the results dict is used to collect results of file filtering - results = {} - if exts.get('.ts'): # non-tombstones older than or equal to latest tombstone are # obsolete diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index aa38407d35..ff89b3213a 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -41,6 +41,7 @@ from swift.obj import ssync_sender from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir from swift.common.storage_policy import POLICIES, REPL_POLICY +DEFAULT_RSYNC_TIMEOUT = 900 hubs.use_hub(get_hub()) @@ -76,7 +77,8 @@ class ObjectReplicator(Daemon): self.partition_times = [] self.interval = int(conf.get('interval') or conf.get('run_pause') or 30) - self.rsync_timeout = int(conf.get('rsync_timeout', 900)) + self.rsync_timeout = int(conf.get('rsync_timeout', + DEFAULT_RSYNC_TIMEOUT)) self.rsync_io_timeout = conf.get('rsync_io_timeout', '30') self.rsync_bwlimit = conf.get('rsync_bwlimit', '0') self.rsync_compress = config_true_value( diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 14e3aa8696..409990ad4a 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -18,6 +18,7 @@ from __future__ import print_function from test.unit import temptree import ctypes +import contextlib import errno import eventlet import eventlet.event @@ -3422,6 +3423,86 @@ class ResellerConfReader(unittest.TestCase): self.assertEqual('pre2_group', options['PRE2_'].get('require_group')) +class TestUnlinkOlder(unittest.TestCase): + + def setUp(self): + self.tempdir = mkdtemp() + self.mtime = {} + + def tearDown(self): + rmtree(self.tempdir, ignore_errors=True) + + def touch(self, fpath, mtime=None): + self.mtime[fpath] = mtime or time.time() + open(fpath, 'w') + + @contextlib.contextmanager + def high_resolution_getmtime(self): + orig_getmtime = os.path.getmtime + + def mock_getmtime(fpath): + mtime = self.mtime.get(fpath) + if mtime is None: + mtime = orig_getmtime(fpath) + return mtime + + with mock.patch('os.path.getmtime', mock_getmtime): + yield + + def test_unlink_older_than_path_not_exists(self): + path = os.path.join(self.tempdir, 'does-not-exist') + # just make sure it doesn't blow up + utils.unlink_older_than(path, time.time()) + + def test_unlink_older_than_file(self): + path = os.path.join(self.tempdir, 'some-file') + self.touch(path) + with self.assertRaises(OSError) as ctx: + utils.unlink_older_than(path, time.time()) + self.assertEqual(ctx.exception.errno, errno.ENOTDIR) + + def test_unlink_older_than_now(self): + self.touch(os.path.join(self.tempdir, 'test')) + with self.high_resolution_getmtime(): + utils.unlink_older_than(self.tempdir, time.time()) + self.assertEqual([], os.listdir(self.tempdir)) + + def test_unlink_not_old_enough(self): + start = time.time() + self.touch(os.path.join(self.tempdir, 'test')) + with self.high_resolution_getmtime(): + utils.unlink_older_than(self.tempdir, start) + self.assertEqual(['test'], os.listdir(self.tempdir)) + + def test_unlink_mixed(self): + self.touch(os.path.join(self.tempdir, 'first')) + cutoff = time.time() + self.touch(os.path.join(self.tempdir, 'second')) + with self.high_resolution_getmtime(): + utils.unlink_older_than(self.tempdir, cutoff) + self.assertEqual(['second'], os.listdir(self.tempdir)) + + def test_unlink_paths(self): + paths = [] + for item in ('first', 'second', 'third'): + path = os.path.join(self.tempdir, item) + self.touch(path) + paths.append(path) + # don't unlink everyone + with self.high_resolution_getmtime(): + utils.unlink_paths_older_than(paths[:2], time.time()) + self.assertEqual(['third'], os.listdir(self.tempdir)) + + def test_unlink_empty_paths(self): + # just make sure it doesn't blow up + utils.unlink_paths_older_than([], time.time()) + + def test_unlink_not_exists_paths(self): + path = os.path.join(self.tempdir, 'does-not-exist') + # just make sure it doesn't blow up + utils.unlink_paths_older_than([path], time.time()) + + class TestSwiftInfo(unittest.TestCase): def tearDown(self): diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index fed9c4b211..db37e49ab1 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -22,15 +22,18 @@ import string from shutil import rmtree from hashlib import md5 from tempfile import mkdtemp -from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \ - DEFAULT_TEST_EC_TYPE -from swift.obj import auditor -from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \ - get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \ - clear_auditor_status, get_auditor_status -from swift.common.utils import mkdirs, normalize_timestamp, Timestamp -from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \ - POLICIES +import textwrap +from test.unit import (FakeLogger, patch_policies, make_timestamp_iter, + DEFAULT_TEST_EC_TYPE) +from swift.obj import auditor, replicator +from swift.obj.diskfile import ( + DiskFile, write_metadata, invalidate_hash, get_data_dir, + DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status, + get_auditor_status) +from swift.common.utils import ( + mkdirs, normalize_timestamp, Timestamp, readconf) +from swift.common.storage_policy import ( + ECStoragePolicy, StoragePolicy, POLICIES) _mocked_policies = [ @@ -275,6 +278,161 @@ class TestAuditor(unittest.TestCase): policy=POLICIES.legacy)) self.assertEqual(auditor_worker.errors, 1) + def test_audit_location_gets_quarantined(self): + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + + location = AuditLocation(self.disk_file._datadir, 'sda', '0', + policy=self.disk_file.policy) + + # instead of a datadir, we'll make a file! + mkdirs(os.path.dirname(self.disk_file._datadir)) + open(self.disk_file._datadir, 'w') + + # after we turn the crank ... + auditor_worker.object_audit(location) + + # ... it should get quarantined + self.assertFalse(os.path.exists(self.disk_file._datadir)) + self.assertEqual(1, auditor_worker.quarantines) + + def test_rsync_tempfile_timeout_auto_option(self): + # if we don't have access to the replicator config section we'll use + # our default + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400) + # if the rsync_tempfile_timeout option is set explicitly we use that + self.conf['rsync_tempfile_timeout'] = '1800' + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800) + # if we have a real config we can be a little smarter + config_path = os.path.join(self.testdir, 'objserver.conf') + stub_config = """ + [object-auditor] + rsync_tempfile_timeout = auto + """ + with open(config_path, 'w') as f: + f.write(textwrap.dedent(stub_config)) + # the Daemon loader will hand the object-auditor config to the + # auditor who will build the workers from it + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + # if there is no object-replicator section we still have to fall back + # to default because we can't parse the config for that section! + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400) + stub_config = """ + [object-replicator] + [object-auditor] + rsync_tempfile_timeout = auto + """ + with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + # if the object-replicator section will parse but does not override + # the default rsync_timeout we assume the default rsync_timeout value + # and add 15mins + self.assertEqual(auditor_worker.rsync_tempfile_timeout, + replicator.DEFAULT_RSYNC_TIMEOUT + 900) + stub_config = """ + [DEFAULT] + reclaim_age = 1209600 + [object-replicator] + rsync_timeout = 3600 + [object-auditor] + rsync_tempfile_timeout = auto + """ + with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + # if there is an object-replicator section with a rsync_timeout + # configured we'll use that value (3600) + 900 + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900) + + def test_inprogress_rsync_tempfiles_get_cleaned_up(self): + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + + location = AuditLocation(self.disk_file._datadir, 'sda', '0', + policy=self.disk_file.policy) + + data = 'VERIFY' + etag = md5() + timestamp = str(normalize_timestamp(time.time())) + with self.disk_file.create() as writer: + writer.write(data) + etag.update(data) + metadata = { + 'ETag': etag.hexdigest(), + 'X-Timestamp': timestamp, + 'Content-Length': str(os.fstat(writer._fd).st_size), + } + writer.put(metadata) + writer.commit(Timestamp(timestamp)) + + datafilename = None + datadir_files = os.listdir(self.disk_file._datadir) + for filename in datadir_files: + if filename.endswith('.data'): + datafilename = filename + break + else: + self.fail('Did not find .data file in %r: %r' % + (self.disk_file._datadir, datadir_files)) + rsynctempfile_path = os.path.join(self.disk_file._datadir, + '.%s.9ILVBL' % datafilename) + open(rsynctempfile_path, 'w') + # sanity check we have an extra file + rsync_files = os.listdir(self.disk_file._datadir) + self.assertEqual(len(datadir_files) + 1, len(rsync_files)) + + # and after we turn the crank ... + auditor_worker.object_audit(location) + + # ... we've still got the rsync file + self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir)) + + # and we'll keep it - depending on the rsync_tempfile_timeout + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400) + self.conf['rsync_tempfile_timeout'] = '3600' + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600) + now = time.time() + 1900 + with mock.patch('swift.obj.auditor.time.time', + return_value=now): + auditor_worker.object_audit(location) + self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir)) + + # but *tomorrow* when we run + tomorrow = time.time() + 86400 + with mock.patch('swift.obj.auditor.time.time', + return_value=tomorrow): + auditor_worker.object_audit(location) + + # ... we'll totally clean that stuff up! + self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir)) + + # but if we have some random crazy file in there + random_crazy_file_path = os.path.join(self.disk_file._datadir, + '.random.crazy.file') + open(random_crazy_file_path, 'w') + + tomorrow = time.time() + 86400 + with mock.patch('swift.obj.auditor.time.time', + return_value=tomorrow): + auditor_worker.object_audit(location) + + # that's someone elses problem + self.assertIn(os.path.basename(random_crazy_file_path), + os.listdir(self.disk_file._datadir)) + def test_generic_exception_handling(self): auditor_worker = auditor.AuditorWorker(self.conf, self.logger, self.rcache, self.devices)