diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 55b77e33e2..78bd87f229 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -27,6 +27,10 @@ from swift.common import request_helpers from swift.common.utils import Timestamp +class SsyncClientDisconnected(Exception): + pass + + def decode_missing(line): """ Parse a string of the form generated by @@ -174,6 +178,9 @@ class Receiver(object): finally: if self.app.replication_semaphore: self.app.replication_semaphore.release() + except SsyncClientDisconnected: + self.app.logger.error('ssync client disconnected') + self.disconnect = True except exceptions.LockTimeout as err: self.app.logger.debug( '%s/%s/%s SSYNC LOCK TIMEOUT: %s' % ( @@ -353,6 +360,9 @@ class Receiver(object): fill up and block everything. """ line = self._readline('missing_check start') + if not line: + # Guess they hung up + raise SsyncClientDisconnected if line.strip() != b':MISSING_CHECK: START': raise Exception( 'Looking for :MISSING_CHECK: START got %r' % line[:1024]) @@ -413,6 +423,9 @@ class Receiver(object): of place partition, for example. """ line = self._readline('updates start') + if not line: + # Guess they hung up waiting for us to process the missing check + raise SsyncClientDisconnected if line.strip() != b':UPDATES: START': raise Exception('Looking for :UPDATES: START got %r' % line[:1024]) successes = 0 diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 994b582e67..674b040b7b 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -64,7 +64,9 @@ class TestReceiver(unittest.TestCase): 'replication_concurrency_per_device': '0', 'log_requests': 'false'} utils.mkdirs(os.path.join(self.testdir, 'device', 'partition')) - self.controller = server.ObjectController(self.conf) + self.logger = debug_logger() + self.controller = server.ObjectController( + self.conf, logger=self.logger) self.controller.bytes_per_sync = 1 self.account1 = 'a' @@ -417,15 +419,12 @@ class TestReceiver(unittest.TestCase): req = swob.Request.blank( '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}) resp = req.get_response(self.controller) - if six.PY2: - got = b"''" - else: - got = b"b''" - self.assertEqual(self.body_lines(resp.body), [ - b':ERROR: 0 "Looking for :MISSING_CHECK: START got %s"' % got]) + self.assertEqual(resp.body, b'\r\n') self.assertEqual(resp.status_int, 200) mocked_replication_semaphore.acquire.assert_called_once_with(0) mocked_replication_semaphore.release.assert_called_once_with() + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(['ssync client disconnected'], error_lines) with mock.patch.object( self.controller, 'replication_semaphore') as \ @@ -441,7 +440,7 @@ class TestReceiver(unittest.TestCase): self.assertFalse(mocked_replication_semaphore.acquire.called) self.assertFalse(mocked_replication_semaphore.release.called) - def test_SSYNC_mount_check(self): + def test_SSYNC_mount_check_isdir(self): with mock.patch.object(self.controller, 'replication_semaphore'), \ mock.patch.object( self.controller._diskfile_router[POLICIES.legacy], @@ -450,15 +449,13 @@ class TestReceiver(unittest.TestCase): req = swob.Request.blank( '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}) resp = req.get_response(self.controller) - if six.PY2: - got = b"''" - else: - got = b"b''" - self.assertEqual(self.body_lines(resp.body), [ - b':ERROR: 0 "Looking for :MISSING_CHECK: START got %s"' % got]) + self.assertEqual(resp.body, b'\r\n') self.assertEqual(resp.status_int, 200) self.assertEqual([], mocks['ismount'].call_args_list) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(['ssync client disconnected'], error_lines) + def test_SSYNC_mount_check(self): with mock.patch.object(self.controller, 'replication_semaphore'), \ mock.patch.object( self.controller._diskfile_router[POLICIES.legacy], @@ -482,16 +479,13 @@ class TestReceiver(unittest.TestCase): req = swob.Request.blank( '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}) resp = req.get_response(self.controller) - if six.PY2: - got = b"''" - else: - got = b"b''" - self.assertEqual(self.body_lines(resp.body), [ - b':ERROR: 0 "Looking for :MISSING_CHECK: START got %s"' % got]) + self.assertEqual(resp.body, b'\r\n') self.assertEqual(resp.status_int, 200) self.assertEqual([mock.call(os.path.join( self.controller._diskfile_router[POLICIES.legacy].devices, 'device'))] * 2, mocks['ismount'].call_args_list) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(['ssync client disconnected'], error_lines) def test_SSYNC_Exception(self): @@ -1037,6 +1031,24 @@ class TestReceiver(unittest.TestCase): self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) + def test_UPDATES_no_start(self): + # verify behavior when the sender disconnects and does not send + # ':UPDATES: START' e.g. if a sender timeout pops while waiting for + # receiver response to missing checks + self.controller.logger = mock.MagicMock() + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'SSYNC'}, + body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n') + req.remote_addr = '2.3.4.5' + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [b':MISSING_CHECK: START', b':MISSING_CHECK: END']) + self.assertEqual(resp.status_int, 200) + self.controller.logger.error.assert_called_once_with( + 'ssync client disconnected') + def test_UPDATES_timeout(self): class _Wrapper(io.BytesIO):