Merge "Treat all invalid frag indexes the same"

This commit is contained in:
Zuul 2019-01-26 13:29:34 +00:00 committed by Gerrit Code Review
commit b608c9cda7
3 changed files with 89 additions and 85 deletions

View File

@ -749,11 +749,12 @@ class BaseDiskFileManager(object):
rv = '%s%s' % (rv, ext)
return rv
def parse_on_disk_filename(self, filename):
def parse_on_disk_filename(self, filename, policy):
"""
Parse an on disk file name.
:param filename: the file name including extension
:param policy: storage policy used to store the file
:returns: a dict, with keys for timestamp, ext and ctype_timestamp:
* timestamp is a :class:`~swift.common.utils.Timestamp`
@ -860,7 +861,8 @@ class BaseDiskFileManager(object):
return self._split_list(
file_info_list, lambda x: x['timestamp'] >= timestamp)
def get_ondisk_files(self, files, datadir, verify=True, **kwargs):
def get_ondisk_files(self, files, datadir, verify=True, policy=None,
**kwargs):
"""
Given a simple list of files names, determine the files that constitute
a valid fileset i.e. a set of files that defines the state of an
@ -881,6 +883,8 @@ class BaseDiskFileManager(object):
:param datadir: directory name files are from.
:param verify: if True verify that the ondisk file contract has not
been violated, otherwise do not verify.
:param policy: storage policy used to store the files. Used to
validate fragment indexes for EC policies.
:returns: a dict that will contain keys:
ts_file -> path to a .ts file or None
data_file -> path to a .data file or None
@ -914,7 +918,7 @@ class BaseDiskFileManager(object):
for afile in files:
# Categorize files by extension
try:
file_info = self.parse_on_disk_filename(afile)
file_info = self.parse_on_disk_filename(afile, policy)
file_info['filename'] = afile
exts[file_info['ext']].append(file_info)
except DiskFileError as e:
@ -1069,10 +1073,11 @@ class BaseDiskFileManager(object):
"""
raise NotImplementedError
def _hash_suffix_dir(self, path):
def _hash_suffix_dir(self, path, policy):
"""
:param path: full path to directory
:param policy: storage policy used
"""
if six.PY2:
hashes = defaultdict(md5)
@ -1099,7 +1104,8 @@ class BaseDiskFileManager(object):
for hsh in path_contents:
hsh_path = join(path, hsh)
try:
ondisk_info = self.cleanup_ondisk_files(hsh_path)
ondisk_info = self.cleanup_ondisk_files(
hsh_path, policy=policy)
except OSError as err:
if err.errno == errno.ENOTDIR:
partition_path = dirname(path)
@ -1153,11 +1159,12 @@ class BaseDiskFileManager(object):
raise PathNotDir()
return hashes
def _hash_suffix(self, path):
def _hash_suffix(self, path, policy=None):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param path: full path to directory
:param policy: storage policy used to store the files
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
"""
@ -1233,7 +1240,8 @@ class BaseDiskFileManager(object):
if not hash_:
suffix_dir = join(partition_path, suffix)
try:
hashes[suffix] = self._hash_suffix(suffix_dir)
hashes[suffix] = self._hash_suffix(
suffix_dir, policy=policy)
hashed += 1
except PathNotDir:
del hashes[suffix]
@ -2404,7 +2412,7 @@ class BaseDiskFile(object):
files = []
# gather info about the valid files to use to open the DiskFile
file_info = self._get_ondisk_files(files)
file_info = self._get_ondisk_files(files, self.policy)
self._data_file = file_info.get('data_file')
if not self._data_file:
@ -2459,11 +2467,12 @@ class BaseDiskFile(object):
self._logger.increment('quarantines')
return DiskFileQuarantined(msg)
def _get_ondisk_files(self, files):
def _get_ondisk_files(self, files, policy=None):
"""
Determine the on-disk files to use.
:param files: a list of files in the object's dir
:param policy: storage policy used to store the files
:returns: dict of files to use having keys 'data_file', 'ts_file',
'meta_file'
"""
@ -2858,8 +2867,9 @@ class DiskFile(BaseDiskFile):
reader_cls = DiskFileReader
writer_cls = DiskFileWriter
def _get_ondisk_files(self, files):
self._ondisk_info = self.manager.get_ondisk_files(files, self._datadir)
def _get_ondisk_files(self, files, policy=None):
self._ondisk_info = self.manager.get_ondisk_files(
files, self._datadir, policy=policy)
return self._ondisk_info
@ -2909,16 +2919,17 @@ class DiskFileManager(BaseDiskFileManager):
hashes[None].update(
file_info['timestamp'].internal + file_info['ext'])
def _hash_suffix(self, path):
def _hash_suffix(self, path, policy=None):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param path: full path to directory
:param policy: storage policy used to store the files
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
:returns: md5 of files in suffix
"""
hashes = self._hash_suffix_dir(path)
hashes = self._hash_suffix_dir(path, policy)
return hashes[None].hexdigest()
@ -3089,7 +3100,8 @@ class ECDiskFileWriter(BaseDiskFileWriter):
# sure that the fragment index is included in object sysmeta.
fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
self._diskfile._frag_index)
fi = self.manager.validate_fragment_index(fi)
fi = self.manager.validate_fragment_index(
fi, self._diskfile.policy)
self._diskfile._frag_index = fi
# defer cleanup until commit() writes makes diskfile durable
cleanup = False
@ -3106,7 +3118,8 @@ class ECDiskFile(BaseDiskFile):
frag_index = kwargs.get('frag_index')
self._frag_index = None
if frag_index is not None:
self._frag_index = self.manager.validate_fragment_index(frag_index)
self._frag_index = self.manager.validate_fragment_index(
frag_index, self.policy)
self._frag_prefs = self._validate_frag_prefs(kwargs.get('frag_prefs'))
self._durable_frag_set = None
@ -3175,17 +3188,18 @@ class ECDiskFile(BaseDiskFile):
return dict([(ts, [info['frag_index'] for info in frag_set])
for ts, frag_set in frag_sets.items()])
def _get_ondisk_files(self, files):
def _get_ondisk_files(self, files, policy=None):
"""
The only difference between this method and the replication policy
DiskFile method is passing in the frag_index and frag_prefs kwargs to
our manager's get_ondisk_files method.
:param files: list of file names
:param policy: storage policy used to store the files
"""
self._ondisk_info = self.manager.get_ondisk_files(
files, self._datadir, frag_index=self._frag_index,
frag_prefs=self._frag_prefs)
frag_prefs=self._frag_prefs, policy=policy)
return self._ondisk_info
def purge(self, timestamp, frag_index):
@ -3226,12 +3240,13 @@ class ECDiskFileManager(BaseDiskFileManager):
diskfile_cls = ECDiskFile
policy = EC_POLICY
def validate_fragment_index(self, frag_index):
def validate_fragment_index(self, frag_index, policy=None):
"""
Return int representation of frag_index, or raise a DiskFileError if
frag_index is not a whole number.
:param frag_index: a fragment archive index
:param policy: storage policy used to validate the index against
"""
try:
frag_index = int(str(frag_index))
@ -3241,6 +3256,11 @@ class ECDiskFileManager(BaseDiskFileManager):
if frag_index < 0:
raise DiskFileError(
'Fragment index must not be negative: %s' % frag_index)
if policy and frag_index >= policy.ec_ndata + policy.ec_nparity:
msg = 'Fragment index must be less than %d for a %d+%d policy: %s'
raise DiskFileError(msg % (
policy.ec_ndata + policy.ec_nparity,
policy.ec_ndata, policy.ec_nparity, frag_index))
return frag_index
def make_on_disk_filename(self, timestamp, ext=None, frag_index=None,
@ -3273,7 +3293,7 @@ class ECDiskFileManager(BaseDiskFileManager):
return super(ECDiskFileManager, self).make_on_disk_filename(
timestamp, ext, ctype_timestamp, *a, **kw)
def parse_on_disk_filename(self, filename):
def parse_on_disk_filename(self, filename, policy):
"""
Returns timestamp(s) and other info extracted from a policy specific
file name. For EC policy the data file name includes a fragment index
@ -3312,7 +3332,7 @@ class ECDiskFileManager(BaseDiskFileManager):
except IndexError:
# expect validate_fragment_index raise DiskFileError
pass
frag_index = self.validate_fragment_index(frag_index)
frag_index = self.validate_fragment_index(frag_index, policy)
try:
durable = parts[2] == 'd'
except IndexError:
@ -3324,7 +3344,8 @@ class ECDiskFileManager(BaseDiskFileManager):
'ctype_timestamp': None,
'durable': durable
}
rv = super(ECDiskFileManager, self).parse_on_disk_filename(filename)
rv = super(ECDiskFileManager, self).parse_on_disk_filename(
filename, policy)
rv['frag_index'] = None
return rv
@ -3545,11 +3566,12 @@ class ECDiskFileManager(BaseDiskFileManager):
file_info = ondisk_info['durable_frag_set'][0]
hashes[None].update(file_info['timestamp'].internal + '.durable')
def _hash_suffix(self, path):
def _hash_suffix(self, path, policy=None):
"""
Performs reclamation and returns an md5 of all (remaining) files.
:param path: full path to directory
:param policy: storage policy used to store the files
:raises PathNotDir: if given path is not a valid directory
:raises OSError: for non-ENOTDIR errors
:returns: dict of md5 hex digests
@ -3558,5 +3580,5 @@ class ECDiskFileManager(BaseDiskFileManager):
# here we flatten out the hashers hexdigest into a dictionary instead
# of just returning the one hexdigest for the whole suffix
hash_per_fi = self._hash_suffix_dir(path)
hash_per_fi = self._hash_suffix_dir(path, policy)
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())

View File

@ -968,6 +968,11 @@ class ObjectReconstructor(Daemon):
# primary who's node_index matches the data's frag_index. With
# duplicated EC frags a revert job must sync to all primary nodes
# that should be holding this frag_index.
if fi >= len(part_nodes):
self.logger.warning(
'Bad fragment index %r for suffixes %r under %s',
fi, data_fi_to_suffixes[fi], part_path)
continue
nodes_sync_to = []
node_index = fi
for n in range(policy.ec_duplication_factor):

View File

@ -902,7 +902,7 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
expected = dict(
data_file=None, meta_file=None, ctype_file=None, ts_file=None)
for policy in POLICIES:
for frag_index in (0, None, '14'):
for frag_index in (0, None, '13'):
# check manager
df_mgr = self.df_router[policy]
datadir = os.path.join('/srv/node/sdb1/',
@ -1656,14 +1656,14 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
Timestamp('1234567890.00001', offset=17)):
for ext in ('.meta', '.data', '.ts'):
fname = '%s%s' % (ts.internal, ext)
info = mgr.parse_on_disk_filename(fname)
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertEqual(ts, info['timestamp'])
self.assertEqual(ext, info['ext'])
def test_parse_on_disk_filename_errors(self):
mgr = self.df_router[POLICIES.default]
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename('junk')
mgr.parse_on_disk_filename('junk', POLICIES.default)
self.assertEqual("Invalid Timestamp value in filename 'junk'",
str(cm.exception))
@ -2489,9 +2489,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
# non-durable data file
for frag in (0, 2, 14):
for frag in (0, 2, 13):
fname = '%s#%s.data' % (ts.internal, frag)
info = mgr.parse_on_disk_filename(fname)
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertEqual(ts, info['timestamp'])
self.assertEqual('.data', info['ext'])
self.assertEqual(frag, info['frag_index'])
@ -2499,9 +2499,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
self.assertEqual(mgr.make_on_disk_filename(**info), fname)
# durable data file
for frag in (0, 2, 14):
for frag in (0, 2, 13):
fname = '%s#%s#d.data' % (ts.internal, frag)
info = mgr.parse_on_disk_filename(fname)
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertEqual(ts, info['timestamp'])
self.assertEqual('.data', info['ext'])
self.assertEqual(frag, info['frag_index'])
@ -2510,9 +2510,9 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
# data file with unexpected suffix marker, not an error in case
# alternative marker suffixes added in future
for frag in (0, 2, 14):
for frag in (0, 2, 13):
fname = '%s#%s#junk.data' % (ts.internal, frag)
info = mgr.parse_on_disk_filename(fname)
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertEqual(ts, info['timestamp'])
self.assertEqual('.data', info['ext'])
self.assertEqual(frag, info['frag_index'])
@ -2522,7 +2522,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for ext in ('.meta', '.durable', '.ts'):
fname = '%s%s' % (ts.internal, ext)
info = mgr.parse_on_disk_filename(fname)
info = mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertEqual(ts, info['timestamp'])
self.assertEqual(ext, info['ext'])
self.assertIsNone(info['frag_index'])
@ -2534,7 +2534,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
Timestamp('1234567890.00001', offset=17)):
fname = '%s.data' % ts.internal
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname)
mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertTrue(str(cm.exception).startswith("Bad fragment index"))
expected = {
@ -2552,18 +2552,18 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
for frag, msg in expected.items():
fname = '%s#%s.data' % (ts.internal, frag)
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname)
mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertIn(msg, str(cm.exception).lower())
# durable data file
for frag, msg in expected.items():
fname = '%s#%s#d.data' % (ts.internal, frag)
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname)
mgr.parse_on_disk_filename(fname, POLICIES.default)
self.assertIn(msg, str(cm.exception).lower())
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename('junk')
mgr.parse_on_disk_filename('junk', POLICIES.default)
self.assertEqual("Invalid Timestamp value in filename 'junk'",
str(cm.exception))
@ -2571,14 +2571,15 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
mgr = self.df_router[POLICIES.default]
for ts in (Timestamp('1234567890.00001'),
Timestamp('1234567890.00001', offset=17)):
for frag in (0, '0', 2, '2', 14, '14'):
for frag in (0, '0', 2, '2', 13, '13'):
for durable in (True, False):
expected = _make_datafilename(
ts, POLICIES.default, frag_index=frag, durable=durable)
actual = mgr.make_on_disk_filename(
ts, '.data', frag_index=frag, durable=durable)
self.assertEqual(expected, actual)
parsed = mgr.parse_on_disk_filename(actual)
parsed = mgr.parse_on_disk_filename(
actual, POLICIES.default)
self.assertEqual(parsed, {
'timestamp': ts,
'frag_index': int(frag),
@ -2600,7 +2601,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
actual = mgr.make_on_disk_filename(
ts, ext, frag_index=frag)
self.assertEqual(expected, actual)
parsed = mgr.parse_on_disk_filename(actual)
parsed = mgr.parse_on_disk_filename(
actual, POLICIES.default)
self.assertEqual(parsed, {
'timestamp': ts,
'frag_index': None,
@ -2618,19 +2620,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
def test_make_on_disk_filename_with_bad_frag_index(self):
mgr = self.df_router[POLICIES.default]
ts = Timestamp('1234567890.00001')
try:
with self.assertRaises(DiskFileError):
# .data requires a frag_index kwarg
mgr.make_on_disk_filename(ts, '.data')
self.fail('Expected DiskFileError for missing frag_index')
except DiskFileError:
pass
for frag in (None, 'foo', '1.314', 1.314, -2, '-2'):
try:
with self.assertRaises(DiskFileError):
mgr.make_on_disk_filename(ts, '.data', frag_index=frag)
self.fail('Expected DiskFileError for frag_index %s' % frag)
except DiskFileError:
pass
for ext in ('.meta', '.durable', '.ts'):
expected = '%s%s' % (ts.internal, ext)
# bad frag index should be ignored
@ -2649,7 +2646,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
actual = mgr.make_on_disk_filename(
t_meta, '.meta', ctype_timestamp=t_type)
self.assertEqual(expected, actual)
parsed = mgr.parse_on_disk_filename(actual)
parsed = mgr.parse_on_disk_filename(actual, POLICIES.default)
self.assertEqual(parsed, {
'timestamp': t_meta,
'frag_index': None,
@ -4007,25 +4004,18 @@ class DiskFileMixin(BaseDiskFileTestMixin):
with mock.patch("swift.obj.diskfile.mkstemp",
mock.MagicMock(side_effect=OSError(
e, os.strerror(e)))):
try:
with self.assertRaises(DiskFileNoSpace):
with df.create(size=200):
pass
except DiskFileNoSpace:
pass
else:
self.fail("Expected exception DiskFileNoSpace")
# Other OSErrors must not be raised as DiskFileNoSpace
with mock.patch("swift.obj.diskfile.mkstemp",
mock.MagicMock(side_effect=OSError(
errno.EACCES, os.strerror(errno.EACCES)))):
try:
with self.assertRaises(OSError) as raised:
with df.create(size=200):
pass
except OSError:
pass
else:
self.fail("Expected exception OSError")
self.assertEqual(raised.exception.errno, errno.EACCES)
def test_create_close_oserror(self):
# This is a horrible hack so you can run this test in isolation.
@ -4039,12 +4029,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
with mock.patch("swift.obj.diskfile.os.close",
mock.MagicMock(side_effect=OSError(
errno.EACCES, os.strerror(errno.EACCES)))):
try:
with df.create(size=200):
pass
except Exception as err:
self.fail("Unexpected exception raised: %r" % err)
else:
with df.create(size=200):
pass
def test_write_metadata(self):
@ -4547,12 +4532,9 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self._create_ondisk_file(df, 'B', ext='.data', timestamp=6)
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
df = self._simple_get_diskfile()
try:
with self.assertRaises(DiskFileDeleted) as raised:
df.open()
except DiskFileDeleted as d:
self.assertEqual(d.timestamp, Timestamp(10).internal)
else:
self.fail("Expected DiskFileDeleted exception")
self.assertEqual(raised.exception.timestamp, Timestamp(10).internal)
def test_ondisk_search_loop_meta_ts_data(self):
df = self._simple_get_diskfile()
@ -4563,12 +4545,9 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self._create_ondisk_file(df, 'B', ext='.data', timestamp=6)
self._create_ondisk_file(df, 'A', ext='.data', timestamp=5)
df = self._simple_get_diskfile()
try:
with self.assertRaises(DiskFileDeleted) as raised:
df.open()
except DiskFileDeleted as d:
self.assertEqual(d.timestamp, Timestamp(8).internal)
else:
self.fail("Expected DiskFileDeleted exception")
self.assertEqual(raised.exception.timestamp, Timestamp(8).internal)
def _test_ondisk_search_loop_meta_data_ts(self, legacy_durable=False):
df = self._simple_get_diskfile()
@ -4910,10 +4889,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
ts = time()
with mock.patch(
self._manager_mock('cleanup_ondisk_files'), mock_cleanup):
try:
df.delete(ts)
except OSError:
self.fail("OSError raised when it should have been swallowed")
# Expect to swallow the OSError
df.delete(ts)
exp_name = '%s.ts' % str(Timestamp(ts).internal)
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), file_count + 1)
@ -5313,7 +5290,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
def test_data_file_has_frag_index(self):
policy = POLICIES.default
for good_value in (0, '0', 2, '2', 14, '14'):
for good_value in (0, '0', 2, '2', 13, '13'):
# frag_index set by constructor arg
ts = self.ts()
expected = [_make_datafilename(
@ -5331,7 +5308,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
ts, policy, good_value, durable=True)]
meta = {'X-Object-Sysmeta-Ec-Frag-Index': good_value}
df = self._get_open_disk_file(ts=ts, policy=policy,
frag_index='99',
frag_index='3',
extra_metadata=meta)
self.assertEqual(expected, sorted(os.listdir(df._datadir)))
actual = df.get_metadata().get('X-Object-Sysmeta-Ec-Frag-Index')
@ -5353,7 +5330,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
# the X-Object-Sysmeta-Ec-Frag-Index should *only* be set when
# the .data file is written.
policy = POLICIES.default
orig_frag_index = 14
orig_frag_index = 13
# frag_index set by constructor arg
ts = self.ts()
expected = [_make_datafilename(
@ -5416,7 +5393,7 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
def test_data_file_errors_bad_frag_index(self):
policy = POLICIES.default
df_mgr = self.df_router[policy]
for bad_value in ('foo', '-2', -2, '3.14', 3.14):
for bad_value in ('foo', '-2', -2, '3.14', 3.14, '14', 14, '999'):
# check that bad frag_index set by constructor arg raises error
# as soon as diskfile is constructed, before data is written
self.assertRaises(DiskFileError, self._simple_get_diskfile,
@ -6892,7 +6869,7 @@ class TestSuffixHashes(unittest.TestCase):
# creates pkl file
df_mgr.get_hashes('sda1', '0', [], policy)
mock_consolidate_hashes.assert_called_once()
self.assertEqual([mock.call(suffix_dir)],
self.assertEqual([mock.call(suffix_dir, policy=policy)],
mock_hash_suffix.call_args_list)
# second object in path
df2 = self.get_different_suffix_df(df)
@ -6903,7 +6880,7 @@ class TestSuffixHashes(unittest.TestCase):
# updates pkl file
df_mgr.get_hashes('sda1', '0', [], policy)
mock_consolidate_hashes.assert_called_once()
self.assertEqual([mock.call(suffix_dir2)],
self.assertEqual([mock.call(suffix_dir2, policy=policy)],
mock_hash_suffix.call_args_list)
def test_consolidate_hashes_raises_exception(self):