diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 2b54ab89d2..6eec06c50e 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -77,8 +77,7 @@ class RebuildingECDiskFileStream(object): self.datafile_metadata = datafile_metadata # the new FA is going to have the same length as others in the set - self._content_length = self.datafile_metadata['Content-Length'] - + self._content_length = int(self.datafile_metadata['Content-Length']) # update the FI and delete the ETag, the obj server will # recalc on the other side... self.datafile_metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 6859cff2fb..990bdb21a9 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -179,7 +179,7 @@ class Receiver(object): yield ':ERROR: %d %r\n' % (0, str(err)) except exceptions.MessageTimeout as err: self.app.logger.error( - '%s/%s/%s TIMEOUT in replication.Receiver: %s' % ( + '%s/%s/%s TIMEOUT in ssync.Receiver: %s' % ( self.request.remote_addr, self.device, self.partition, err)) yield ':ERROR: %d %r\n' % (408, str(err)) @@ -188,11 +188,11 @@ class Receiver(object): yield ':ERROR: %d %r\n' % (err.status_int, body) except Exception as err: self.app.logger.exception( - '%s/%s/%s EXCEPTION in replication.Receiver' % + '%s/%s/%s EXCEPTION in ssync.Receiver' % (self.request.remote_addr, self.device, self.partition)) yield ':ERROR: %d %r\n' % (0, str(err)) except Exception: - self.app.logger.exception('EXCEPTION in replication.Receiver') + self.app.logger.exception('EXCEPTION in ssync.Receiver') if self.disconnect: # This makes the socket close early so the remote side doesn't have # to send its whole request while the lower Eventlet-level just @@ -287,7 +287,7 @@ class Receiver(object): # if commit fails then log exception and fall back to wanting # a full update self.app.logger.exception( - '%s/%s/%s EXCEPTION in replication.Receiver while ' + '%s/%s/%s EXCEPTION in ssync.Receiver while ' 'attempting commit of %s' % (self.request.remote_addr, self.device, self.partition, df._datadir)) @@ -466,7 +466,7 @@ class Receiver(object): chunk = self.fp.read( min(left, self.app.network_chunk_size)) if not chunk: - raise Exception( + raise exceptions.ChunkReadError( 'Early termination for %s %s' % (method, path)) left -= len(chunk) yield chunk @@ -488,6 +488,9 @@ class Receiver(object): resp.status_int == http.HTTP_NOT_FOUND: successes += 1 else: + self.app.logger.warning( + 'ssync subrequest failed with %s: %s %s' % + (resp.status_int, method, subreq.path)) failures += 1 if failures >= self.app.replication_failure_threshold and ( not successes or diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index a509166b46..692dd45c5b 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -149,7 +149,7 @@ class Sender(object): # was originally written to shell out to rsync which would do # no such thing. self.daemon.logger.exception( - '%s:%s/%s/%s EXCEPTION in replication.Sender', + '%s:%s/%s/%s EXCEPTION in ssync.Sender', self.node.get('replication_ip'), self.node.get('replication_port'), self.node.get('device'), self.job.get('partition')) @@ -163,7 +163,7 @@ class Sender(object): # This particular exception handler does the minimal amount as it # would only get called if the above except Exception handler # failed (bad node or job data). - self.daemon.logger.exception('EXCEPTION in replication.Sender') + self.daemon.logger.exception('EXCEPTION in ssync.Sender') return False, {} def connect(self): @@ -350,6 +350,11 @@ class Sender(object): if want.get('data'): self.send_delete(url_path, err.timestamp) except exceptions.DiskFileError: + # DiskFileErrors are expected while opening the diskfile, + # before any data is read and sent. Since there is no partial + # state on the receiver it's ok to ignore this diskfile and + # continue. The diskfile may however be deleted after a + # successful ssync since it remains in the send_map. pass with exceptions.MessageTimeout( self.daemon.node_timeout, 'updates end'): @@ -404,10 +409,21 @@ class Sender(object): msg = '\r\n'.join(msg) + '\r\n\r\n' with exceptions.MessageTimeout(self.daemon.node_timeout, 'send_put'): self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) + bytes_read = 0 for chunk in df.reader(): + bytes_read += len(chunk) with exceptions.MessageTimeout( self.daemon.node_timeout, 'send_put chunk'): self.connection.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + if bytes_read != df.content_length: + # Since we may now have partial state on the receiver we have to + # prevent the receiver finalising what may well be a bad or + # partially written diskfile. Unfortunately we have no other option + # than to pull the plug on this ssync session. If ssync supported + # multiphase PUTs like the proxy uses for EC we could send a bad + # etag in a footer of this subrequest, but that is not supported. + raise exceptions.ReplicationException( + 'Sent data length does not match content-length') def send_post(self, url_path, df): metadata = df.get_metafile_metadata() diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 150627f758..1738f0435a 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -2506,7 +2506,7 @@ class TestObjectReconstructor(unittest.TestCase): node = part_nodes[1] metadata = { 'name': '/a/c/o', - 'Content-Length': 0, + 'Content-Length': '0', 'ETag': 'etag', } @@ -2539,6 +2539,7 @@ class TestObjectReconstructor(unittest.TestCase): *codes, body_iter=body_iter, headers=headers): df = self.reconstructor.reconstruct_fa( job, node, metadata) + self.assertEqual(0, df.content_length) fixed_body = ''.join(df.reader()) self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(md5(fixed_body).hexdigest(), diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 9ecc4ca333..b58cd4bbd2 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -29,9 +29,10 @@ from swift.common import utils from swift.common.storage_policy import POLICIES from swift.common.utils import Timestamp from swift.obj import ssync_sender, server -from swift.obj.reconstructor import RebuildingECDiskFileStream +from swift.obj.reconstructor import RebuildingECDiskFileStream, \ + ObjectReconstructor -from test.unit import patch_policies +from test.unit import patch_policies, debug_logger from test.unit.obj.common import BaseTest, FakeReplicator @@ -60,13 +61,14 @@ class TestBaseSsync(BaseTest): 'mount_check': 'false', 'replication_one_per_device': 'false', 'log_requests': 'false'} - self.rx_controller = server.ObjectController(conf) + self.rx_logger = debug_logger() + self.rx_controller = server.ObjectController(conf, self.rx_logger) self.ts_iter = (Timestamp(t) for t in itertools.count(int(time.time()))) self.rx_ip = '127.0.0.1' sock = eventlet.listen((self.rx_ip, 0)) self.rx_server = eventlet.spawn( - eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger()) + eventlet.wsgi.server, sock, self.rx_controller, self.rx_logger) self.rx_port = sock.getsockname()[1] self.rx_node = {'replication_ip': self.rx_ip, 'replication_port': self.rx_port, @@ -730,6 +732,317 @@ class TestSsyncEC(TestBaseSsync): self.assertFalse(results['rx_updates']) +class FakeResponse(object): + def __init__(self, frag_index, data): + self.headers = { + 'X-Object-Sysmeta-Ec-Frag-Index': str(frag_index), + 'X-Object-Sysmeta-Ec-Etag': 'the etag', + 'X-Backend-Timestamp': '1234567890.12345' + } + self.data = data + + def getheaders(self): + return self.headers + + def read(self, length): + if isinstance(self.data, Exception): + raise self.data + val = self.data + self.data = '' + return val + + +@patch_policies(with_ec_default=True) +class TestSsyncECReconstructorSyncJob(TestBaseSsync): + def setUp(self): + super(TestSsyncECReconstructorSyncJob, self).setUp() + self.policy = POLICIES.default + self.rx_node_index = 0 + self.tx_node_index = 1 + + # create sender side diskfiles... + self.tx_objs = {} + tx_df_mgr = self.daemon._diskfile_router[self.policy] + t1 = next(self.ts_iter) + self.tx_objs['o1'] = self._create_ondisk_files( + tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,)) + t2 = next(self.ts_iter) + self.tx_objs['o2'] = self._create_ondisk_files( + tx_df_mgr, 'o2', self.policy, t2, (self.tx_node_index,)) + + self.suffixes = set() + for diskfiles in list(self.tx_objs.values()): + for df in diskfiles: + self.suffixes.add( + os.path.basename(os.path.dirname(df._datadir))) + + self.job_node = dict(self.rx_node) + self.job_node['index'] = self.rx_node_index + + self.frag_length = int( + self.tx_objs['o1'][0].get_metadata()['Content-Length']) + + def _test_reconstructor_sync_job(self, frag_responses): + # Helper method to mock reconstructor to consume given lists of fake + # responses while reconstructing a fragment for a sync type job. The + # tests verify that when the reconstructed fragment iter fails in some + # way then ssync does not mistakenly create fragments on the receiving + # node which have incorrect data. + # See https://bugs.launchpad.net/swift/+bug/1631144 + + # frag_responses is a list of two lists of responses to each + # reconstructor GET request for a fragment archive. The two items in + # the outer list are lists of responses for each of the two fragments + # to be reconstructed. Items in the inner lists are responses for each + # of the other fragments fetched during the reconstructor rebuild. + path_to_responses = {} + fake_get_response_calls = [] + + def fake_get_response(recon, node, part, path, headers, policy): + # select a list of fake responses for this path and return the next + # from the list + if path not in path_to_responses: + path_to_responses[path] = frag_responses.pop(0) + response = path_to_responses[path].pop() + fake_get_response_calls.append(path) + return response + + def fake_get_part_nodes(part): + # the reconstructor will try to remove the receiver node from the + # object ring part nodes, but the fake node we created for our + # receiver is not actually in the ring part nodes, so append it + # here simply so that the reconstructor does not fail to remove it. + return (self.policy.object_ring._get_part_nodes(part) + + [self.job_node]) + + def fake_reconstruct(self, policy, fragment_payload, frag_index): + # fake EC reconstruction by returning first frag, which is ok + # because all frags would be same length + return fragment_payload[0] + + with mock.patch( + 'swift.obj.reconstructor.ObjectReconstructor._get_response', + fake_get_response): + with mock.patch( + 'swift.obj.reconstructor.ObjectReconstructor._reconstruct', + fake_reconstruct): + with mock.patch.object( + self.policy.object_ring, 'get_part_nodes', + fake_get_part_nodes): + self.reconstructor = ObjectReconstructor( + {}, logger=debug_logger('test_reconstructor')) + job = { + 'device': self.device, + 'partition': self.partition, + 'policy': self.policy, + 'sync_diskfile_builder': + self.reconstructor.reconstruct_fa + } + sender = ssync_sender.Sender( + self.daemon, self.job_node, job, self.suffixes) + sender.connect, trace = self.make_connect_wrapper(sender) + sender() + return trace + + def test_sync_reconstructor_partial_rebuild(self): + # First fragment to sync gets partial content from reconstructor. + # Expect ssync job to exit early with no file written on receiver. + frag_responses = [ + [FakeResponse(i, 'x' * (self.frag_length - 1)) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)], + [FakeResponse(i, 'y' * self.frag_length) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]] + + self._test_reconstructor_sync_job(frag_responses) + msgs = [] + for obj_name in ('o1', 'o2'): + try: + df = self._open_rx_diskfile( + obj_name, self.policy, self.rx_node_index) + msgs.append('Unexpected rx diskfile for %r with content %r' % + (obj_name, ''.join([d for d in df.reader()]))) + except DiskFileNotExist: + pass # expected outcome + if msgs: + self.fail('Failed with:\n%s' % '\n'.join(msgs)) + log_lines = self.daemon.logger.get_lines_for_level('error') + self.assertIn('Sent data length does not match content-length', + log_lines[0]) + self.assertFalse(log_lines[1:]) + # trampoline for the receiver to write a log + eventlet.sleep(0) + log_lines = self.rx_logger.get_lines_for_level('warning') + self.assertIn('ssync subrequest failed with 499', + log_lines[0]) + self.assertFalse(log_lines[1:]) + self.assertFalse(self.rx_logger.get_lines_for_level('error')) + + def test_sync_reconstructor_no_rebuilt_content(self): + # First fragment to sync gets no content in any response to + # reconstructor. Expect ssync job to exit early with no file written on + # receiver. + frag_responses = [ + [FakeResponse(i, '') + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)], + [FakeResponse(i, 'y' * self.frag_length) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]] + + self._test_reconstructor_sync_job(frag_responses) + msgs = [] + for obj_name in ('o1', 'o2'): + try: + df = self._open_rx_diskfile( + obj_name, self.policy, self.rx_node_index) + msgs.append('Unexpected rx diskfile for %r with content %r' % + (obj_name, ''.join([d for d in df.reader()]))) + except DiskFileNotExist: + pass # expected outcome + if msgs: + self.fail('Failed with:\n%s' % '\n'.join(msgs)) + log_lines = self.daemon.logger.get_lines_for_level('error') + self.assertIn('Sent data length does not match content-length', + log_lines[0]) + self.assertFalse(log_lines[1:]) + # trampoline for the receiver to write a log + eventlet.sleep(0) + log_lines = self.rx_logger.get_lines_for_level('warning') + self.assertIn('ssync subrequest failed with 499', + log_lines[0]) + self.assertFalse(log_lines[1:]) + self.assertFalse(self.rx_logger.get_lines_for_level('error')) + + def test_sync_reconstructor_exception_during_rebuild(self): + # First fragment to sync has some reconstructor get responses raise + # exception while rebuilding. Expect ssync job to exit early with no + # files written on receiver. + frag_responses = [ + # ec_ndata responses are ok, but one of these will be ignored as + # it is for the frag index being rebuilt + [FakeResponse(i, 'x' * self.frag_length) + for i in range(self.policy.ec_ndata)] + + # ec_nparity responses will raise an Exception - at least one of + # these will be used during rebuild + [FakeResponse(i, Exception('raised in response read method')) + for i in range(self.policy.ec_ndata, + self.policy.ec_ndata + self.policy.ec_nparity)], + # second set of response are all good + [FakeResponse(i, 'y' * self.frag_length) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]] + + self._test_reconstructor_sync_job(frag_responses) + msgs = [] + for obj_name in ('o1', 'o2'): + try: + df = self._open_rx_diskfile( + obj_name, self.policy, self.rx_node_index) + msgs.append('Unexpected rx diskfile for %r with content %r' % + (obj_name, ''.join([d for d in df.reader()]))) + except DiskFileNotExist: + pass # expected outcome + if msgs: + self.fail('Failed with:\n%s' % '\n'.join(msgs)) + + log_lines = self.reconstructor.logger.get_lines_for_level('error') + self.assertIn('Error trying to rebuild', log_lines[0]) + log_lines = self.daemon.logger.get_lines_for_level('error') + self.assertIn('Sent data length does not match content-length', + log_lines[0]) + self.assertFalse(log_lines[1:]) + # trampoline for the receiver to write a log + eventlet.sleep(0) + log_lines = self.rx_logger.get_lines_for_level('warning') + self.assertIn('ssync subrequest failed with 499', + log_lines[0]) + self.assertFalse(log_lines[1:]) + self.assertFalse(self.rx_logger.get_lines_for_level('error')) + + def test_sync_reconstructor_no_responses(self): + # First fragment to sync gets no responses for reconstructor to rebuild + # with, nothing is sent to receiver so expect to skip that fragment and + # continue with second. + frag_responses = [ + [None + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)], + [FakeResponse(i, 'y' * self.frag_length) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]] + + trace = self._test_reconstructor_sync_job(frag_responses) + results = self._analyze_trace(trace) + self.assertEqual(2, len(results['tx_missing'])) + self.assertEqual(2, len(results['rx_missing'])) + self.assertEqual(1, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + self.assertEqual('PUT', results['tx_updates'][0].get('method')) + synced_obj_name = results['tx_updates'][0].get('path')[-2:] + + msgs = [] + obj_name = synced_obj_name + try: + df = self._open_rx_diskfile( + obj_name, self.policy, self.rx_node_index) + self.assertEqual('y' * self.frag_length, + ''.join([d for d in df.reader()])) + except DiskFileNotExist: + msgs.append('Missing rx diskfile for %r' % obj_name) + + obj_names = list(self.tx_objs) + obj_names.remove(synced_obj_name) + obj_name = obj_names[0] + try: + df = self._open_rx_diskfile( + obj_name, self.policy, self.rx_node_index) + msgs.append('Unexpected rx diskfile for %r with content %r' % + (obj_name, ''.join([d for d in df.reader()]))) + except DiskFileNotExist: + pass # expected outcome + if msgs: + self.fail('Failed with:\n%s' % '\n'.join(msgs)) + self.assertFalse(self.daemon.logger.get_lines_for_level('error')) + log_lines = self.reconstructor.logger.get_lines_for_level('error') + self.assertIn('Unable to get enough responses', log_lines[0]) + # trampoline for the receiver to write a log + eventlet.sleep(0) + self.assertFalse(self.rx_logger.get_lines_for_level('warning')) + self.assertFalse(self.rx_logger.get_lines_for_level('error')) + + def test_sync_reconstructor_rebuild_ok(self): + # Sanity test for this class of tests. Both fragments get a full + # complement of responses and rebuild correctly. + frag_responses = [ + [FakeResponse(i, 'x' * self.frag_length) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)], + [FakeResponse(i, 'y' * self.frag_length) + for i in range(self.policy.ec_ndata + self.policy.ec_nparity)]] + + trace = self._test_reconstructor_sync_job(frag_responses) + results = self._analyze_trace(trace) + self.assertEqual(2, len(results['tx_missing'])) + self.assertEqual(2, len(results['rx_missing'])) + self.assertEqual(2, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + msgs = [] + rx_frags = [] + for obj_name in self.tx_objs: + try: + df = self._open_rx_diskfile( + obj_name, self.policy, self.rx_node_index) + rx_frags.append(''.join([d for d in df.reader()])) + except DiskFileNotExist: + msgs.append('Missing rx diskfile for %r' % obj_name) + if msgs: + self.fail('Failed with:\n%s' % '\n'.join(msgs)) + self.assertIn('x' * self.frag_length, rx_frags) + self.assertIn('y' * self.frag_length, rx_frags) + self.assertFalse(rx_frags[2:]) + self.assertFalse(self.daemon.logger.get_lines_for_level('error')) + self.assertFalse( + self.reconstructor.logger.get_lines_for_level('error')) + # trampoline for the receiver to write a log + eventlet.sleep(0) + self.assertFalse(self.rx_logger.get_lines_for_level('warning')) + self.assertFalse(self.rx_logger.get_lines_for_level('error')) + + @patch_policies class TestSsyncReplication(TestBaseSsync): def test_sync(self): diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index b94cc13fd5..f6be6b1d84 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -445,7 +445,7 @@ class TestReceiver(unittest.TestCase): mock_wsgi_input.mock_socket) mock_wsgi_input.mock_socket.close.assert_called_once_with() self.controller.logger.exception.assert_called_once_with( - '1.2.3.4/device/partition EXCEPTION in replication.Receiver') + '1.2.3.4/device/partition EXCEPTION in ssync.Receiver') def test_SSYNC_Exception_Exception(self): @@ -481,7 +481,7 @@ class TestReceiver(unittest.TestCase): mock_wsgi_input.mock_socket) mock_wsgi_input.mock_socket.close.assert_called_once_with() self.controller.logger.exception.assert_called_once_with( - 'EXCEPTION in replication.Receiver') + 'EXCEPTION in ssync.Receiver') def test_MISSING_CHECK_timeout(self): @@ -522,7 +522,7 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertTrue(mock_shutdown_safe.called) self.controller.logger.error.assert_called_once_with( - '2.3.4.5/sda1/1 TIMEOUT in replication.Receiver: ' + '2.3.4.5/sda1/1 TIMEOUT in ssync.Receiver: ' '0.01 seconds: missing_check line') def test_MISSING_CHECK_other_exception(self): @@ -564,7 +564,7 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertTrue(mock_shutdown_safe.called) self.controller.logger.exception.assert_called_once_with( - '3.4.5.6/sda1/1 EXCEPTION in replication.Receiver') + '3.4.5.6/sda1/1 EXCEPTION in ssync.Receiver') def test_MISSING_CHECK_empty_list(self): @@ -773,7 +773,7 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertTrue(self.controller.logger.exception.called) self.assertIn( - 'EXCEPTION in replication.Receiver while attempting commit of', + 'EXCEPTION in ssync.Receiver while attempting commit of', self.controller.logger.exception.call_args[0][0]) def test_MISSING_CHECK_storage_policy(self): @@ -970,7 +970,7 @@ class TestReceiver(unittest.TestCase): mock_wsgi_input.mock_socket) mock_wsgi_input.mock_socket.close.assert_called_once_with() self.controller.logger.error.assert_called_once_with( - '2.3.4.5/device/partition TIMEOUT in replication.Receiver: ' + '2.3.4.5/device/partition TIMEOUT in ssync.Receiver: ' '0.01 seconds: updates line') def test_UPDATES_other_exception(self): @@ -1017,7 +1017,7 @@ class TestReceiver(unittest.TestCase): mock_wsgi_input.mock_socket) mock_wsgi_input.mock_socket.close.assert_called_once_with() self.controller.logger.exception.assert_called_once_with( - '3.4.5.6/device/partition EXCEPTION in replication.Receiver') + '3.4.5.6/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_no_problems_no_hard_disconnect(self): @@ -1071,7 +1071,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'need more than 1 value to unpack'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') with mock.patch.object( self.controller, 'DELETE', @@ -1093,7 +1093,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'need more than 1 value to unpack'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_no_headers(self): self.controller.logger = mock.MagicMock() @@ -1110,7 +1110,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'Got no headers for DELETE /a/c/o'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_bad_headers(self): self.controller.logger = mock.MagicMock() @@ -1128,7 +1128,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'need more than 1 value to unpack'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') self.controller.logger = mock.MagicMock() req = swob.Request.blank( @@ -1146,7 +1146,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'need more than 1 value to unpack'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_bad_content_length(self): self.controller.logger = mock.MagicMock() @@ -1164,7 +1164,7 @@ class TestReceiver(unittest.TestCase): ':ERROR: 0 "invalid literal for int() with base 10: \'a\'"']) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_content_length_with_DELETE(self): self.controller.logger = mock.MagicMock() @@ -1182,7 +1182,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'DELETE subrequest with content-length /a/c/o'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_no_content_length_with_PUT(self): self.controller.logger = mock.MagicMock() @@ -1199,7 +1199,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'No content-length sent for PUT /a/c/o'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_early_termination(self): self.controller.logger = mock.MagicMock() @@ -1217,7 +1217,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'Early termination for PUT /a/c/o'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') def test_UPDATES_failures(self): @@ -1250,6 +1250,9 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertFalse(self.controller.logger.exception.called) self.assertFalse(self.controller.logger.error.called) + self.assertTrue(self.controller.logger.warning.called) + self.assertEqual(3, self.controller.logger.warning.call_count) + self.controller.logger.clear() # failures hit threshold and no successes, so ratio is like infinity with mock.patch.object(self.controller, 'DELETE', _DELETE): @@ -1274,8 +1277,11 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'Too many 4 failures to 0 successes'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') self.assertFalse(self.controller.logger.error.called) + self.assertTrue(self.controller.logger.warning.called) + self.assertEqual(4, self.controller.logger.warning.call_count) + self.controller.logger.clear() # failures hit threshold and ratio hits 1.33333333333 with mock.patch.object(self.controller, 'DELETE', _DELETE): @@ -1304,6 +1310,9 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertFalse(self.controller.logger.exception.called) self.assertFalse(self.controller.logger.error.called) + self.assertTrue(self.controller.logger.warning.called) + self.assertEqual(4, self.controller.logger.warning.call_count) + self.controller.logger.clear() # failures hit threshold and ratio hits 2.0 with mock.patch.object(self.controller, 'DELETE', _DELETE): @@ -1329,8 +1338,11 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'Too many 4 failures to 2 successes'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') self.assertFalse(self.controller.logger.error.called) + self.assertTrue(self.controller.logger.warning.called) + self.assertEqual(4, self.controller.logger.warning.call_count) + self.controller.logger.clear() def test_UPDATES_PUT(self): _PUT_request = [None] @@ -1658,7 +1670,7 @@ class TestReceiver(unittest.TestCase): ":ERROR: 0 'Invalid subrequest method BONK'"]) self.assertEqual(resp.status_int, 200) self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in replication.Receiver') + 'None/device/partition EXCEPTION in ssync.Receiver') self.assertEqual(len(_BONK_request), 1) # sanity self.assertEqual(_BONK_request[0], None) @@ -1885,6 +1897,8 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertFalse(self.controller.logger.exception.called) self.assertFalse(self.controller.logger.error.called) + self.assertTrue(self.controller.logger.warning.called) + self.assertEqual(2, self.controller.logger.warning.call_count) self.assertEqual(len(_requests), 2) # sanity req = _requests.pop(0) self.assertEqual(req.path, '/device/partition/a/c/o1') diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 40f7e25527..c7968c68f0 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -143,7 +143,7 @@ class TestSender(BaseTest): error_lines = self.daemon.logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( - '1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:')) + '1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:')) def test_call_catches_exception_handling_exception(self): job = node = None # Will cause inside exception handler to fail @@ -156,7 +156,7 @@ class TestSender(BaseTest): error_lines = self.daemon.logger.get_lines_for_level('error') for line in error_lines: self.assertTrue(line.startswith( - 'EXCEPTION in replication.Sender')) + 'EXCEPTION in ssync.Sender')) def test_call_calls_others(self): self.sender.suffixes = ['abc'] @@ -461,7 +461,10 @@ class TestSender(BaseTest): )) self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes self.sender.connect = mock.MagicMock() - self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock() + df = mock.MagicMock() + df.content_length = 0 + self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock( + return_value=df) self.sender.disconnect = mock.MagicMock() success, candidates = self.sender() self.assertTrue(success)