diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index c74c06a4df..64497552e3 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -369,14 +369,13 @@ class ObjectReconstructor(Daemon): :returns: a DiskFile like class for use by ssync :raises DiskFileError: if the fragment archive cannot be reconstructed """ - - part_nodes = job['policy'].object_ring.get_part_nodes( - job['partition']) - part_nodes.remove(node) + # don't try and fetch a fragment from the node we're rebuilding to + part_nodes = [n for n in job['policy'].object_ring.get_part_nodes( + job['partition']) if n['id'] != node['id']] # the fragment index we need to reconstruct is the position index # of the node we're rebuilding to within the primary part list - fi_to_rebuild = job['policy'].get_backend_index(node['index']) + fi_to_rebuild = node['backend_index'] # KISS send out connection requests to all nodes, see what sticks. # Use fragment preferences header to tell other nodes that we want @@ -829,6 +828,8 @@ class ObjectReconstructor(Daemon): syncd_with += 1 continue + node['backend_index'] = job['policy'].get_backend_index( + node['index']) # ssync any out-of-sync suffixes with the remote node success, _ = ssync_sender( self, node, job, suffixes)() @@ -850,6 +851,8 @@ class ObjectReconstructor(Daemon): syncd_with = 0 reverted_objs = {} for node in job['sync_to']: + node['backend_index'] = job['policy'].get_backend_index( + node['index']) success, in_sync_objs = ssync_sender( self, node, job, job['suffixes'])() if success: diff --git a/swift/obj/server.py b/swift/obj/server.py index f5897327a7..302831b58b 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -766,9 +766,8 @@ class ObjectController(BaseStorageServer): except ValueError as e: raise HTTPBadRequest(body=str(e), request=request, content_type='text/plain') - # SSYNC will include Frag-Index header for subrequests to primary - # nodes; handoff nodes should 409 subrequests to over-write an - # existing data fragment until they offloaded the existing fragment + # SSYNC will include Frag-Index header for subrequests, in which case + # get_diskfile will ignore non-matching on-disk data files frag_index = request.headers.get('X-Backend-Ssync-Frag-Index') next_part_power = request.headers.get('X-Backend-Next-Part-Power') try: diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 68844dd01e..ebce8569ba 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -219,7 +219,7 @@ class Receiver(object): self.device, self.partition, self.policy = \ request_helpers.get_name_and_placement(self.request, 2, 2, False) - self.frag_index = self.node_index = None + self.frag_index = None if self.request.headers.get('X-Backend-Ssync-Frag-Index'): try: self.frag_index = int( @@ -228,19 +228,6 @@ class Receiver(object): raise swob.HTTPBadRequest( 'Invalid X-Backend-Ssync-Frag-Index %r' % self.request.headers['X-Backend-Ssync-Frag-Index']) - if self.request.headers.get('X-Backend-Ssync-Node-Index'): - try: - self.node_index = int( - self.request.headers['X-Backend-Ssync-Node-Index']) - except ValueError: - raise swob.HTTPBadRequest( - 'Invalid X-Backend-Ssync-Node-Index %r' % - self.request.headers['X-Backend-Ssync-Node-Index']) - if self.node_index != self.frag_index: - # a primary node should only receive it's own fragments - raise swob.HTTPBadRequest( - 'Frag-Index (%s) != Node-Index (%s)' % ( - self.frag_index, self.node_index)) utils.validate_device_partition(self.device, self.partition) self.diskfile_mgr = self.app._diskfile_router[self.policy] if not self.diskfile_mgr.get_dev_path(self.device): @@ -476,9 +463,9 @@ class Receiver(object): raise Exception('Invalid subrequest method %s' % method) subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy) subreq.headers['X-Backend-Replication'] = 'True' - if self.node_index is not None: + if self.frag_index is not None: # primary node should not 409 if it has a non-primary fragment - subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index + subreq.headers['X-Backend-Ssync-Frag-Index'] = self.frag_index if replication_headers: subreq.headers['X-Backend-Replication-Headers'] = \ ' '.join(replication_headers) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index c628c531d5..da9ca63f1d 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -231,21 +231,14 @@ class Sender(object): connection.putheader('Transfer-Encoding', 'chunked') connection.putheader('X-Backend-Storage-Policy-Index', int(self.job['policy'])) - # a sync job must use the node's index for the frag_index of the - # rebuilt fragments instead of the frag_index from the job which - # will be rebuilding them - frag_index = self.node.get('index', self.job.get('frag_index')) - if frag_index is None: - # replication jobs will not have a frag_index key; - # reconstructor jobs with only tombstones will have a - # frag_index key explicitly set to the value of None - in both - # cases on the wire we write the empty string which - # ssync_receiver will translate to None - frag_index = '' - connection.putheader('X-Backend-Ssync-Frag-Index', frag_index) - # a revert job to a handoff will not have a node index - connection.putheader('X-Backend-Ssync-Node-Index', - self.node.get('index', '')) + # a sync job must use the node's backend_index for the frag_index + # of the rebuilt fragments instead of the frag_index from the job + # which will be rebuilding them + frag_index = self.node.get('backend_index') + if frag_index is not None: + connection.putheader('X-Backend-Ssync-Frag-Index', frag_index) + # Node-Index header is for backwards compat 2.4.0-2.20.0 + connection.putheader('X-Backend-Ssync-Node-Index', frag_index) connection.endheaders() with exceptions.MessageTimeout( self.daemon.node_timeout, 'connect receive'): diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 7173390f3a..6e48f92213 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -3628,6 +3628,92 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): set(c['suffixes']), ) for c in ssync_calls)) + def test_sync_duplicates_to_remote_region(self): + partition = 0 + part_nodes = self.policy.object_ring.get_part_nodes(partition) + # in the non-duplicate case we just pick a random node + local_dev = random.choice(part_nodes[-14:]) + frag_index = self.policy.get_backend_index(local_dev['index']) + sync_to = object_reconstructor._get_partners( + local_dev['index'], part_nodes) + part_path = os.path.join(self.devices, self.local_dev['device'], + diskfile.get_data_dir(self.policy), + str(partition)) + # setup left and right hashes + stub_hashes = { + '123': {frag_index: 'hash', None: 'hash'}, + 'abc': {frag_index: 'hash', None: 'hash'}, + } + # left hand side is in sync + left_frag_index = self.policy.get_backend_index(sync_to[0]['index']) + left_hashes = { + '123': {left_frag_index: 'hash', None: 'hash'}, + 'abc': {left_frag_index: 'hash', None: 'hash'}, + } + # right hand side needs sync + right_frag_index = self.policy.get_backend_index(sync_to[1]['index']) + right_hashes = { + '123': {right_frag_index: 'hash', None: 'hash'}, + 'abc': {right_frag_index: 'hashX', None: 'hash'}, + } + + job = { + 'job_type': object_reconstructor.SYNC, + 'frag_index': frag_index, + 'suffixes': stub_hashes.keys(), + 'sync_to': sync_to, + 'partition': partition, + 'path': part_path, + 'hashes': stub_hashes, + 'policy': self.policy, + 'local_dev': self.local_dev, + 'device': self.local_dev['device'], + } + + responses = [ + (200, pickle.dumps(left_hashes)), + (200, pickle.dumps(right_hashes)), + (200, pickle.dumps(right_hashes)), + ] + codes, body_iter = zip(*responses) + + # we're going to dip our mocks into the ssync layer a bit + ssync_resp = mock.MagicMock() + ssync_resp.status = 200 + ssync_resp.readline.side_effect = [ + ':MISSING_CHECK: START', + ':MISSING_CHECK: END', + ':UPDATES: START', + ':UPDATES: END', + ] + + ssync_headers = [] + + def capture_headers(name, value): + ssync_headers.append((name, value)) + + ssync_conn = mock.MagicMock() + ssync_conn.getresponse.return_value = ssync_resp + ssync_conn.putheader = capture_headers + + with mock.patch('swift.obj.ssync_sender.SsyncBufferedHTTPConnection', + return_value=ssync_conn), \ + mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes', + return_value=(None, stub_hashes)), \ + mock.patch('swift.obj.diskfile.ECDiskFileManager.yield_hashes', + return_value=iter([])), \ + mocked_http_conn(*codes, body_iter=body_iter): + self.reconstructor.process_job(job) + + # ... to make sure it sets up our headers correctly + self.assertEqual(ssync_headers, [ + ('Transfer-Encoding', 'chunked'), + ('X-Backend-Storage-Policy-Index', 0), + ('X-Backend-Ssync-Frag-Index', right_frag_index), + # we include this for backwards compat + ('X-Backend-Ssync-Node-Index', right_frag_index), + ]) + def test_process_job_sync_missing_durable(self): partition = 0 part_nodes = self.policy.object_ring.get_part_nodes(partition) @@ -4101,9 +4187,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(self.reconstructor.handoffs_remaining, 0) def test_process_job_revert_cleanup_tombstone(self): - sync_to = [random.choice([n for n in self.policy.object_ring.devs - if n != self.local_dev])] partition = 0 + sync_to = [random.choice([ + n for n in self.policy.object_ring.get_part_nodes(partition) + if n['id'] != self.local_dev['id']])] part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), @@ -4205,6 +4292,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4268,6 +4356,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[4] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4304,6 +4393,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[4] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4349,6 +4439,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[-4] + node['backend_index'] = self.policy.get_backend_index(node['index']) # make up some data (trim some amount to make it unaligned with # segment size) @@ -4385,6 +4476,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) policy = self.policy possible_errors = [Timeout(), Exception('kaboom!')] @@ -4414,6 +4506,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) policy = self.policy codes = [404 for i in range(policy.object_ring.replicas - 1)] @@ -4438,6 +4531,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4488,6 +4582,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4542,6 +4637,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4603,6 +4699,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] ec_archive_dict = dict() @@ -4677,7 +4774,9 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): 'policy': self.policy, } part_nodes = self.policy.object_ring.get_part_nodes(0) - broken_node = random.randint(0, self.policy.ec_ndata - 1) + broken_index = random.randint(0, self.policy.ec_ndata - 1) + node = part_nodes[broken_index] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4685,7 +4784,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # instead of popping the broken body, we'll just leave it in the list # of responses and take away something else. - broken_body = ec_archive_bodies[broken_node] + broken_body = ec_archive_bodies[broken_index] ec_archive_bodies = ec_archive_bodies[:-1] def make_header(body): @@ -4698,7 +4797,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): codes, body_iter, headers = zip(*responses) with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( - job, part_nodes[broken_node], self.obj_metadata) + job, node, self.obj_metadata) fixed_body = ''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(md5(fixed_body).hexdigest(), @@ -4711,7 +4810,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): debug_log_lines = self.logger.get_lines_for_level('debug') # redundant frag found once in first ec_ndata responses self.assertIn( - 'Found existing frag #%s at' % broken_node, + 'Found existing frag #%s at' % broken_index, debug_log_lines[0]) # N.B. in the future, we could avoid those check because @@ -4722,12 +4821,12 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # liberasurecode[1]. # 1: https://github.com/openstack/liberasurecode/blob/ # master/src/erasurecode.c#L870 - log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_node + log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_index self.assertIn(log_prefix, debug_log_lines[1]) self.assertFalse(debug_log_lines[2:]) got_frag_index_list = json.loads( debug_log_lines[1][len(log_prefix):]) - self.assertNotIn(broken_node, got_frag_index_list) + self.assertNotIn(broken_index, got_frag_index_list) def test_reconstruct_fa_finds_duplicate_does_not_fail(self): job = { @@ -4736,6 +4835,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4785,6 +4885,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4854,6 +4955,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] etag = md5(test_data).hexdigest() @@ -4928,6 +5030,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor): } part_nodes = self.policy.object_ring.get_part_nodes(0) node = part_nodes[index] + node['backend_index'] = self.policy.get_backend_index(node['index']) metadata = { 'name': '/a/c/o', 'Content-Length': 0, diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 5cf1067cde..1c6b1b20c7 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -312,6 +312,7 @@ class TestBaseSsyncEC(TestBaseSsync): self.policy = POLICIES.default self.logger = debug_logger('test-ssync-sender') self.daemon = ObjectReconstructor(self.daemon_conf, self.logger) + self.rx_node['backend_index'] = 0 def _get_object_data(self, path, frag_index=None, **kwargs): # return a frag archive for given object name and frag index. @@ -378,7 +379,6 @@ class TestSsyncEC(TestBaseSsyncEC): 'policy': policy, 'frag_index': frag_index} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -485,7 +485,6 @@ class TestSsyncEC(TestBaseSsyncEC): 'policy': policy, 'frag_index': frag_index} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -583,7 +582,6 @@ class TestSsyncEC(TestBaseSsyncEC): 'frag_index': frag_index, 'sync_diskfile_builder': fake_reconstruct_fa} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -667,10 +665,11 @@ class TestSsyncEC(TestBaseSsyncEC): def test_send_invalid_frag_index(self): policy = POLICIES.default - job = {'frag_index': 'Not a number', + job = {'frag_index': 'No one cares', 'device': self.device, 'partition': self.partition, 'policy': policy} + self.rx_node['backend_index'] = 'Not a number' sender = ssync_sender.Sender( self.daemon, self.rx_node, job, ['abc']) success, _ = sender() @@ -713,7 +712,6 @@ class TestSsyncEC(TestBaseSsyncEC): 'policy': policy, 'frag_index': frag_index} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -808,7 +806,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): os.path.basename(os.path.dirname(df._datadir))) self.job_node = dict(self.rx_node) - self.job_node['index'] = self.rx_node_index + self.job_node['id'] = 0 self.frag_length = int( self.tx_objs['o1'][0].get_metadata()['Content-Length']) @@ -1082,7 +1080,6 @@ class TestSsyncReplication(TestBaseSsync): def test_sync(self): policy = POLICIES.default - rx_node_index = 0 # create sender side diskfiles... tx_objs = {} @@ -1136,7 +1133,6 @@ class TestSsyncReplication(TestBaseSsync): 'partition': self.partition, 'policy': policy} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -1204,7 +1200,6 @@ class TestSsyncReplication(TestBaseSsync): def test_meta_file_sync(self): policy = POLICIES.default - rx_node_index = 0 # create diskfiles... tx_objs = {} @@ -1309,7 +1304,6 @@ class TestSsyncReplication(TestBaseSsync): 'partition': self.partition, 'policy': policy} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -1352,7 +1346,6 @@ class TestSsyncReplication(TestBaseSsync): def test_expired_object(self): # verify that expired objects sync policy = POLICIES.default - rx_node_index = 0 tx_df_mgr = self.daemon._df_router[policy] t1 = next(self.ts_iter) obj_name = 'o1' @@ -1370,7 +1363,6 @@ class TestSsyncReplication(TestBaseSsync): 'partition': self.partition, 'policy': policy} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -1387,7 +1379,6 @@ class TestSsyncReplication(TestBaseSsync): def _check_no_longer_expired_object(self, obj_name, df, policy): # verify that objects with x-delete-at metadata that are not expired # can be sync'd - rx_node_index = 0 def do_ssync(): # create ssync sender instance... @@ -1396,7 +1387,6 @@ class TestSsyncReplication(TestBaseSsync): 'partition': self.partition, 'policy': policy} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -1480,7 +1470,6 @@ class TestSsyncReplication(TestBaseSsync): # verify that the sender does sync a data file to a legacy receiver, # but does not PUT meta file content to a legacy receiver policy = POLICIES.default - rx_node_index = 0 # create diskfiles... tx_df_mgr = self.daemon._df_router[policy] @@ -1504,7 +1493,6 @@ class TestSsyncReplication(TestBaseSsync): 'partition': self.partition, 'policy': policy} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -1563,7 +1551,6 @@ class TestSsyncReplication(TestBaseSsync): def test_content_type_sync(self): policy = POLICIES.default - rx_node_index = 0 # create diskfiles... tx_objs = {} @@ -1675,7 +1662,6 @@ class TestSsyncReplication(TestBaseSsync): 'partition': self.partition, 'policy': policy} node = dict(self.rx_node) - node.update({'index': rx_node_index}) sender = ssync_sender.Sender(self.daemon, node, job, suffixes) # wrap connection from tx to rx to capture ssync messages... sender.connect, trace = self.make_connect_wrapper(sender) @@ -1721,5 +1707,6 @@ class TestSsyncReplication(TestBaseSsync): self.device, self.partition, suffixes, policy) self.assertEqual(tx_hashes, rx_hashes) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 90c309ae18..1818cb6453 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -211,7 +211,6 @@ class TestReceiver(unittest.TestCase): ':UPDATES: START', ':UPDATES: END']) self.assertEqual(rcvr.policy, POLICIES[1]) self.assertEqual(rcvr.frag_index, 7) - self.assertIsNone(rcvr.node_index) @unit.patch_policies() def test_Receiver_with_only_node_index_header(self): @@ -226,13 +225,17 @@ class TestReceiver(unittest.TestCase): body=':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n' ':UPDATES: START\r\n:UPDATES: END\r\n') - with self.assertRaises(HTTPException) as e: - ssync_receiver.Receiver(self.controller, req) - self.assertEqual(e.exception.status_int, 400) - # if a node index is included - it *must* be - # the same value of frag index - self.assertEqual(e.exception.body, - 'Frag-Index (None) != Node-Index (7)') + rcvr = ssync_receiver.Receiver(self.controller, req) + body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()] + self.assertEqual( + body_lines, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(rcvr.policy, POLICIES[1]) + # we used to require the reconstructor to send the frag_index twice as + # two different headers because of evolutionary reasons, now we ignore + # node_index + self.assertEqual(rcvr.frag_index, None) @unit.patch_policies() def test_Receiver_with_matched_indexes(self): @@ -256,7 +259,6 @@ class TestReceiver(unittest.TestCase): ':UPDATES: START', ':UPDATES: END']) self.assertEqual(rcvr.policy, POLICIES[1]) self.assertEqual(rcvr.frag_index, 7) - self.assertEqual(rcvr.node_index, 7) @unit.patch_policies() def test_Receiver_with_invalid_indexes(self): @@ -289,8 +291,16 @@ class TestReceiver(unittest.TestCase): body=':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n' ':UPDATES: START\r\n:UPDATES: END\r\n') - self.assertRaises(HTTPException, ssync_receiver.Receiver, - self.controller, req) + rcvr = ssync_receiver.Receiver(self.controller, req) + body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()] + self.assertEqual( + body_lines, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(rcvr.policy, POLICIES[1]) + # node_index if provided should always match frag_index; but if they + # differ, frag_index takes precedence + self.assertEqual(rcvr.frag_index, 7) def test_SSYNC_replication_lock_fail(self): def _mock(path, policy, partition): @@ -2057,7 +2067,8 @@ class TestSsyncRxServer(unittest.TestCase): sender = ssync_sender.Sender(self.daemon, node, job, ['abc']) # kick off the sender and let the error trigger failure - with mock.patch('swift.obj.ssync_receiver.Receiver.initialize_request')\ + with mock.patch( + 'swift.obj.ssync_receiver.Receiver.initialize_request') \ as mock_initialize_request: mock_initialize_request.side_effect = \ swob.HTTPInternalServerError() diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index f2a2aa54af..38b2792c39 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -182,7 +182,7 @@ class TestSender(BaseTest): def test_connect(self): node = dict(replication_ip='1.2.3.4', replication_port=5678, - device='sda1', index=0) + device='sda1', backend_index=0) job = dict(partition='9', policy=POLICIES[1]) self.sender = ssync_sender.Sender(self.daemon, node, job, None) self.sender.suffixes = ['abc'] @@ -236,8 +236,6 @@ class TestSender(BaseTest): 'putheader': [ mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 1), - mock.call('X-Backend-Ssync-Frag-Index', 9), - mock.call('X-Backend-Ssync-Node-Index', ''), ], 'endheaders': [mock.call()], } @@ -270,8 +268,6 @@ class TestSender(BaseTest): 'putheader': [ mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 0), - mock.call('X-Backend-Ssync-Frag-Index', ''), - mock.call('X-Backend-Ssync-Node-Index', ''), ], 'endheaders': [mock.call()], } @@ -304,8 +300,40 @@ class TestSender(BaseTest): 'putheader': [ mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 1), - mock.call('X-Backend-Ssync-Frag-Index', ''), - mock.call('X-Backend-Ssync-Node-Index', ''), + ], + 'endheaders': [mock.call()], + } + for method_name, expected_calls in expectations.items(): + mock_method = getattr(mock_conn, method_name) + self.assertEqual(expected_calls, mock_method.mock_calls, + 'connection method "%s" got %r not %r' % ( + method_name, mock_method.mock_calls, + expected_calls)) + + def test_connect_handoff_none_frag_to_primary(self): + node = dict(replication_ip='1.2.3.4', replication_port=5678, + device='sda1', backend_index=42) + job = dict(partition='9', policy=POLICIES[1], frag_index=None) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) + self.sender.suffixes = ['abc'] + with mock.patch( + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' + ) as mock_conn_class: + mock_conn = mock_conn_class.return_value + mock_resp = mock.MagicMock() + mock_resp.status = 200 + mock_conn.getresponse.return_value = mock_resp + self.sender.connect() + mock_conn_class.assert_called_once_with('1.2.3.4:5678') + expectations = { + 'putrequest': [ + mock.call('SSYNC', '/sda1/9'), + ], + 'putheader': [ + mock.call('Transfer-Encoding', 'chunked'), + mock.call('X-Backend-Storage-Policy-Index', 1), + mock.call('X-Backend-Ssync-Frag-Index', 42), + mock.call('X-Backend-Ssync-Node-Index', 42), ], 'endheaders': [mock.call()], } @@ -339,8 +367,6 @@ class TestSender(BaseTest): 'putheader': [ mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 1), - mock.call('X-Backend-Ssync-Frag-Index', ''), - mock.call('X-Backend-Ssync-Node-Index', ''), ], 'endheaders': [mock.call()], }