diff --git a/swift/cli/relinker.py b/swift/cli/relinker.py index 08c11dbd8f..2baa5f3b09 100644 --- a/swift/cli/relinker.py +++ b/swift/cli/relinker.py @@ -29,7 +29,7 @@ from swift.common.utils import replace_partition_in_path, config_true_value, \ audit_location_generator, get_logger, readconf, drop_privileges, \ RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \ non_negative_float, non_negative_int, config_auto_int_value, \ - dump_recon_cache + dump_recon_cache, get_partition_from_path from swift.obj import diskfile @@ -118,6 +118,7 @@ class Relinker(object): self.devices_data = recursive_defaultdict() self.policy_count = 0 self.pid = os.getpid() + self.linked_into_partitions = set() def _aggregate_dev_policy_stats(self): for dev_data in self.devices_data.values(): @@ -283,9 +284,11 @@ class Relinker(object): def hook_pre_partition(self, partition_path): self.pre_partition_errors = self.total_errors + self.linked_into_partitions = set() def hook_post_partition(self, partition_path): - datadir_path, part = os.path.split(os.path.abspath(partition_path)) + datadir_path, partition = os.path.split( + os.path.abspath(partition_path)) device_path, datadir_name = os.path.split(datadir_path) device = os.path.basename(device_path) state_tmp_file = os.path.join( @@ -315,15 +318,15 @@ class Relinker(object): # shift to the new partition space and rehash # |0 2N| # | IIJJKKLLMMNNOOPP| - partition = int(part) - if not self.do_cleanup and partition >= 2 ** ( - self.states['part_power'] - 1): - for new_part in (2 * partition, 2 * partition + 1): + for dirty_partition in self.linked_into_partitions: + if self.do_cleanup or \ + dirty_partition >= 2 ** self.states['part_power']: self.diskfile_mgr.get_hashes( - device, new_part, [], self.policy) - elif self.do_cleanup: + device, dirty_partition, [], self.policy) + + if self.do_cleanup: hashes = self.diskfile_mgr.get_hashes( - device, partition, [], self.policy) + device, int(partition), [], self.policy) # In any reasonably-large cluster, we'd expect all old # partitions P to be empty after cleanup (i.e., it's unlikely # that there's another partition Q := P//2 that also has data @@ -359,7 +362,7 @@ class Relinker(object): # in case the process is interrupted and needs to resume, or there # were errors and the relinker needs to run again. if self.pre_partition_errors == self.total_errors: - self.states["state"][part] = True + self.states["state"][partition] = True with open(state_tmp_file, 'wt') as f: json.dump(self.states, f) os.fsync(f.fileno()) @@ -507,6 +510,8 @@ class Relinker(object): self.stats['errors'] += 1 missing_links += 1 if created_links: + self.linked_into_partitions.add(get_partition_from_path( + self.conf['devices'], new_hash_path)) diskfile.invalidate_hash(os.path.dirname(new_hash_path)) if self.do_cleanup and not missing_links: @@ -529,6 +534,9 @@ class Relinker(object): self.logger.debug("Removed %s", old_file) if rehash: + # Even though we're invalidating the suffix, don't update + # self.linked_into_partitions -- we only care about them for + # relinking into the new part-power space try: diskfile.invalidate_hash(os.path.dirname(hash_path)) except Exception as exc: diff --git a/swift/common/utils.py b/swift/common/utils.py index 404d384c9d..950ef41a68 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -5982,6 +5982,22 @@ def get_partition_for_hash(hex_hash, part_power): return struct.unpack_from('>I', raw_hash)[0] >> part_shift +def get_partition_from_path(devices, path): + """ + :param devices: directory where devices are mounted (e.g. /srv/node) + :param path: full path to a object file or hashdir + :returns: the (integer) partition from the path + """ + offset_parts = devices.rstrip(os.sep).split(os.sep) + path_components = path.split(os.sep) + if offset_parts == path_components[:len(offset_parts)]: + offset = len(offset_parts) + else: + raise ValueError('Path %r is not under device dir %r' % ( + path, devices)) + return int(path_components[offset + 2]) + + def replace_partition_in_path(devices, path, part_power): """ Takes a path and a partition power and returns the same path, but with the @@ -5990,8 +6006,6 @@ def replace_partition_in_path(devices, path, part_power): :param devices: directory where devices are mounted (e.g. /srv/node) :param path: full path to a object file or hashdir :param part_power: partition power to compute correct partition number - :param is_hash_dir: if True then ``path`` is the path to a hash dir, - otherwise ``path`` is the path to a file in a hash dir. :returns: Path with re-computed partition power """ offset_parts = devices.rstrip(os.sep).split(os.sep) diff --git a/test/unit/cli/test_relinker.py b/test/unit/cli/test_relinker.py index 7abde673cd..6eac14aad7 100644 --- a/test/unit/cli/test_relinker.py +++ b/test/unit/cli/test_relinker.py @@ -2120,7 +2120,7 @@ class TestRelinker(unittest.TestCase): (('meta', 1),), None, None, - (('data', 0), ('meta', 1), ('meta', 2))) + (('data', 0), ('meta', 2))) info_lines = self.logger.get_lines_for_level('info') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 2 linked, 2 removed, 0 errors)', @@ -2131,7 +2131,7 @@ class TestRelinker(unittest.TestCase): (('data', 0),), None, None, - (('data', 0), ('ts', 2),)) + (('ts', 2),)) info_lines = self.logger.get_lines_for_level('info') self.assertIn('1 hash dirs processed (cleanup=True) ' '(1 files, 1 linked, 1 removed, 0 errors)', @@ -2142,7 +2142,7 @@ class TestRelinker(unittest.TestCase): (('ts', 0),), None, None, - (('ts', 0), ('data', 1), ('meta', 2))) + (('data', 1), ('meta', 2))) info_lines = self.logger.get_lines_for_level('info') self.assertIn('1 hash dirs processed (cleanup=True) ' '(2 files, 2 linked, 2 removed, 0 errors)', @@ -3212,7 +3212,8 @@ class TestRelinker(unittest.TestCase): self.assertEqual([], self.logger.get_lines_for_level('error')) def test_cleanup_not_yet_relinked(self): - # force rehash of new partition to not happen during cleanup + # force new partition to be above range of partitions visited during + # cleanup self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1)) self._common_test_cleanup(relink=False) with self._mock_relinker(): @@ -3234,11 +3235,48 @@ class TestRelinker(unittest.TestCase): info_lines = self.logger.get_lines_for_level('info') self.assertIn('1 hash dirs processed (cleanup=True) ' '(1 files, 1 linked, 1 removed, 0 errors)', info_lines) - # suffix should be invalidated in new partition + # suffix should be invalidated and rehashed in new partition hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid') self.assertTrue(os.path.exists(hashes_invalid)) with open(hashes_invalid, 'r') as fd: - self.assertEqual(str(self.suffix), fd.read().strip()) + self.assertEqual('', fd.read().strip()) + self.assertEqual([], self.logger.get_lines_for_level('error')) + + def test_cleanup_not_yet_relinked_low(self): + # force new partition to be in the range of partitions visited during + # cleanup, but not exist until after cleanup would have visited it + self._setup_object(lambda part: part < 2 ** (PART_POWER - 1)) + self._common_test_cleanup(relink=False) + self.assertFalse(os.path.isfile(self.expected_file)) + self.assertFalse(os.path.exists(self.next_part_dir)) + # Relinker processes partitions in reverse order; as a result, the + # "normal" rehash during cleanup won't hit this, since it doesn't + # exist yet -- but when we finish processing the old partition, + # we'll loop back around. + with self._mock_relinker(): + self.assertEqual(0, relinker.main([ + 'cleanup', + '--swift-dir', self.testdir, + '--devices', self.devices, + '--skip-mount', + ])) + + self.assertTrue(os.path.isfile(self.expected_file)) # link created + # old partition should be cleaned up + self.assertFalse(os.path.exists(self.part_dir)) + self.assertEqual([], self.logger.get_lines_for_level('warning')) + self.assertIn( + 'Relinking (cleanup) created link: %s to %s' + % (self.objname, self.expected_file), + self.logger.get_lines_for_level('debug')) + info_lines = self.logger.get_lines_for_level('info') + self.assertIn('1 hash dirs processed (cleanup=True) ' + '(1 files, 1 linked, 1 removed, 0 errors)', info_lines) + # suffix should be invalidated and rehashed in new partition + hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid') + self.assertTrue(os.path.exists(hashes_invalid)) + with open(hashes_invalid, 'r') as fd: + self.assertEqual('', fd.read().strip()) self.assertEqual([], self.logger.get_lines_for_level('error')) def test_cleanup_same_object_different_inode_in_new_partition(self): @@ -3302,7 +3340,8 @@ class TestRelinker(unittest.TestCase): self.assertEqual(0, res) # old partition should be cleaned up self.assertFalse(os.path.exists(self.part_dir)) - self.assertTrue(os.path.isfile(older_obj_file)) # older file intact + # which is also going to clean up the older file + self.assertFalse(os.path.isfile(older_obj_file)) self.assertTrue(os.path.isfile(self.expected_file)) # link created self.assertIn( 'Relinking (cleanup) created link: %s to %s' @@ -3312,11 +3351,11 @@ class TestRelinker(unittest.TestCase): info_lines = self.logger.get_lines_for_level('info') self.assertIn('1 hash dirs processed (cleanup=True) ' '(1 files, 1 linked, 1 removed, 0 errors)', info_lines) - # suffix should be invalidated in new partition + # suffix should be invalidated and rehashed in new partition hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid') self.assertTrue(os.path.exists(hashes_invalid)) with open(hashes_invalid, 'r') as fd: - self.assertEqual(str(self.suffix), fd.read().strip()) + self.assertEqual('', fd.read().strip()) self.assertEqual([], self.logger.get_lines_for_level('error')) def test_cleanup_deleted(self): @@ -3613,11 +3652,11 @@ class TestRelinker(unittest.TestCase): '--skip-mount', ])) warning_lines = self.logger.get_lines_for_level('warning') - # once for cleanup_ondisk_files in old and once once for the - # get_ondisk_files of union of files; the new partition did not exist - # at start of cleanup so is not rehashed - self.assertEqual(2, len(warning_lines), - 'Expected 2 log lines, got %r' % warning_lines) + # once for cleanup_ondisk_files in old, again for the get_ondisk_files + # of union of files, and one last time when the new partition gets + # rehashed at the end of processing the old one + self.assertEqual(3, len(warning_lines), + 'Expected 3 log lines, got %r' % warning_lines) for line in warning_lines: self.assertIn('Bad fragment index: None', line, warning_lines) self.assertIn( @@ -3666,12 +3705,8 @@ class TestRelinker(unittest.TestCase): ])) expected = [('invalidate', self.next_suffix_dir)] if self.part >= 2 ** (PART_POWER - 1): - expected.extend([ - ('get_hashes', self.existing_device, self.next_part & ~1, - [], POLICIES[0]), - ('get_hashes', self.existing_device, self.next_part | 1, - [], POLICIES[0]), - ]) + expected.append(('get_hashes', self.existing_device, + self.next_part, [], POLICIES[0])) self.assertEqual(calls, expected) # Depending on partition, there may or may not be a get_hashes here diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index a6d1741752..4f2cabc5d2 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -4468,6 +4468,22 @@ cluster_dfw1 = http://dfw1.host/v1/ self.assertEqual(0, utils.get_partition_for_hash(hex_hash, 0)) self.assertEqual(0, utils.get_partition_for_hash(hex_hash, -1)) + def test_get_partition_from_path(self): + def do_test(path): + self.assertEqual(utils.get_partition_from_path('/s/n', path), 70) + self.assertEqual(utils.get_partition_from_path('/s/n/', path), 70) + path += '/' + self.assertEqual(utils.get_partition_from_path('/s/n', path), 70) + self.assertEqual(utils.get_partition_from_path('/s/n/', path), 70) + + do_test('/s/n/d/o/70/c77/af088baea4806dcaba30bf07d9e64c77/f') + # also works with a hashdir + do_test('/s/n/d/o/70/c77/af088baea4806dcaba30bf07d9e64c77') + # or suffix dir + do_test('/s/n/d/o/70/c77') + # or even the part dir itself + do_test('/s/n/d/o/70') + def test_replace_partition_in_path(self): # Check for new part = part * 2 old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'