diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index b7d677a5b6..3b30b014ec 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -30,10 +30,6 @@ class SwiftException(Exception): pass -class AuditException(SwiftException): - pass - - class DiskFileError(SwiftException): pass diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 2fce5703f6..265b3b188a 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -17,21 +17,19 @@ import os import time from swift import gettext_ as _ from contextlib import closing - from eventlet import Timeout from swift.obj import diskfile -from swift.common.utils import get_logger, audit_location_generator, \ - ratelimit_sleep, dump_recon_cache, list_from_csv, json -from swift.common.exceptions import AuditException, DiskFileQuarantined, \ - DiskFileNotExist +from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \ + list_from_csv, json +from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist from swift.common.daemon import Daemon SLEEP_BETWEEN_AUDITS = 30 class AuditorWorker(object): - """Walk through file system to audit object""" + """Walk through file system to audit objects""" def __init__(self, conf, logger, zero_byte_only_at_fps=0): self.conf = conf @@ -72,13 +70,10 @@ class AuditorWorker(object): total_quarantines = 0 total_errors = 0 time_auditing = 0 - all_locs = audit_location_generator( - self.devices, diskfile.DATADIR, '.data', - mount_check=self.diskfile_mgr.mount_check, - logger=self.logger) - for path, device, partition in all_locs: + all_locs = self.diskfile_mgr.object_audit_location_generator() + for location in all_locs: loop_time = time.time() - self.failsafe_object_audit(path, device, partition) + self.failsafe_object_audit(location) self.logger.timing_since('timing', loop_time) self.files_running_time = ratelimit_sleep( self.files_running_time, self.max_files_per_second) @@ -151,75 +146,54 @@ class AuditorWorker(object): else: self.stats_buckets["OVER"] += 1 - def failsafe_object_audit(self, path, device, partition): + def failsafe_object_audit(self, location): """ Entrypoint to object_audit, with a failsafe generic exception handler. """ try: - self.object_audit(path, device, partition) + self.object_audit(location) except (Exception, Timeout): self.logger.increment('errors') self.errors += 1 - self.logger.exception(_('ERROR Trying to audit %s'), path) + self.logger.exception(_('ERROR Trying to audit %s'), location) - def object_audit(self, path, device, partition): + def object_audit(self, location): """ - Audits the given object path. + Audits the given object location. - :param path: a path to an object - :param device: the device the path is on - :param partition: the partition the path is on + :param location: an audit location + (from diskfile.object_audit_location_generator) """ + def raise_dfq(msg): + raise DiskFileQuarantined(msg) + try: - try: - name = diskfile.read_metadata(path)['name'] - except (Exception, Timeout) as exc: - raise AuditException('Error when reading metadata: %s' % exc) - _junk, account, container, obj = name.split('/', 3) - df = self.diskfile_mgr.get_diskfile( - device, partition, account, container, obj) - try: - with df.open(): - metadata = df.get_metadata() - obj_size = int(metadata['Content-Length']) - if self.stats_sizes: - self.record_stats(obj_size) - if self.zero_byte_only_at_fps and obj_size: - self.passes += 1 - return - reader = df.reader() - with closing(reader): - for chunk in reader: - chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) - self.bytes_processed += chunk_len - self.total_bytes_processed += chunk_len - if reader.was_quarantined: - self.quarantines += 1 - self.logger.error(_('ERROR Object %(obj)s failed audit and' - ' was quarantined: %(err)s'), - {'obj': path, - 'err': reader.was_quarantined}) + df = self.diskfile_mgr.get_diskfile_from_audit_location(location) + with df.open(): + metadata = df.get_metadata() + obj_size = int(metadata['Content-Length']) + if self.stats_sizes: + self.record_stats(obj_size) + if self.zero_byte_only_at_fps and obj_size: + self.passes += 1 return - except DiskFileNotExist: - return + reader = df.reader(_quarantine_hook=raise_dfq) + with closing(reader): + for chunk in reader: + chunk_len = len(chunk) + self.bytes_running_time = ratelimit_sleep( + self.bytes_running_time, + self.max_bytes_per_second, + incr_by=chunk_len) + self.bytes_processed += chunk_len + self.total_bytes_processed += chunk_len + except DiskFileNotExist: + return except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and was' ' quarantined: %(err)s'), - {'obj': path, 'err': err}) - except AuditException as err: - self.logger.increment('quarantines') - self.quarantines += 1 - self.logger.error(_('ERROR Object %(obj)s failed audit and will' - ' be quarantined: %(err)s'), - {'obj': path, 'err': err}) - diskfile.quarantine_renamer( - os.path.join(self.devices, device), path) - return + {'obj': location, 'err': err}) self.passes += 1 diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index db75aef860..351e63ad34 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -40,6 +40,7 @@ import hashlib import logging import traceback from os.path import basename, dirname, exists, getmtime, join +from random import shuffle from tempfile import mkstemp from contextlib import contextmanager from collections import defaultdict @@ -52,7 +53,7 @@ from swift.common.constraints import check_mount from swift.common.utils import mkdirs, normalize_timestamp, \ storage_directory, hash_path, renamer, fallocate, fsync, \ fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \ - config_true_value + config_true_value, listdir from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir @@ -290,6 +291,67 @@ def get_hashes(partition_dir, recalculate=None, do_listdir=False, return hashed, hashes +class AuditLocation(object): + """ + Represents an object location to be audited. + + Other than being a bucket of data, the only useful thing this does is + stringify to a filesystem path so the auditor's logs look okay. + """ + + def __init__(self, path, device, partition): + self.path, self.device, self.partition = path, device, partition + + def __str__(self): + return str(self.path) + + +def object_audit_location_generator(devices, mount_check=True, logger=None): + """ + Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all + objects stored under that directory. The AuditLocation only knows the path + to the hash directory, not to the .data file therein (if any). This is to + avoid a double listdir(hash_dir); the DiskFile object will always do one, + so we don't. + + :param devices: parent directory of the devices to be audited + :param mount_check: flag to check if a mount check should be performed + on devices + :param logger: a logger object + """ + device_dirs = listdir(devices) + # randomize devices in case of process restart before sweep completed + shuffle(device_dirs) + for device in device_dirs: + if mount_check and not \ + os.path.ismount(os.path.join(devices, device)): + if logger: + logger.debug( + _('Skipping %s as it is not mounted'), device) + continue + datadir_path = os.path.join(devices, device, DATADIR) + partitions = listdir(datadir_path) + for partition in partitions: + part_path = os.path.join(datadir_path, partition) + try: + suffixes = listdir(part_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + for asuffix in suffixes: + suff_path = os.path.join(part_path, asuffix) + try: + hashes = listdir(suff_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + for hsh in hashes: + hsh_path = os.path.join(suff_path, hsh) + yield AuditLocation(hsh_path, device, partition) + + class DiskFileManager(object): """ Management class for devices, providing common place for shared parameters @@ -332,16 +394,19 @@ class DiskFileManager(object): """ return os.path.join(self.devices, device) - def get_dev_path(self, device): + def get_dev_path(self, device, mount_check=None): """ Return the path to a device, checking to see that it is a proper mount point based on a configuration parameter. :param device: name of target device + :param mount_check: whether or not to check mountedness of device. + Defaults to bool(self.mount_check). :returns: full path to the device, None if the path to the device is not a proper mount point. """ - if self.mount_check and not check_mount(self.devices, device): + should_check = self.mount_check if mount_check is None else mount_check + if should_check and not check_mount(self.devices, device): dev_path = None else: dev_path = os.path.join(self.devices, device) @@ -368,6 +433,16 @@ class DiskFileManager(object): return DiskFile(self, dev_path, self.threadpools[device], partition, account, container, obj, **kwargs) + def object_audit_location_generator(self): + return object_audit_location_generator(self.devices, self.mount_check, + self.logger) + + def get_diskfile_from_audit_location(self, audit_location): + dev_path = self.get_dev_path(audit_location.device, mount_check=False) + return DiskFile.from_hash_dir( + self, audit_location.path, dev_path, + audit_location.partition) + def get_hashes(self, device, partition, suffix): dev_path = self.get_dev_path(device) if not dev_path: @@ -516,12 +591,13 @@ class DiskFileReader(object): :param keep_cache_size: maximum object size that will be kept in cache :param device_path: on-disk device path, used when quarantining an obj :param logger: logger caller wants this object to use + :param quarantine_hook: 1-arg callable called w/reason when quarantined :param iter_hook: called when __iter__ returns a chunk :param keep_cache: should resulting reads be kept in the buffer cache """ def __init__(self, fp, data_file, obj_size, etag, threadpool, disk_chunk_size, keep_cache_size, device_path, logger, - iter_hook=None, keep_cache=False): + quarantine_hook, iter_hook=None, keep_cache=False): # Parameter tracking self._fp = fp self._data_file = data_file @@ -531,6 +607,7 @@ class DiskFileReader(object): self._disk_chunk_size = disk_chunk_size self._device_path = device_path self._logger = logger + self._quarantine_hook = quarantine_hook self._iter_hook = iter_hook if keep_cache: # Caller suggests we keep this in cache, only do it if the @@ -546,8 +623,6 @@ class DiskFileReader(object): self._read_to_eof = False self._suppress_file_closing = False self._quarantined_dir = None - # Currently referenced by the object Auditor only - self.was_quarantined = '' def __iter__(self): """Returns an iterator over the data file.""" @@ -630,7 +705,7 @@ class DiskFileReader(object): self._quarantined_dir = self._threadpool.run_in_thread( quarantine_renamer, self._device_path, self._data_file) self._logger.increment('quarantines') - self.was_quarantined = msg + self._quarantine_hook(msg) def _handle_close_quarantine(self): """Check if file needs to be quarantined""" @@ -655,6 +730,8 @@ class DiskFileReader(object): try: if self._started_at_0 and self._read_to_eof: self._handle_close_quarantine() + except DiskFileQuarantined: + raise except (Exception, Timeout) as e: self._logger.error(_( 'ERROR DiskFile %(data_file)s' @@ -689,23 +766,35 @@ class DiskFile(object): """ def __init__(self, mgr, device_path, threadpool, partition, - account, container, obj): + account=None, container=None, obj=None, _datadir=None): self._mgr = mgr self._device_path = device_path self._threadpool = threadpool or ThreadPool(nthreads=0) self._logger = mgr.logger self._disk_chunk_size = mgr.disk_chunk_size self._bytes_per_sync = mgr.bytes_per_sync - self._name = '/' + '/'.join((account, container, obj)) - name_hash = hash_path(account, container, obj) - self._datadir = join( - device_path, storage_directory(DATADIR, partition, name_hash)) + if account and container and obj: + self._name = '/' + '/'.join((account, container, obj)) + else: + # gets populated when we read the metadata + self._name = None self._tmpdir = join(device_path, 'tmp') self._metadata = None self._data_file = None self._fp = None self._quarantined_dir = None + if _datadir: + self._datadir = _datadir + else: + name_hash = hash_path(account, container, obj) + self._datadir = join( + device_path, storage_directory(DATADIR, partition, name_hash)) + + @classmethod + def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition): + return cls(mgr, device_path, None, partition, _datadir=hash_dir_path) + def open(self): """ Open the object. @@ -767,7 +856,7 @@ class DiskFile(object): def _quarantine(self, data_file, msg): """ - Quarantine a file; responsible for incrementing the associated loggers + Quarantine a file; responsible for incrementing the associated logger's count of quarantines. :param data_file: full path of data file to quarantine @@ -804,8 +893,18 @@ class DiskFile(object): try: files = sorted(os.listdir(self._datadir), reverse=True) except OSError as err: - if err.errno != errno.ENOENT: - raise DiskFileError() + if err.errno == errno.ENOTDIR: + # If there's a file here instead of a directory, quarantine + # it; something's gone wrong somewhere. + self._quarantine( + # hack: quarantine_renamer actually renames the directory + # enclosing the filename you give it, but here we just + # want this one file and not its parent. + os.path.join(self._datadir, "made-up-filename"), + "Expected directory, found file at %s" % self._datadir) + elif err.errno != errno.ENOENT: + raise DiskFileError( + "Error listing directory %s: %s" % (self._datadir, err)) # The data directory does not exist, so the object cannot exist. else: for afile in files: @@ -864,6 +963,14 @@ class DiskFile(object): exc.timestamp = metadata['X-Timestamp'] return exc + def _verify_name_matches_hash(self, data_file): + hash_from_fs = os.path.basename(self._datadir) + hash_from_name = hash_path(self._name.lstrip('/')) + if hash_from_fs != hash_from_name: + self._quarantine( + data_file, + "Hash of name in metadata does not match directory name") + def _verify_data_file(self, data_file, fp): """ Verify the metadata's name value matches what we think the object is @@ -962,6 +1069,12 @@ class DiskFile(object): self._metadata.update(sys_metadata) else: self._metadata = datafile_metadata + if self._name is None: + # If we don't know our name, we were just given a hash dir at + # instantiation, so we'd better validate that the name hashes back + # to us + self._name = self._metadata['name'] + self._verify_name_matches_hash(data_file) self._verify_data_file(data_file, fp) return fp @@ -990,7 +1103,8 @@ class DiskFile(object): with self.open(): return self.get_metadata() - def reader(self, iter_hook=None, keep_cache=False): + def reader(self, iter_hook=None, keep_cache=False, + _quarantine_hook=lambda m: None): """ Return a :class:`swift.common.swob.Response` class compatible "`app_iter`" object as defined by @@ -1002,12 +1116,17 @@ class DiskFile(object): :param iter_hook: called when __iter__ returns a chunk :param keep_cache: caller's preference for keeping data read in the OS buffer cache + :param _quarantine_hook: 1-arg callable called when obj quarantined; + the arg is the reason for quarantine. + Default is to ignore it. + Not needed by the REST layer. :returns: a :class:`swift.obj.diskfile.DiskFileReader` object """ dr = DiskFileReader( self._fp, self._data_file, int(self._metadata['Content-Length']), self._metadata['ETag'], self._threadpool, self._disk_chunk_size, self._mgr.keep_cache_size, self._device_path, self._logger, + quarantine_hook=_quarantine_hook, iter_hook=iter_hook, keep_cache=keep_cache) # At this point the reader object is now responsible for closing # the file pointer. diff --git a/swift/obj/server.py b/swift/obj/server.py index 0e902183a7..6756fef6be 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -489,8 +489,8 @@ class ObjectController(object): ('X-Auth-Token' not in request.headers and 'X-Storage-Token' not in request.headers)) response = Response( - app_iter=disk_file.reader(iter_hook=sleep, - keep_cache=keep_cache), + app_iter=disk_file.reader( + iter_hook=sleep, keep_cache=keep_cache), request=request, conditional_response=True) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index afe2eac4ac..f97cd97dd1 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -327,7 +327,7 @@ def _set_info_cache(app, env, account, container, resp): :param app: the application object :param account: the unquoted account name - :param container: the unquoted containr name or None + :param container: the unquoted container name or None :param resp: the response received or None if info cache should be cleared """ diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 115ffe7658..eb50effbec 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -24,7 +24,7 @@ from tempfile import mkdtemp from test.unit import FakeLogger from swift.obj import auditor from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \ - DATADIR, DiskFileManager + DATADIR, DiskFileManager, AuditLocation from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory @@ -77,14 +77,12 @@ class TestAuditor(unittest.TestCase): pre_quarantines = auditor_worker.quarantines auditor_worker.object_audit( - os.path.join(self.disk_file._datadir, timestamp + '.data'), - 'sda', '0') + AuditLocation(self.disk_file._datadir, 'sda', '0')) self.assertEquals(auditor_worker.quarantines, pre_quarantines) os.write(writer._fd, 'extra_data') auditor_worker.object_audit( - os.path.join(self.disk_file._datadir, timestamp + '.data'), - 'sda', '0') + AuditLocation(self.disk_file._datadir, 'sda', '0')) self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) def test_object_audit_diff_data(self): @@ -108,8 +106,7 @@ class TestAuditor(unittest.TestCase): self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o') auditor_worker.object_audit( - os.path.join(self.disk_file._datadir, timestamp + '.data'), - 'sda', '0') + AuditLocation(self.disk_file._datadir, 'sda', '0')) self.assertEquals(auditor_worker.quarantines, pre_quarantines) etag = md5() etag.update('1' + '0' * 1023) @@ -121,8 +118,7 @@ class TestAuditor(unittest.TestCase): writer.put(metadata) auditor_worker.object_audit( - os.path.join(self.disk_file._datadir, timestamp + '.data'), - 'sda', '0') + AuditLocation(self.disk_file._datadir, 'sda', '0')) self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) def test_object_audit_no_meta(self): @@ -136,8 +132,7 @@ class TestAuditor(unittest.TestCase): auditor_worker = auditor.AuditorWorker(self.conf, self.logger) pre_quarantines = auditor_worker.quarantines auditor_worker.object_audit( - os.path.join(self.disk_file._datadir, timestamp + '.data'), - 'sda', '0') + AuditLocation(self.disk_file._datadir, 'sda', '0')) self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1) def test_object_audit_will_not_swallow_errors_in_tests(self): @@ -150,10 +145,10 @@ class TestAuditor(unittest.TestCase): def blowup(*args): raise NameError('tpyo') - with mock.patch('swift.obj.diskfile.DiskFile', - blowup): + with mock.patch.object(DiskFileManager, + 'get_diskfile_from_audit_location', blowup): self.assertRaises(NameError, auditor_worker.object_audit, - path, 'sda', '0') + AuditLocation(os.path.dirname(path), 'sda', '0')) def test_failsafe_object_audit_will_swallow_errors_in_tests(self): timestamp = str(normalize_timestamp(time.time())) @@ -167,7 +162,8 @@ class TestAuditor(unittest.TestCase): raise NameError('tpyo') with mock.patch('swift.obj.diskfile.DiskFile', blowup): - auditor_worker.failsafe_object_audit(path, 'sda', '0') + auditor_worker.failsafe_object_audit( + AuditLocation(os.path.dirname(path), 'sda', '0')) self.assertEquals(auditor_worker.errors, 1) def test_generic_exception_handling(self): @@ -308,6 +304,7 @@ class TestAuditor(unittest.TestCase): if not os.path.exists(dir_path): mkdirs(dir_path) fp = open(ts_file_path, 'w') + write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'}) fp.close() etag = md5() @@ -362,8 +359,8 @@ class TestAuditor(unittest.TestCase): def test_with_tombstone(self): ts_file_path = self.setup_bad_zero_byte(with_ts=True) - self.auditor.run_once() self.assertTrue(ts_file_path.endswith('ts')) + self.auditor.run_once() self.assertTrue(os.path.exists(ts_file_path)) def test_sleeper(self): diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 72852a7cfe..804d989523 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -34,7 +34,7 @@ from contextlib import closing from gzip import GzipFile from eventlet import tpool -from test.unit import FakeLogger, mock as unit_mock +from test.unit import FakeLogger, mock as unit_mock, temptree from swift.obj import diskfile from swift.common import utils @@ -345,6 +345,129 @@ class TestDiskFileModuleMethods(unittest.TestCase): [file3]) +class TestObjectAuditLocationGenerator(unittest.TestCase): + def _make_file(self, path): + try: + os.makedirs(os.path.dirname(path)) + except OSError as err: + if err.errno != errno.EEXIST: + raise + + with open(path, 'w'): + pass + + def test_finding_of_hashdirs(self): + with temptree([]) as tmpdir: + # the good + os.makedirs(os.path.join(tmpdir, "sdp", "objects", "1519", "aca", + "5c1fdc1ffb12e5eaf84edc30d8b67aca")) + os.makedirs(os.path.join(tmpdir, "sdp", "objects", "1519", "aca", + "fdfd184d39080020bc8b487f8a7beaca")) + os.makedirs(os.path.join(tmpdir, "sdp", "objects", "1519", "df2", + "b0fe7af831cc7b1af5bf486b1c841df2")) + os.makedirs(os.path.join(tmpdir, "sdp", "objects", "9720", "ca5", + "4a943bc72c2e647c4675923d58cf4ca5")) + os.makedirs(os.path.join(tmpdir, "sdq", "objects", "3071", "8eb", + "fcd938702024c25fef6c32fef05298eb")) + + # the bad + self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519", + "fed")) + self._make_file(os.path.join(tmpdir, "sdq", "objects", "9876")) + + # the empty + os.makedirs(os.path.join(tmpdir, "sdr")) + os.makedirs(os.path.join(tmpdir, "sds", "objects")) + os.makedirs(os.path.join(tmpdir, "sdt", "objects", "9601")) + os.makedirs(os.path.join(tmpdir, "sdu", "objects", "6499", "f80")) + + # the irrelevant + os.makedirs(os.path.join(tmpdir, "sdv", "accounts", "77", "421", + "4b8c86149a6d532f4af018578fd9f421")) + os.makedirs(os.path.join(tmpdir, "sdw", "containers", "28", "51e", + "4f9eee668b66c6f0250bfa3c7ab9e51e")) + + locations = [(loc.path, loc.device, loc.partition) + for loc in diskfile.object_audit_location_generator( + devices=tmpdir, mount_check=False)] + locations.sort() + + self.assertEqual( + locations, + [(os.path.join(tmpdir, "sdp", "objects", "1519", "aca", + "5c1fdc1ffb12e5eaf84edc30d8b67aca"), + "sdp", "1519"), + (os.path.join(tmpdir, "sdp", "objects", "1519", "aca", + "fdfd184d39080020bc8b487f8a7beaca"), + "sdp", "1519"), + (os.path.join(tmpdir, "sdp", "objects", "1519", "df2", + "b0fe7af831cc7b1af5bf486b1c841df2"), + "sdp", "1519"), + (os.path.join(tmpdir, "sdp", "objects", "9720", "ca5", + "4a943bc72c2e647c4675923d58cf4ca5"), + "sdp", "9720"), + (os.path.join(tmpdir, "sdq", "objects", "3071", "8eb", + "fcd938702024c25fef6c32fef05298eb"), + "sdq", "3071")]) + + def test_skipping_unmounted_devices(self): + def mock_ismount(path): + return path.endswith('sdp') + + with mock.patch('os.path.ismount', mock_ismount): + with temptree([]) as tmpdir: + os.makedirs(os.path.join(tmpdir, "sdp", "objects", + "2607", "df3", + "ec2871fe724411f91787462f97d30df3")) + os.makedirs(os.path.join(tmpdir, "sdq", "objects", + "9785", "a10", + "4993d582f41be9771505a8d4cb237a10")) + + locations = [ + (loc.path, loc.device, loc.partition) + for loc in diskfile.object_audit_location_generator( + devices=tmpdir, mount_check=True)] + locations.sort() + + self.assertEqual( + locations, + [(os.path.join(tmpdir, "sdp", "objects", + "2607", "df3", + "ec2871fe724411f91787462f97d30df3"), + "sdp", "2607")]) + + def test_only_catch_expected_errors(self): + # Crazy exceptions should still escape object_audit_location_generator + # so that errors get logged and a human can see what's going wrong; + # only normal FS corruption should be skipped over silently. + + def list_locations(dirname): + return [(loc.path, loc.device, loc.partition) + for loc in diskfile.object_audit_location_generator( + devices=dirname, mount_check=False)] + + real_listdir = os.listdir + + def splode_if_endswith(suffix): + def sploder(path): + if path.endswith(suffix): + raise OSError(errno.ELIBBAD, "don't try to ad-lib") + else: + return real_listdir(path) + return sploder + + with temptree([]) as tmpdir: + os.makedirs(os.path.join(tmpdir, "sdf", "objects", + "2607", "b54", + "fe450ec990a88cc4b252b181bab04b54")) + with mock.patch('os.listdir', splode_if_endswith("sdf/objects")): + self.assertRaises(OSError, list_locations, tmpdir) + with mock.patch('os.listdir', splode_if_endswith("2607")): + self.assertRaises(OSError, list_locations, tmpdir) + with mock.patch('os.listdir', splode_if_endswith("b54")): + self.assertRaises(OSError, list_locations, tmpdir) + + class TestDiskFile(unittest.TestCase): """Test swift.obj.diskfile.DiskFile""" @@ -387,10 +510,14 @@ class TestDiskFile(unittest.TestCase): xattr.setxattr(f.fileno(), diskfile.METADATA_KEY, pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL)) - def _create_test_file(self, data, timestamp=None, metadata=None): - df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o') + def _create_test_file(self, data, timestamp=None, metadata=None, + account='a', container='c', object='o'): + if metadata is None: + metadata = {} + metadata.setdefault('name', '/%s/%s/%s' % (account, container, object)) + df = self.df_mgr.get_diskfile('sda', '0', account, container, object) self._create_ondisk_file(df, data, timestamp, metadata) - df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o') + df = self.df_mgr.get_diskfile('sda', '0', account, container, object) df.open() return df @@ -435,10 +562,11 @@ class TestDiskFile(unittest.TestCase): def test_disk_file_app_iter_corners(self): df = self._create_test_file('1234567890') - reader = df.reader() + quarantine_msgs = [] + reader = df.reader(_quarantine_hook=quarantine_msgs.append) self.assertEquals(''.join(reader.app_iter_range(0, None)), '1234567890') - self.assertFalse(reader.was_quarantined) + self.assertEquals(quarantine_msgs, []) df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o') with df.open(): reader = df.reader() @@ -446,19 +574,21 @@ class TestDiskFile(unittest.TestCase): def test_disk_file_app_iter_partial_closes(self): df = self._create_test_file('1234567890') - reader = df.reader() + quarantine_msgs = [] + reader = df.reader(_quarantine_hook=quarantine_msgs.append) it = reader.app_iter_range(0, 5) - self.assertFalse(reader.was_quarantined) + self.assertEquals(quarantine_msgs, []) self.assertEqual(''.join(it), '12345') self.assertTrue(reader._fp is None) def test_disk_file_app_iter_ranges(self): df = self._create_test_file('012345678911234567892123456789') - reader = df.reader() + quarantine_msgs = [] + reader = df.reader(_quarantine_hook=quarantine_msgs.append) it = reader.app_iter_ranges([(0, 10), (10, 20), (20, 30)], 'plain/text', '\r\n--someheader\r\n', 30) - self.assertFalse(reader.was_quarantined) + self.assertEquals(quarantine_msgs, []) value = ''.join(it) self.assert_('0123456789' in value) self.assert_('1123456789' in value) @@ -466,11 +596,12 @@ class TestDiskFile(unittest.TestCase): def test_disk_file_app_iter_ranges_edges(self): df = self._create_test_file('012345678911234567892123456789') - reader = df.reader() + quarantine_msgs = [] + reader = df.reader(_quarantine_hook=quarantine_msgs.append) it = reader.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever', '\r\n--someheader\r\n', 30) value = ''.join(it) - self.assertFalse(reader.was_quarantined) + self.assertEquals(quarantine_msgs, []) self.assert_('3456789' in value) self.assert_('01' in value) @@ -480,7 +611,8 @@ class TestDiskFile(unittest.TestCase): long_str = '01234567890' * 65536 target_strs = ['3456789', long_str[0:65590]] df = self._create_test_file(long_str) - reader = df.reader() + quarantine_msgs = [] + reader = df.reader(_quarantine_hook=quarantine_msgs.append) it = reader.app_iter_ranges([(3, 10), (0, 65590)], 'plain/text', '5e816ff8b8b8e9a5d355497e5d9e0301', 655360) @@ -493,7 +625,7 @@ class TestDiskFile(unittest.TestCase): '5e816ff8b8b8e9a5d355497e5d9e0301\r\n']) value = header + ''.join(it) - self.assertFalse(reader.was_quarantined) + self.assertEquals(quarantine_msgs, []) parts = map(lambda p: p.get_payload(decode=True), email.message_from_string(value).walk())[1:3] @@ -504,7 +636,8 @@ class TestDiskFile(unittest.TestCase): # When ranges passed into the method is either empty array or None, # this method will yield empty string df = self._create_test_file('012345678911234567892123456789') - reader = df.reader() + quarantine_msgs = [] + reader = df.reader(_quarantine_hook=quarantine_msgs.append) it = reader.app_iter_ranges([], 'application/whatever', '\r\n--someheader\r\n', 100) self.assertEqual(''.join(it), '') @@ -514,7 +647,7 @@ class TestDiskFile(unittest.TestCase): reader = df.reader() it = reader.app_iter_ranges(None, 'app/something', '\r\n--someheader\r\n', 150) - self.assertFalse(reader.was_quarantined) + self.assertEquals(quarantine_msgs, []) self.assertEqual(''.join(it), '') def test_disk_file_mkstemp_creates_dir(self): @@ -657,9 +790,10 @@ class TestDiskFile(unittest.TestCase): open_exc = invalid_type in ('Content-Length', 'Bad-Content-Length', 'Corrupt-Xattrs', 'Truncated-Xattrs') reader = None + quarantine_msgs = [] try: df = self._get_open_disk_file(**kwargs) - reader = df.reader() + reader = df.reader(_quarantine_hook=quarantine_msgs.append) except DiskFileQuarantined as err: if not open_exc: self.fail( @@ -675,7 +809,7 @@ class TestDiskFile(unittest.TestCase): self.fail("Unexpected DiskFileQuarantine raised: :%r" % err) else: if not open_exc: - self.assertTrue(reader.was_quarantined) + self.assertEqual(1, len(quarantine_msgs)) verify(invalid_type=invalid_type, obj_name='1') @@ -798,6 +932,27 @@ class TestDiskFile(unittest.TestCase): else: self.fail("Expected DiskFileQuarantined exception") + def test_quarantine_hashdir_not_a_directory(self): + df = self._create_test_file('1234567890', account="abc", + container='123', object='xyz') + hashdir = df._datadir + rmtree(hashdir) + with open(hashdir, 'w'): + pass + + df = self.df_mgr.get_diskfile('sda', '0', 'abc', '123', 'xyz') + try: + df.open() + except DiskFileQuarantined: + pass + else: + self.fail("Expected DiskFileQuarantined, didn't get it") + + # make sure the right thing got quarantined; the suffix dir should not + # have moved, as that could have many objects in it + self.assertFalse(os.path.exists(hashdir)) + self.assertTrue(os.path.exists(os.path.dirname(hashdir))) + def test_write_metadata(self): df = self._create_test_file('1234567890') timestamp = normalize_timestamp(time()) @@ -847,6 +1002,29 @@ class TestDiskFile(unittest.TestCase): self.assertRaises(DiskFileNotExist, df.open) self.assertFalse(os.path.exists(ts_fullpath)) + def test_from_audit_location(self): + hashdir = self._create_test_file( + 'blah blah', + account='three', container='blind', object='mice')._datadir + df = self.df_mgr.get_diskfile_from_audit_location( + diskfile.AuditLocation(hashdir, 'sda1', '0')) + df.open() + self.assertEqual(df._name, '/three/blind/mice') + + def test_from_audit_location_with_mismatched_hash(self): + hashdir = self._create_test_file( + 'blah blah', + account='this', container='is', object='right')._datadir + + datafile = os.path.join(hashdir, os.listdir(hashdir)[0]) + meta = diskfile.read_metadata(datafile) + meta['name'] = '/this/is/wrong' + diskfile.write_metadata(datafile, meta) + + df = self.df_mgr.get_diskfile_from_audit_location( + diskfile.AuditLocation(hashdir, 'sda1', '0')) + self.assertRaises(DiskFileQuarantined, df.open) + def test_close_error(self): def mock_handle_close_quarantine():