Treat all invalid frag indexes the same
Note that we also want to prevent any IndexErrors that might arise from pickled data already on disk. Change-Id: If56190134edddb4b778e41a2cdd360008fbfb0e4 Closes-Bug: 1772378
This commit is contained in:
parent
6e664de7df
commit
43412c73de
@ -748,11 +748,12 @@ class BaseDiskFileManager(object):
|
|||||||
rv = '%s%s' % (rv, ext)
|
rv = '%s%s' % (rv, ext)
|
||||||
return rv
|
return rv
|
||||||
|
|
||||||
def parse_on_disk_filename(self, filename):
|
def parse_on_disk_filename(self, filename, policy):
|
||||||
"""
|
"""
|
||||||
Parse an on disk file name.
|
Parse an on disk file name.
|
||||||
|
|
||||||
:param filename: the file name including extension
|
:param filename: the file name including extension
|
||||||
|
:param policy: storage policy used to store the file
|
||||||
:returns: a dict, with keys for timestamp, ext and ctype_timestamp:
|
:returns: a dict, with keys for timestamp, ext and ctype_timestamp:
|
||||||
|
|
||||||
* timestamp is a :class:`~swift.common.utils.Timestamp`
|
* timestamp is a :class:`~swift.common.utils.Timestamp`
|
||||||
@ -859,7 +860,8 @@ class BaseDiskFileManager(object):
|
|||||||
return self._split_list(
|
return self._split_list(
|
||||||
file_info_list, lambda x: x['timestamp'] >= timestamp)
|
file_info_list, lambda x: x['timestamp'] >= timestamp)
|
||||||
|
|
||||||
def get_ondisk_files(self, files, datadir, verify=True, **kwargs):
|
def get_ondisk_files(self, files, datadir, verify=True, policy=None,
|
||||||
|
**kwargs):
|
||||||
"""
|
"""
|
||||||
Given a simple list of files names, determine the files that constitute
|
Given a simple list of files names, determine the files that constitute
|
||||||
a valid fileset i.e. a set of files that defines the state of an
|
a valid fileset i.e. a set of files that defines the state of an
|
||||||
@ -880,6 +882,8 @@ class BaseDiskFileManager(object):
|
|||||||
:param datadir: directory name files are from.
|
:param datadir: directory name files are from.
|
||||||
:param verify: if True verify that the ondisk file contract has not
|
:param verify: if True verify that the ondisk file contract has not
|
||||||
been violated, otherwise do not verify.
|
been violated, otherwise do not verify.
|
||||||
|
:param policy: storage policy used to store the files. Used to
|
||||||
|
validate fragment indexes for EC policies.
|
||||||
:returns: a dict that will contain keys:
|
:returns: a dict that will contain keys:
|
||||||
ts_file -> path to a .ts file or None
|
ts_file -> path to a .ts file or None
|
||||||
data_file -> path to a .data file or None
|
data_file -> path to a .data file or None
|
||||||
@ -913,7 +917,7 @@ class BaseDiskFileManager(object):
|
|||||||
for afile in files:
|
for afile in files:
|
||||||
# Categorize files by extension
|
# Categorize files by extension
|
||||||
try:
|
try:
|
||||||
file_info = self.parse_on_disk_filename(afile)
|
file_info = self.parse_on_disk_filename(afile, policy)
|
||||||
file_info['filename'] = afile
|
file_info['filename'] = afile
|
||||||
exts[file_info['ext']].append(file_info)
|
exts[file_info['ext']].append(file_info)
|
||||||
except DiskFileError as e:
|
except DiskFileError as e:
|
||||||
@ -1068,10 +1072,11 @@ class BaseDiskFileManager(object):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def _hash_suffix_dir(self, path):
|
def _hash_suffix_dir(self, path, policy):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
|
:param policy: storage policy used
|
||||||
"""
|
"""
|
||||||
hashes = defaultdict(md5)
|
hashes = defaultdict(md5)
|
||||||
try:
|
try:
|
||||||
@ -1083,7 +1088,8 @@ class BaseDiskFileManager(object):
|
|||||||
for hsh in path_contents:
|
for hsh in path_contents:
|
||||||
hsh_path = join(path, hsh)
|
hsh_path = join(path, hsh)
|
||||||
try:
|
try:
|
||||||
ondisk_info = self.cleanup_ondisk_files(hsh_path)
|
ondisk_info = self.cleanup_ondisk_files(
|
||||||
|
hsh_path, policy=policy)
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
if err.errno == errno.ENOTDIR:
|
if err.errno == errno.ENOTDIR:
|
||||||
partition_path = dirname(path)
|
partition_path = dirname(path)
|
||||||
@ -1137,11 +1143,12 @@ class BaseDiskFileManager(object):
|
|||||||
raise PathNotDir()
|
raise PathNotDir()
|
||||||
return hashes
|
return hashes
|
||||||
|
|
||||||
def _hash_suffix(self, path):
|
def _hash_suffix(self, path, policy=None):
|
||||||
"""
|
"""
|
||||||
Performs reclamation and returns an md5 of all (remaining) files.
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
|
:param policy: storage policy used to store the files
|
||||||
:raises PathNotDir: if given path is not a valid directory
|
:raises PathNotDir: if given path is not a valid directory
|
||||||
:raises OSError: for non-ENOTDIR errors
|
:raises OSError: for non-ENOTDIR errors
|
||||||
"""
|
"""
|
||||||
@ -1217,7 +1224,8 @@ class BaseDiskFileManager(object):
|
|||||||
if not hash_:
|
if not hash_:
|
||||||
suffix_dir = join(partition_path, suffix)
|
suffix_dir = join(partition_path, suffix)
|
||||||
try:
|
try:
|
||||||
hashes[suffix] = self._hash_suffix(suffix_dir)
|
hashes[suffix] = self._hash_suffix(
|
||||||
|
suffix_dir, policy=policy)
|
||||||
hashed += 1
|
hashed += 1
|
||||||
except PathNotDir:
|
except PathNotDir:
|
||||||
del hashes[suffix]
|
del hashes[suffix]
|
||||||
@ -2375,7 +2383,7 @@ class BaseDiskFile(object):
|
|||||||
files = []
|
files = []
|
||||||
|
|
||||||
# gather info about the valid files to use to open the DiskFile
|
# gather info about the valid files to use to open the DiskFile
|
||||||
file_info = self._get_ondisk_files(files)
|
file_info = self._get_ondisk_files(files, self.policy)
|
||||||
|
|
||||||
self._data_file = file_info.get('data_file')
|
self._data_file = file_info.get('data_file')
|
||||||
if not self._data_file:
|
if not self._data_file:
|
||||||
@ -2430,11 +2438,12 @@ class BaseDiskFile(object):
|
|||||||
self._logger.increment('quarantines')
|
self._logger.increment('quarantines')
|
||||||
return DiskFileQuarantined(msg)
|
return DiskFileQuarantined(msg)
|
||||||
|
|
||||||
def _get_ondisk_files(self, files):
|
def _get_ondisk_files(self, files, policy=None):
|
||||||
"""
|
"""
|
||||||
Determine the on-disk files to use.
|
Determine the on-disk files to use.
|
||||||
|
|
||||||
:param files: a list of files in the object's dir
|
:param files: a list of files in the object's dir
|
||||||
|
:param policy: storage policy used to store the files
|
||||||
:returns: dict of files to use having keys 'data_file', 'ts_file',
|
:returns: dict of files to use having keys 'data_file', 'ts_file',
|
||||||
'meta_file'
|
'meta_file'
|
||||||
"""
|
"""
|
||||||
@ -2829,8 +2838,9 @@ class DiskFile(BaseDiskFile):
|
|||||||
reader_cls = DiskFileReader
|
reader_cls = DiskFileReader
|
||||||
writer_cls = DiskFileWriter
|
writer_cls = DiskFileWriter
|
||||||
|
|
||||||
def _get_ondisk_files(self, files):
|
def _get_ondisk_files(self, files, policy=None):
|
||||||
self._ondisk_info = self.manager.get_ondisk_files(files, self._datadir)
|
self._ondisk_info = self.manager.get_ondisk_files(
|
||||||
|
files, self._datadir, policy=policy)
|
||||||
return self._ondisk_info
|
return self._ondisk_info
|
||||||
|
|
||||||
|
|
||||||
@ -2880,16 +2890,17 @@ class DiskFileManager(BaseDiskFileManager):
|
|||||||
hashes[None].update(
|
hashes[None].update(
|
||||||
file_info['timestamp'].internal + file_info['ext'])
|
file_info['timestamp'].internal + file_info['ext'])
|
||||||
|
|
||||||
def _hash_suffix(self, path):
|
def _hash_suffix(self, path, policy=None):
|
||||||
"""
|
"""
|
||||||
Performs reclamation and returns an md5 of all (remaining) files.
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
|
:param policy: storage policy used to store the files
|
||||||
:raises PathNotDir: if given path is not a valid directory
|
:raises PathNotDir: if given path is not a valid directory
|
||||||
:raises OSError: for non-ENOTDIR errors
|
:raises OSError: for non-ENOTDIR errors
|
||||||
:returns: md5 of files in suffix
|
:returns: md5 of files in suffix
|
||||||
"""
|
"""
|
||||||
hashes = self._hash_suffix_dir(path)
|
hashes = self._hash_suffix_dir(path, policy)
|
||||||
return hashes[None].hexdigest()
|
return hashes[None].hexdigest()
|
||||||
|
|
||||||
|
|
||||||
@ -3060,7 +3071,8 @@ class ECDiskFileWriter(BaseDiskFileWriter):
|
|||||||
# sure that the fragment index is included in object sysmeta.
|
# sure that the fragment index is included in object sysmeta.
|
||||||
fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
|
fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
|
||||||
self._diskfile._frag_index)
|
self._diskfile._frag_index)
|
||||||
fi = self.manager.validate_fragment_index(fi)
|
fi = self.manager.validate_fragment_index(
|
||||||
|
fi, self._diskfile.policy)
|
||||||
self._diskfile._frag_index = fi
|
self._diskfile._frag_index = fi
|
||||||
# defer cleanup until commit() writes makes diskfile durable
|
# defer cleanup until commit() writes makes diskfile durable
|
||||||
cleanup = False
|
cleanup = False
|
||||||
@ -3077,7 +3089,8 @@ class ECDiskFile(BaseDiskFile):
|
|||||||
frag_index = kwargs.get('frag_index')
|
frag_index = kwargs.get('frag_index')
|
||||||
self._frag_index = None
|
self._frag_index = None
|
||||||
if frag_index is not None:
|
if frag_index is not None:
|
||||||
self._frag_index = self.manager.validate_fragment_index(frag_index)
|
self._frag_index = self.manager.validate_fragment_index(
|
||||||
|
frag_index, self.policy)
|
||||||
self._frag_prefs = self._validate_frag_prefs(kwargs.get('frag_prefs'))
|
self._frag_prefs = self._validate_frag_prefs(kwargs.get('frag_prefs'))
|
||||||
self._durable_frag_set = None
|
self._durable_frag_set = None
|
||||||
|
|
||||||
@ -3146,17 +3159,18 @@ class ECDiskFile(BaseDiskFile):
|
|||||||
return dict([(ts, [info['frag_index'] for info in frag_set])
|
return dict([(ts, [info['frag_index'] for info in frag_set])
|
||||||
for ts, frag_set in frag_sets.items()])
|
for ts, frag_set in frag_sets.items()])
|
||||||
|
|
||||||
def _get_ondisk_files(self, files):
|
def _get_ondisk_files(self, files, policy=None):
|
||||||
"""
|
"""
|
||||||
The only difference between this method and the replication policy
|
The only difference between this method and the replication policy
|
||||||
DiskFile method is passing in the frag_index and frag_prefs kwargs to
|
DiskFile method is passing in the frag_index and frag_prefs kwargs to
|
||||||
our manager's get_ondisk_files method.
|
our manager's get_ondisk_files method.
|
||||||
|
|
||||||
:param files: list of file names
|
:param files: list of file names
|
||||||
|
:param policy: storage policy used to store the files
|
||||||
"""
|
"""
|
||||||
self._ondisk_info = self.manager.get_ondisk_files(
|
self._ondisk_info = self.manager.get_ondisk_files(
|
||||||
files, self._datadir, frag_index=self._frag_index,
|
files, self._datadir, frag_index=self._frag_index,
|
||||||
frag_prefs=self._frag_prefs)
|
frag_prefs=self._frag_prefs, policy=policy)
|
||||||
return self._ondisk_info
|
return self._ondisk_info
|
||||||
|
|
||||||
def purge(self, timestamp, frag_index):
|
def purge(self, timestamp, frag_index):
|
||||||
@ -3197,12 +3211,13 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
diskfile_cls = ECDiskFile
|
diskfile_cls = ECDiskFile
|
||||||
policy = EC_POLICY
|
policy = EC_POLICY
|
||||||
|
|
||||||
def validate_fragment_index(self, frag_index):
|
def validate_fragment_index(self, frag_index, policy=None):
|
||||||
"""
|
"""
|
||||||
Return int representation of frag_index, or raise a DiskFileError if
|
Return int representation of frag_index, or raise a DiskFileError if
|
||||||
frag_index is not a whole number.
|
frag_index is not a whole number.
|
||||||
|
|
||||||
:param frag_index: a fragment archive index
|
:param frag_index: a fragment archive index
|
||||||
|
:param policy: storage policy used to validate the index against
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
frag_index = int(str(frag_index))
|
frag_index = int(str(frag_index))
|
||||||
@ -3212,6 +3227,11 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
if frag_index < 0:
|
if frag_index < 0:
|
||||||
raise DiskFileError(
|
raise DiskFileError(
|
||||||
'Fragment index must not be negative: %s' % frag_index)
|
'Fragment index must not be negative: %s' % frag_index)
|
||||||
|
if policy and frag_index >= policy.ec_ndata + policy.ec_nparity:
|
||||||
|
msg = 'Fragment index must be less than %d for a %d+%d policy: %s'
|
||||||
|
raise DiskFileError(msg % (
|
||||||
|
policy.ec_ndata + policy.ec_nparity,
|
||||||
|
policy.ec_ndata, policy.ec_nparity, frag_index))
|
||||||
return frag_index
|
return frag_index
|
||||||
|
|
||||||
def make_on_disk_filename(self, timestamp, ext=None, frag_index=None,
|
def make_on_disk_filename(self, timestamp, ext=None, frag_index=None,
|
||||||
@ -3244,7 +3264,7 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
return super(ECDiskFileManager, self).make_on_disk_filename(
|
return super(ECDiskFileManager, self).make_on_disk_filename(
|
||||||
timestamp, ext, ctype_timestamp, *a, **kw)
|
timestamp, ext, ctype_timestamp, *a, **kw)
|
||||||
|
|
||||||
def parse_on_disk_filename(self, filename):
|
def parse_on_disk_filename(self, filename, policy):
|
||||||
"""
|
"""
|
||||||
Returns timestamp(s) and other info extracted from a policy specific
|
Returns timestamp(s) and other info extracted from a policy specific
|
||||||
file name. For EC policy the data file name includes a fragment index
|
file name. For EC policy the data file name includes a fragment index
|
||||||
@ -3283,7 +3303,7 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
except IndexError:
|
except IndexError:
|
||||||
# expect validate_fragment_index raise DiskFileError
|
# expect validate_fragment_index raise DiskFileError
|
||||||
pass
|
pass
|
||||||
frag_index = self.validate_fragment_index(frag_index)
|
frag_index = self.validate_fragment_index(frag_index, policy)
|
||||||
try:
|
try:
|
||||||
durable = parts[2] == 'd'
|
durable = parts[2] == 'd'
|
||||||
except IndexError:
|
except IndexError:
|
||||||
@ -3295,7 +3315,8 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
'ctype_timestamp': None,
|
'ctype_timestamp': None,
|
||||||
'durable': durable
|
'durable': durable
|
||||||
}
|
}
|
||||||
rv = super(ECDiskFileManager, self).parse_on_disk_filename(filename)
|
rv = super(ECDiskFileManager, self).parse_on_disk_filename(
|
||||||
|
filename, policy)
|
||||||
rv['frag_index'] = None
|
rv['frag_index'] = None
|
||||||
return rv
|
return rv
|
||||||
|
|
||||||
@ -3516,11 +3537,12 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
file_info = ondisk_info['durable_frag_set'][0]
|
file_info = ondisk_info['durable_frag_set'][0]
|
||||||
hashes[None].update(file_info['timestamp'].internal + '.durable')
|
hashes[None].update(file_info['timestamp'].internal + '.durable')
|
||||||
|
|
||||||
def _hash_suffix(self, path):
|
def _hash_suffix(self, path, policy=None):
|
||||||
"""
|
"""
|
||||||
Performs reclamation and returns an md5 of all (remaining) files.
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
|
:param policy: storage policy used to store the files
|
||||||
:raises PathNotDir: if given path is not a valid directory
|
:raises PathNotDir: if given path is not a valid directory
|
||||||
:raises OSError: for non-ENOTDIR errors
|
:raises OSError: for non-ENOTDIR errors
|
||||||
:returns: dict of md5 hex digests
|
:returns: dict of md5 hex digests
|
||||||
@ -3529,5 +3551,5 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
# here we flatten out the hashers hexdigest into a dictionary instead
|
# here we flatten out the hashers hexdigest into a dictionary instead
|
||||||
# of just returning the one hexdigest for the whole suffix
|
# of just returning the one hexdigest for the whole suffix
|
||||||
|
|
||||||
hash_per_fi = self._hash_suffix_dir(path)
|
hash_per_fi = self._hash_suffix_dir(path, policy)
|
||||||
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
|
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
|
||||||
|
@ -966,6 +966,11 @@ class ObjectReconstructor(Daemon):
|
|||||||
# primary who's node_index matches the data's frag_index. With
|
# primary who's node_index matches the data's frag_index. With
|
||||||
# duplicated EC frags a revert job must sync to all primary nodes
|
# duplicated EC frags a revert job must sync to all primary nodes
|
||||||
# that should be holding this frag_index.
|
# that should be holding this frag_index.
|
||||||
|
if fi >= len(part_nodes):
|
||||||
|
self.logger.warning(
|
||||||
|
'Bad fragment index %r for suffixes %r under %s',
|
||||||
|
fi, data_fi_to_suffixes[fi], part_path)
|
||||||
|
continue
|
||||||
nodes_sync_to = []
|
nodes_sync_to = []
|
||||||
node_index = fi
|
node_index = fi
|
||||||
for n in range(policy.ec_duplication_factor):
|
for n in range(policy.ec_duplication_factor):
|
||||||
@ -1195,7 +1200,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
with Timeout(self.lockup_timeout):
|
with Timeout(self.lockup_timeout):
|
||||||
self.run_pool.waitall()
|
self.run_pool.waitall()
|
||||||
except (Exception, Timeout):
|
except (Exception, Timeout):
|
||||||
self.logger.exception(_("Exception in top-level"
|
self.logger.exception(_("Exception in top-level "
|
||||||
"reconstruction loop"))
|
"reconstruction loop"))
|
||||||
self.kill_coros()
|
self.kill_coros()
|
||||||
finally:
|
finally:
|
||||||
|
@ -902,7 +902,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
|
|||||||
expected = dict(
|
expected = dict(
|
||||||
data_file=None, meta_file=None, ctype_file=None, ts_file=None)
|
data_file=None, meta_file=None, ctype_file=None, ts_file=None)
|
||||||
for policy in POLICIES:
|
for policy in POLICIES:
|
||||||
for frag_index in (0, None, '14'):
|
for frag_index in (0, None, '13'):
|
||||||
# check manager
|
# check manager
|
||||||
df_mgr = self.df_router[policy]
|
df_mgr = self.df_router[policy]
|
||||||
datadir = os.path.join('/srv/node/sdb1/',
|
datadir = os.path.join('/srv/node/sdb1/',
|
||||||
@ -1629,14 +1629,14 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
Timestamp('1234567890.00001', offset=17)):
|
Timestamp('1234567890.00001', offset=17)):
|
||||||
for ext in ('.meta', '.data', '.ts'):
|
for ext in ('.meta', '.data', '.ts'):
|
||||||
fname = '%s%s' % (ts.internal, ext)
|
fname = '%s%s' % (ts.internal, ext)
|
||||||
info = mgr.parse_on_disk_filename(fname)
|
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertEqual(ts, info['timestamp'])
|
self.assertEqual(ts, info['timestamp'])
|
||||||
self.assertEqual(ext, info['ext'])
|
self.assertEqual(ext, info['ext'])
|
||||||
|
|
||||||
def test_parse_on_disk_filename_errors(self):
|
def test_parse_on_disk_filename_errors(self):
|
||||||
mgr = self.df_router[POLICIES.default]
|
mgr = self.df_router[POLICIES.default]
|
||||||
with self.assertRaises(DiskFileError) as cm:
|
with self.assertRaises(DiskFileError) as cm:
|
||||||
mgr.parse_on_disk_filename('junk')
|
mgr.parse_on_disk_filename('junk', POLICIES.default)
|
||||||
self.assertEqual("Invalid Timestamp value in filename 'junk'",
|
self.assertEqual("Invalid Timestamp value in filename 'junk'",
|
||||||
str(cm.exception))
|
str(cm.exception))
|
||||||
|
|
||||||
@ -2462,9 +2462,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
for ts in (Timestamp('1234567890.00001'),
|
for ts in (Timestamp('1234567890.00001'),
|
||||||
Timestamp('1234567890.00001', offset=17)):
|
Timestamp('1234567890.00001', offset=17)):
|
||||||
# non-durable data file
|
# non-durable data file
|
||||||
for frag in (0, 2, 14):
|
for frag in (0, 2, 13):
|
||||||
fname = '%s#%s.data' % (ts.internal, frag)
|
fname = '%s#%s.data' % (ts.internal, frag)
|
||||||
info = mgr.parse_on_disk_filename(fname)
|
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertEqual(ts, info['timestamp'])
|
self.assertEqual(ts, info['timestamp'])
|
||||||
self.assertEqual('.data', info['ext'])
|
self.assertEqual('.data', info['ext'])
|
||||||
self.assertEqual(frag, info['frag_index'])
|
self.assertEqual(frag, info['frag_index'])
|
||||||
@ -2472,9 +2472,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
self.assertEqual(mgr.make_on_disk_filename(**info), fname)
|
self.assertEqual(mgr.make_on_disk_filename(**info), fname)
|
||||||
|
|
||||||
# durable data file
|
# durable data file
|
||||||
for frag in (0, 2, 14):
|
for frag in (0, 2, 13):
|
||||||
fname = '%s#%s#d.data' % (ts.internal, frag)
|
fname = '%s#%s#d.data' % (ts.internal, frag)
|
||||||
info = mgr.parse_on_disk_filename(fname)
|
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertEqual(ts, info['timestamp'])
|
self.assertEqual(ts, info['timestamp'])
|
||||||
self.assertEqual('.data', info['ext'])
|
self.assertEqual('.data', info['ext'])
|
||||||
self.assertEqual(frag, info['frag_index'])
|
self.assertEqual(frag, info['frag_index'])
|
||||||
@ -2483,9 +2483,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
|
|
||||||
# data file with unexpected suffix marker, not an error in case
|
# data file with unexpected suffix marker, not an error in case
|
||||||
# alternative marker suffixes added in future
|
# alternative marker suffixes added in future
|
||||||
for frag in (0, 2, 14):
|
for frag in (0, 2, 13):
|
||||||
fname = '%s#%s#junk.data' % (ts.internal, frag)
|
fname = '%s#%s#junk.data' % (ts.internal, frag)
|
||||||
info = mgr.parse_on_disk_filename(fname)
|
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertEqual(ts, info['timestamp'])
|
self.assertEqual(ts, info['timestamp'])
|
||||||
self.assertEqual('.data', info['ext'])
|
self.assertEqual('.data', info['ext'])
|
||||||
self.assertEqual(frag, info['frag_index'])
|
self.assertEqual(frag, info['frag_index'])
|
||||||
@ -2495,7 +2495,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
|
|
||||||
for ext in ('.meta', '.durable', '.ts'):
|
for ext in ('.meta', '.durable', '.ts'):
|
||||||
fname = '%s%s' % (ts.internal, ext)
|
fname = '%s%s' % (ts.internal, ext)
|
||||||
info = mgr.parse_on_disk_filename(fname)
|
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertEqual(ts, info['timestamp'])
|
self.assertEqual(ts, info['timestamp'])
|
||||||
self.assertEqual(ext, info['ext'])
|
self.assertEqual(ext, info['ext'])
|
||||||
self.assertIsNone(info['frag_index'])
|
self.assertIsNone(info['frag_index'])
|
||||||
@ -2507,7 +2507,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
Timestamp('1234567890.00001', offset=17)):
|
Timestamp('1234567890.00001', offset=17)):
|
||||||
fname = '%s.data' % ts.internal
|
fname = '%s.data' % ts.internal
|
||||||
with self.assertRaises(DiskFileError) as cm:
|
with self.assertRaises(DiskFileError) as cm:
|
||||||
mgr.parse_on_disk_filename(fname)
|
mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertTrue(str(cm.exception).startswith("Bad fragment index"))
|
self.assertTrue(str(cm.exception).startswith("Bad fragment index"))
|
||||||
|
|
||||||
expected = {
|
expected = {
|
||||||
@ -2525,18 +2525,18 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
for frag, msg in expected.items():
|
for frag, msg in expected.items():
|
||||||
fname = '%s#%s.data' % (ts.internal, frag)
|
fname = '%s#%s.data' % (ts.internal, frag)
|
||||||
with self.assertRaises(DiskFileError) as cm:
|
with self.assertRaises(DiskFileError) as cm:
|
||||||
mgr.parse_on_disk_filename(fname)
|
mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertIn(msg, str(cm.exception).lower())
|
self.assertIn(msg, str(cm.exception).lower())
|
||||||
|
|
||||||
# durable data file
|
# durable data file
|
||||||
for frag, msg in expected.items():
|
for frag, msg in expected.items():
|
||||||
fname = '%s#%s#d.data' % (ts.internal, frag)
|
fname = '%s#%s#d.data' % (ts.internal, frag)
|
||||||
with self.assertRaises(DiskFileError) as cm:
|
with self.assertRaises(DiskFileError) as cm:
|
||||||
mgr.parse_on_disk_filename(fname)
|
mgr.parse_on_disk_filename(fname, POLICIES.default)
|
||||||
self.assertIn(msg, str(cm.exception).lower())
|
self.assertIn(msg, str(cm.exception).lower())
|
||||||
|
|
||||||
with self.assertRaises(DiskFileError) as cm:
|
with self.assertRaises(DiskFileError) as cm:
|
||||||
mgr.parse_on_disk_filename('junk')
|
mgr.parse_on_disk_filename('junk', POLICIES.default)
|
||||||
self.assertEqual("Invalid Timestamp value in filename 'junk'",
|
self.assertEqual("Invalid Timestamp value in filename 'junk'",
|
||||||
str(cm.exception))
|
str(cm.exception))
|
||||||
|
|
||||||
@ -2544,14 +2544,15 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
mgr = self.df_router[POLICIES.default]
|
mgr = self.df_router[POLICIES.default]
|
||||||
for ts in (Timestamp('1234567890.00001'),
|
for ts in (Timestamp('1234567890.00001'),
|
||||||
Timestamp('1234567890.00001', offset=17)):
|
Timestamp('1234567890.00001', offset=17)):
|
||||||
for frag in (0, '0', 2, '2', 14, '14'):
|
for frag in (0, '0', 2, '2', 13, '13'):
|
||||||
for durable in (True, False):
|
for durable in (True, False):
|
||||||
expected = _make_datafilename(
|
expected = _make_datafilename(
|
||||||
ts, POLICIES.default, frag_index=frag, durable=durable)
|
ts, POLICIES.default, frag_index=frag, durable=durable)
|
||||||
actual = mgr.make_on_disk_filename(
|
actual = mgr.make_on_disk_filename(
|
||||||
ts, '.data', frag_index=frag, durable=durable)
|
ts, '.data', frag_index=frag, durable=durable)
|
||||||
self.assertEqual(expected, actual)
|
self.assertEqual(expected, actual)
|
||||||
parsed = mgr.parse_on_disk_filename(actual)
|
parsed = mgr.parse_on_disk_filename(
|
||||||
|
actual, POLICIES.default)
|
||||||
self.assertEqual(parsed, {
|
self.assertEqual(parsed, {
|
||||||
'timestamp': ts,
|
'timestamp': ts,
|
||||||
'frag_index': int(frag),
|
'frag_index': int(frag),
|
||||||
@ -2573,7 +2574,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
actual = mgr.make_on_disk_filename(
|
actual = mgr.make_on_disk_filename(
|
||||||
ts, ext, frag_index=frag)
|
ts, ext, frag_index=frag)
|
||||||
self.assertEqual(expected, actual)
|
self.assertEqual(expected, actual)
|
||||||
parsed = mgr.parse_on_disk_filename(actual)
|
parsed = mgr.parse_on_disk_filename(
|
||||||
|
actual, POLICIES.default)
|
||||||
self.assertEqual(parsed, {
|
self.assertEqual(parsed, {
|
||||||
'timestamp': ts,
|
'timestamp': ts,
|
||||||
'frag_index': None,
|
'frag_index': None,
|
||||||
@ -2591,19 +2593,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
def test_make_on_disk_filename_with_bad_frag_index(self):
|
def test_make_on_disk_filename_with_bad_frag_index(self):
|
||||||
mgr = self.df_router[POLICIES.default]
|
mgr = self.df_router[POLICIES.default]
|
||||||
ts = Timestamp('1234567890.00001')
|
ts = Timestamp('1234567890.00001')
|
||||||
try:
|
with self.assertRaises(DiskFileError):
|
||||||
# .data requires a frag_index kwarg
|
# .data requires a frag_index kwarg
|
||||||
mgr.make_on_disk_filename(ts, '.data')
|
mgr.make_on_disk_filename(ts, '.data')
|
||||||
self.fail('Expected DiskFileError for missing frag_index')
|
|
||||||
except DiskFileError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
for frag in (None, 'foo', '1.314', 1.314, -2, '-2'):
|
for frag in (None, 'foo', '1.314', 1.314, -2, '-2'):
|
||||||
try:
|
with self.assertRaises(DiskFileError):
|
||||||
mgr.make_on_disk_filename(ts, '.data', frag_index=frag)
|
mgr.make_on_disk_filename(ts, '.data', frag_index=frag)
|
||||||
self.fail('Expected DiskFileError for frag_index %s' % frag)
|
|
||||||
except DiskFileError:
|
|
||||||
pass
|
|
||||||
for ext in ('.meta', '.durable', '.ts'):
|
for ext in ('.meta', '.durable', '.ts'):
|
||||||
expected = '%s%s' % (ts.internal, ext)
|
expected = '%s%s' % (ts.internal, ext)
|
||||||
# bad frag index should be ignored
|
# bad frag index should be ignored
|
||||||
@ -2622,7 +2619,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
|
|||||||
actual = mgr.make_on_disk_filename(
|
actual = mgr.make_on_disk_filename(
|
||||||
t_meta, '.meta', ctype_timestamp=t_type)
|
t_meta, '.meta', ctype_timestamp=t_type)
|
||||||
self.assertEqual(expected, actual)
|
self.assertEqual(expected, actual)
|
||||||
parsed = mgr.parse_on_disk_filename(actual)
|
parsed = mgr.parse_on_disk_filename(actual, POLICIES.default)
|
||||||
self.assertEqual(parsed, {
|
self.assertEqual(parsed, {
|
||||||
'timestamp': t_meta,
|
'timestamp': t_meta,
|
||||||
'frag_index': None,
|
'frag_index': None,
|
||||||
@ -3980,25 +3977,18 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
with mock.patch("swift.obj.diskfile.mkstemp",
|
with mock.patch("swift.obj.diskfile.mkstemp",
|
||||||
mock.MagicMock(side_effect=OSError(
|
mock.MagicMock(side_effect=OSError(
|
||||||
e, os.strerror(e)))):
|
e, os.strerror(e)))):
|
||||||
try:
|
with self.assertRaises(DiskFileNoSpace):
|
||||||
with df.create(size=200):
|
with df.create(size=200):
|
||||||
pass
|
pass
|
||||||
except DiskFileNoSpace:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
self.fail("Expected exception DiskFileNoSpace")
|
|
||||||
|
|
||||||
# Other OSErrors must not be raised as DiskFileNoSpace
|
# Other OSErrors must not be raised as DiskFileNoSpace
|
||||||
with mock.patch("swift.obj.diskfile.mkstemp",
|
with mock.patch("swift.obj.diskfile.mkstemp",
|
||||||
mock.MagicMock(side_effect=OSError(
|
mock.MagicMock(side_effect=OSError(
|
||||||
errno.EACCES, os.strerror(errno.EACCES)))):
|
errno.EACCES, os.strerror(errno.EACCES)))):
|
||||||
try:
|
with self.assertRaises(OSError) as raised:
|
||||||
with df.create(size=200):
|
with df.create(size=200):
|
||||||
pass
|
pass
|
||||||
except OSError:
|
self.assertEqual(raised.exception.errno, errno.EACCES)
|
||||||
pass
|
|
||||||
else:
|
|
||||||
self.fail("Expected exception OSError")
|
|
||||||
|
|
||||||
def test_create_close_oserror(self):
|
def test_create_close_oserror(self):
|
||||||
# This is a horrible hack so you can run this test in isolation.
|
# This is a horrible hack so you can run this test in isolation.
|
||||||
@ -4012,12 +4002,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
with mock.patch("swift.obj.diskfile.os.close",
|
with mock.patch("swift.obj.diskfile.os.close",
|
||||||
mock.MagicMock(side_effect=OSError(
|
mock.MagicMock(side_effect=OSError(
|
||||||
errno.EACCES, os.strerror(errno.EACCES)))):
|
errno.EACCES, os.strerror(errno.EACCES)))):
|
||||||
try:
|
with df.create(size=200):
|
||||||
with df.create(size=200):
|
|
||||||
pass
|
|
||||||
except Exception as err:
|
|
||||||
self.fail("Unexpected exception raised: %r" % err)
|
|
||||||
else:
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def test_write_metadata(self):
|
def test_write_metadata(self):
|
||||||
@ -4520,12 +4505,9 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
self._create_ondisk_file(df, 'B', ext='.data', timestamp=6)
|
self._create_ondisk_file(df, 'B', ext='.data', timestamp=6)
|
||||||
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
|
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
|
||||||
df = self._simple_get_diskfile()
|
df = self._simple_get_diskfile()
|
||||||
try:
|
with self.assertRaises(DiskFileDeleted) as raised:
|
||||||
df.open()
|
df.open()
|
||||||
except DiskFileDeleted as d:
|
self.assertEqual(raised.exception.timestamp, Timestamp(10).internal)
|
||||||
self.assertEqual(d.timestamp, Timestamp(10).internal)
|
|
||||||
else:
|
|
||||||
self.fail("Expected DiskFileDeleted exception")
|
|
||||||
|
|
||||||
def test_ondisk_search_loop_meta_ts_data(self):
|
def test_ondisk_search_loop_meta_ts_data(self):
|
||||||
df = self._simple_get_diskfile()
|
df = self._simple_get_diskfile()
|
||||||
@ -4536,12 +4518,9 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
self._create_ondisk_file(df, 'B', ext='.data', timestamp=6)
|
self._create_ondisk_file(df, 'B', ext='.data', timestamp=6)
|
||||||
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
|
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
|
||||||
df = self._simple_get_diskfile()
|
df = self._simple_get_diskfile()
|
||||||
try:
|
with self.assertRaises(DiskFileDeleted) as raised:
|
||||||
df.open()
|
df.open()
|
||||||
except DiskFileDeleted as d:
|
self.assertEqual(raised.exception.timestamp, Timestamp(8).internal)
|
||||||
self.assertEqual(d.timestamp, Timestamp(8).internal)
|
|
||||||
else:
|
|
||||||
self.fail("Expected DiskFileDeleted exception")
|
|
||||||
|
|
||||||
def _test_ondisk_search_loop_meta_data_ts(self, legacy_durable=False):
|
def _test_ondisk_search_loop_meta_data_ts(self, legacy_durable=False):
|
||||||
df = self._simple_get_diskfile()
|
df = self._simple_get_diskfile()
|
||||||
@ -4883,10 +4862,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
|
|||||||
ts = time()
|
ts = time()
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
self._manager_mock('cleanup_ondisk_files'), mock_cleanup):
|
self._manager_mock('cleanup_ondisk_files'), mock_cleanup):
|
||||||
try:
|
# Expect to swallow the OSError
|
||||||
df.delete(ts)
|
df.delete(ts)
|
||||||
except OSError:
|
|
||||||
self.fail("OSError raised when it should have been swallowed")
|
|
||||||
exp_name = '%s.ts' % str(Timestamp(ts).internal)
|
exp_name = '%s.ts' % str(Timestamp(ts).internal)
|
||||||
dl = os.listdir(df._datadir)
|
dl = os.listdir(df._datadir)
|
||||||
self.assertEqual(len(dl), file_count + 1)
|
self.assertEqual(len(dl), file_count + 1)
|
||||||
@ -5286,7 +5263,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
|||||||
|
|
||||||
def test_data_file_has_frag_index(self):
|
def test_data_file_has_frag_index(self):
|
||||||
policy = POLICIES.default
|
policy = POLICIES.default
|
||||||
for good_value in (0, '0', 2, '2', 14, '14'):
|
for good_value in (0, '0', 2, '2', 13, '13'):
|
||||||
# frag_index set by constructor arg
|
# frag_index set by constructor arg
|
||||||
ts = self.ts()
|
ts = self.ts()
|
||||||
expected = [_make_datafilename(
|
expected = [_make_datafilename(
|
||||||
@ -5304,7 +5281,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
|||||||
ts, policy, good_value, durable=True)]
|
ts, policy, good_value, durable=True)]
|
||||||
meta = {'X-Object-Sysmeta-Ec-Frag-Index': good_value}
|
meta = {'X-Object-Sysmeta-Ec-Frag-Index': good_value}
|
||||||
df = self._get_open_disk_file(ts=ts, policy=policy,
|
df = self._get_open_disk_file(ts=ts, policy=policy,
|
||||||
frag_index='99',
|
frag_index='3',
|
||||||
extra_metadata=meta)
|
extra_metadata=meta)
|
||||||
self.assertEqual(expected, sorted(os.listdir(df._datadir)))
|
self.assertEqual(expected, sorted(os.listdir(df._datadir)))
|
||||||
actual = df.get_metadata().get('X-Object-Sysmeta-Ec-Frag-Index')
|
actual = df.get_metadata().get('X-Object-Sysmeta-Ec-Frag-Index')
|
||||||
@ -5326,7 +5303,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
|||||||
# the X-Object-Sysmeta-Ec-Frag-Index should *only* be set when
|
# the X-Object-Sysmeta-Ec-Frag-Index should *only* be set when
|
||||||
# the .data file is written.
|
# the .data file is written.
|
||||||
policy = POLICIES.default
|
policy = POLICIES.default
|
||||||
orig_frag_index = 14
|
orig_frag_index = 13
|
||||||
# frag_index set by constructor arg
|
# frag_index set by constructor arg
|
||||||
ts = self.ts()
|
ts = self.ts()
|
||||||
expected = [_make_datafilename(
|
expected = [_make_datafilename(
|
||||||
@ -5389,7 +5366,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
|||||||
def test_data_file_errors_bad_frag_index(self):
|
def test_data_file_errors_bad_frag_index(self):
|
||||||
policy = POLICIES.default
|
policy = POLICIES.default
|
||||||
df_mgr = self.df_router[policy]
|
df_mgr = self.df_router[policy]
|
||||||
for bad_value in ('foo', '-2', -2, '3.14', 3.14):
|
for bad_value in ('foo', '-2', -2, '3.14', 3.14, '14', 14, '999'):
|
||||||
# check that bad frag_index set by constructor arg raises error
|
# check that bad frag_index set by constructor arg raises error
|
||||||
# as soon as diskfile is constructed, before data is written
|
# as soon as diskfile is constructed, before data is written
|
||||||
self.assertRaises(DiskFileError, self._simple_get_diskfile,
|
self.assertRaises(DiskFileError, self._simple_get_diskfile,
|
||||||
@ -6865,7 +6842,7 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
# creates pkl file
|
# creates pkl file
|
||||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
mock_consolidate_hashes.assert_called_once()
|
mock_consolidate_hashes.assert_called_once()
|
||||||
self.assertEqual([mock.call(suffix_dir)],
|
self.assertEqual([mock.call(suffix_dir, policy=policy)],
|
||||||
mock_hash_suffix.call_args_list)
|
mock_hash_suffix.call_args_list)
|
||||||
# second object in path
|
# second object in path
|
||||||
df2 = self.get_different_suffix_df(df)
|
df2 = self.get_different_suffix_df(df)
|
||||||
@ -6876,7 +6853,7 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
# updates pkl file
|
# updates pkl file
|
||||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
mock_consolidate_hashes.assert_called_once()
|
mock_consolidate_hashes.assert_called_once()
|
||||||
self.assertEqual([mock.call(suffix_dir2)],
|
self.assertEqual([mock.call(suffix_dir2, policy=policy)],
|
||||||
mock_hash_suffix.call_args_list)
|
mock_hash_suffix.call_args_list)
|
||||||
|
|
||||||
def test_consolidate_hashes_raises_exception(self):
|
def test_consolidate_hashes_raises_exception(self):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user