Merge "Use correct headers in reconstructor requests"

This commit is contained in:
Zuul 2018-11-01 02:39:49 +00:00 committed by Gerrit Code Review
commit dd2448f708
2 changed files with 63 additions and 5 deletions

View File

@ -653,13 +653,15 @@ class ObjectReconstructor(Daemon):
return suffixes
def rehash_remote(self, node, job, suffixes):
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
try:
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(sorted(suffixes)),
headers=self.headers)
headers=headers)
conn.getresponse().read()
except (Exception, Timeout):
self.logger.exception(
@ -680,12 +682,14 @@ class ObjectReconstructor(Daemon):
"""
# get hashes from the remote node
remote_suffixes = None
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
try:
with Timeout(self.http_timeout):
resp = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=self.headers).getresponse()
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
_('%s responded as unmounted'),
@ -774,7 +778,6 @@ class ObjectReconstructor(Daemon):
:param: the job dict, with the keys defined in ``_get_job_info``
"""
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
begin = time.time()
if job['job_type'] == REVERT:
self._revert(job, begin)

View File

@ -3382,6 +3382,60 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
v, job[k], k)
self.assertEqual(v, job[k], msg)
def test_rehash_remote(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 1,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
node = job['sync_to'][0]
# process_job used to try and modify the instance base headers
self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \
int(POLICIES[1])
# ... which doesn't work out under concurrency with multiple policies
self.assertNotEqual(
self.reconstructor.headers['X-Backend-Storage-Policy-Index'],
int(job['policy']))
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.rehash_remote(node, job, [])
self.assertEqual([int(job['policy'])], [
r['headers']['X-Backend-Storage-Policy-Index']
for r in request_log.requests])
def test_get_suffixes_to_sync(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 1,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
node = job['sync_to'][0]
# process_job used to try and modify the instance base headers
self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \
int(POLICIES[1])
# ... which doesn't work out under concurrency with multiple policies
self.assertNotEqual(
self.reconstructor.headers['X-Backend-Storage-Policy-Index'],
int(job['policy']))
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor._get_suffixes_to_sync(job, node)
self.assertEqual([int(job['policy'])], [
r['headers']['X-Backend-Storage-Policy-Index']
for r in request_log.requests])
def test_get_suffix_delta(self):
# different
local_suff = {'123': {None: 'abc', 0: 'def'}}
@ -4831,8 +4885,9 @@ class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive):
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
ec_segment_size=4096,
ec_duplication_factor=2)],
fake_ring_args=[{'replicas': 28}])
ec_duplication_factor=2),
StoragePolicy(1, name='other')],
fake_ring_args=[{'replicas': 28}, {'replicas': 3}])
class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
def setUp(self):
super(TestObjectReconstructorECDuplicationFactor, self).setUp()