diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 8f84b06c7a..81ecc6589a 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -252,6 +252,13 @@ class ObjectReconstructor(Daemon): if not resp: continue resp.headers = HeaderKeyDict(resp.getheaders()) + if str(fi_to_rebuild) == \ + resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'): + continue + if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set( + r.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + for r in responses): + continue responses.append(resp) etag = sorted(responses, reverse=True, key=lambda r: Timestamp( diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index a52e64bd1a..f753de00ce 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -23,6 +23,7 @@ import time import shutil import re import random +import struct from eventlet import Timeout from contextlib import closing, nested, contextmanager @@ -120,6 +121,14 @@ def count_stats(logger, key, metric): return count +def get_header_frag_index(self, body): + metadata = self.policy.pyeclib_driver.get_metadata(body) + frag_index = struct.unpack('h', metadata[:2])[0] + return { + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + } + + @patch_policies([StoragePolicy(0, name='zero', is_default=True), ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand', ec_ndata=2, ec_nparity=1)]) @@ -2406,9 +2415,13 @@ class TestObjectReconstructor(unittest.TestCase): broken_body = ec_archive_bodies.pop(1) - responses = list((200, body) for body in ec_archive_bodies) - headers = {'X-Object-Sysmeta-Ec-Etag': etag} - codes, body_iter = zip(*responses) + responses = list() + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + responses.append((200, body, headers)) + + codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( job, node, metadata) @@ -2436,17 +2449,21 @@ class TestObjectReconstructor(unittest.TestCase): broken_body = ec_archive_bodies.pop(4) - base_responses = list((200, body) for body in ec_archive_bodies) + base_responses = list() + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + base_responses.append((200, body, headers)) + # since we're already missing a fragment a +2 scheme can only support # one additional failure at a time for error in (Timeout(), 404, Exception('kaboom!')): - responses = list(base_responses) + responses = base_responses error_index = random.randint(0, len(responses) - 1) - responses[error_index] = (error, '') - headers = {'X-Object-Sysmeta-Ec-Etag': etag} - codes, body_iter = zip(*responses) + responses[error_index] = (error, '', '') + codes, body_iter, headers_iter = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, - headers=headers): + headers=headers_iter): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) fixed_body = ''.join(df.reader()) @@ -2476,16 +2493,19 @@ class TestObjectReconstructor(unittest.TestCase): # the scheme is 10+4, so this gets a parity node broken_body = ec_archive_bodies.pop(-4) - base_responses = list((200, body) for body in ec_archive_bodies) + responses = list() + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + responses.append((200, body, headers)) + for error in (Timeout(), 404, Exception('kaboom!')): - responses = list(base_responses) # grab a data node index error_index = random.randint(0, self.policy.ec_ndata - 1) - responses[error_index] = (error, '') - headers = {'X-Object-Sysmeta-Ec-Etag': etag} - codes, body_iter = zip(*responses) + responses[error_index] = (error, '', '') + codes, body_iter, headers_iter = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, - headers=headers): + headers=headers_iter): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) fixed_body = ''.join(df.reader()) @@ -2532,23 +2552,28 @@ class TestObjectReconstructor(unittest.TestCase): ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) # bad response - bad_response = (200, '', { + bad_headers = { 'X-Object-Sysmeta-Ec-Etag': 'some garbage', 'X-Backend-Timestamp': next(ts).internal, - }) + } # good responses - headers = { - 'X-Object-Sysmeta-Ec-Etag': etag, - 'X-Backend-Timestamp': next(ts).internal - } - responses = [(200, body, headers) - for body in ec_archive_bodies] + responses = list() + t1 = next(ts).internal + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': t1}) + responses.append((200, body, headers)) + # mixed together - error_index = random.randint(0, len(responses) - 2) + error_index = random.randint(0, self.policy.ec_ndata) + error_headers = get_header_frag_index(self, + (responses[error_index])[1]) + error_headers.update(bad_headers) + bad_response = (200, '', bad_headers) responses[error_index] = bad_response codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): @@ -2577,18 +2602,19 @@ class TestObjectReconstructor(unittest.TestCase): ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + # good responses - headers = { - 'X-Object-Sysmeta-Ec-Etag': etag, - 'X-Backend-Timestamp': next(ts).internal - } - responses = [(200, body, headers) - for body in ec_archive_bodies] - codes, body_iter, headers = zip(*responses) + responses = list() + t0 = next(ts).internal + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': t0}) + responses.append((200, body, headers)) # sanity check before negative test + codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) @@ -2598,17 +2624,100 @@ class TestObjectReconstructor(unittest.TestCase): md5(broken_body).hexdigest()) # one newer etag can spoil the bunch - new_response = (200, '', { - 'X-Object-Sysmeta-Ec-Etag': 'some garbage', - 'X-Backend-Timestamp': next(ts).internal, - }) new_index = random.randint(0, len(responses) - self.policy.ec_nparity) + new_headers = get_header_frag_index(self, (responses[new_index])[1]) + new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage', + 'X-Backend-Timestamp': next(ts).internal}) + new_response = (200, '', new_headers) responses[new_index] = new_response codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, job, node, dict(metadata)) + def test_reconstruct_fa_finds_itself_does_not_fail(self): + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + metadata = { + 'name': '/a/c/o', + 'Content-Length': 0, + 'ETag': 'etag', + } + + test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data).hexdigest() + ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + + # instead of popping the broken body, we'll just leave it in the list + # of responses and take away something else. + broken_body = ec_archive_bodies[1] + ec_archive_bodies = ec_archive_bodies[:-1] + + def make_header(body): + metadata = self.policy.pyeclib_driver.get_metadata(body) + frag_index = struct.unpack('h', metadata[:2])[0] + return { + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + 'X-Object-Sysmeta-Ec-Etag': etag, + } + + responses = [(200, body, make_header(body)) + for body in ec_archive_bodies] + codes, body_iter, headers = zip(*responses) + with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): + df = self.reconstructor.reconstruct_fa( + job, node, metadata) + fixed_body = ''.join(df.reader()) + self.assertEqual(len(fixed_body), len(broken_body)) + self.assertEqual(md5(fixed_body).hexdigest(), + md5(broken_body).hexdigest()) + + def test_reconstruct_fa_finds_duplicate_does_not_fail(self): + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + metadata = { + 'name': '/a/c/o', + 'Content-Length': 0, + 'ETag': 'etag', + } + + test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data).hexdigest() + ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + + broken_body = ec_archive_bodies.pop(1) + # add some duplicates + num_duplicates = self.policy.ec_nparity - 1 + ec_archive_bodies = (ec_archive_bodies[:num_duplicates] + + ec_archive_bodies)[:-num_duplicates] + + def make_header(body): + metadata = self.policy.pyeclib_driver.get_metadata(body) + frag_index = struct.unpack('h', metadata[:2])[0] + return { + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + 'X-Object-Sysmeta-Ec-Etag': etag, + } + + responses = [(200, body, make_header(body)) + for body in ec_archive_bodies] + codes, body_iter, headers = zip(*responses) + with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): + df = self.reconstructor.reconstruct_fa( + job, node, metadata) + fixed_body = ''.join(df.reader()) + self.assertEqual(len(fixed_body), len(broken_body)) + self.assertEqual(md5(fixed_body).hexdigest(), + md5(broken_body).hexdigest()) + if __name__ == '__main__': unittest.main()