From 44b76a1b1b83ac9563010f1ddfca5fca76e567bf Mon Sep 17 00:00:00 2001 From: Minwoo Bae Date: Thu, 18 Jun 2015 14:21:06 -0500 Subject: [PATCH] EC Reconstructor: Do not reconstruct existing fragments. The EC reconstructor needs to verify that the fragment needing to be reconstructed does not reside in the collection of node responses. Otherwise, resources will be spent unnecessarily reconstructing the fragment. Moreover, this could cause a segfault on some backends. This change adds the necessary verification steps to make sure that a fragment will only be rebuilt in the case it is missing from the other fragment archives. Added some tests to provide coverage for these scenarios. Change-Id: I91f3d4af52cbc66c9f7ce00726f247b5462e66f9 Closes-Bug: #1452553 --- swift/obj/reconstructor.py | 7 ++ test/unit/obj/test_reconstructor.py | 183 ++++++++++++++++++++++------ 2 files changed, 153 insertions(+), 37 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index f9aa5f15d8..5b282de87c 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -249,6 +249,13 @@ class ObjectReconstructor(Daemon): if not resp: continue resp.headers = HeaderKeyDict(resp.getheaders()) + if str(fi_to_rebuild) == \ + resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'): + continue + if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set( + r.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + for r in responses): + continue responses.append(resp) etag = sorted(responses, reverse=True, key=lambda r: Timestamp( diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 321ea3751d..60b2b4ff6b 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -23,6 +23,7 @@ import time import shutil import re import random +import struct from eventlet import Timeout from contextlib import closing, nested, contextmanager @@ -126,6 +127,14 @@ def count_stats(logger, key, metric): return count +def get_header_frag_index(self, body): + metadata = self.policy.pyeclib_driver.get_metadata(body) + frag_index = struct.unpack('h', metadata[:2])[0] + return { + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + } + + @patch_policies([StoragePolicy(0, name='zero', is_default=True), ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand', ec_ndata=2, ec_nparity=1)]) @@ -2309,9 +2318,13 @@ class TestObjectReconstructor(unittest.TestCase): broken_body = ec_archive_bodies.pop(1) - responses = list((200, body) for body in ec_archive_bodies) - headers = {'X-Object-Sysmeta-Ec-Etag': etag} - codes, body_iter = zip(*responses) + responses = list() + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + responses.append((200, body, headers)) + + codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( job, node, metadata) @@ -2339,17 +2352,21 @@ class TestObjectReconstructor(unittest.TestCase): broken_body = ec_archive_bodies.pop(4) - base_responses = list((200, body) for body in ec_archive_bodies) + base_responses = list() + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + base_responses.append((200, body, headers)) + # since we're already missing a fragment a +2 scheme can only support # one additional failure at a time for error in (Timeout(), 404, Exception('kaboom!')): - responses = list(base_responses) + responses = base_responses error_index = random.randint(0, len(responses) - 1) - responses[error_index] = (error, '') - headers = {'X-Object-Sysmeta-Ec-Etag': etag} - codes, body_iter = zip(*responses) + responses[error_index] = (error, '', '') + codes, body_iter, headers_iter = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, - headers=headers): + headers=headers_iter): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) fixed_body = ''.join(df.reader()) @@ -2379,16 +2396,19 @@ class TestObjectReconstructor(unittest.TestCase): # the scheme is 10+4, so this gets a parity node broken_body = ec_archive_bodies.pop(-4) - base_responses = list((200, body) for body in ec_archive_bodies) + responses = list() + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + responses.append((200, body, headers)) + for error in (Timeout(), 404, Exception('kaboom!')): - responses = list(base_responses) # grab a data node index error_index = random.randint(0, self.policy.ec_ndata - 1) - responses[error_index] = (error, '') - headers = {'X-Object-Sysmeta-Ec-Etag': etag} - codes, body_iter = zip(*responses) + responses[error_index] = (error, '', '') + codes, body_iter, headers_iter = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, - headers=headers): + headers=headers_iter): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) fixed_body = ''.join(df.reader()) @@ -2435,23 +2455,28 @@ class TestObjectReconstructor(unittest.TestCase): ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) # bad response - bad_response = (200, '', { + bad_headers = { 'X-Object-Sysmeta-Ec-Etag': 'some garbage', 'X-Backend-Timestamp': next(ts).internal, - }) + } # good responses - headers = { - 'X-Object-Sysmeta-Ec-Etag': etag, - 'X-Backend-Timestamp': next(ts).internal - } - responses = [(200, body, headers) - for body in ec_archive_bodies] + responses = list() + t1 = next(ts).internal + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': t1}) + responses.append((200, body, headers)) + # mixed together - error_index = random.randint(0, len(responses) - 2) + error_index = random.randint(0, self.policy.ec_ndata) + error_headers = get_header_frag_index(self, + (responses[error_index])[1]) + error_headers.update(bad_headers) + bad_response = (200, '', bad_headers) responses[error_index] = bad_response codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): @@ -2480,18 +2505,19 @@ class TestObjectReconstructor(unittest.TestCase): ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) broken_body = ec_archive_bodies.pop(1) - ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + # good responses - headers = { - 'X-Object-Sysmeta-Ec-Etag': etag, - 'X-Backend-Timestamp': next(ts).internal - } - responses = [(200, body, headers) - for body in ec_archive_bodies] - codes, body_iter, headers = zip(*responses) + responses = list() + t0 = next(ts).internal + for body in ec_archive_bodies: + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': t0}) + responses.append((200, body, headers)) # sanity check before negative test + codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( job, node, dict(metadata)) @@ -2501,17 +2527,100 @@ class TestObjectReconstructor(unittest.TestCase): md5(broken_body).hexdigest()) # one newer etag can spoil the bunch - new_response = (200, '', { - 'X-Object-Sysmeta-Ec-Etag': 'some garbage', - 'X-Backend-Timestamp': next(ts).internal, - }) new_index = random.randint(0, len(responses) - self.policy.ec_nparity) + new_headers = get_header_frag_index(self, (responses[new_index])[1]) + new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage', + 'X-Backend-Timestamp': next(ts).internal}) + new_response = (200, '', new_headers) responses[new_index] = new_response codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, job, node, dict(metadata)) + def test_reconstruct_fa_finds_itself_does_not_fail(self): + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + metadata = { + 'name': '/a/c/o', + 'Content-Length': 0, + 'ETag': 'etag', + } + + test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data).hexdigest() + ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + + # instead of popping the broken body, we'll just leave it in the list + # of responses and take away something else. + broken_body = ec_archive_bodies[1] + ec_archive_bodies = ec_archive_bodies[:-1] + + def make_header(body): + metadata = self.policy.pyeclib_driver.get_metadata(body) + frag_index = struct.unpack('h', metadata[:2])[0] + return { + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + 'X-Object-Sysmeta-Ec-Etag': etag, + } + + responses = [(200, body, make_header(body)) + for body in ec_archive_bodies] + codes, body_iter, headers = zip(*responses) + with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): + df = self.reconstructor.reconstruct_fa( + job, node, metadata) + fixed_body = ''.join(df.reader()) + self.assertEqual(len(fixed_body), len(broken_body)) + self.assertEqual(md5(fixed_body).hexdigest(), + md5(broken_body).hexdigest()) + + def test_reconstruct_fa_finds_duplicate_does_not_fail(self): + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + metadata = { + 'name': '/a/c/o', + 'Content-Length': 0, + 'ETag': 'etag', + } + + test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] + etag = md5(test_data).hexdigest() + ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) + + broken_body = ec_archive_bodies.pop(1) + # add some duplicates + num_duplicates = self.policy.ec_nparity - 1 + ec_archive_bodies = (ec_archive_bodies[:num_duplicates] + + ec_archive_bodies)[:-num_duplicates] + + def make_header(body): + metadata = self.policy.pyeclib_driver.get_metadata(body) + frag_index = struct.unpack('h', metadata[:2])[0] + return { + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + 'X-Object-Sysmeta-Ec-Etag': etag, + } + + responses = [(200, body, make_header(body)) + for body in ec_archive_bodies] + codes, body_iter, headers = zip(*responses) + with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): + df = self.reconstructor.reconstruct_fa( + job, node, metadata) + fixed_body = ''.join(df.reader()) + self.assertEqual(len(fixed_body), len(broken_body)) + self.assertEqual(md5(fixed_body).hexdigest(), + md5(broken_body).hexdigest()) + if __name__ == '__main__': unittest.main()