From 1d03803a85ca50272071725518c7110e1b2dacb1 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Tue, 15 Mar 2016 17:09:21 -0700 Subject: [PATCH] Auditor will clean up stale rsync tempfiles DiskFile already fills in the _ondisk_info attribute when it tries to open a diskfile - even if the DiskFile's fileset is not valid or deleted. During this process the rsync tempfiles would be discovered and logged, but no-one would attempt to clean them up - even if they were really old. Instead of logging and ignoring unexpected files when validate a DiskFile fileset we'll add unexpected files to the unexpected key in the _ondisk_info attribute. With a little bit of re-organization in the auditor's object_audit method to get things into a single return path we can add an unconditional check for unexpected files and remove those that are "old enough". Since the replicator will kill any rsync processes that are running longer than the configured rsync_timeout we know that any rsync tempfiles older than this can be deleted. Split unlink_older_than in common.utils into two functions to allow an explicit list of previously discovered paths to be passed in to avoid an extra listdir. Since the getmtime handling already ignores OSError there's less concern of race condition where a previous discovered unexpected file is reaped by rsync while we're attempting to clean it up. Update some doc on the new config option. Closes-Bug: #1554005 Change-Id: Id67681cb77f605e3491b8afcb9c69d769e154283 --- doc/manpages/object-server.conf.5 | 3 + doc/source/deployment_guide.rst | 5 + etc/object-server.conf-sample | 7 ++ swift/common/utils.py | 17 ++- swift/obj/auditor.py | 74 ++++++++++--- swift/obj/diskfile.py | 14 ++- swift/obj/replicator.py | 4 +- test/unit/common/test_utils.py | 81 ++++++++++++++ test/unit/obj/test_auditor.py | 176 ++++++++++++++++++++++++++++-- 9 files changed, 344 insertions(+), 37 deletions(-) diff --git a/doc/manpages/object-server.conf.5 b/doc/manpages/object-server.conf.5 index ac1a8889e3..2e58de32fb 100644 --- a/doc/manpages/object-server.conf.5 +++ b/doc/manpages/object-server.conf.5 @@ -499,6 +499,9 @@ and ensure that swift has read/write. The default is /var/cache/swift. Takes a comma separated list of ints. If set, the object auditor will increment a counter for every object whose size is <= to the given break points and report the result after a full scan. +.IP \fBrsync_tempfile_timeout\fR +Time elapsed in seconds before rsync tempfiles will be unlinked. Config value of "auto" +will try to use object-replicator's rsync_timeout + 900 or fall-back to 86400 (1 day). .RE diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 9743aef8b2..ec86c90c93 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -738,6 +738,11 @@ concurrency 1 The number of parallel processes zero_byte_files_per_second 50 object_size_stats recon_cache_path /var/cache/swift Path to recon cache +rsync_tempfile_timeout auto Time elapsed in seconds before rsync + tempfiles will be unlinked. Config value + of "auto" try to use object-replicator's + rsync_timeout + 900 or fallback to 86400 + (1 day). =========================== =================== ========================================== ------------------------------ diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 80731584ee..e01193bff2 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -306,6 +306,13 @@ use = egg:swift#recon # points and report the result after a full scan. # object_size_stats = +# The auditor will cleanup old rsync tempfiles after they are "old +# enough" to delete. You can configure the time elapsed in seconds +# before rsync tempfiles will be unlinked, or the default value of +# "auto" try to use object-replicator's rsync_timeout + 900 and fallback +# to 86400 (1 day). +# rsync_tempfile_timeout = auto + # Note: Put it at the beginning of the pipleline to profile all middleware. But # it is safer to put this after healthcheck. [filter:xprofile] diff --git a/swift/common/utils.py b/swift/common/utils.py index e975bf1ad2..210dd9f4e0 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2122,10 +2122,21 @@ def unlink_older_than(path, mtime): Remove any file in a given path that that was last modified before mtime. :param path: path to remove file from - :mtime: timestamp of oldest file to keep + :param mtime: timestamp of oldest file to keep """ - for fname in listdir(path): - fpath = os.path.join(path, fname) + filepaths = map(functools.partial(os.path.join, path), listdir(path)) + return unlink_paths_older_than(filepaths, mtime) + + +def unlink_paths_older_than(filepaths, mtime): + """ + Remove any files from the given list that that were + last modified before mtime. + + :param filepaths: a list of strings, the full paths of files to check + :param mtime: timestamp of oldest file to keep + """ + for fpath in filepaths: try: if os.path.getmtime(fpath) < mtime: os.unlink(fpath) diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index db6e4f4988..3b5f2de785 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -18,18 +18,23 @@ import os import sys import time import signal +import re from random import shuffle from swift import gettext_ as _ from contextlib import closing from eventlet import Timeout -from swift.obj import diskfile -from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \ - list_from_csv, listdir +from swift.obj import diskfile, replicator +from swift.common.utils import ( + get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir, + unlink_paths_older_than, readconf, config_auto_int_value) from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES +# This matches rsync tempfiles, like "..data.Xy095a" +RE_RSYNC_TEMPFILE = re.compile(r'^\..*\.([a-zA-Z0-9_]){6}$') + class AuditorWorker(object): """Walk through file system to audit objects""" @@ -42,6 +47,27 @@ class AuditorWorker(object): self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_bytes_per_second = float(conf.get('bytes_per_second', 10000000)) + try: + # ideally unless ops overrides the rsync_tempfile_timeout in the + # auditor section we can base our behavior on whatever they + # configure for their replicator + replicator_config = readconf(self.conf['__file__'], + 'object-replicator') + except (KeyError, SystemExit): + # if we can't parse the real config (generally a KeyError on + # __file__, or SystemExit on no object-replicator section) we use + # a very conservative default + default = 86400 + else: + replicator_rsync_timeout = int(replicator_config.get( + 'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT)) + # Here we can do some light math for ops and use the *replicator's* + # rsync_timeout (plus 15 mins to avoid deleting local tempfiles + # before the remote replicator kills it's rsync) + default = replicator_rsync_timeout + 900 + self.rsync_tempfile_timeout = config_auto_int_value( + self.conf.get('rsync_tempfile_timeout'), default) + self.auditor_type = 'ALL' self.zero_byte_only_at_fps = zero_byte_only_at_fps if self.zero_byte_only_at_fps: @@ -200,34 +226,46 @@ class AuditorWorker(object): raise DiskFileQuarantined(msg) diskfile_mgr = self.diskfile_router[location.policy] + # this method doesn't normally raise errors, even if the audit + # location does not exist; if this raises an unexpected error it + # will get logged in failsafe + df = diskfile_mgr.get_diskfile_from_audit_location(location) + reader = None try: - df = diskfile_mgr.get_diskfile_from_audit_location(location) with df.open(): metadata = df.get_metadata() obj_size = int(metadata['Content-Length']) if self.stats_sizes: self.record_stats(obj_size) - if self.zero_byte_only_at_fps and obj_size: - self.passes += 1 - return - reader = df.reader(_quarantine_hook=raise_dfq) - with closing(reader): - for chunk in reader: - chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) - self.bytes_processed += chunk_len - self.total_bytes_processed += chunk_len + if obj_size and not self.zero_byte_only_at_fps: + reader = df.reader(_quarantine_hook=raise_dfq) + if reader: + with closing(reader): + for chunk in reader: + chunk_len = len(chunk) + self.bytes_running_time = ratelimit_sleep( + self.bytes_running_time, + self.max_bytes_per_second, + incr_by=chunk_len) + self.bytes_processed += chunk_len + self.total_bytes_processed += chunk_len except DiskFileNotExist: - return + pass except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and was' ' quarantined: %(err)s'), {'obj': location, 'err': err}) self.passes += 1 + # _ondisk_info attr is initialized to None and filled in by open + ondisk_info_dict = df._ondisk_info or {} + if 'unexpected' in ondisk_info_dict: + is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match( + os.path.basename(fpath)) + rsync_tempfile_paths = filter(is_rsync_tempfile, + ondisk_info_dict['unexpected']) + mtime = time.time() - self.rsync_tempfile_timeout + unlink_paths_older_than(rsync_tempfile_paths, mtime) class ObjectAuditor(Daemon): diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 881371b07c..7ea2f25109 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -741,7 +741,10 @@ class BaseDiskFileManager(object): # dicts for the files having that extension. The file_info dicts are of # the form returned by parse_on_disk_filename, with the filename added. # Each list is sorted in reverse timestamp order. - # + + # the results dict is used to collect results of file filtering + results = {} + # The exts dict will be modified during subsequent processing as files # are removed to be discarded or ignored. exts = defaultdict(list) @@ -752,16 +755,15 @@ class BaseDiskFileManager(object): file_info['filename'] = afile exts[file_info['ext']].append(file_info) except DiskFileError as e: - self.logger.warning('Unexpected file %s: %s' % - (os.path.join(datadir or '', afile), e)) + file_path = os.path.join(datadir or '', afile) + self.logger.warning('Unexpected file %s: %s', + file_path, e) + results.setdefault('unexpected', []).append(file_path) for ext in exts: # For each extension sort files into reverse chronological order. exts[ext] = sorted( exts[ext], key=lambda info: info['timestamp'], reverse=True) - # the results dict is used to collect results of file filtering - results = {} - if exts.get('.ts'): # non-tombstones older than or equal to latest tombstone are # obsolete diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index aa38407d35..ff89b3213a 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -41,6 +41,7 @@ from swift.obj import ssync_sender from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir from swift.common.storage_policy import POLICIES, REPL_POLICY +DEFAULT_RSYNC_TIMEOUT = 900 hubs.use_hub(get_hub()) @@ -76,7 +77,8 @@ class ObjectReplicator(Daemon): self.partition_times = [] self.interval = int(conf.get('interval') or conf.get('run_pause') or 30) - self.rsync_timeout = int(conf.get('rsync_timeout', 900)) + self.rsync_timeout = int(conf.get('rsync_timeout', + DEFAULT_RSYNC_TIMEOUT)) self.rsync_io_timeout = conf.get('rsync_io_timeout', '30') self.rsync_bwlimit = conf.get('rsync_bwlimit', '0') self.rsync_compress = config_true_value( diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 14e3aa8696..409990ad4a 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -18,6 +18,7 @@ from __future__ import print_function from test.unit import temptree import ctypes +import contextlib import errno import eventlet import eventlet.event @@ -3422,6 +3423,86 @@ class ResellerConfReader(unittest.TestCase): self.assertEqual('pre2_group', options['PRE2_'].get('require_group')) +class TestUnlinkOlder(unittest.TestCase): + + def setUp(self): + self.tempdir = mkdtemp() + self.mtime = {} + + def tearDown(self): + rmtree(self.tempdir, ignore_errors=True) + + def touch(self, fpath, mtime=None): + self.mtime[fpath] = mtime or time.time() + open(fpath, 'w') + + @contextlib.contextmanager + def high_resolution_getmtime(self): + orig_getmtime = os.path.getmtime + + def mock_getmtime(fpath): + mtime = self.mtime.get(fpath) + if mtime is None: + mtime = orig_getmtime(fpath) + return mtime + + with mock.patch('os.path.getmtime', mock_getmtime): + yield + + def test_unlink_older_than_path_not_exists(self): + path = os.path.join(self.tempdir, 'does-not-exist') + # just make sure it doesn't blow up + utils.unlink_older_than(path, time.time()) + + def test_unlink_older_than_file(self): + path = os.path.join(self.tempdir, 'some-file') + self.touch(path) + with self.assertRaises(OSError) as ctx: + utils.unlink_older_than(path, time.time()) + self.assertEqual(ctx.exception.errno, errno.ENOTDIR) + + def test_unlink_older_than_now(self): + self.touch(os.path.join(self.tempdir, 'test')) + with self.high_resolution_getmtime(): + utils.unlink_older_than(self.tempdir, time.time()) + self.assertEqual([], os.listdir(self.tempdir)) + + def test_unlink_not_old_enough(self): + start = time.time() + self.touch(os.path.join(self.tempdir, 'test')) + with self.high_resolution_getmtime(): + utils.unlink_older_than(self.tempdir, start) + self.assertEqual(['test'], os.listdir(self.tempdir)) + + def test_unlink_mixed(self): + self.touch(os.path.join(self.tempdir, 'first')) + cutoff = time.time() + self.touch(os.path.join(self.tempdir, 'second')) + with self.high_resolution_getmtime(): + utils.unlink_older_than(self.tempdir, cutoff) + self.assertEqual(['second'], os.listdir(self.tempdir)) + + def test_unlink_paths(self): + paths = [] + for item in ('first', 'second', 'third'): + path = os.path.join(self.tempdir, item) + self.touch(path) + paths.append(path) + # don't unlink everyone + with self.high_resolution_getmtime(): + utils.unlink_paths_older_than(paths[:2], time.time()) + self.assertEqual(['third'], os.listdir(self.tempdir)) + + def test_unlink_empty_paths(self): + # just make sure it doesn't blow up + utils.unlink_paths_older_than([], time.time()) + + def test_unlink_not_exists_paths(self): + path = os.path.join(self.tempdir, 'does-not-exist') + # just make sure it doesn't blow up + utils.unlink_paths_older_than([path], time.time()) + + class TestSwiftInfo(unittest.TestCase): def tearDown(self): diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index fed9c4b211..db37e49ab1 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -22,15 +22,18 @@ import string from shutil import rmtree from hashlib import md5 from tempfile import mkdtemp -from test.unit import FakeLogger, patch_policies, make_timestamp_iter, \ - DEFAULT_TEST_EC_TYPE -from swift.obj import auditor -from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \ - get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, \ - clear_auditor_status, get_auditor_status -from swift.common.utils import mkdirs, normalize_timestamp, Timestamp -from swift.common.storage_policy import ECStoragePolicy, StoragePolicy, \ - POLICIES +import textwrap +from test.unit import (FakeLogger, patch_policies, make_timestamp_iter, + DEFAULT_TEST_EC_TYPE) +from swift.obj import auditor, replicator +from swift.obj.diskfile import ( + DiskFile, write_metadata, invalidate_hash, get_data_dir, + DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status, + get_auditor_status) +from swift.common.utils import ( + mkdirs, normalize_timestamp, Timestamp, readconf) +from swift.common.storage_policy import ( + ECStoragePolicy, StoragePolicy, POLICIES) _mocked_policies = [ @@ -275,6 +278,161 @@ class TestAuditor(unittest.TestCase): policy=POLICIES.legacy)) self.assertEqual(auditor_worker.errors, 1) + def test_audit_location_gets_quarantined(self): + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + + location = AuditLocation(self.disk_file._datadir, 'sda', '0', + policy=self.disk_file.policy) + + # instead of a datadir, we'll make a file! + mkdirs(os.path.dirname(self.disk_file._datadir)) + open(self.disk_file._datadir, 'w') + + # after we turn the crank ... + auditor_worker.object_audit(location) + + # ... it should get quarantined + self.assertFalse(os.path.exists(self.disk_file._datadir)) + self.assertEqual(1, auditor_worker.quarantines) + + def test_rsync_tempfile_timeout_auto_option(self): + # if we don't have access to the replicator config section we'll use + # our default + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400) + # if the rsync_tempfile_timeout option is set explicitly we use that + self.conf['rsync_tempfile_timeout'] = '1800' + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800) + # if we have a real config we can be a little smarter + config_path = os.path.join(self.testdir, 'objserver.conf') + stub_config = """ + [object-auditor] + rsync_tempfile_timeout = auto + """ + with open(config_path, 'w') as f: + f.write(textwrap.dedent(stub_config)) + # the Daemon loader will hand the object-auditor config to the + # auditor who will build the workers from it + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + # if there is no object-replicator section we still have to fall back + # to default because we can't parse the config for that section! + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400) + stub_config = """ + [object-replicator] + [object-auditor] + rsync_tempfile_timeout = auto + """ + with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + # if the object-replicator section will parse but does not override + # the default rsync_timeout we assume the default rsync_timeout value + # and add 15mins + self.assertEqual(auditor_worker.rsync_tempfile_timeout, + replicator.DEFAULT_RSYNC_TIMEOUT + 900) + stub_config = """ + [DEFAULT] + reclaim_age = 1209600 + [object-replicator] + rsync_timeout = 3600 + [object-auditor] + rsync_tempfile_timeout = auto + """ + with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + # if there is an object-replicator section with a rsync_timeout + # configured we'll use that value (3600) + 900 + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900) + + def test_inprogress_rsync_tempfiles_get_cleaned_up(self): + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + + location = AuditLocation(self.disk_file._datadir, 'sda', '0', + policy=self.disk_file.policy) + + data = 'VERIFY' + etag = md5() + timestamp = str(normalize_timestamp(time.time())) + with self.disk_file.create() as writer: + writer.write(data) + etag.update(data) + metadata = { + 'ETag': etag.hexdigest(), + 'X-Timestamp': timestamp, + 'Content-Length': str(os.fstat(writer._fd).st_size), + } + writer.put(metadata) + writer.commit(Timestamp(timestamp)) + + datafilename = None + datadir_files = os.listdir(self.disk_file._datadir) + for filename in datadir_files: + if filename.endswith('.data'): + datafilename = filename + break + else: + self.fail('Did not find .data file in %r: %r' % + (self.disk_file._datadir, datadir_files)) + rsynctempfile_path = os.path.join(self.disk_file._datadir, + '.%s.9ILVBL' % datafilename) + open(rsynctempfile_path, 'w') + # sanity check we have an extra file + rsync_files = os.listdir(self.disk_file._datadir) + self.assertEqual(len(datadir_files) + 1, len(rsync_files)) + + # and after we turn the crank ... + auditor_worker.object_audit(location) + + # ... we've still got the rsync file + self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir)) + + # and we'll keep it - depending on the rsync_tempfile_timeout + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400) + self.conf['rsync_tempfile_timeout'] = '3600' + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600) + now = time.time() + 1900 + with mock.patch('swift.obj.auditor.time.time', + return_value=now): + auditor_worker.object_audit(location) + self.assertEqual(rsync_files, os.listdir(self.disk_file._datadir)) + + # but *tomorrow* when we run + tomorrow = time.time() + 86400 + with mock.patch('swift.obj.auditor.time.time', + return_value=tomorrow): + auditor_worker.object_audit(location) + + # ... we'll totally clean that stuff up! + self.assertEqual(datadir_files, os.listdir(self.disk_file._datadir)) + + # but if we have some random crazy file in there + random_crazy_file_path = os.path.join(self.disk_file._datadir, + '.random.crazy.file') + open(random_crazy_file_path, 'w') + + tomorrow = time.time() + 86400 + with mock.patch('swift.obj.auditor.time.time', + return_value=tomorrow): + auditor_worker.object_audit(location) + + # that's someone elses problem + self.assertIn(os.path.basename(random_crazy_file_path), + os.listdir(self.disk_file._datadir)) + def test_generic_exception_handling(self): auditor_worker = auditor.AuditorWorker(self.conf, self.logger, self.rcache, self.devices)