Merge "Alternate DiskFile constructor for efficient auditing."
This commit is contained in:
commit
deaddf003b
@ -30,10 +30,6 @@ class SwiftException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AuditException(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class DiskFileError(SwiftException):
|
||||
pass
|
||||
|
||||
|
@ -17,21 +17,19 @@ import os
|
||||
import time
|
||||
from swift import gettext_ as _
|
||||
from contextlib import closing
|
||||
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.obj import diskfile
|
||||
from swift.common.utils import get_logger, audit_location_generator, \
|
||||
ratelimit_sleep, dump_recon_cache, list_from_csv, json
|
||||
from swift.common.exceptions import AuditException, DiskFileQuarantined, \
|
||||
DiskFileNotExist
|
||||
from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \
|
||||
list_from_csv, json
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
|
||||
from swift.common.daemon import Daemon
|
||||
|
||||
SLEEP_BETWEEN_AUDITS = 30
|
||||
|
||||
|
||||
class AuditorWorker(object):
|
||||
"""Walk through file system to audit object"""
|
||||
"""Walk through file system to audit objects"""
|
||||
|
||||
def __init__(self, conf, logger, zero_byte_only_at_fps=0):
|
||||
self.conf = conf
|
||||
@ -72,13 +70,10 @@ class AuditorWorker(object):
|
||||
total_quarantines = 0
|
||||
total_errors = 0
|
||||
time_auditing = 0
|
||||
all_locs = audit_location_generator(
|
||||
self.devices, diskfile.DATADIR, '.data',
|
||||
mount_check=self.diskfile_mgr.mount_check,
|
||||
logger=self.logger)
|
||||
for path, device, partition in all_locs:
|
||||
all_locs = self.diskfile_mgr.object_audit_location_generator()
|
||||
for location in all_locs:
|
||||
loop_time = time.time()
|
||||
self.failsafe_object_audit(path, device, partition)
|
||||
self.failsafe_object_audit(location)
|
||||
self.logger.timing_since('timing', loop_time)
|
||||
self.files_running_time = ratelimit_sleep(
|
||||
self.files_running_time, self.max_files_per_second)
|
||||
@ -151,75 +146,54 @@ class AuditorWorker(object):
|
||||
else:
|
||||
self.stats_buckets["OVER"] += 1
|
||||
|
||||
def failsafe_object_audit(self, path, device, partition):
|
||||
def failsafe_object_audit(self, location):
|
||||
"""
|
||||
Entrypoint to object_audit, with a failsafe generic exception handler.
|
||||
"""
|
||||
try:
|
||||
self.object_audit(path, device, partition)
|
||||
self.object_audit(location)
|
||||
except (Exception, Timeout):
|
||||
self.logger.increment('errors')
|
||||
self.errors += 1
|
||||
self.logger.exception(_('ERROR Trying to audit %s'), path)
|
||||
self.logger.exception(_('ERROR Trying to audit %s'), location)
|
||||
|
||||
def object_audit(self, path, device, partition):
|
||||
def object_audit(self, location):
|
||||
"""
|
||||
Audits the given object path.
|
||||
Audits the given object location.
|
||||
|
||||
:param path: a path to an object
|
||||
:param device: the device the path is on
|
||||
:param partition: the partition the path is on
|
||||
:param location: an audit location
|
||||
(from diskfile.object_audit_location_generator)
|
||||
"""
|
||||
def raise_dfq(msg):
|
||||
raise DiskFileQuarantined(msg)
|
||||
|
||||
try:
|
||||
try:
|
||||
name = diskfile.read_metadata(path)['name']
|
||||
except (Exception, Timeout) as exc:
|
||||
raise AuditException('Error when reading metadata: %s' % exc)
|
||||
_junk, account, container, obj = name.split('/', 3)
|
||||
df = self.diskfile_mgr.get_diskfile(
|
||||
device, partition, account, container, obj)
|
||||
try:
|
||||
with df.open():
|
||||
metadata = df.get_metadata()
|
||||
obj_size = int(metadata['Content-Length'])
|
||||
if self.stats_sizes:
|
||||
self.record_stats(obj_size)
|
||||
if self.zero_byte_only_at_fps and obj_size:
|
||||
self.passes += 1
|
||||
return
|
||||
reader = df.reader()
|
||||
with closing(reader):
|
||||
for chunk in reader:
|
||||
chunk_len = len(chunk)
|
||||
self.bytes_running_time = ratelimit_sleep(
|
||||
self.bytes_running_time,
|
||||
self.max_bytes_per_second,
|
||||
incr_by=chunk_len)
|
||||
self.bytes_processed += chunk_len
|
||||
self.total_bytes_processed += chunk_len
|
||||
if reader.was_quarantined:
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and'
|
||||
' was quarantined: %(err)s'),
|
||||
{'obj': path,
|
||||
'err': reader.was_quarantined})
|
||||
df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
|
||||
with df.open():
|
||||
metadata = df.get_metadata()
|
||||
obj_size = int(metadata['Content-Length'])
|
||||
if self.stats_sizes:
|
||||
self.record_stats(obj_size)
|
||||
if self.zero_byte_only_at_fps and obj_size:
|
||||
self.passes += 1
|
||||
return
|
||||
except DiskFileNotExist:
|
||||
return
|
||||
reader = df.reader(_quarantine_hook=raise_dfq)
|
||||
with closing(reader):
|
||||
for chunk in reader:
|
||||
chunk_len = len(chunk)
|
||||
self.bytes_running_time = ratelimit_sleep(
|
||||
self.bytes_running_time,
|
||||
self.max_bytes_per_second,
|
||||
incr_by=chunk_len)
|
||||
self.bytes_processed += chunk_len
|
||||
self.total_bytes_processed += chunk_len
|
||||
except DiskFileNotExist:
|
||||
return
|
||||
except DiskFileQuarantined as err:
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
|
||||
' quarantined: %(err)s'),
|
||||
{'obj': path, 'err': err})
|
||||
except AuditException as err:
|
||||
self.logger.increment('quarantines')
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and will'
|
||||
' be quarantined: %(err)s'),
|
||||
{'obj': path, 'err': err})
|
||||
diskfile.quarantine_renamer(
|
||||
os.path.join(self.devices, device), path)
|
||||
return
|
||||
{'obj': location, 'err': err})
|
||||
self.passes += 1
|
||||
|
||||
|
||||
|
@ -40,6 +40,7 @@ import hashlib
|
||||
import logging
|
||||
import traceback
|
||||
from os.path import basename, dirname, exists, getmtime, join
|
||||
from random import shuffle
|
||||
from tempfile import mkstemp
|
||||
from contextlib import contextmanager
|
||||
from collections import defaultdict
|
||||
@ -52,7 +53,7 @@ from swift.common.constraints import check_mount
|
||||
from swift.common.utils import mkdirs, normalize_timestamp, \
|
||||
storage_directory, hash_path, renamer, fallocate, fsync, \
|
||||
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
|
||||
config_true_value
|
||||
config_true_value, listdir
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
|
||||
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
|
||||
DiskFileDeleted, DiskFileError, DiskFileNotOpen, PathNotDir
|
||||
@ -290,6 +291,67 @@ def get_hashes(partition_dir, recalculate=None, do_listdir=False,
|
||||
return hashed, hashes
|
||||
|
||||
|
||||
class AuditLocation(object):
|
||||
"""
|
||||
Represents an object location to be audited.
|
||||
|
||||
Other than being a bucket of data, the only useful thing this does is
|
||||
stringify to a filesystem path so the auditor's logs look okay.
|
||||
"""
|
||||
|
||||
def __init__(self, path, device, partition):
|
||||
self.path, self.device, self.partition = path, device, partition
|
||||
|
||||
def __str__(self):
|
||||
return str(self.path)
|
||||
|
||||
|
||||
def object_audit_location_generator(devices, mount_check=True, logger=None):
|
||||
"""
|
||||
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
|
||||
objects stored under that directory. The AuditLocation only knows the path
|
||||
to the hash directory, not to the .data file therein (if any). This is to
|
||||
avoid a double listdir(hash_dir); the DiskFile object will always do one,
|
||||
so we don't.
|
||||
|
||||
:param devices: parent directory of the devices to be audited
|
||||
:param mount_check: flag to check if a mount check should be performed
|
||||
on devices
|
||||
:param logger: a logger object
|
||||
"""
|
||||
device_dirs = listdir(devices)
|
||||
# randomize devices in case of process restart before sweep completed
|
||||
shuffle(device_dirs)
|
||||
for device in device_dirs:
|
||||
if mount_check and not \
|
||||
os.path.ismount(os.path.join(devices, device)):
|
||||
if logger:
|
||||
logger.debug(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, DATADIR)
|
||||
partitions = listdir(datadir_path)
|
||||
for partition in partitions:
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
try:
|
||||
suffixes = listdir(part_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
try:
|
||||
hashes = listdir(suff_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
for hsh in hashes:
|
||||
hsh_path = os.path.join(suff_path, hsh)
|
||||
yield AuditLocation(hsh_path, device, partition)
|
||||
|
||||
|
||||
class DiskFileManager(object):
|
||||
"""
|
||||
Management class for devices, providing common place for shared parameters
|
||||
@ -332,16 +394,19 @@ class DiskFileManager(object):
|
||||
"""
|
||||
return os.path.join(self.devices, device)
|
||||
|
||||
def get_dev_path(self, device):
|
||||
def get_dev_path(self, device, mount_check=None):
|
||||
"""
|
||||
Return the path to a device, checking to see that it is a proper mount
|
||||
point based on a configuration parameter.
|
||||
|
||||
:param device: name of target device
|
||||
:param mount_check: whether or not to check mountedness of device.
|
||||
Defaults to bool(self.mount_check).
|
||||
:returns: full path to the device, None if the path to the device is
|
||||
not a proper mount point.
|
||||
"""
|
||||
if self.mount_check and not check_mount(self.devices, device):
|
||||
should_check = self.mount_check if mount_check is None else mount_check
|
||||
if should_check and not check_mount(self.devices, device):
|
||||
dev_path = None
|
||||
else:
|
||||
dev_path = os.path.join(self.devices, device)
|
||||
@ -368,6 +433,16 @@ class DiskFileManager(object):
|
||||
return DiskFile(self, dev_path, self.threadpools[device],
|
||||
partition, account, container, obj, **kwargs)
|
||||
|
||||
def object_audit_location_generator(self):
|
||||
return object_audit_location_generator(self.devices, self.mount_check,
|
||||
self.logger)
|
||||
|
||||
def get_diskfile_from_audit_location(self, audit_location):
|
||||
dev_path = self.get_dev_path(audit_location.device, mount_check=False)
|
||||
return DiskFile.from_hash_dir(
|
||||
self, audit_location.path, dev_path,
|
||||
audit_location.partition)
|
||||
|
||||
def get_hashes(self, device, partition, suffix):
|
||||
dev_path = self.get_dev_path(device)
|
||||
if not dev_path:
|
||||
@ -516,12 +591,13 @@ class DiskFileReader(object):
|
||||
:param keep_cache_size: maximum object size that will be kept in cache
|
||||
:param device_path: on-disk device path, used when quarantining an obj
|
||||
:param logger: logger caller wants this object to use
|
||||
:param quarantine_hook: 1-arg callable called w/reason when quarantined
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param keep_cache: should resulting reads be kept in the buffer cache
|
||||
"""
|
||||
def __init__(self, fp, data_file, obj_size, etag, threadpool,
|
||||
disk_chunk_size, keep_cache_size, device_path, logger,
|
||||
iter_hook=None, keep_cache=False):
|
||||
quarantine_hook, iter_hook=None, keep_cache=False):
|
||||
# Parameter tracking
|
||||
self._fp = fp
|
||||
self._data_file = data_file
|
||||
@ -531,6 +607,7 @@ class DiskFileReader(object):
|
||||
self._disk_chunk_size = disk_chunk_size
|
||||
self._device_path = device_path
|
||||
self._logger = logger
|
||||
self._quarantine_hook = quarantine_hook
|
||||
self._iter_hook = iter_hook
|
||||
if keep_cache:
|
||||
# Caller suggests we keep this in cache, only do it if the
|
||||
@ -546,8 +623,6 @@ class DiskFileReader(object):
|
||||
self._read_to_eof = False
|
||||
self._suppress_file_closing = False
|
||||
self._quarantined_dir = None
|
||||
# Currently referenced by the object Auditor only
|
||||
self.was_quarantined = ''
|
||||
|
||||
def __iter__(self):
|
||||
"""Returns an iterator over the data file."""
|
||||
@ -630,7 +705,7 @@ class DiskFileReader(object):
|
||||
self._quarantined_dir = self._threadpool.run_in_thread(
|
||||
quarantine_renamer, self._device_path, self._data_file)
|
||||
self._logger.increment('quarantines')
|
||||
self.was_quarantined = msg
|
||||
self._quarantine_hook(msg)
|
||||
|
||||
def _handle_close_quarantine(self):
|
||||
"""Check if file needs to be quarantined"""
|
||||
@ -655,6 +730,8 @@ class DiskFileReader(object):
|
||||
try:
|
||||
if self._started_at_0 and self._read_to_eof:
|
||||
self._handle_close_quarantine()
|
||||
except DiskFileQuarantined:
|
||||
raise
|
||||
except (Exception, Timeout) as e:
|
||||
self._logger.error(_(
|
||||
'ERROR DiskFile %(data_file)s'
|
||||
@ -689,23 +766,35 @@ class DiskFile(object):
|
||||
"""
|
||||
|
||||
def __init__(self, mgr, device_path, threadpool, partition,
|
||||
account, container, obj):
|
||||
account=None, container=None, obj=None, _datadir=None):
|
||||
self._mgr = mgr
|
||||
self._device_path = device_path
|
||||
self._threadpool = threadpool or ThreadPool(nthreads=0)
|
||||
self._logger = mgr.logger
|
||||
self._disk_chunk_size = mgr.disk_chunk_size
|
||||
self._bytes_per_sync = mgr.bytes_per_sync
|
||||
self._name = '/' + '/'.join((account, container, obj))
|
||||
name_hash = hash_path(account, container, obj)
|
||||
self._datadir = join(
|
||||
device_path, storage_directory(DATADIR, partition, name_hash))
|
||||
if account and container and obj:
|
||||
self._name = '/' + '/'.join((account, container, obj))
|
||||
else:
|
||||
# gets populated when we read the metadata
|
||||
self._name = None
|
||||
self._tmpdir = join(device_path, 'tmp')
|
||||
self._metadata = None
|
||||
self._data_file = None
|
||||
self._fp = None
|
||||
self._quarantined_dir = None
|
||||
|
||||
if _datadir:
|
||||
self._datadir = _datadir
|
||||
else:
|
||||
name_hash = hash_path(account, container, obj)
|
||||
self._datadir = join(
|
||||
device_path, storage_directory(DATADIR, partition, name_hash))
|
||||
|
||||
@classmethod
|
||||
def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition):
|
||||
return cls(mgr, device_path, None, partition, _datadir=hash_dir_path)
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
Open the object.
|
||||
@ -767,7 +856,7 @@ class DiskFile(object):
|
||||
|
||||
def _quarantine(self, data_file, msg):
|
||||
"""
|
||||
Quarantine a file; responsible for incrementing the associated loggers
|
||||
Quarantine a file; responsible for incrementing the associated logger's
|
||||
count of quarantines.
|
||||
|
||||
:param data_file: full path of data file to quarantine
|
||||
@ -804,8 +893,18 @@ class DiskFile(object):
|
||||
try:
|
||||
files = sorted(os.listdir(self._datadir), reverse=True)
|
||||
except OSError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise DiskFileError()
|
||||
if err.errno == errno.ENOTDIR:
|
||||
# If there's a file here instead of a directory, quarantine
|
||||
# it; something's gone wrong somewhere.
|
||||
self._quarantine(
|
||||
# hack: quarantine_renamer actually renames the directory
|
||||
# enclosing the filename you give it, but here we just
|
||||
# want this one file and not its parent.
|
||||
os.path.join(self._datadir, "made-up-filename"),
|
||||
"Expected directory, found file at %s" % self._datadir)
|
||||
elif err.errno != errno.ENOENT:
|
||||
raise DiskFileError(
|
||||
"Error listing directory %s: %s" % (self._datadir, err))
|
||||
# The data directory does not exist, so the object cannot exist.
|
||||
else:
|
||||
for afile in files:
|
||||
@ -864,6 +963,14 @@ class DiskFile(object):
|
||||
exc.timestamp = metadata['X-Timestamp']
|
||||
return exc
|
||||
|
||||
def _verify_name_matches_hash(self, data_file):
|
||||
hash_from_fs = os.path.basename(self._datadir)
|
||||
hash_from_name = hash_path(self._name.lstrip('/'))
|
||||
if hash_from_fs != hash_from_name:
|
||||
self._quarantine(
|
||||
data_file,
|
||||
"Hash of name in metadata does not match directory name")
|
||||
|
||||
def _verify_data_file(self, data_file, fp):
|
||||
"""
|
||||
Verify the metadata's name value matches what we think the object is
|
||||
@ -962,6 +1069,12 @@ class DiskFile(object):
|
||||
self._metadata.update(sys_metadata)
|
||||
else:
|
||||
self._metadata = datafile_metadata
|
||||
if self._name is None:
|
||||
# If we don't know our name, we were just given a hash dir at
|
||||
# instantiation, so we'd better validate that the name hashes back
|
||||
# to us
|
||||
self._name = self._metadata['name']
|
||||
self._verify_name_matches_hash(data_file)
|
||||
self._verify_data_file(data_file, fp)
|
||||
return fp
|
||||
|
||||
@ -990,7 +1103,8 @@ class DiskFile(object):
|
||||
with self.open():
|
||||
return self.get_metadata()
|
||||
|
||||
def reader(self, iter_hook=None, keep_cache=False):
|
||||
def reader(self, iter_hook=None, keep_cache=False,
|
||||
_quarantine_hook=lambda m: None):
|
||||
"""
|
||||
Return a :class:`swift.common.swob.Response` class compatible
|
||||
"`app_iter`" object as defined by
|
||||
@ -1002,12 +1116,17 @@ class DiskFile(object):
|
||||
:param iter_hook: called when __iter__ returns a chunk
|
||||
:param keep_cache: caller's preference for keeping data read in the
|
||||
OS buffer cache
|
||||
:param _quarantine_hook: 1-arg callable called when obj quarantined;
|
||||
the arg is the reason for quarantine.
|
||||
Default is to ignore it.
|
||||
Not needed by the REST layer.
|
||||
:returns: a :class:`swift.obj.diskfile.DiskFileReader` object
|
||||
"""
|
||||
dr = DiskFileReader(
|
||||
self._fp, self._data_file, int(self._metadata['Content-Length']),
|
||||
self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
|
||||
self._mgr.keep_cache_size, self._device_path, self._logger,
|
||||
quarantine_hook=_quarantine_hook,
|
||||
iter_hook=iter_hook, keep_cache=keep_cache)
|
||||
# At this point the reader object is now responsible for closing
|
||||
# the file pointer.
|
||||
|
@ -489,8 +489,8 @@ class ObjectController(object):
|
||||
('X-Auth-Token' not in request.headers and
|
||||
'X-Storage-Token' not in request.headers))
|
||||
response = Response(
|
||||
app_iter=disk_file.reader(iter_hook=sleep,
|
||||
keep_cache=keep_cache),
|
||||
app_iter=disk_file.reader(
|
||||
iter_hook=sleep, keep_cache=keep_cache),
|
||||
request=request, conditional_response=True)
|
||||
response.headers['Content-Type'] = metadata.get(
|
||||
'Content-Type', 'application/octet-stream')
|
||||
|
@ -327,7 +327,7 @@ def _set_info_cache(app, env, account, container, resp):
|
||||
|
||||
:param app: the application object
|
||||
:param account: the unquoted account name
|
||||
:param container: the unquoted containr name or None
|
||||
:param container: the unquoted container name or None
|
||||
:param resp: the response received or None if info cache should be cleared
|
||||
"""
|
||||
|
||||
|
@ -24,7 +24,7 @@ from tempfile import mkdtemp
|
||||
from test.unit import FakeLogger
|
||||
from swift.obj import auditor
|
||||
from swift.obj.diskfile import DiskFile, write_metadata, invalidate_hash, \
|
||||
DATADIR, DiskFileManager
|
||||
DATADIR, DiskFileManager, AuditLocation
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
storage_directory
|
||||
|
||||
@ -77,14 +77,12 @@ class TestAuditor(unittest.TestCase):
|
||||
pre_quarantines = auditor_worker.quarantines
|
||||
|
||||
auditor_worker.object_audit(
|
||||
os.path.join(self.disk_file._datadir, timestamp + '.data'),
|
||||
'sda', '0')
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
|
||||
os.write(writer._fd, 'extra_data')
|
||||
auditor_worker.object_audit(
|
||||
os.path.join(self.disk_file._datadir, timestamp + '.data'),
|
||||
'sda', '0')
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_audit_diff_data(self):
|
||||
@ -108,8 +106,7 @@ class TestAuditor(unittest.TestCase):
|
||||
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
|
||||
auditor_worker.object_audit(
|
||||
os.path.join(self.disk_file._datadir, timestamp + '.data'),
|
||||
'sda', '0')
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines)
|
||||
etag = md5()
|
||||
etag.update('1' + '0' * 1023)
|
||||
@ -121,8 +118,7 @@ class TestAuditor(unittest.TestCase):
|
||||
writer.put(metadata)
|
||||
|
||||
auditor_worker.object_audit(
|
||||
os.path.join(self.disk_file._datadir, timestamp + '.data'),
|
||||
'sda', '0')
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_audit_no_meta(self):
|
||||
@ -136,8 +132,7 @@ class TestAuditor(unittest.TestCase):
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
|
||||
pre_quarantines = auditor_worker.quarantines
|
||||
auditor_worker.object_audit(
|
||||
os.path.join(self.disk_file._datadir, timestamp + '.data'),
|
||||
'sda', '0')
|
||||
AuditLocation(self.disk_file._datadir, 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
|
||||
|
||||
def test_object_audit_will_not_swallow_errors_in_tests(self):
|
||||
@ -150,10 +145,10 @@ class TestAuditor(unittest.TestCase):
|
||||
|
||||
def blowup(*args):
|
||||
raise NameError('tpyo')
|
||||
with mock.patch('swift.obj.diskfile.DiskFile',
|
||||
blowup):
|
||||
with mock.patch.object(DiskFileManager,
|
||||
'get_diskfile_from_audit_location', blowup):
|
||||
self.assertRaises(NameError, auditor_worker.object_audit,
|
||||
path, 'sda', '0')
|
||||
AuditLocation(os.path.dirname(path), 'sda', '0'))
|
||||
|
||||
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
|
||||
timestamp = str(normalize_timestamp(time.time()))
|
||||
@ -167,7 +162,8 @@ class TestAuditor(unittest.TestCase):
|
||||
raise NameError('tpyo')
|
||||
with mock.patch('swift.obj.diskfile.DiskFile',
|
||||
blowup):
|
||||
auditor_worker.failsafe_object_audit(path, 'sda', '0')
|
||||
auditor_worker.failsafe_object_audit(
|
||||
AuditLocation(os.path.dirname(path), 'sda', '0'))
|
||||
self.assertEquals(auditor_worker.errors, 1)
|
||||
|
||||
def test_generic_exception_handling(self):
|
||||
@ -308,6 +304,7 @@ class TestAuditor(unittest.TestCase):
|
||||
if not os.path.exists(dir_path):
|
||||
mkdirs(dir_path)
|
||||
fp = open(ts_file_path, 'w')
|
||||
write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
|
||||
fp.close()
|
||||
|
||||
etag = md5()
|
||||
@ -362,8 +359,8 @@ class TestAuditor(unittest.TestCase):
|
||||
|
||||
def test_with_tombstone(self):
|
||||
ts_file_path = self.setup_bad_zero_byte(with_ts=True)
|
||||
self.auditor.run_once()
|
||||
self.assertTrue(ts_file_path.endswith('ts'))
|
||||
self.auditor.run_once()
|
||||
self.assertTrue(os.path.exists(ts_file_path))
|
||||
|
||||
def test_sleeper(self):
|
||||
|
@ -34,7 +34,7 @@ from contextlib import closing
|
||||
from gzip import GzipFile
|
||||
|
||||
from eventlet import tpool
|
||||
from test.unit import FakeLogger, mock as unit_mock
|
||||
from test.unit import FakeLogger, mock as unit_mock, temptree
|
||||
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils
|
||||
@ -345,6 +345,129 @@ class TestDiskFileModuleMethods(unittest.TestCase):
|
||||
[file3])
|
||||
|
||||
|
||||
class TestObjectAuditLocationGenerator(unittest.TestCase):
|
||||
def _make_file(self, path):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(path))
|
||||
except OSError as err:
|
||||
if err.errno != errno.EEXIST:
|
||||
raise
|
||||
|
||||
with open(path, 'w'):
|
||||
pass
|
||||
|
||||
def test_finding_of_hashdirs(self):
|
||||
with temptree([]) as tmpdir:
|
||||
# the good
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
"5c1fdc1ffb12e5eaf84edc30d8b67aca"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
"fdfd184d39080020bc8b487f8a7beaca"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects", "1519", "df2",
|
||||
"b0fe7af831cc7b1af5bf486b1c841df2"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects", "9720", "ca5",
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects", "3071", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"))
|
||||
|
||||
# the bad
|
||||
self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519",
|
||||
"fed"))
|
||||
self._make_file(os.path.join(tmpdir, "sdq", "objects", "9876"))
|
||||
|
||||
# the empty
|
||||
os.makedirs(os.path.join(tmpdir, "sdr"))
|
||||
os.makedirs(os.path.join(tmpdir, "sds", "objects"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdt", "objects", "9601"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdu", "objects", "6499", "f80"))
|
||||
|
||||
# the irrelevant
|
||||
os.makedirs(os.path.join(tmpdir, "sdv", "accounts", "77", "421",
|
||||
"4b8c86149a6d532f4af018578fd9f421"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdw", "containers", "28", "51e",
|
||||
"4f9eee668b66c6f0250bfa3c7ab9e51e"))
|
||||
|
||||
locations = [(loc.path, loc.device, loc.partition)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=False)]
|
||||
locations.sort()
|
||||
|
||||
self.assertEqual(
|
||||
locations,
|
||||
[(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
"5c1fdc1ffb12e5eaf84edc30d8b67aca"),
|
||||
"sdp", "1519"),
|
||||
(os.path.join(tmpdir, "sdp", "objects", "1519", "aca",
|
||||
"fdfd184d39080020bc8b487f8a7beaca"),
|
||||
"sdp", "1519"),
|
||||
(os.path.join(tmpdir, "sdp", "objects", "1519", "df2",
|
||||
"b0fe7af831cc7b1af5bf486b1c841df2"),
|
||||
"sdp", "1519"),
|
||||
(os.path.join(tmpdir, "sdp", "objects", "9720", "ca5",
|
||||
"4a943bc72c2e647c4675923d58cf4ca5"),
|
||||
"sdp", "9720"),
|
||||
(os.path.join(tmpdir, "sdq", "objects", "3071", "8eb",
|
||||
"fcd938702024c25fef6c32fef05298eb"),
|
||||
"sdq", "3071")])
|
||||
|
||||
def test_skipping_unmounted_devices(self):
|
||||
def mock_ismount(path):
|
||||
return path.endswith('sdp')
|
||||
|
||||
with mock.patch('os.path.ismount', mock_ismount):
|
||||
with temptree([]) as tmpdir:
|
||||
os.makedirs(os.path.join(tmpdir, "sdp", "objects",
|
||||
"2607", "df3",
|
||||
"ec2871fe724411f91787462f97d30df3"))
|
||||
os.makedirs(os.path.join(tmpdir, "sdq", "objects",
|
||||
"9785", "a10",
|
||||
"4993d582f41be9771505a8d4cb237a10"))
|
||||
|
||||
locations = [
|
||||
(loc.path, loc.device, loc.partition)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=tmpdir, mount_check=True)]
|
||||
locations.sort()
|
||||
|
||||
self.assertEqual(
|
||||
locations,
|
||||
[(os.path.join(tmpdir, "sdp", "objects",
|
||||
"2607", "df3",
|
||||
"ec2871fe724411f91787462f97d30df3"),
|
||||
"sdp", "2607")])
|
||||
|
||||
def test_only_catch_expected_errors(self):
|
||||
# Crazy exceptions should still escape object_audit_location_generator
|
||||
# so that errors get logged and a human can see what's going wrong;
|
||||
# only normal FS corruption should be skipped over silently.
|
||||
|
||||
def list_locations(dirname):
|
||||
return [(loc.path, loc.device, loc.partition)
|
||||
for loc in diskfile.object_audit_location_generator(
|
||||
devices=dirname, mount_check=False)]
|
||||
|
||||
real_listdir = os.listdir
|
||||
|
||||
def splode_if_endswith(suffix):
|
||||
def sploder(path):
|
||||
if path.endswith(suffix):
|
||||
raise OSError(errno.ELIBBAD, "don't try to ad-lib")
|
||||
else:
|
||||
return real_listdir(path)
|
||||
return sploder
|
||||
|
||||
with temptree([]) as tmpdir:
|
||||
os.makedirs(os.path.join(tmpdir, "sdf", "objects",
|
||||
"2607", "b54",
|
||||
"fe450ec990a88cc4b252b181bab04b54"))
|
||||
with mock.patch('os.listdir', splode_if_endswith("sdf/objects")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
with mock.patch('os.listdir', splode_if_endswith("2607")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
with mock.patch('os.listdir', splode_if_endswith("b54")):
|
||||
self.assertRaises(OSError, list_locations, tmpdir)
|
||||
|
||||
|
||||
class TestDiskFile(unittest.TestCase):
|
||||
"""Test swift.obj.diskfile.DiskFile"""
|
||||
|
||||
@ -387,10 +510,14 @@ class TestDiskFile(unittest.TestCase):
|
||||
xattr.setxattr(f.fileno(), diskfile.METADATA_KEY,
|
||||
pickle.dumps(metadata, diskfile.PICKLE_PROTOCOL))
|
||||
|
||||
def _create_test_file(self, data, timestamp=None, metadata=None):
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
def _create_test_file(self, data, timestamp=None, metadata=None,
|
||||
account='a', container='c', object='o'):
|
||||
if metadata is None:
|
||||
metadata = {}
|
||||
metadata.setdefault('name', '/%s/%s/%s' % (account, container, object))
|
||||
df = self.df_mgr.get_diskfile('sda', '0', account, container, object)
|
||||
self._create_ondisk_file(df, data, timestamp, metadata)
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
df = self.df_mgr.get_diskfile('sda', '0', account, container, object)
|
||||
df.open()
|
||||
return df
|
||||
|
||||
@ -435,10 +562,11 @@ class TestDiskFile(unittest.TestCase):
|
||||
|
||||
def test_disk_file_app_iter_corners(self):
|
||||
df = self._create_test_file('1234567890')
|
||||
reader = df.reader()
|
||||
quarantine_msgs = []
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
self.assertEquals(''.join(reader.app_iter_range(0, None)),
|
||||
'1234567890')
|
||||
self.assertFalse(reader.was_quarantined)
|
||||
self.assertEquals(quarantine_msgs, [])
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o')
|
||||
with df.open():
|
||||
reader = df.reader()
|
||||
@ -446,19 +574,21 @@ class TestDiskFile(unittest.TestCase):
|
||||
|
||||
def test_disk_file_app_iter_partial_closes(self):
|
||||
df = self._create_test_file('1234567890')
|
||||
reader = df.reader()
|
||||
quarantine_msgs = []
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
it = reader.app_iter_range(0, 5)
|
||||
self.assertFalse(reader.was_quarantined)
|
||||
self.assertEquals(quarantine_msgs, [])
|
||||
self.assertEqual(''.join(it), '12345')
|
||||
self.assertTrue(reader._fp is None)
|
||||
|
||||
def test_disk_file_app_iter_ranges(self):
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
reader = df.reader()
|
||||
quarantine_msgs = []
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
it = reader.app_iter_ranges([(0, 10), (10, 20), (20, 30)],
|
||||
'plain/text',
|
||||
'\r\n--someheader\r\n', 30)
|
||||
self.assertFalse(reader.was_quarantined)
|
||||
self.assertEquals(quarantine_msgs, [])
|
||||
value = ''.join(it)
|
||||
self.assert_('0123456789' in value)
|
||||
self.assert_('1123456789' in value)
|
||||
@ -466,11 +596,12 @@ class TestDiskFile(unittest.TestCase):
|
||||
|
||||
def test_disk_file_app_iter_ranges_edges(self):
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
reader = df.reader()
|
||||
quarantine_msgs = []
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
it = reader.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever',
|
||||
'\r\n--someheader\r\n', 30)
|
||||
value = ''.join(it)
|
||||
self.assertFalse(reader.was_quarantined)
|
||||
self.assertEquals(quarantine_msgs, [])
|
||||
self.assert_('3456789' in value)
|
||||
self.assert_('01' in value)
|
||||
|
||||
@ -480,7 +611,8 @@ class TestDiskFile(unittest.TestCase):
|
||||
long_str = '01234567890' * 65536
|
||||
target_strs = ['3456789', long_str[0:65590]]
|
||||
df = self._create_test_file(long_str)
|
||||
reader = df.reader()
|
||||
quarantine_msgs = []
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
it = reader.app_iter_ranges([(3, 10), (0, 65590)], 'plain/text',
|
||||
'5e816ff8b8b8e9a5d355497e5d9e0301', 655360)
|
||||
|
||||
@ -493,7 +625,7 @@ class TestDiskFile(unittest.TestCase):
|
||||
'5e816ff8b8b8e9a5d355497e5d9e0301\r\n'])
|
||||
|
||||
value = header + ''.join(it)
|
||||
self.assertFalse(reader.was_quarantined)
|
||||
self.assertEquals(quarantine_msgs, [])
|
||||
|
||||
parts = map(lambda p: p.get_payload(decode=True),
|
||||
email.message_from_string(value).walk())[1:3]
|
||||
@ -504,7 +636,8 @@ class TestDiskFile(unittest.TestCase):
|
||||
# When ranges passed into the method is either empty array or None,
|
||||
# this method will yield empty string
|
||||
df = self._create_test_file('012345678911234567892123456789')
|
||||
reader = df.reader()
|
||||
quarantine_msgs = []
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
it = reader.app_iter_ranges([], 'application/whatever',
|
||||
'\r\n--someheader\r\n', 100)
|
||||
self.assertEqual(''.join(it), '')
|
||||
@ -514,7 +647,7 @@ class TestDiskFile(unittest.TestCase):
|
||||
reader = df.reader()
|
||||
it = reader.app_iter_ranges(None, 'app/something',
|
||||
'\r\n--someheader\r\n', 150)
|
||||
self.assertFalse(reader.was_quarantined)
|
||||
self.assertEquals(quarantine_msgs, [])
|
||||
self.assertEqual(''.join(it), '')
|
||||
|
||||
def test_disk_file_mkstemp_creates_dir(self):
|
||||
@ -657,9 +790,10 @@ class TestDiskFile(unittest.TestCase):
|
||||
open_exc = invalid_type in ('Content-Length', 'Bad-Content-Length',
|
||||
'Corrupt-Xattrs', 'Truncated-Xattrs')
|
||||
reader = None
|
||||
quarantine_msgs = []
|
||||
try:
|
||||
df = self._get_open_disk_file(**kwargs)
|
||||
reader = df.reader()
|
||||
reader = df.reader(_quarantine_hook=quarantine_msgs.append)
|
||||
except DiskFileQuarantined as err:
|
||||
if not open_exc:
|
||||
self.fail(
|
||||
@ -675,7 +809,7 @@ class TestDiskFile(unittest.TestCase):
|
||||
self.fail("Unexpected DiskFileQuarantine raised: :%r" % err)
|
||||
else:
|
||||
if not open_exc:
|
||||
self.assertTrue(reader.was_quarantined)
|
||||
self.assertEqual(1, len(quarantine_msgs))
|
||||
|
||||
verify(invalid_type=invalid_type, obj_name='1')
|
||||
|
||||
@ -798,6 +932,27 @@ class TestDiskFile(unittest.TestCase):
|
||||
else:
|
||||
self.fail("Expected DiskFileQuarantined exception")
|
||||
|
||||
def test_quarantine_hashdir_not_a_directory(self):
|
||||
df = self._create_test_file('1234567890', account="abc",
|
||||
container='123', object='xyz')
|
||||
hashdir = df._datadir
|
||||
rmtree(hashdir)
|
||||
with open(hashdir, 'w'):
|
||||
pass
|
||||
|
||||
df = self.df_mgr.get_diskfile('sda', '0', 'abc', '123', 'xyz')
|
||||
try:
|
||||
df.open()
|
||||
except DiskFileQuarantined:
|
||||
pass
|
||||
else:
|
||||
self.fail("Expected DiskFileQuarantined, didn't get it")
|
||||
|
||||
# make sure the right thing got quarantined; the suffix dir should not
|
||||
# have moved, as that could have many objects in it
|
||||
self.assertFalse(os.path.exists(hashdir))
|
||||
self.assertTrue(os.path.exists(os.path.dirname(hashdir)))
|
||||
|
||||
def test_write_metadata(self):
|
||||
df = self._create_test_file('1234567890')
|
||||
timestamp = normalize_timestamp(time())
|
||||
@ -847,6 +1002,29 @@ class TestDiskFile(unittest.TestCase):
|
||||
self.assertRaises(DiskFileNotExist, df.open)
|
||||
self.assertFalse(os.path.exists(ts_fullpath))
|
||||
|
||||
def test_from_audit_location(self):
|
||||
hashdir = self._create_test_file(
|
||||
'blah blah',
|
||||
account='three', container='blind', object='mice')._datadir
|
||||
df = self.df_mgr.get_diskfile_from_audit_location(
|
||||
diskfile.AuditLocation(hashdir, 'sda1', '0'))
|
||||
df.open()
|
||||
self.assertEqual(df._name, '/three/blind/mice')
|
||||
|
||||
def test_from_audit_location_with_mismatched_hash(self):
|
||||
hashdir = self._create_test_file(
|
||||
'blah blah',
|
||||
account='this', container='is', object='right')._datadir
|
||||
|
||||
datafile = os.path.join(hashdir, os.listdir(hashdir)[0])
|
||||
meta = diskfile.read_metadata(datafile)
|
||||
meta['name'] = '/this/is/wrong'
|
||||
diskfile.write_metadata(datafile, meta)
|
||||
|
||||
df = self.df_mgr.get_diskfile_from_audit_location(
|
||||
diskfile.AuditLocation(hashdir, 'sda1', '0'))
|
||||
self.assertRaises(DiskFileQuarantined, df.open)
|
||||
|
||||
def test_close_error(self):
|
||||
|
||||
def mock_handle_close_quarantine():
|
||||
|
Loading…
x
Reference in New Issue
Block a user