Fixed regression in consolidate_hashes

Occurs when a new file is stored to new suffix to not empty partition.
Then suffix is added to an invalidations file but not into hashes
pickle file. When a replication of this partition runs, replication of
suffix is completed on first and each 10th run of replicator. Rsync
runs on each new suffix because destination does not return hash of
new suffix although suffix content is in the same state.
This bug was introduced in 2.7.0

Co-Authored-By: Alistair Coles <alistair.coles@hpe.com>
Change-Id: Ie2700f6e6171f2ecfa7d07b0f18b79e90cbf1c8a
Closes-Bug: #1634967
This commit is contained in:
Pavel Kvasnička 2016-11-23 16:41:11 +01:00 committed by Alistair Coles
parent 0873b7c03e
commit 8ac432fff3
2 changed files with 128 additions and 39 deletions

View File

@ -276,7 +276,8 @@ def consolidate_hashes(partition_dir):
with open(invalidations_file, 'rb') as inv_fh:
for line in inv_fh:
suffix = line.strip()
if hashes is not None and hashes.get(suffix) is not None:
if hashes is not None and \
hashes.get(suffix, '') is not None:
hashes[suffix] = None
modified = True
except (IOError, OSError) as e:

View File

@ -6030,9 +6030,11 @@ class TestSuffixHashes(unittest.TestCase):
self.assertFalse(os.path.exists(hashes_file))
self.assertFalse(os.path.exists(inv_file))
def test_invalidate_hash_file_exists(self):
def test_invalidate_hash_empty_file_exists(self):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertEqual(hashes, {})
# create something to hash
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
@ -6041,6 +6043,34 @@ class TestSuffixHashes(unittest.TestCase):
suffix = os.path.basename(suffix_dir)
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes) # sanity
def test_invalidate_hash_consolidation(self):
def assert_consolidation(suffixes):
# verify that suffixes are invalidated after consolidation
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
hashes = df_mgr.consolidate_hashes(part_path)
self.assertTrue(mock_lock.called)
for suffix in suffixes:
self.assertIn(suffix, hashes)
self.assertIsNone(hashes[suffix])
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
with open(invalidations_file, 'rb') as f:
self.assertEqual("", f.read())
return hashes
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
# create something to hash
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
policy=policy)
df.delete(self.ts())
suffix_dir = os.path.dirname(df._datadir)
suffix = os.path.basename(suffix_dir)
original_hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, original_hashes) # sanity
self.assertIsNotNone(original_hashes[suffix])
# sanity check hashes file
part_path = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(policy), '0')
@ -6048,24 +6078,57 @@ class TestSuffixHashes(unittest.TestCase):
invalidations_file = os.path.join(
part_path, diskfile.HASH_INVALIDATIONS_FILE)
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
self.assertEqual(original_hashes, pickle.load(f))
self.assertFalse(os.path.exists(invalidations_file))
# invalidate the hash
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
df_mgr.invalidate_hash(suffix_dir)
self.assertTrue(mock_lock.called)
# suffix should be in invalidations file
with open(invalidations_file, 'rb') as f:
self.assertEqual(suffix + "\n", f.read())
# hashes file is unchanged
with open(hashes_file, 'rb') as f:
self.assertEqual(original_hashes, pickle.load(f))
# consolidate the hash and the invalidations
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
hashes = df_mgr.consolidate_hashes(part_path)
self.assertIsNone(hashes.get(suffix))
hashes = assert_consolidation([suffix])
# invalidate a different suffix hash in same partition but not in
# existing hashes.pkl
i = 0
while True:
df2 = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o%d' % i,
policy=policy)
i += 1
suffix_dir2 = os.path.dirname(df2._datadir)
if suffix_dir != suffix_dir2:
break
df2.delete(self.ts())
suffix2 = os.path.basename(suffix_dir2)
# suffix2 should be in invalidations file
with open(invalidations_file, 'rb') as f:
self.assertEqual(suffix2 + "\n", f.read())
# hashes file is not yet changed
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
# consolidate hashes
hashes = assert_consolidation([suffix, suffix2])
# invalidating suffix2 multiple times is ok
df2.delete(self.ts())
df2.delete(self.ts())
# suffix2 should be in invalidations file
with open(invalidations_file, 'rb') as f:
self.assertEqual("", f.read())
self.assertEqual("%s\n%s\n" % (suffix2, suffix2), f.read())
# hashes file is not yet changed
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
# consolidate hashes
assert_consolidation([suffix, suffix2])
# invalidate_hash tests - error handling
@ -6208,63 +6271,88 @@ class TestSuffixHashes(unittest.TestCase):
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertEqual(hashes, {})
def test_hash_suffix_one_reclaim_tombstone_with_hash_pkl(self):
def test_hash_suffix_ts_cleanup_after_recalc(self):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile(
'sda1', '0', 'a', 'c', 'o', policy=policy)
suffix_dir = os.path.dirname(df._datadir)
part_dir = os.path.dirname(suffix_dir)
hash_file = os.path.join(part_dir, diskfile.HASH_FILE)
suffix = os.path.basename(suffix_dir)
# scale back reclaim age a bit
df_mgr.reclaim_age = 1000
# write a tombstone that's just a *little* older
old_time = time() - 1001
# write a valid tombstone
old_time = time() - 500
timestamp = Timestamp(old_time)
df.delete(timestamp.internal)
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
# sanity
self.assertEqual(hashes, {})
self.assertFalse(os.path.exists(df._datadir))
self.assertIn(suffix, hashes)
self.assertIsNotNone(hashes[suffix])
hash_timestamp = os.stat(hash_file).st_mtime
# if hash.pkl exists, that .ts file is not reclaimed
df = df_mgr.get_diskfile(
'sda1', '0', 'a', 'c', 'o', policy=policy)
df.delete(timestamp.internal)
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
# This was a cached value so the value looks empty
self.assertEqual(hashes, {})
# and the hash.pkl is not touched
self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime)
# and we still have tombstone entry
# we have tombstone entry
tombstone = '%s.ts' % timestamp.internal
self.assertTrue(os.path.exists(df._datadir))
self.assertIn(tombstone, os.listdir(df._datadir))
# lower reclaim age to force tombstone reclaiming
df_mgr.reclaim_age = 200
# not cleaning up because suffix not invalidated
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertTrue(os.path.exists(df._datadir))
self.assertIn(tombstone, os.listdir(df._datadir))
self.assertIn(suffix, hashes)
self.assertIsNotNone(hashes[suffix])
# recalculating suffix hash cause cleanup
hashes = df_mgr.get_hashes('sda1', '0', [suffix], policy)
self.assertEqual(hashes, {})
self.assertFalse(os.path.exists(df._datadir))
def test_hash_suffix_ts_cleanup_after_invalidate_hash(self):
for policy in self.iter_policies():
df_mgr = self.df_router[policy]
df = df_mgr.get_diskfile(
'sda1', '0', 'a', 'c', 'o', policy=policy)
suffix_dir = os.path.dirname(df._datadir)
suffix = os.path.basename(suffix_dir)
# scale back reclaim age a bit
df_mgr.reclaim_age = 1000
# write a valid tombstone
old_time = time() - 500
timestamp = Timestamp(old_time)
df.delete(timestamp.internal)
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertIn(suffix, hashes)
self.assertIsNotNone(hashes[suffix])
# we have tombstone entry
tombstone = '%s.ts' % timestamp.internal
self.assertTrue(os.path.exists(df._datadir))
self.assertIn(tombstone, os.listdir(df._datadir))
# lower reclaim age to force tombstone reclaiming
df_mgr.reclaim_age = 200
# not cleaning up because suffix not invalidated
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertTrue(os.path.exists(df._datadir))
self.assertIn(tombstone, os.listdir(df._datadir))
self.assertIn(suffix, hashes)
self.assertIsNotNone(hashes[suffix])
# However if we call invalidate_hash for the suffix dir,
# get_hashes can reclaim the tombstone
with mock.patch('swift.obj.diskfile.lock_path'):
df_mgr.invalidate_hash(suffix_dir)
# updating invalidated hashes cause cleanup
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertEqual(hashes, {})
# If we have no other objects in the suffix, get_hashes
# doesn't reclaim anything
self.assertTrue(os.path.exists(df._datadir))
self.assertIn(tombstone, os.listdir(df._datadir))
self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime)
# *BUT* if suffix value is given to recalc, it can force to recaim!
suffix = os.path.dirname(suffix_dir)
hashes = df_mgr.get_hashes('sda1', '0', [suffix], policy)
self.assertFalse(os.path.exists(df._datadir))
# hash.pkl was updated
self.assertGreater(os.stat(hash_file).st_mtime, hash_timestamp)
def test_hash_suffix_one_reclaim_and_one_valid_tombstone(self):
for policy in self.iter_policies():