Use correct headers in reconstructor requests

As long as the reconstructor collects parts from all policies each job
must be considered to have it's storage policy index and we can't use
global state for policy specific headers.  It's good hygiene to avoid
mutating the global state regardless.

Under load with multiple policies we observed essentially empty handoff
parts "re-appearing" on nodes until adding these changes.

Closes-Bug: #1671180
Change-Id: Id0e5f2743e05d81da7b26b2f05c90ba3c68e4d72
This commit is contained in:
Clay Gerrard 2018-10-30 11:38:53 -05:00
parent 2cfe31551d
commit 441df4fc93
2 changed files with 63 additions and 5 deletions

View File

@ -653,13 +653,15 @@ class ObjectReconstructor(Daemon):
return suffixes
def rehash_remote(self, node, job, suffixes):
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
try:
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(sorted(suffixes)),
headers=self.headers)
headers=headers)
conn.getresponse().read()
except (Exception, Timeout):
self.logger.exception(
@ -680,12 +682,14 @@ class ObjectReconstructor(Daemon):
"""
# get hashes from the remote node
remote_suffixes = None
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
try:
with Timeout(self.http_timeout):
resp = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'', headers=self.headers).getresponse()
'', headers=headers).getresponse()
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
_('%s responded as unmounted'),
@ -774,7 +778,6 @@ class ObjectReconstructor(Daemon):
:param: the job dict, with the keys defined in ``_get_job_info``
"""
self.headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
begin = time.time()
if job['job_type'] == REVERT:
self._revert(job, begin)

View File

@ -3382,6 +3382,60 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
v, job[k], k)
self.assertEqual(v, job[k], msg)
def test_rehash_remote(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 1,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
node = job['sync_to'][0]
# process_job used to try and modify the instance base headers
self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \
int(POLICIES[1])
# ... which doesn't work out under concurrency with multiple policies
self.assertNotEqual(
self.reconstructor.headers['X-Backend-Storage-Policy-Index'],
int(job['policy']))
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.rehash_remote(node, job, [])
self.assertEqual([int(job['policy'])], [
r['headers']['X-Backend-Storage-Policy-Index']
for r in request_log.requests])
def test_get_suffixes_to_sync(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 1,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
node = job['sync_to'][0]
# process_job used to try and modify the instance base headers
self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \
int(POLICIES[1])
# ... which doesn't work out under concurrency with multiple policies
self.assertNotEqual(
self.reconstructor.headers['X-Backend-Storage-Policy-Index'],
int(job['policy']))
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor._get_suffixes_to_sync(job, node)
self.assertEqual([int(job['policy'])], [
r['headers']['X-Backend-Storage-Policy-Index']
for r in request_log.requests])
def test_get_suffix_delta(self):
# different
local_suff = {'123': {None: 'abc', 0: 'def'}}
@ -4831,8 +4885,9 @@ class TestReconstructFragmentArchiveUTF8(TestReconstructFragmentArchive):
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
ec_segment_size=4096,
ec_duplication_factor=2)],
fake_ring_args=[{'replicas': 28}])
ec_duplication_factor=2),
StoragePolicy(1, name='other')],
fake_ring_args=[{'replicas': 28}, {'replicas': 3}])
class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
def setUp(self):
super(TestObjectReconstructorECDuplicationFactor, self).setUp()