Merge "EC Reconstructor: Do not reconstruct existing fragments."

This commit is contained in:
Jenkins 2015-07-07 22:24:16 +00:00 committed by Gerrit Code Review
commit 131668f359
2 changed files with 153 additions and 37 deletions

View File

@ -252,6 +252,13 @@ class ObjectReconstructor(Daemon):
if not resp: if not resp:
continue continue
resp.headers = HeaderKeyDict(resp.getheaders()) resp.headers = HeaderKeyDict(resp.getheaders())
if str(fi_to_rebuild) == \
resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'):
continue
if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set(
r.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
for r in responses):
continue
responses.append(resp) responses.append(resp)
etag = sorted(responses, reverse=True, etag = sorted(responses, reverse=True,
key=lambda r: Timestamp( key=lambda r: Timestamp(

View File

@ -23,6 +23,7 @@ import time
import shutil import shutil
import re import re
import random import random
import struct
from eventlet import Timeout from eventlet import Timeout
from contextlib import closing, nested, contextmanager from contextlib import closing, nested, contextmanager
@ -120,6 +121,14 @@ def count_stats(logger, key, metric):
return count return count
def get_header_frag_index(self, body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
}
@patch_policies([StoragePolicy(0, name='zero', is_default=True), @patch_policies([StoragePolicy(0, name='zero', is_default=True),
ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand', ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
ec_ndata=2, ec_nparity=1)]) ec_ndata=2, ec_nparity=1)])
@ -2406,9 +2415,13 @@ class TestObjectReconstructor(unittest.TestCase):
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
responses = list((200, body) for body in ec_archive_bodies) responses = list()
headers = {'X-Object-Sysmeta-Ec-Etag': etag} for body in ec_archive_bodies:
codes, body_iter = zip(*responses) headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, metadata) job, node, metadata)
@ -2436,17 +2449,21 @@ class TestObjectReconstructor(unittest.TestCase):
broken_body = ec_archive_bodies.pop(4) broken_body = ec_archive_bodies.pop(4)
base_responses = list((200, body) for body in ec_archive_bodies) base_responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
base_responses.append((200, body, headers))
# since we're already missing a fragment a +2 scheme can only support # since we're already missing a fragment a +2 scheme can only support
# one additional failure at a time # one additional failure at a time
for error in (Timeout(), 404, Exception('kaboom!')): for error in (Timeout(), 404, Exception('kaboom!')):
responses = list(base_responses) responses = base_responses
error_index = random.randint(0, len(responses) - 1) error_index = random.randint(0, len(responses) - 1)
responses[error_index] = (error, '') responses[error_index] = (error, '', '')
headers = {'X-Object-Sysmeta-Ec-Etag': etag} codes, body_iter, headers_iter = zip(*responses)
codes, body_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata)) job, node, dict(metadata))
fixed_body = ''.join(df.reader()) fixed_body = ''.join(df.reader())
@ -2476,16 +2493,19 @@ class TestObjectReconstructor(unittest.TestCase):
# the scheme is 10+4, so this gets a parity node # the scheme is 10+4, so this gets a parity node
broken_body = ec_archive_bodies.pop(-4) broken_body = ec_archive_bodies.pop(-4)
base_responses = list((200, body) for body in ec_archive_bodies) responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
for error in (Timeout(), 404, Exception('kaboom!')): for error in (Timeout(), 404, Exception('kaboom!')):
responses = list(base_responses)
# grab a data node index # grab a data node index
error_index = random.randint(0, self.policy.ec_ndata - 1) error_index = random.randint(0, self.policy.ec_ndata - 1)
responses[error_index] = (error, '') responses[error_index] = (error, '', '')
headers = {'X-Object-Sysmeta-Ec-Etag': etag} codes, body_iter, headers_iter = zip(*responses)
codes, body_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata)) job, node, dict(metadata))
fixed_body = ''.join(df.reader()) fixed_body = ''.join(df.reader())
@ -2532,23 +2552,28 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# bad response # bad response
bad_response = (200, '', { bad_headers = {
'X-Object-Sysmeta-Ec-Etag': 'some garbage', 'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal, 'X-Backend-Timestamp': next(ts).internal,
}) }
# good responses # good responses
headers = { responses = list()
'X-Object-Sysmeta-Ec-Etag': etag, t1 = next(ts).internal
'X-Backend-Timestamp': next(ts).internal for body in ec_archive_bodies:
} headers = get_header_frag_index(self, body)
responses = [(200, body, headers) headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
for body in ec_archive_bodies] 'X-Backend-Timestamp': t1})
responses.append((200, body, headers))
# mixed together # mixed together
error_index = random.randint(0, len(responses) - 2) error_index = random.randint(0, self.policy.ec_ndata)
error_headers = get_header_frag_index(self,
(responses[error_index])[1])
error_headers.update(bad_headers)
bad_response = (200, '', bad_headers)
responses[error_index] = bad_response responses[error_index] = bad_response
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
@ -2577,18 +2602,19 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# good responses # good responses
headers = { responses = list()
'X-Object-Sysmeta-Ec-Etag': etag, t0 = next(ts).internal
'X-Backend-Timestamp': next(ts).internal for body in ec_archive_bodies:
} headers = get_header_frag_index(self, body)
responses = [(200, body, headers) headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
for body in ec_archive_bodies] 'X-Backend-Timestamp': t0})
codes, body_iter, headers = zip(*responses) responses.append((200, body, headers))
# sanity check before negative test # sanity check before negative test
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata)) job, node, dict(metadata))
@ -2598,17 +2624,100 @@ class TestObjectReconstructor(unittest.TestCase):
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# one newer etag can spoil the bunch # one newer etag can spoil the bunch
new_response = (200, '', {
'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal,
})
new_index = random.randint(0, len(responses) - self.policy.ec_nparity) new_index = random.randint(0, len(responses) - self.policy.ec_nparity)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal})
new_response = (200, '', new_headers)
responses[new_index] = new_response responses[new_index] = new_response
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, dict(metadata)) job, node, dict(metadata))
def test_reconstruct_fa_finds_itself_does_not_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
broken_body = ec_archive_bodies[1]
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# add some duplicates
num_duplicates = self.policy.ec_nparity - 1
ec_archive_bodies = (ec_archive_bodies[:num_duplicates] +
ec_archive_bodies)[:-num_duplicates]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()