Merge "Fix race when consolidating new partition"
This commit is contained in:
commit
b3e69acb43
@ -283,23 +283,11 @@ def consolidate_hashes(partition_dir):
|
|||||||
:param suffix_dir: absolute path to partition dir containing hashes.pkl
|
:param suffix_dir: absolute path to partition dir containing hashes.pkl
|
||||||
and hashes.invalid
|
and hashes.invalid
|
||||||
|
|
||||||
:returns: the hashes, or None if there's no hashes.pkl.
|
:returns: a dict, the suffix hashes (if any), the key 'valid' will be False
|
||||||
|
if hashes.pkl is corrupt, cannot be read or does not exist
|
||||||
"""
|
"""
|
||||||
hashes_file = join(partition_dir, HASH_FILE)
|
|
||||||
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
|
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
|
||||||
|
|
||||||
if not os.path.exists(hashes_file):
|
|
||||||
if os.path.exists(invalidations_file):
|
|
||||||
# no hashes at all -> everything's invalid, so empty the file with
|
|
||||||
# the invalid suffixes in it, if it exists
|
|
||||||
try:
|
|
||||||
with open(invalidations_file, 'wb'):
|
|
||||||
pass
|
|
||||||
except OSError as e:
|
|
||||||
if e.errno != errno.ENOENT:
|
|
||||||
raise
|
|
||||||
return None
|
|
||||||
|
|
||||||
with lock_path(partition_dir):
|
with lock_path(partition_dir):
|
||||||
hashes = read_hashes(partition_dir)
|
hashes = read_hashes(partition_dir)
|
||||||
|
|
||||||
@ -1069,9 +1057,6 @@ class BaseDiskFileManager(object):
|
|||||||
self.logger.warning('Unable to read %r', hashes_file,
|
self.logger.warning('Unable to read %r', hashes_file,
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
|
|
||||||
if orig_hashes is None:
|
|
||||||
# consolidate_hashes returns None if hashes.pkl does not exist
|
|
||||||
orig_hashes = {'valid': False}
|
|
||||||
if not orig_hashes['valid']:
|
if not orig_hashes['valid']:
|
||||||
# This is the only path to a valid hashes from invalid read (e.g.
|
# This is the only path to a valid hashes from invalid read (e.g.
|
||||||
# does not exist, corrupt, etc.). Moreover, in order to write this
|
# does not exist, corrupt, etc.). Moreover, in order to write this
|
||||||
|
@ -6166,7 +6166,7 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
def test_invalidate_hash_racing_get_hashes_diff_suffix_existing_part(self):
|
def test_invalidate_hash_racing_get_hashes_diff_suffix_existing_part(self):
|
||||||
self._test_invalidate_hash_racing_get_hashes_diff_suffix(True)
|
self._test_invalidate_hash_racing_get_hashes_diff_suffix(True)
|
||||||
|
|
||||||
def test_hash_invalidations_survive_racing_get_hashes_same_suffix(self):
|
def _check_hash_invalidations_race_get_hashes_same_suffix(self, existing):
|
||||||
# verify that when two processes concurrently call get_hashes, then any
|
# verify that when two processes concurrently call get_hashes, then any
|
||||||
# concurrent hash invalidation will survive and be consolidated on a
|
# concurrent hash invalidation will survive and be consolidated on a
|
||||||
# subsequent call to get_hashes (i.e. ensure first get_hashes process
|
# subsequent call to get_hashes (i.e. ensure first get_hashes process
|
||||||
@ -6177,6 +6177,7 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
for policy in self.iter_policies():
|
for policy in self.iter_policies():
|
||||||
df_mgr = self.df_router[policy]
|
df_mgr = self.df_router[policy]
|
||||||
orig_hash_suffix = df_mgr._hash_suffix
|
orig_hash_suffix = df_mgr._hash_suffix
|
||||||
|
if existing:
|
||||||
# create hashes.pkl
|
# create hashes.pkl
|
||||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
|
|
||||||
@ -6210,7 +6211,10 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
# simulate another process calling get_hashes but failing
|
# simulate another process calling get_hashes but failing
|
||||||
# after hash invalidation have been consolidated
|
# after hash invalidation have been consolidated
|
||||||
hashes = df_mgr.consolidate_hashes(part_dir)
|
hashes = df_mgr.consolidate_hashes(part_dir)
|
||||||
|
if existing:
|
||||||
self.assertTrue(hashes['valid'])
|
self.assertTrue(hashes['valid'])
|
||||||
|
else:
|
||||||
|
self.assertFalse(hashes['valid'])
|
||||||
# get the updated suffix hash...
|
# get the updated suffix hash...
|
||||||
non_local['hash'] = orig_hash_suffix(suffix_dir)
|
non_local['hash'] = orig_hash_suffix(suffix_dir)
|
||||||
return result
|
return result
|
||||||
@ -6229,6 +6233,12 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
# so hashes should have the latest suffix hash...
|
# so hashes should have the latest suffix hash...
|
||||||
self.assertEqual(hashes[suffix], non_local['hash'])
|
self.assertEqual(hashes[suffix], non_local['hash'])
|
||||||
|
|
||||||
|
def test_hash_invalidations_race_get_hashes_same_suffix_new(self):
|
||||||
|
self._check_hash_invalidations_race_get_hashes_same_suffix(False)
|
||||||
|
|
||||||
|
def test_hash_invalidations_race_get_hashes_same_suffix_existing(self):
|
||||||
|
self._check_hash_invalidations_race_get_hashes_same_suffix(True)
|
||||||
|
|
||||||
def _check_unpickle_error_and_get_hashes_failure(self, existing):
|
def _check_unpickle_error_and_get_hashes_failure(self, existing):
|
||||||
for policy in self.iter_policies():
|
for policy in self.iter_policies():
|
||||||
df_mgr = self.df_router[policy]
|
df_mgr = self.df_router[policy]
|
||||||
|
@ -853,9 +853,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||||||
|
|
||||||
self.assertFalse(os.path.exists(pol_1_part_1_path))
|
self.assertFalse(os.path.exists(pol_1_part_1_path))
|
||||||
warnings = self.reconstructor.logger.get_lines_for_level('warning')
|
warnings = self.reconstructor.logger.get_lines_for_level('warning')
|
||||||
self.assertEqual(1, len(warnings))
|
self.assertEqual(2, len(warnings))
|
||||||
self.assertIn(pol_1_part_1_path, warnings[0])
|
# first warning is due to get_hashes failing to take lock on non-dir
|
||||||
self.assertIn('not a directory', warnings[0].lower())
|
self.assertIn(pol_1_part_1_path + '/hashes.pkl', warnings[0])
|
||||||
|
self.assertIn('unable to read', warnings[0].lower())
|
||||||
|
self.assertIn(pol_1_part_1_path, warnings[1])
|
||||||
|
self.assertIn('not a directory', warnings[1].lower())
|
||||||
|
|
||||||
def test_ignores_status_file(self):
|
def test_ignores_status_file(self):
|
||||||
# Following fd86d5a, the auditor will leave status files on each device
|
# Following fd86d5a, the auditor will leave status files on each device
|
||||||
|
Loading…
Reference in New Issue
Block a user