From a2f4046624190584305eab77bd1fbb623caa7be3 Mon Sep 17 00:00:00 2001 From: Kota Tsuyuzaki Date: Tue, 28 Feb 2017 00:52:06 -0800 Subject: [PATCH] Small fixes for ec duplication To address Alistair's comment at https://review.openstack.org/#/c/219165. This includes: - Fix reconstructor log message to avoid redundant frag index info - Fix incorrect FabricatedRing setting to have ec_k + ec_m replicas - Use policy.ec_n_unique_fragments for testing frag index election - Plus some various minor cleanup and docs additions Huge refactoring around TestECMixin at the test/unit/proxy/test_server.py is in https://review.openstack.org/#/c/440466/ to clarify the change. Co-Authored-By: Alistair Coles Change-Id: I8096202f5f8d91296963f7a409a29d57fa7828e4 --- swift/obj/reconstructor.py | 24 +++++----- test/unit/obj/test_reconstructor.py | 64 +++++++++++++++---------- test/unit/proxy/controllers/test_obj.py | 13 +++-- 3 files changed, 60 insertions(+), 41 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 3434a026e4..8601e8d1e2 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -193,17 +193,14 @@ class ObjectReconstructor(Daemon): return True def _full_path(self, node, part, path, policy): - frag_index = (policy.get_backend_index(node['index']) - if 'index' in node else 'handoff') return '%(replication_ip)s:%(replication_port)s' \ '/%(device)s/%(part)s%(path)s ' \ - 'policy#%(policy)d frag#%(frag_index)s' % { + 'policy#%(policy)d' % { 'replication_ip': node['replication_ip'], 'replication_port': node['replication_port'], 'device': node['device'], 'part': part, 'path': path, 'policy': policy, - 'frag_index': frag_index, } def _get_response(self, node, part, path, headers, policy): @@ -308,8 +305,9 @@ class ObjectReconstructor(Daemon): # the remote node to our target instead of rebuild. But # instead we ignore it. self.logger.debug( - 'Found existing frag #%s while rebuilding #%s from %s', - unique_index, fi_to_rebuild, self._full_path( + 'Found existing frag #%s at %s while rebuilding to %s', + fi_to_rebuild, resp.full_path, + self._full_path( node, job['partition'], datafile_metadata['name'], job['policy'])) continue @@ -330,10 +328,11 @@ class ObjectReconstructor(Daemon): if etag != etag_buckets.setdefault(timestamp, etag): self.logger.error( - 'Mixed Etag (%s, %s) for %s', + 'Mixed Etag (%s, %s) for %s frag#%s', etag, etag_buckets[timestamp], self._full_path(node, job['partition'], - datafile_metadata['name'], job['policy'])) + datafile_metadata['name'], job['policy']), + fi_to_rebuild) continue if unique_index not in buckets[timestamp]: @@ -349,21 +348,22 @@ class ObjectReconstructor(Daemon): etag = etag_buckets[timestamp] self.logger.error( 'Unable to get enough responses (%s/%s) ' - 'to reconstruct %s with ETag %s' % ( + 'to reconstruct %s frag#%s with ETag %s' % ( len(resp), job['policy'].ec_ndata, self._full_path(node, job['partition'], datafile_metadata['name'], job['policy']), - etag)) + fi_to_rebuild, etag)) if error_resp_count: self.logger.error( 'Unable to get enough responses (%s error responses) ' - 'to reconstruct %s' % ( + 'to reconstruct %s frag#%s' % ( error_resp_count, self._full_path(node, job['partition'], datafile_metadata['name'], - job['policy']))) + job['policy']), + fi_to_rebuild)) raise DiskFileError('Unable to reconstruct EC archive') diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 16881432f7..78d1246597 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -645,14 +645,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_get_response(self): part = self.part_nums[0] - node = POLICIES[1].object_ring.get_part_nodes(int(part))[0] + node = self.policy.object_ring.get_part_nodes(int(part))[0] def do_test(stat_code): with mocked_http_conn(stat_code): resp = self.reconstructor._get_response(node, part, path='nada', headers={}, - policy=POLICIES[1]) + policy=self.policy) return resp resp = do_test(200) @@ -1031,9 +1031,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): stat_method, stat_prefix = stat_key self.assertStatCount(stat_method, stat_prefix, expected) # part 2 should be totally empty - policy = POLICIES[1] - hash_gen = self.reconstructor._df_router[policy].yield_hashes( - 'sda1', '2', policy) + hash_gen = self.reconstructor._df_router[self.policy].yield_hashes( + 'sda1', '2', self.policy) for path, hash_, ts in hash_gen: self.fail('found %s with %s in %s' % (hash_, ts, path)) # but the partition directory and hashes pkl still exist @@ -1180,7 +1179,7 @@ class TestObjectReconstructor(unittest.TestCase): self.policy.object_ring.max_more_nodes = \ self.policy.object_ring.replicas self.ts_iter = make_timestamp_iter() - self.fabricated_ring = FabricatedRing() + self.fabricated_ring = FabricatedRing(replicas=14, devices=28) def _configure_reconstructor(self, **kwargs): self.conf.update(kwargs) @@ -1885,8 +1884,7 @@ class TestObjectReconstructor(unittest.TestCase): } # since this part doesn't belong on us it doesn't matter what # frag_index we have - frag_index = self.policy.get_backend_index( - random.randint(0, ring.replicas - 1)) + frag_index = random.randint(0, self.policy.ec_n_unique_fragments - 1) stub_hashes = { '123': {frag_index: 'hash', None: 'hash'}, 'abc': {None: 'hash'}, @@ -1940,9 +1938,9 @@ class TestObjectReconstructor(unittest.TestCase): 'part_path': part_path, } frag_index = self.policy.get_backend_index(node_index) - other_frag_index = self.policy.get_backend_index( - random.choice( - [f for f in range(ring.replicas) if f != node_index])) + other_frag_index = random.choice( + [f for f in range(self.policy.ec_n_unique_fragments) + if f != node_index]) stub_hashes = { '123': {frag_index: 'hash', None: 'hash'}, '456': {other_frag_index: 'hash', None: 'hash'}, @@ -3232,6 +3230,9 @@ class TestObjectReconstructor(unittest.TestCase): self.fail( "no expected etag %s found: %s" % (list(ec_archive_dict), error_line)) + # remove the found etag which should not be found in the + # following error lines + del ec_archive_dict[(expected_etag, ts)] expected = 'Unable to get enough responses (%s/10) to ' \ 'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \ @@ -3244,12 +3245,15 @@ class TestObjectReconstructor(unittest.TestCase): self.assertFalse(self.logger.get_lines_for_level('warning')) def test_reconstruct_fa_finds_itself_does_not_fail(self): + # verify that reconstruction of a missing frag can cope with finding + # that missing frag in the responses it gets from other nodes while + # attempting to rebuild the missing frag job = { 'partition': 0, 'policy': self.policy, } part_nodes = self.policy.object_ring.get_part_nodes(0) - node = part_nodes[1] + broken_node = random.randint(0, self.policy.ec_ndata - 1) metadata = { 'name': '/a/c/o', 'Content-Length': 0, @@ -3263,7 +3267,7 @@ class TestObjectReconstructor(unittest.TestCase): # instead of popping the broken body, we'll just leave it in the list # of responses and take away something else. - broken_body = ec_archive_bodies[1] + broken_body = ec_archive_bodies[broken_node] ec_archive_bodies = ec_archive_bodies[:-1] def make_header(body): @@ -3276,22 +3280,21 @@ class TestObjectReconstructor(unittest.TestCase): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, node, metadata) + job, part_nodes[broken_node], metadata) fixed_body = ''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(md5(fixed_body).hexdigest(), md5(broken_body).hexdigest()) - # no error and warning + # no error, no warning self.assertFalse(self.logger.get_lines_for_level('error')) self.assertFalse(self.logger.get_lines_for_level('warning')) # the found own frag will be reported in the debug message debug_log_lines = self.logger.get_lines_for_level('debug') - self.assertEqual(2, len(debug_log_lines)) + # redundant frag found once in first ec_ndata responses self.assertIn( - 'Found existing frag #1 while rebuilding #1 from', + 'Found existing frag #%s at' % broken_node, debug_log_lines[0]) - # ... and then, it should be skipped in the responses # N.B. in the future, we could avoid those check because # definitely sending the copy rather than reconstruct will @@ -3301,11 +3304,12 @@ class TestObjectReconstructor(unittest.TestCase): # liberasurecode[1]. # 1: https://github.com/openstack/liberasurecode/blob/ # master/src/erasurecode.c#L870 - expected_prefix = 'Reconstruct frag #1 with frag indexes' - self.assertIn(expected_prefix, debug_log_lines[1]) + log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_node + self.assertIn(log_prefix, debug_log_lines[1]) + self.assertFalse(debug_log_lines[2:]) got_frag_index_list = json.loads( - debug_log_lines[1][len(expected_prefix):]) - self.assertNotIn(1, got_frag_index_list) + debug_log_lines[1][len(log_prefix):]) + self.assertNotIn(broken_node, got_frag_index_list) def test_reconstruct_fa_finds_duplicate_does_not_fail(self): job = { @@ -3369,11 +3373,13 @@ class TestObjectReconstructor(unittest.TestCase): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + ts = make_timestamp_iter() + timestamp = next(ts) metadata = { 'name': '/a/c/o', 'Content-Length': 0, 'ETag': 'etag', - 'X-Timestamp': '1234567890.12345' + 'X-Timestamp': timestamp.normal } test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] @@ -3384,7 +3390,9 @@ class TestObjectReconstructor(unittest.TestCase): def make_header(body): headers = get_header_frag_index(self, body) - headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) + headers.update( + {'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': timestamp.internal}) return headers def test_missing_header(missing_header, expected_warning): @@ -3392,6 +3400,10 @@ class TestObjectReconstructor(unittest.TestCase): responses = [(200, body, make_header(body)) for body in ec_archive_bodies] + # To drop the header from the response[0], set None as the value + # explicitly instead of deleting the key because if no key exists + # in the dict, fake_http_connect will insert some key/value pairs + # automatically (e.g. X-Backend-Timestamp) responses[0][2].update({missing_header: None}) codes, body_iter, headers = zip(*responses) @@ -3412,7 +3424,7 @@ class TestObjectReconstructor(unittest.TestCase): self.assertIn(expected_warning, warning_log_lines) message_base = \ - "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0 frag#0" + "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0" test_missing_header( 'X-Object-Sysmeta-Ec-Etag', @@ -3479,7 +3491,7 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(1, len(warning_log_lines)) expected_message = \ "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \ - "policy#0 frag#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)" + "policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)" self.assertIn(expected_message, warning_log_lines) for value in ('None', 'invalid'): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 16789b5ccc..ad54832f4b 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -4304,6 +4304,9 @@ class TestECDuplicationObjController( controller = self.controller_cls( self.app, 'a', 'c', 'o') + # sanity, tester should set unique frag index to the variable + self.assertEqual( + unique, self.policy.get_backend_index(unique)) # create a dummy list of putters, check no handoffs putters = [] for index in range(self.policy.object_ring.replica_count): @@ -4327,9 +4330,13 @@ class TestECDuplicationObjController( # pop one more fragment too to make one missing hole putters.pop(one_more_missing) - # then determine chunk, we have 26 putters here and unique frag - # index 0 missing 2 copies and unique frag index 1 missing 1 copy - # i.e. the handoff node should be assigned to unique frag index 1 + # we have 26 putters here and frag index "unique" missing 2 + # copies. (i.e. missing "unique" and "duplicated" which + # should be same unique index). Then "one_more_missing" frag + # is different from both of the "unique" and "duplicated" + # but it's only 1 copy missing so that the handoff node should + # be assigned to either "unique" or "duplicated" prior to + # "one_more_missing" got = controller._determine_chunk_destinations(putters, self.policy) # N.B. len(putters) is now len(expected - 2) due to pop twice self.assertEqual(len(putters), len(got))