Merge "Faster suffix invalidations on object PUT/DELETE"

This commit is contained in:
Jenkins 2016-03-16 02:08:22 +00:00 committed by Gerrit Code Review
commit f2e344a4d9
2 changed files with 141 additions and 52 deletions

View File

@ -72,6 +72,7 @@ from functools import partial
PICKLE_PROTOCOL = 2
ONE_WEEK = 604800
HASH_FILE = 'hashes.pkl'
HASH_INVALIDATIONS_FILE = 'hashes.invalid'
METADATA_KEY = 'user.swift.metadata'
DROP_CACHE_WINDOW = 1024 * 1024
# These are system-set metadata keys that cannot be changed with a POST.
@ -221,6 +222,73 @@ def quarantine_renamer(device_path, corrupted_file_path):
return to_dir
def consolidate_hashes(partition_dir):
"""
Take what's in hashes.pkl and hashes.invalid, combine them, write the
result back to hashes.pkl, and clear out hashes.invalid.
:param suffix_dir: absolute path to partition dir containing hashes.pkl
and hashes.invalid
:returns: the hashes, or None if there's no hashes.pkl.
"""
hashes_file = join(partition_dir, HASH_FILE)
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
if not os.path.exists(hashes_file):
if os.path.exists(invalidations_file):
# no hashes at all -> everything's invalid, so empty the file with
# the invalid suffixes in it, if it exists
try:
with open(invalidations_file, 'wb'):
pass
except OSError as e:
if e.errno != errno.ENOENT:
raise
return None
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as hashes_fp:
pickled_hashes = hashes_fp.read()
except (IOError, OSError):
hashes = {}
else:
try:
hashes = pickle.loads(pickled_hashes)
except Exception:
# pickle.loads() can raise a wide variety of exceptions when
# given invalid input depending on the way in which the
# input is invalid.
hashes = None
modified = False
try:
with open(invalidations_file, 'rb') as inv_fh:
for line in inv_fh:
suffix = line.strip()
if hashes is not None and hashes.get(suffix) is not None:
hashes[suffix] = None
modified = True
except (IOError, OSError) as e:
if e.errno != errno.ENOENT:
raise
if modified:
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
# Now that all the invalidations are reflected in hashes.pkl, it's
# safe to clear out the invalidations file.
try:
with open(invalidations_file, 'w') as inv_fh:
pass
except OSError as e:
if e.errno != errno.ENOENT:
raise
return hashes
def invalidate_hash(suffix_dir):
"""
Invalidates the hash for a suffix_dir in the partition's hashes file.
@ -234,16 +302,11 @@ def invalidate_hash(suffix_dir):
hashes_file = join(partition_dir, HASH_FILE)
if not os.path.exists(hashes_file):
return
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
with lock_path(partition_dir):
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
if suffix in hashes and not hashes[suffix]:
return
except Exception:
return
hashes[suffix] = None
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
with open(invalidations_file, 'ab') as inv_fh:
inv_fh.write(suffix + "\n")
class AuditLocation(object):
@ -395,6 +458,7 @@ class BaseDiskFileManager(object):
diskfile_cls = None # must be set by subclasses
invalidate_hash = strip_self(invalidate_hash)
consolidate_hashes = strip_self(consolidate_hashes)
quarantine_renamer = strip_self(quarantine_renamer)
def __init__(self, conf, logger):
@ -875,12 +939,22 @@ class BaseDiskFileManager(object):
recalculate = []
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
mtime = getmtime(hashes_file)
except OSError as e:
if e.errno != errno.ENOENT:
raise
try:
hashes = self.consolidate_hashes(partition_path)
except Exception:
do_listdir = True
force_rewrite = True
else:
if hashes is None: # no hashes.pkl file; let's build it
do_listdir = True
force_rewrite = True
hashes = {}
if do_listdir:
for suff in os.listdir(partition_path):
if len(suff) == 3:

View File

@ -1649,7 +1649,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
fname = '%s#%s.data' % (ts.internal, frag)
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename(fname)
self.assertTrue(msg in str(cm.exception).lower())
self.assertIn(msg, str(cm.exception).lower())
with self.assertRaises(DiskFileError) as cm:
mgr.parse_on_disk_filename('junk')
@ -2287,7 +2287,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
# non-fast-post updateable keys are preserved
self.assertEqual('text/garbage', df._metadata['Content-Type'])
# original fast-post updateable keys are removed
self.assertTrue('X-Object-Meta-Key1' not in df._metadata)
self.assertNotIn('X-Object-Meta-Key1', df._metadata)
# new fast-post updateable keys are added
self.assertEqual('Value2', df._metadata['X-Object-Meta-Key2'])
@ -2365,9 +2365,9 @@ class DiskFileMixin(BaseDiskFileTestMixin):
'plain/text',
'\r\n--someheader\r\n', 30)
value = ''.join(it)
self.assertTrue('0123456789' in value)
self.assertTrue('1123456789' in value)
self.assertTrue('2123456789' in value)
self.assertIn('0123456789', value)
self.assertIn('1123456789', value)
self.assertIn('2123456789', value)
self.assertEqual(quarantine_msgs, [])
def test_disk_file_app_iter_ranges_w_quarantine(self):
@ -2379,9 +2379,9 @@ class DiskFileMixin(BaseDiskFileTestMixin):
'plain/text',
'\r\n--someheader\r\n', 30)
value = ''.join(it)
self.assertTrue('0123456789' in value)
self.assertTrue('1123456789' in value)
self.assertTrue('2123456789' in value)
self.assertIn('0123456789', value)
self.assertIn('1123456789', value)
self.assertIn('2123456789', value)
self.assertEqual(quarantine_msgs,
["Bytes read: 30, does not match metadata: 31"])
@ -2393,7 +2393,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
'plain/text',
'\r\n--someheader\r\n', 30)
value = ''.join(it)
self.assertTrue('0123456789' in value)
self.assertIn('0123456789', value)
self.assertEqual(quarantine_msgs, [])
def test_disk_file_app_iter_ranges_edges(self):
@ -2403,8 +2403,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
it = reader.app_iter_ranges([(3, 10), (0, 2)], 'application/whatever',
'\r\n--someheader\r\n', 30)
value = ''.join(it)
self.assertTrue('3456789' in value)
self.assertTrue('01' in value)
self.assertIn('3456789', value)
self.assertIn('01', value)
self.assertEqual(quarantine_msgs, [])
def test_disk_file_large_app_iter_ranges(self):
@ -2870,7 +2870,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), file_count + 1)
exp_name = '%s.meta' % timestamp
self.assertTrue(exp_name in set(dl))
self.assertIn(exp_name, set(dl))
def test_write_metadata_with_content_type(self):
# if metadata has content-type then its time should be in file name
@ -3180,8 +3180,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
exp_name = '%s.ts' % ts.internal
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), 1)
self.assertTrue(exp_name in set(dl),
'Expected file %s missing in %s' % (exp_name, dl))
self.assertIn(exp_name, set(dl))
# cleanup before next policy
os.unlink(os.path.join(df._datadir, exp_name))
@ -3192,7 +3191,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
exp_name = '%s.ts' % str(Timestamp(ts).internal)
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), 1)
self.assertTrue(exp_name in set(dl))
self.assertIn(exp_name, set(dl))
df = self._simple_get_diskfile()
self.assertRaises(DiskFileDeleted, df.open)
@ -3203,7 +3202,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
exp_name = '%s.ts' % str(Timestamp(ts).internal)
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), 1)
self.assertTrue(exp_name in set(dl))
self.assertIn(exp_name, set(dl))
# it's pickle-format, so removing the last byte is sufficient to
# corrupt it
ts_fullpath = os.path.join(df._datadir, exp_name)
@ -3255,8 +3254,8 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self.assertEqual(reader._fp, None)
error_lines = df._logger.get_lines_for_level('error')
self.assertEqual(len(error_lines), 1)
self.assertTrue('close failure' in error_lines[0])
self.assertTrue('Bad' in error_lines[0])
self.assertIn('close failure', error_lines[0])
self.assertIn('Bad', error_lines[0])
def test_mount_checking(self):
@ -3314,10 +3313,10 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self._create_ondisk_file(df, '', ext='.ts', timestamp=5)
df = self._simple_get_diskfile()
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertIn('X-Timestamp', df._metadata)
self.assertEqual(df._metadata['X-Timestamp'],
Timestamp(10).internal)
self.assertTrue('deleted' not in df._metadata)
self.assertNotIn('deleted', df._metadata)
def test_ondisk_search_loop_multiple_meta_data(self):
df = self._simple_get_diskfile()
@ -3379,10 +3378,10 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
df = self._simple_get_diskfile()
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertIn('X-Timestamp', df._metadata)
self.assertEqual(df._metadata['X-Timestamp'],
Timestamp(10).internal)
self.assertTrue('deleted' not in df._metadata)
self.assertNotIn('deleted', df._metadata)
def test_ondisk_search_loop_wayward_files_ignored(self):
df = self._simple_get_diskfile()
@ -3398,10 +3397,10 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self._create_ondisk_file(df, '', ext='.meta', timestamp=5)
df = self._simple_get_diskfile()
with df.open():
self.assertTrue('X-Timestamp' in df._metadata)
self.assertIn('X-Timestamp', df._metadata)
self.assertEqual(df._metadata['X-Timestamp'],
Timestamp(10).internal)
self.assertTrue('deleted' not in df._metadata)
self.assertNotIn('deleted', df._metadata)
def test_ondisk_search_loop_listdir_error(self):
df = self._simple_get_diskfile()
@ -3435,7 +3434,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
pass
reader.close()
log_lines = df._logger.get_lines_for_level('error')
self.assertTrue('a very special error' in log_lines[-1])
self.assertIn('a very special error', log_lines[-1])
def test_diskfile_names(self):
df = self._simple_get_diskfile()
@ -3459,7 +3458,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
exp_name = '%s.ts' % str(Timestamp(ts).internal)
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), 1)
self.assertTrue(exp_name in set(dl))
self.assertIn(exp_name, set(dl))
df = self._simple_get_diskfile()
exc = None
try:
@ -3491,7 +3490,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
exp_name = '%s.ts' % str(Timestamp(ts).internal)
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), 1)
self.assertTrue(exp_name in set(dl))
self.assertIn(exp_name, set(dl))
df = self._simple_get_diskfile()
exc = None
try:
@ -3602,7 +3601,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
exp_name = '%s.ts' % str(Timestamp(ts).internal)
dl = os.listdir(df._datadir)
self.assertEqual(len(dl), file_count + 1)
self.assertTrue(exp_name in set(dl))
self.assertIn(exp_name, set(dl))
def _system_can_zero_copy(self):
if not splice.available:
@ -3645,7 +3644,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self.assertFalse(reader.can_zero_copy_send())
log_lines = df_mgr.logger.get_lines_for_level('warning')
self.assertTrue('MD5 sockets' in log_lines[-1])
self.assertIn('MD5 sockets', log_lines[-1])
def test_tee_to_md5_pipe_length_mismatch(self):
if not self._system_can_zero_copy():
@ -3756,7 +3755,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self.fail("Expected exception DiskFileNoSpace")
self.assertTrue(_m_fallocate.called)
self.assertTrue(_m_unlink.called)
self.assertTrue('error' not in self.logger.all_log_lines())
self.assertNotIn('error', self.logger.all_log_lines())
def test_create_unlink_cleanup_renamer_fails(self):
# Test cleanup when renamer fails
@ -3783,7 +3782,7 @@ class DiskFileMixin(BaseDiskFileTestMixin):
self.assertFalse(writer.put_succeeded)
self.assertTrue(_m_renamer.called)
self.assertTrue(_m_unlink.called)
self.assertTrue('error' not in self.logger.all_log_lines())
self.assertNotIn('error', self.logger.all_log_lines())
def test_create_unlink_cleanup_logging(self):
# Test logging of os.unlink() failures.
@ -4761,12 +4760,15 @@ class TestSuffixHashes(unittest.TestCase):
part_path = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(policy), '0')
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
inv_file = os.path.join(
part_path, diskfile.HASH_INVALIDATIONS_FILE)
self.assertFalse(os.path.exists(hashes_file)) # sanity
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
df_mgr.invalidate_hash(suffix_dir)
self.assertFalse(mock_lock.called)
# does not create file
# does not create files
self.assertFalse(os.path.exists(hashes_file))
self.assertFalse(os.path.exists(inv_file))
def test_invalidate_hash_file_exists(self):
for policy in self.iter_policies():
@ -4778,19 +4780,32 @@ class TestSuffixHashes(unittest.TestCase):
suffix_dir = os.path.dirname(df._datadir)
suffix = os.path.basename(suffix_dir)
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertTrue(suffix in hashes) # sanity
self.assertIn(suffix, hashes) # sanity
# sanity check hashes file
part_path = os.path.join(self.devices, 'sda1',
diskfile.get_data_dir(policy), '0')
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
invalidations_file = os.path.join(
part_path, diskfile.HASH_INVALIDATIONS_FILE)
with open(hashes_file, 'rb') as f:
self.assertEqual(hashes, pickle.load(f))
# invalidate the hash
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
df_mgr.invalidate_hash(suffix_dir)
self.assertTrue(mock_lock.called)
with open(invalidations_file, 'rb') as f:
self.assertEqual(suffix + "\n", f.read())
# consolidate the hash and the invalidations
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
hashes = df_mgr.consolidate_hashes(part_path)
self.assertIsNone(hashes.get(suffix))
with open(hashes_file, 'rb') as f:
self.assertEqual({suffix: None}, pickle.load(f))
self.assertEqual(hashes, pickle.load(f))
with open(invalidations_file, 'rb') as f:
self.assertEqual("", f.read())
# invalidate_hash tests - error handling
@ -4817,7 +4832,7 @@ class TestSuffixHashes(unittest.TestCase):
self.assertEqual(f.read(), 'asdf')
# ... but get_hashes will
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertTrue(suffix in hashes)
self.assertIn(suffix, hashes)
# get_hashes tests - hash_suffix behaviors
@ -5244,7 +5259,7 @@ class TestSuffixHashes(unittest.TestCase):
self.assertTrue(os.path.exists(hsh_path)) # sanity
# get_hashes will cleanup empty hsh_path and leave valid one
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
self.assertTrue(suffix in hashes)
self.assertIn(suffix, hashes)
self.assertTrue(os.path.exists(df._datadir))
for hsh_path in empty_hsh_paths:
self.assertFalse(os.path.exists(hsh_path))
@ -5470,7 +5485,7 @@ class TestSuffixHashes(unittest.TestCase):
# get_hashes will find the untracked suffix dir
self.assertFalse(os.path.exists(hashes_file)) # sanity
hashes = df_mgr.get_hashes(self.existing_device, '0', [], policy)
self.assertTrue(suffix in hashes)
self.assertIn(suffix, hashes)
# ... and create a hashes pickle for it
self.assertTrue(os.path.exists(hashes_file))
@ -5500,7 +5515,7 @@ class TestSuffixHashes(unittest.TestCase):
# ... unless remote end asks for a recalc
hashes = df_mgr.get_hashes(self.existing_device, '0', [suffix],
policy)
self.assertTrue(suffix in hashes)
self.assertIn(suffix, hashes)
def test_get_hashes_does_not_rehash_known_suffix_dirs(self):
for policy in self.iter_policies():
@ -5512,7 +5527,7 @@ class TestSuffixHashes(unittest.TestCase):
df.delete(timestamp)
# create the baseline hashes file
hashes = df_mgr.get_hashes(self.existing_device, '0', [], policy)
self.assertTrue(suffix in hashes)
self.assertIn(suffix, hashes)
# now change the contents of the suffix w/o calling
# invalidate_hash
rmtree(df._datadir)
@ -5694,8 +5709,8 @@ class TestSuffixHashes(unittest.TestCase):
diskfile.get_data_dir(policy), '0')
open(os.path.join(part_dir, 'bad'), 'w').close()
hashes = df_mgr.get_hashes(self.existing_device, '0', [], policy)
self.assertTrue(suffix in hashes)
self.assertFalse('bad' in hashes)
self.assertIn(suffix, hashes)
self.assertNotIn('bad', hashes)
def test_get_hashes_hash_suffix_other_oserror(self):
for policy in self.iter_policies():