From 441df4fc930db7fc82d89b8c6a4d28d935554518 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Tue, 30 Oct 2018 11:38:53 -0500 Subject: [PATCH] Use correct headers in reconstructor requests As long as the reconstructor collects parts from all policies each job must be considered to have it's storage policy index and we can't use global state for policy specific headers. It's good hygiene to avoid mutating the global state regardless. Under load with multiple policies we observed essentially empty handoff parts "re-appearing" on nodes until adding these changes. Closes-Bug: #1671180 Change-Id: Id0e5f2743e05d81da7b26b2f05c90ba3c68e4d72 --- swift/obj/reconstructor.py | 9 +++-- test/unit/obj/test_reconstructor.py | 59 ++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 2a2de8ef22..17a00dc36a 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -653,13 +653,15 @@ class ObjectReconstructor(Daemon): return suffixes def rehash_remote(self, node, job, suffixes): + headers = self.headers.copy() + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) try: with Timeout(self.http_timeout): conn = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(sorted(suffixes)), - headers=self.headers) + headers=headers) conn.getresponse().read() except (Exception, Timeout): self.logger.exception( @@ -680,12 +682,14 @@ class ObjectReconstructor(Daemon): """ # get hashes from the remote node remote_suffixes = None + headers = self.headers.copy() + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) try: with Timeout(self.http_timeout): resp = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', - '', headers=self.headers).getresponse() + '', headers=headers).getresponse() if resp.status == HTTP_INSUFFICIENT_STORAGE: self.logger.error( _('%s responded as unmounted'), @@ -774,7 +778,6 @@ class ObjectReconstructor(Daemon): :param: the job dict, with the keys defined in ``_get_job_info`` """ - self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) begin = time.time() if job['job_type'] == REVERT: self._revert(job, begin) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 33d75ed85e..3daad5661e 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -3382,6 +3382,60 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): v, job[k], k) self.assertEqual(v, job[k], msg) + def test_rehash_remote(self): + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), '1') + utils.mkdirs(part_path) + part_info = { + 'local_dev': self.local_dev, + 'policy': self.policy, + 'partition': 1, + 'part_path': part_path, + } + jobs = self.reconstructor.build_reconstruction_jobs(part_info) + self.assertEqual(1, len(jobs)) + job = jobs[0] + node = job['sync_to'][0] + # process_job used to try and modify the instance base headers + self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \ + int(POLICIES[1]) + # ... which doesn't work out under concurrency with multiple policies + self.assertNotEqual( + self.reconstructor.headers['X-Backend-Storage-Policy-Index'], + int(job['policy'])) + with mocked_http_conn(200, body=pickle.dumps({})) as request_log: + self.reconstructor.rehash_remote(node, job, []) + self.assertEqual([int(job['policy'])], [ + r['headers']['X-Backend-Storage-Policy-Index'] + for r in request_log.requests]) + + def test_get_suffixes_to_sync(self): + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), '1') + utils.mkdirs(part_path) + part_info = { + 'local_dev': self.local_dev, + 'policy': self.policy, + 'partition': 1, + 'part_path': part_path, + } + jobs = self.reconstructor.build_reconstruction_jobs(part_info) + self.assertEqual(1, len(jobs)) + job = jobs[0] + node = job['sync_to'][0] + # process_job used to try and modify the instance base headers + self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \ + int(POLICIES[1]) + # ... which doesn't work out under concurrency with multiple policies + self.assertNotEqual( + self.reconstructor.headers['X-Backend-Storage-Policy-Index'], + int(job['policy'])) + with mocked_http_conn(200, body=pickle.dumps({})) as request_log: + self.reconstructor._get_suffixes_to_sync(job, node) + self.assertEqual([int(job['policy'])], [ + r['headers']['X-Backend-Storage-Policy-Index'] + for r in request_log.requests]) + def test_get_suffix_delta(self): # different local_suff = {'123': {None: 'abc', 0: 'def'}} @@ -4831,8 +4885,9 @@ class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive): ec_type=DEFAULT_TEST_EC_TYPE, ec_ndata=10, ec_nparity=4, ec_segment_size=4096, - ec_duplication_factor=2)], - fake_ring_args=[{'replicas': 28}]) + ec_duplication_factor=2), + StoragePolicy(1, name='other')], + fake_ring_args=[{'replicas': 28}, {'replicas': 3}]) class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): def setUp(self): super(TestObjectReconstructorECDuplicationFactor, self).setUp()