relinker: Rehash the parts actually touched when relinking
This has a few different benefits: * When re-running the relinker, we won't try to rehash partitions outside the expanded range. * When running on a small, sparse cluster (like in dev or testing) we won't try to rehash an empty new partition. * If we find old files from an earlier part power increase and link them into the correct partition, we'll rehash the one we actually linked to. * If we relink a file during cleanup that was missed during relink, we'll rehash it rather than waiting for the replicator to do it. Related-Change: I9aace80088cd00d02c418fe4d782b662fb5c8bcf Change-Id: I3c91127e19156af7a707ad84c5a89727df87f2f1
This commit is contained in:
parent
4ce907a4ae
commit
1db2816119
@ -29,7 +29,7 @@ from swift.common.utils import replace_partition_in_path, config_true_value, \
|
|||||||
audit_location_generator, get_logger, readconf, drop_privileges, \
|
audit_location_generator, get_logger, readconf, drop_privileges, \
|
||||||
RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \
|
RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \
|
||||||
non_negative_float, non_negative_int, config_auto_int_value, \
|
non_negative_float, non_negative_int, config_auto_int_value, \
|
||||||
dump_recon_cache
|
dump_recon_cache, get_partition_from_path
|
||||||
from swift.obj import diskfile
|
from swift.obj import diskfile
|
||||||
|
|
||||||
|
|
||||||
@ -118,6 +118,7 @@ class Relinker(object):
|
|||||||
self.devices_data = recursive_defaultdict()
|
self.devices_data = recursive_defaultdict()
|
||||||
self.policy_count = 0
|
self.policy_count = 0
|
||||||
self.pid = os.getpid()
|
self.pid = os.getpid()
|
||||||
|
self.linked_into_partitions = set()
|
||||||
|
|
||||||
def _aggregate_dev_policy_stats(self):
|
def _aggregate_dev_policy_stats(self):
|
||||||
for dev_data in self.devices_data.values():
|
for dev_data in self.devices_data.values():
|
||||||
@ -283,9 +284,11 @@ class Relinker(object):
|
|||||||
|
|
||||||
def hook_pre_partition(self, partition_path):
|
def hook_pre_partition(self, partition_path):
|
||||||
self.pre_partition_errors = self.total_errors
|
self.pre_partition_errors = self.total_errors
|
||||||
|
self.linked_into_partitions = set()
|
||||||
|
|
||||||
def hook_post_partition(self, partition_path):
|
def hook_post_partition(self, partition_path):
|
||||||
datadir_path, part = os.path.split(os.path.abspath(partition_path))
|
datadir_path, partition = os.path.split(
|
||||||
|
os.path.abspath(partition_path))
|
||||||
device_path, datadir_name = os.path.split(datadir_path)
|
device_path, datadir_name = os.path.split(datadir_path)
|
||||||
device = os.path.basename(device_path)
|
device = os.path.basename(device_path)
|
||||||
state_tmp_file = os.path.join(
|
state_tmp_file = os.path.join(
|
||||||
@ -315,15 +318,15 @@ class Relinker(object):
|
|||||||
# shift to the new partition space and rehash
|
# shift to the new partition space and rehash
|
||||||
# |0 2N|
|
# |0 2N|
|
||||||
# | IIJJKKLLMMNNOOPP|
|
# | IIJJKKLLMMNNOOPP|
|
||||||
partition = int(part)
|
for dirty_partition in self.linked_into_partitions:
|
||||||
if not self.do_cleanup and partition >= 2 ** (
|
if self.do_cleanup or \
|
||||||
self.states['part_power'] - 1):
|
dirty_partition >= 2 ** self.states['part_power']:
|
||||||
for new_part in (2 * partition, 2 * partition + 1):
|
|
||||||
self.diskfile_mgr.get_hashes(
|
self.diskfile_mgr.get_hashes(
|
||||||
device, new_part, [], self.policy)
|
device, dirty_partition, [], self.policy)
|
||||||
elif self.do_cleanup:
|
|
||||||
|
if self.do_cleanup:
|
||||||
hashes = self.diskfile_mgr.get_hashes(
|
hashes = self.diskfile_mgr.get_hashes(
|
||||||
device, partition, [], self.policy)
|
device, int(partition), [], self.policy)
|
||||||
# In any reasonably-large cluster, we'd expect all old
|
# In any reasonably-large cluster, we'd expect all old
|
||||||
# partitions P to be empty after cleanup (i.e., it's unlikely
|
# partitions P to be empty after cleanup (i.e., it's unlikely
|
||||||
# that there's another partition Q := P//2 that also has data
|
# that there's another partition Q := P//2 that also has data
|
||||||
@ -359,7 +362,7 @@ class Relinker(object):
|
|||||||
# in case the process is interrupted and needs to resume, or there
|
# in case the process is interrupted and needs to resume, or there
|
||||||
# were errors and the relinker needs to run again.
|
# were errors and the relinker needs to run again.
|
||||||
if self.pre_partition_errors == self.total_errors:
|
if self.pre_partition_errors == self.total_errors:
|
||||||
self.states["state"][part] = True
|
self.states["state"][partition] = True
|
||||||
with open(state_tmp_file, 'wt') as f:
|
with open(state_tmp_file, 'wt') as f:
|
||||||
json.dump(self.states, f)
|
json.dump(self.states, f)
|
||||||
os.fsync(f.fileno())
|
os.fsync(f.fileno())
|
||||||
@ -507,6 +510,8 @@ class Relinker(object):
|
|||||||
self.stats['errors'] += 1
|
self.stats['errors'] += 1
|
||||||
missing_links += 1
|
missing_links += 1
|
||||||
if created_links:
|
if created_links:
|
||||||
|
self.linked_into_partitions.add(get_partition_from_path(
|
||||||
|
self.conf['devices'], new_hash_path))
|
||||||
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
|
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
|
||||||
|
|
||||||
if self.do_cleanup and not missing_links:
|
if self.do_cleanup and not missing_links:
|
||||||
@ -529,6 +534,9 @@ class Relinker(object):
|
|||||||
self.logger.debug("Removed %s", old_file)
|
self.logger.debug("Removed %s", old_file)
|
||||||
|
|
||||||
if rehash:
|
if rehash:
|
||||||
|
# Even though we're invalidating the suffix, don't update
|
||||||
|
# self.linked_into_partitions -- we only care about them for
|
||||||
|
# relinking into the new part-power space
|
||||||
try:
|
try:
|
||||||
diskfile.invalidate_hash(os.path.dirname(hash_path))
|
diskfile.invalidate_hash(os.path.dirname(hash_path))
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
@ -5965,6 +5965,22 @@ def get_partition_for_hash(hex_hash, part_power):
|
|||||||
return struct.unpack_from('>I', raw_hash)[0] >> part_shift
|
return struct.unpack_from('>I', raw_hash)[0] >> part_shift
|
||||||
|
|
||||||
|
|
||||||
|
def get_partition_from_path(devices, path):
|
||||||
|
"""
|
||||||
|
:param devices: directory where devices are mounted (e.g. /srv/node)
|
||||||
|
:param path: full path to a object file or hashdir
|
||||||
|
:returns: the (integer) partition from the path
|
||||||
|
"""
|
||||||
|
offset_parts = devices.rstrip(os.sep).split(os.sep)
|
||||||
|
path_components = path.split(os.sep)
|
||||||
|
if offset_parts == path_components[:len(offset_parts)]:
|
||||||
|
offset = len(offset_parts)
|
||||||
|
else:
|
||||||
|
raise ValueError('Path %r is not under device dir %r' % (
|
||||||
|
path, devices))
|
||||||
|
return int(path_components[offset + 2])
|
||||||
|
|
||||||
|
|
||||||
def replace_partition_in_path(devices, path, part_power):
|
def replace_partition_in_path(devices, path, part_power):
|
||||||
"""
|
"""
|
||||||
Takes a path and a partition power and returns the same path, but with the
|
Takes a path and a partition power and returns the same path, but with the
|
||||||
@ -5973,8 +5989,6 @@ def replace_partition_in_path(devices, path, part_power):
|
|||||||
:param devices: directory where devices are mounted (e.g. /srv/node)
|
:param devices: directory where devices are mounted (e.g. /srv/node)
|
||||||
:param path: full path to a object file or hashdir
|
:param path: full path to a object file or hashdir
|
||||||
:param part_power: partition power to compute correct partition number
|
:param part_power: partition power to compute correct partition number
|
||||||
:param is_hash_dir: if True then ``path`` is the path to a hash dir,
|
|
||||||
otherwise ``path`` is the path to a file in a hash dir.
|
|
||||||
:returns: Path with re-computed partition power
|
:returns: Path with re-computed partition power
|
||||||
"""
|
"""
|
||||||
offset_parts = devices.rstrip(os.sep).split(os.sep)
|
offset_parts = devices.rstrip(os.sep).split(os.sep)
|
||||||
|
@ -2120,7 +2120,7 @@ class TestRelinker(unittest.TestCase):
|
|||||||
(('meta', 1),),
|
(('meta', 1),),
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
(('data', 0), ('meta', 1), ('meta', 2)))
|
(('data', 0), ('meta', 2)))
|
||||||
info_lines = self.logger.get_lines_for_level('info')
|
info_lines = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn('1 hash dirs processed (cleanup=True) '
|
self.assertIn('1 hash dirs processed (cleanup=True) '
|
||||||
'(2 files, 2 linked, 2 removed, 0 errors)',
|
'(2 files, 2 linked, 2 removed, 0 errors)',
|
||||||
@ -2131,7 +2131,7 @@ class TestRelinker(unittest.TestCase):
|
|||||||
(('data', 0),),
|
(('data', 0),),
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
(('data', 0), ('ts', 2),))
|
(('ts', 2),))
|
||||||
info_lines = self.logger.get_lines_for_level('info')
|
info_lines = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn('1 hash dirs processed (cleanup=True) '
|
self.assertIn('1 hash dirs processed (cleanup=True) '
|
||||||
'(1 files, 1 linked, 1 removed, 0 errors)',
|
'(1 files, 1 linked, 1 removed, 0 errors)',
|
||||||
@ -2142,7 +2142,7 @@ class TestRelinker(unittest.TestCase):
|
|||||||
(('ts', 0),),
|
(('ts', 0),),
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
(('ts', 0), ('data', 1), ('meta', 2)))
|
(('data', 1), ('meta', 2)))
|
||||||
info_lines = self.logger.get_lines_for_level('info')
|
info_lines = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn('1 hash dirs processed (cleanup=True) '
|
self.assertIn('1 hash dirs processed (cleanup=True) '
|
||||||
'(2 files, 2 linked, 2 removed, 0 errors)',
|
'(2 files, 2 linked, 2 removed, 0 errors)',
|
||||||
@ -3212,7 +3212,8 @@ class TestRelinker(unittest.TestCase):
|
|||||||
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
def test_cleanup_not_yet_relinked(self):
|
def test_cleanup_not_yet_relinked(self):
|
||||||
# force rehash of new partition to not happen during cleanup
|
# force new partition to be above range of partitions visited during
|
||||||
|
# cleanup
|
||||||
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
|
||||||
self._common_test_cleanup(relink=False)
|
self._common_test_cleanup(relink=False)
|
||||||
with self._mock_relinker():
|
with self._mock_relinker():
|
||||||
@ -3234,11 +3235,48 @@ class TestRelinker(unittest.TestCase):
|
|||||||
info_lines = self.logger.get_lines_for_level('info')
|
info_lines = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn('1 hash dirs processed (cleanup=True) '
|
self.assertIn('1 hash dirs processed (cleanup=True) '
|
||||||
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
|
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
|
||||||
# suffix should be invalidated in new partition
|
# suffix should be invalidated and rehashed in new partition
|
||||||
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
self.assertTrue(os.path.exists(hashes_invalid))
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
with open(hashes_invalid, 'r') as fd:
|
with open(hashes_invalid, 'r') as fd:
|
||||||
self.assertEqual(str(self.suffix), fd.read().strip())
|
self.assertEqual('', fd.read().strip())
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
|
def test_cleanup_not_yet_relinked_low(self):
|
||||||
|
# force new partition to be in the range of partitions visited during
|
||||||
|
# cleanup, but not exist until after cleanup would have visited it
|
||||||
|
self._setup_object(lambda part: part < 2 ** (PART_POWER - 1))
|
||||||
|
self._common_test_cleanup(relink=False)
|
||||||
|
self.assertFalse(os.path.isfile(self.expected_file))
|
||||||
|
self.assertFalse(os.path.exists(self.next_part_dir))
|
||||||
|
# Relinker processes partitions in reverse order; as a result, the
|
||||||
|
# "normal" rehash during cleanup won't hit this, since it doesn't
|
||||||
|
# exist yet -- but when we finish processing the old partition,
|
||||||
|
# we'll loop back around.
|
||||||
|
with self._mock_relinker():
|
||||||
|
self.assertEqual(0, relinker.main([
|
||||||
|
'cleanup',
|
||||||
|
'--swift-dir', self.testdir,
|
||||||
|
'--devices', self.devices,
|
||||||
|
'--skip-mount',
|
||||||
|
]))
|
||||||
|
|
||||||
|
self.assertTrue(os.path.isfile(self.expected_file)) # link created
|
||||||
|
# old partition should be cleaned up
|
||||||
|
self.assertFalse(os.path.exists(self.part_dir))
|
||||||
|
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||||
|
self.assertIn(
|
||||||
|
'Relinking (cleanup) created link: %s to %s'
|
||||||
|
% (self.objname, self.expected_file),
|
||||||
|
self.logger.get_lines_for_level('debug'))
|
||||||
|
info_lines = self.logger.get_lines_for_level('info')
|
||||||
|
self.assertIn('1 hash dirs processed (cleanup=True) '
|
||||||
|
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
|
||||||
|
# suffix should be invalidated and rehashed in new partition
|
||||||
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
|
with open(hashes_invalid, 'r') as fd:
|
||||||
|
self.assertEqual('', fd.read().strip())
|
||||||
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
def test_cleanup_same_object_different_inode_in_new_partition(self):
|
def test_cleanup_same_object_different_inode_in_new_partition(self):
|
||||||
@ -3302,7 +3340,8 @@ class TestRelinker(unittest.TestCase):
|
|||||||
self.assertEqual(0, res)
|
self.assertEqual(0, res)
|
||||||
# old partition should be cleaned up
|
# old partition should be cleaned up
|
||||||
self.assertFalse(os.path.exists(self.part_dir))
|
self.assertFalse(os.path.exists(self.part_dir))
|
||||||
self.assertTrue(os.path.isfile(older_obj_file)) # older file intact
|
# which is also going to clean up the older file
|
||||||
|
self.assertFalse(os.path.isfile(older_obj_file))
|
||||||
self.assertTrue(os.path.isfile(self.expected_file)) # link created
|
self.assertTrue(os.path.isfile(self.expected_file)) # link created
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
'Relinking (cleanup) created link: %s to %s'
|
'Relinking (cleanup) created link: %s to %s'
|
||||||
@ -3312,11 +3351,11 @@ class TestRelinker(unittest.TestCase):
|
|||||||
info_lines = self.logger.get_lines_for_level('info')
|
info_lines = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn('1 hash dirs processed (cleanup=True) '
|
self.assertIn('1 hash dirs processed (cleanup=True) '
|
||||||
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
|
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
|
||||||
# suffix should be invalidated in new partition
|
# suffix should be invalidated and rehashed in new partition
|
||||||
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
|
||||||
self.assertTrue(os.path.exists(hashes_invalid))
|
self.assertTrue(os.path.exists(hashes_invalid))
|
||||||
with open(hashes_invalid, 'r') as fd:
|
with open(hashes_invalid, 'r') as fd:
|
||||||
self.assertEqual(str(self.suffix), fd.read().strip())
|
self.assertEqual('', fd.read().strip())
|
||||||
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
self.assertEqual([], self.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
def test_cleanup_deleted(self):
|
def test_cleanup_deleted(self):
|
||||||
@ -3613,11 +3652,11 @@ class TestRelinker(unittest.TestCase):
|
|||||||
'--skip-mount',
|
'--skip-mount',
|
||||||
]))
|
]))
|
||||||
warning_lines = self.logger.get_lines_for_level('warning')
|
warning_lines = self.logger.get_lines_for_level('warning')
|
||||||
# once for cleanup_ondisk_files in old and once once for the
|
# once for cleanup_ondisk_files in old, again for the get_ondisk_files
|
||||||
# get_ondisk_files of union of files; the new partition did not exist
|
# of union of files, and one last time when the new partition gets
|
||||||
# at start of cleanup so is not rehashed
|
# rehashed at the end of processing the old one
|
||||||
self.assertEqual(2, len(warning_lines),
|
self.assertEqual(3, len(warning_lines),
|
||||||
'Expected 2 log lines, got %r' % warning_lines)
|
'Expected 3 log lines, got %r' % warning_lines)
|
||||||
for line in warning_lines:
|
for line in warning_lines:
|
||||||
self.assertIn('Bad fragment index: None', line, warning_lines)
|
self.assertIn('Bad fragment index: None', line, warning_lines)
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
@ -3666,12 +3705,8 @@ class TestRelinker(unittest.TestCase):
|
|||||||
]))
|
]))
|
||||||
expected = [('invalidate', self.next_suffix_dir)]
|
expected = [('invalidate', self.next_suffix_dir)]
|
||||||
if self.part >= 2 ** (PART_POWER - 1):
|
if self.part >= 2 ** (PART_POWER - 1):
|
||||||
expected.extend([
|
expected.append(('get_hashes', self.existing_device,
|
||||||
('get_hashes', self.existing_device, self.next_part & ~1,
|
self.next_part, [], POLICIES[0]))
|
||||||
[], POLICIES[0]),
|
|
||||||
('get_hashes', self.existing_device, self.next_part | 1,
|
|
||||||
[], POLICIES[0]),
|
|
||||||
])
|
|
||||||
|
|
||||||
self.assertEqual(calls, expected)
|
self.assertEqual(calls, expected)
|
||||||
# Depending on partition, there may or may not be a get_hashes here
|
# Depending on partition, there may or may not be a get_hashes here
|
||||||
|
@ -4446,6 +4446,22 @@ cluster_dfw1 = http://dfw1.host/v1/
|
|||||||
self.assertEqual(0, utils.get_partition_for_hash(hex_hash, 0))
|
self.assertEqual(0, utils.get_partition_for_hash(hex_hash, 0))
|
||||||
self.assertEqual(0, utils.get_partition_for_hash(hex_hash, -1))
|
self.assertEqual(0, utils.get_partition_for_hash(hex_hash, -1))
|
||||||
|
|
||||||
|
def test_get_partition_from_path(self):
|
||||||
|
def do_test(path):
|
||||||
|
self.assertEqual(utils.get_partition_from_path('/s/n', path), 70)
|
||||||
|
self.assertEqual(utils.get_partition_from_path('/s/n/', path), 70)
|
||||||
|
path += '/'
|
||||||
|
self.assertEqual(utils.get_partition_from_path('/s/n', path), 70)
|
||||||
|
self.assertEqual(utils.get_partition_from_path('/s/n/', path), 70)
|
||||||
|
|
||||||
|
do_test('/s/n/d/o/70/c77/af088baea4806dcaba30bf07d9e64c77/f')
|
||||||
|
# also works with a hashdir
|
||||||
|
do_test('/s/n/d/o/70/c77/af088baea4806dcaba30bf07d9e64c77')
|
||||||
|
# or suffix dir
|
||||||
|
do_test('/s/n/d/o/70/c77')
|
||||||
|
# or even the part dir itself
|
||||||
|
do_test('/s/n/d/o/70')
|
||||||
|
|
||||||
def test_replace_partition_in_path(self):
|
def test_replace_partition_in_path(self):
|
||||||
# Check for new part = part * 2
|
# Check for new part = part * 2
|
||||||
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'
|
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'
|
||||||
|
Loading…
x
Reference in New Issue
Block a user