diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index ba600ae1af..3895cb356a 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -1747,7 +1747,12 @@ class MIMEPutter(Putter): mime_boundary, multiphase=need_multiphase) -def chunk_transformer(policy, nstreams): +def chunk_transformer(policy): + """ + A generator to transform a source chunk to erasure coded chunks for each + `send` call. The number of erasure coded chunks is as + policy.ec_n_unique_fragments. + """ segment_size = policy.ec_segment_size buf = collections.deque() @@ -1775,9 +1780,8 @@ def chunk_transformer(policy, nstreams): frags_by_byte_order = [] for chunk_to_encode in chunks_to_encode: - encoded_chunks = policy.pyeclib_driver.encode(chunk_to_encode) - send_chunks = encoded_chunks * policy.ec_duplication_factor - frags_by_byte_order.append(send_chunks) + frags_by_byte_order.append( + policy.pyeclib_driver.encode(chunk_to_encode)) # Sequential calls to encode() have given us a list that # looks like this: # @@ -1802,9 +1806,9 @@ def chunk_transformer(policy, nstreams): last_bytes = ''.join(buf) if last_bytes: last_frags = policy.pyeclib_driver.encode(last_bytes) - yield last_frags * policy.ec_duplication_factor + yield last_frags else: - yield [''] * nstreams + yield [''] * policy.ec_n_unique_fragments def trailing_metadata(policy, client_obj_hasher, @@ -2330,28 +2334,29 @@ class ECObjectController(BaseObjectController): def _determine_chunk_destinations(self, putters, policy): """ Given a list of putters, return a dict where the key is the putter - and the value is the node index to use. + and the value is the frag index to use. - This is done so that we line up handoffs using the same node index + This is done so that we line up handoffs using the same frag index (in the primary part list) as the primary that the handoff is standing in for. This lets erasure-code fragment archives wind up on the preferred local primary nodes when possible. :param putters: a list of swift.proxy.controllers.obj.MIMEPutter instance - :param policy: A policy instance + :param policy: A policy instance which should be one of ECStoragePolicy """ - # Give each putter a "chunk index": the index of the + # Give each putter a "frag index": the index of the # transformed chunk that we'll send to it. # # For primary nodes, that's just its index (primary 0 gets # chunk 0, primary 1 gets chunk 1, and so on). For handoffs, # we assign the chunk index of a missing primary. handoff_conns = [] - chunk_index = {} + putter_to_frag_index = {} for p in putters: if p.node_index is not None: - chunk_index[p] = p.node_index + putter_to_frag_index[p] = policy.get_backend_index( + p.node_index) else: handoff_conns.append(p) @@ -2362,35 +2367,33 @@ class ECObjectController(BaseObjectController): # returns 507, in which case a handoff is used to replace it. # lack_list is a dict of list to keep hole indexes - # e.g. if we have 2 holes for index 0 with ec_duplication_factor=2 + # e.g. if we have 2 holes for frag index 0 with ec_duplication_factor=2 # lack_list is like {0: [0], 1: [0]}, and then, if 1 hole found - # for index 1, lack_list will be {0: [0, 1], 1: [0]}. + # for frag index 1, lack_list will be {0: [0, 1], 1: [0]}. # After that, holes will be filled from bigger key # (i.e. 1:[0] at first) - # Grouping all missing fragment indexes for each unique_index - unique_index_to_holes = collections.defaultdict(list) - available_indexes = chunk_index.values() - for node_index in range(policy.object_ring.replica_count): - if node_index not in available_indexes: - unique_index = policy.get_backend_index(node_index) - unique_index_to_holes[unique_index].append(node_index) - - # Set the missing index to lack_list + # Grouping all missing fragment indexes for each frag_index + available_indexes = putter_to_frag_index.values() lack_list = collections.defaultdict(list) - for unique_index, holes in unique_index_to_holes.items(): - for lack_tier, hole_node_index in enumerate(holes): - lack_list[lack_tier].append(hole_node_index) + for frag_index in range(policy.ec_n_unique_fragments): + # Set the missing index to lack_list + available_count = available_indexes.count(frag_index) + # N.B. it should be duplication_factor >= lack >= 0 + lack = policy.ec_duplication_factor - available_count + # now we are missing one or more nodes to store the frag index + for lack_tier in range(lack): + lack_list[lack_tier].append(frag_index) # Extract the lack_list to a flat list holes = [] for lack_tier, indexes in sorted(lack_list.items(), reverse=True): holes.extend(indexes) - # Fill chunk_index list with the hole list + # Fill putter_to_frag_index list with the hole list for hole, p in zip(holes, handoff_conns): - chunk_index[p] = hole - return chunk_index + putter_to_frag_index[p] = hole + return putter_to_frag_index def _transfer_data(self, req, policy, data_source, putters, nodes, min_conns, etag_hasher): @@ -2400,15 +2403,15 @@ class ECObjectController(BaseObjectController): This method was added in the PUT method extraction change """ bytes_transferred = 0 - chunk_transform = chunk_transformer(policy, len(nodes)) + chunk_transform = chunk_transformer(policy) chunk_transform.send(None) - chunk_hashers = collections.defaultdict(md5) + frag_hashers = collections.defaultdict(md5) def send_chunk(chunk): # Note: there's two different hashers in here. etag_hasher is # hashing the original object so that we can validate the ETag # that the client sent (and etag_hasher is None if the client - # didn't send one). The hasher in chunk_hashers is hashing the + # didn't send one). The hasher in frag_hashers is hashing the # fragment archive being sent to the client; this lets us guard # against data corruption on the network between proxy and # object server. @@ -2420,11 +2423,17 @@ class ECObjectController(BaseObjectController): # or whatever we're doing, the transform will give us None. return + updated_frag_indexes = set() for putter in list(putters): - ci = chunk_index[putter] - backend_chunk = backend_chunks[ci] + frag_index = putter_to_frag_index[putter] + backend_chunk = backend_chunks[frag_index] if not putter.failed: - chunk_hashers[ci].update(backend_chunk) + # N.B. same frag_index will appear when using + # ec_duplication_factor >= 2. So skip to feed the chunk + # to hasher if the frag was updated already. + if frag_index not in updated_frag_indexes: + frag_hashers[frag_index].update(backend_chunk) + updated_frag_indexes.add(frag_index) putter.send_chunk(backend_chunk) else: putter.close() @@ -2437,9 +2446,9 @@ class ECObjectController(BaseObjectController): try: with ContextPool(len(putters)) as pool: - # build our chunk index dict to place handoffs in the + # build our putter_to_frag_index dict to place handoffs in the # same part nodes index as the primaries they are covering - chunk_index = self._determine_chunk_destinations( + putter_to_frag_index = self._determine_chunk_destinations( putters, policy) for putter in putters: @@ -2487,14 +2496,14 @@ class ECObjectController(BaseObjectController): footers = {(k, v) for k, v in footers.items() if not k.lower().startswith('x-object-sysmeta-ec-')} for putter in putters: - ci = chunk_index[putter] + frag_index = putter_to_frag_index[putter] # Update any footers set by middleware with EC footers trail_md = trailing_metadata( policy, etag_hasher, - bytes_transferred, policy.get_backend_index(ci)) + bytes_transferred, frag_index) trail_md.update(footers) # Etag footer must always be hash of what we sent - trail_md['Etag'] = chunk_hashers[ci].hexdigest() + trail_md['Etag'] = frag_hashers[frag_index].hexdigest() putter.end_of_object_data(footer_metadata=trail_md) for putter in putters: diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index a73608edd3..00c2744e03 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -3663,21 +3663,20 @@ class TestECFunctions(unittest.TestCase): ec_segment_size=segment_size, ec_duplication_factor=dup) expected = policy.pyeclib_driver.encode(orig_chunk) - transform = obj.chunk_transformer( - policy, policy.object_ring.replica_count) + transform = obj.chunk_transformer(policy) transform.send(None) backend_chunks = transform.send(orig_chunk) self.assertIsNotNone(backend_chunks) # sanity self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual(expected * dup, backend_chunks) + len(backend_chunks), policy.ec_n_unique_fragments) + self.assertEqual(expected, backend_chunks) # flush out last chunk buffer backend_chunks = transform.send('') self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual([''] * policy.object_ring.replica_count, + len(backend_chunks), policy.ec_n_unique_fragments) + self.assertEqual([''] * policy.ec_n_unique_fragments, backend_chunks) do_test(1) do_test(2) @@ -3693,8 +3692,7 @@ class TestECFunctions(unittest.TestCase): ec_segment_size=1024, ec_duplication_factor=dup) expected = policy.pyeclib_driver.encode(last_chunk) - transform = obj.chunk_transformer( - policy, policy.object_ring.replica_count) + transform = obj.chunk_transformer(policy) transform.send(None) transform.send(last_chunk) @@ -3702,8 +3700,9 @@ class TestECFunctions(unittest.TestCase): backend_chunks = transform.send('') self.assertEqual( - len(backend_chunks), policy.object_ring.replica_count) - self.assertEqual(expected * dup, backend_chunks) + len(backend_chunks), policy.ec_n_unique_fragments) + self.assertEqual(expected, backend_chunks) + do_test(1) do_test(2) @@ -4310,12 +4309,13 @@ class TestECDuplicationObjController( got = controller._determine_chunk_destinations(putters, self.policy) expected = {} for i, p in enumerate(putters): - expected[p] = i + expected[p] = self.policy.get_backend_index(i) # sanity self.assertEqual(got, expected) # now lets make an unique fragment as handoffs - putters[unique].node_index = None + handoff_putter = putters[unique] + handoff_putter.node_index = None # and then, pop a fragment which has same fragment index with unique self.assertEqual( @@ -4329,21 +4329,20 @@ class TestECDuplicationObjController( # index 0 missing 2 copies and unique frag index 1 missing 1 copy # i.e. the handoff node should be assigned to unique frag index 1 got = controller._determine_chunk_destinations(putters, self.policy) - self.assertEqual(len(expected) - 2, len(got)) - - # index one_more_missing should not be choosen - self.assertNotIn(one_more_missing, got.values()) - # either index unique or duplicated should be in the got dict - self.assertTrue( - any([unique in got.values(), duplicated in got.values()])) - # but it's not both - self.assertFalse( - all([unique in got.values(), duplicated in got.values()])) + # N.B. len(putters) is now len(expected - 2) due to pop twice + self.assertEqual(len(putters), len(got)) + # sanity, no node index - for handoff putter + self.assertIsNone(handoff_putter.node_index) + self.assertEqual(got[handoff_putter], unique) + # sanity, other nodes execpt handoff_putter have node_index + self.assertTrue(all( + [putter.node_index for putter in got if + putter != handoff_putter])) def test_determine_chunk_destinations_prioritize_more_missing(self): - # drop 0, 14 and 1 should work + # drop node_index 0, 14 and 1 should work self._test_determine_chunk_destinations_prioritize(0, 14, 1) - # drop 1, 15 and 0 should work, too + # drop node_index 1, 15 and 0 should work, too self._test_determine_chunk_destinations_prioritize(1, 15, 0)