From 8dcdaff64eee37fdbaf2e83bbe6fa07dbfe36bd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20Kvasni=C4=8Dka?= Date: Fri, 25 Nov 2016 09:26:27 +0100 Subject: [PATCH] Fix non-deterministic suffix updates in hashes.pkl Right now the do_listdir option was set on every 10th replication run. Due to the randomness of the job listing this might update a given partition much less often than expected, for example with 1000 partitions per replicator only every ~70th run. Co-Authored-By: Alistair Coles Co-Authored-By: Clay Gerrard Co-Authored-By: Christian Schwede Related-Bug: #1634967 Closes-Bug: 1644807 Change-Id: Ib5c9dd17e40150450ec57a728ae8652fbc730af6 --- swift/obj/diskfile.py | 1 + swift/obj/replicator.py | 10 +++++- test/unit/obj/test_replicator.py | 60 ++++++++++++++++++++++++++++---- 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 4c3213b0ce..f47fcdab28 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1065,6 +1065,7 @@ class BaseDiskFileManager(object): if len(suff) == 3: hashes.setdefault(suff, None) modified = True + self.logger.debug('Run listdir on %s', partition_path) hashes.update((suffix, None) for suffix in recalculate) for suffix, hash_ in hashes.items(): if not hash_: diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 91914fe7ee..0aee76bfeb 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -46,6 +46,10 @@ DEFAULT_RSYNC_TIMEOUT = 900 hubs.use_hub(get_hub()) +def _do_listdir(partition, replication_cycle): + return (((partition + replication_cycle) % 10) == 0) + + class ObjectReplicator(Daemon): """ Replicate objects. @@ -74,6 +78,7 @@ class ObjectReplicator(Daemon): self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.next_check = time.time() + self.ring_check_interval self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7)) + self.replication_cycle = random.randint(0, 9) self.partition_times = [] self.interval = int(conf.get('interval') or conf.get('run_pause') or 30) @@ -404,7 +409,9 @@ class ObjectReplicator(Daemon): try: hashed, local_hash = tpool_reraise( self._diskfile_mgr._get_hashes, job['path'], - do_listdir=(self.replication_count % 10) == 0, + do_listdir=_do_listdir( + int(job['partition']), + self.replication_cycle), reclaim_age=self.reclaim_age) self.suffix_hash += hashed self.logger.update_stats('suffix.hashes', hashed) @@ -692,6 +699,7 @@ class ObjectReplicator(Daemon): self.suffix_hash = 0 self.replication_count = 0 self.last_replication_count = -1 + self.replication_cycle = (self.replication_cycle + 1) % 10 self.partition_times = [] self.my_replication_ips = self._get_my_replication_ips() self.all_devs_info = set() diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index e16056079e..c863e45a93 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -312,11 +312,20 @@ class TestObjectReplicator(unittest.TestCase): for node in nodes: process_arg_checker.append( (0, '', ['rsync', whole_path_from, rsync_mods])) + start = replicator.replication_cycle + self.assertGreaterEqual(start, 0) + self.assertLess(start, 9) with _mock_process(process_arg_checker): replicator.run_once() + self.assertEqual(start + 1, replicator.replication_cycle) self.assertFalse(process_errors) self.assertFalse(self.logger.get_lines_for_level('error')) object_replicator.http_connect = was_connector + with _mock_process(process_arg_checker): + for cycle in range(1, 10): + replicator.run_once() + self.assertEqual((start + 1 + cycle) % 10, + replicator.replication_cycle) # policy 1 def test_run_once_1(self): @@ -1695,9 +1704,10 @@ class TestObjectReplicator(unittest.TestCase): self.replicator.sync_method.assert_called_once_with( 'node', 'job', 'suffixes') - @mock.patch('swift.obj.replicator.tpool_reraise', autospec=True) + @mock.patch('swift.obj.replicator.tpool_reraise') @mock.patch('swift.obj.replicator.http_connect', autospec=True) - def test_update(self, mock_http, mock_tpool_reraise): + @mock.patch('swift.obj.replicator._do_listdir') + def test_update(self, mock_do_listdir, mock_http, mock_tpool_reraise): def set_default(self): self.replicator.suffix_count = 0 @@ -1715,11 +1725,23 @@ class TestObjectReplicator(unittest.TestCase): mock_http.return_value = answer = mock.MagicMock() answer.getresponse.return_value = resp = mock.MagicMock() - # Check uncorrect http_connect with status 507 and + # Check incorrect http_connect with status 507 and # count of attempts and call args resp.status = 507 error = '%(replication_ip)s/%(device)s responded as unmounted' expect = 'Error syncing partition: ' + expected_listdir_calls = [ + mock.call(int(job['partition']), + self.replicator.replication_cycle) + for job in jobs] + do_listdir_results = [False, False, True, False, True, False] + mock_do_listdir.side_effect = do_listdir_results + expected_tpool_calls = [ + mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'], + do_listdir=do_listdir, + reclaim_age=self.replicator.reclaim_age) + for job, do_listdir in zip(jobs, do_listdir_results) + ] for job in jobs: set_default(self) ring = job['policy'].object_ring @@ -1749,8 +1771,11 @@ class TestObjectReplicator(unittest.TestCase): mock_http.assert_has_calls(reqs, any_order=True) mock_http.reset_mock() self.logger.clear() - - # Check uncorrect http_connect with status 400 != HTTP_OK + mock_do_listdir.assert_has_calls(expected_listdir_calls) + mock_tpool_reraise.assert_has_calls(expected_tpool_calls) + mock_do_listdir.side_effect = None + mock_do_listdir.return_value = False + # Check incorrect http_connect with status 400 != HTTP_OK resp.status = 400 error = 'Invalid response %(resp)s from %(ip)s' for job in jobs: @@ -1765,7 +1790,7 @@ class TestObjectReplicator(unittest.TestCase): self.logger.clear() # Check successful http_connection and exception with - # uncorrect pickle.loads(resp.read()) + # incorrect pickle.loads(resp.read()) resp.status = 200 expect = 'Error syncing with node: %r: ' for job in jobs: @@ -1896,6 +1921,29 @@ class TestObjectReplicator(unittest.TestCase): _m_os_path_exists.call_args_list[-2][0][0], os.path.join(job['path'])) + def test_do_listdir(self): + # Test if do_listdir is enabled for every 10th partition to rehash + # First number is the number of partitions in the job, list entries + # are the expected partition numbers per run + test_data = { + 9: [1, 0, 1, 1, 1, 1, 1, 1, 1, 1], + 29: [3, 2, 3, 3, 3, 3, 3, 3, 3, 3], + 111: [12, 11, 11, 11, 11, 11, 11, 11, 11, 11]} + + for partitions, expected in test_data.items(): + seen = [] + for phase in range(10): + invalidated = 0 + for partition in range(partitions): + if object_replicator._do_listdir(partition, phase): + seen.append(partition) + invalidated += 1 + # Every 10th partition is seen after each phase + self.assertEqual(expected[phase], invalidated) + + # After 10 cycles every partition is seen exactly once + self.assertEqual(sorted(range(partitions)), sorted(seen)) + if __name__ == '__main__': unittest.main()