ssync: Tolerate more hang-ups
Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: I6e62f7cc0474087edfd7f0ca133c75dce71cc795 Closes-Bug: #1889951
This commit is contained in:
parent
155af38d1b
commit
f54634e950
@ -27,6 +27,10 @@ from swift.common import request_helpers
|
||||
from swift.common.utils import Timestamp
|
||||
|
||||
|
||||
class SsyncClientDisconnected(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def decode_missing(line):
|
||||
"""
|
||||
Parse a string of the form generated by
|
||||
@ -174,6 +178,9 @@ class Receiver(object):
|
||||
finally:
|
||||
if self.app.replication_semaphore:
|
||||
self.app.replication_semaphore.release()
|
||||
except SsyncClientDisconnected:
|
||||
self.app.logger.error('ssync client disconnected')
|
||||
self.disconnect = True
|
||||
except exceptions.LockTimeout as err:
|
||||
self.app.logger.debug(
|
||||
'%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
|
||||
@ -353,6 +360,9 @@ class Receiver(object):
|
||||
fill up and block everything.
|
||||
"""
|
||||
line = self._readline('missing_check start')
|
||||
if not line:
|
||||
# Guess they hung up
|
||||
raise SsyncClientDisconnected
|
||||
if line.strip() != b':MISSING_CHECK: START':
|
||||
raise Exception(
|
||||
'Looking for :MISSING_CHECK: START got %r' % line[:1024])
|
||||
@ -413,6 +423,9 @@ class Receiver(object):
|
||||
of place partition, for example.
|
||||
"""
|
||||
line = self._readline('updates start')
|
||||
if not line:
|
||||
# Guess they hung up waiting for us to process the missing check
|
||||
raise SsyncClientDisconnected
|
||||
if line.strip() != b':UPDATES: START':
|
||||
raise Exception('Looking for :UPDATES: START got %r' % line[:1024])
|
||||
successes = 0
|
||||
|
@ -64,7 +64,9 @@ class TestReceiver(unittest.TestCase):
|
||||
'replication_concurrency_per_device': '0',
|
||||
'log_requests': 'false'}
|
||||
utils.mkdirs(os.path.join(self.testdir, 'device', 'partition'))
|
||||
self.controller = server.ObjectController(self.conf)
|
||||
self.logger = debug_logger()
|
||||
self.controller = server.ObjectController(
|
||||
self.conf, logger=self.logger)
|
||||
self.controller.bytes_per_sync = 1
|
||||
|
||||
self.account1 = 'a'
|
||||
@ -417,15 +419,12 @@ class TestReceiver(unittest.TestCase):
|
||||
req = swob.Request.blank(
|
||||
'/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
|
||||
resp = req.get_response(self.controller)
|
||||
if six.PY2:
|
||||
got = b"''"
|
||||
else:
|
||||
got = b"b''"
|
||||
self.assertEqual(self.body_lines(resp.body), [
|
||||
b':ERROR: 0 "Looking for :MISSING_CHECK: START got %s"' % got])
|
||||
self.assertEqual(resp.body, b'\r\n')
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
mocked_replication_semaphore.acquire.assert_called_once_with(0)
|
||||
mocked_replication_semaphore.release.assert_called_once_with()
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(['ssync client disconnected'], error_lines)
|
||||
|
||||
with mock.patch.object(
|
||||
self.controller, 'replication_semaphore') as \
|
||||
@ -441,7 +440,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(mocked_replication_semaphore.acquire.called)
|
||||
self.assertFalse(mocked_replication_semaphore.release.called)
|
||||
|
||||
def test_SSYNC_mount_check(self):
|
||||
def test_SSYNC_mount_check_isdir(self):
|
||||
with mock.patch.object(self.controller, 'replication_semaphore'), \
|
||||
mock.patch.object(
|
||||
self.controller._diskfile_router[POLICIES.legacy],
|
||||
@ -450,15 +449,13 @@ class TestReceiver(unittest.TestCase):
|
||||
req = swob.Request.blank(
|
||||
'/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
|
||||
resp = req.get_response(self.controller)
|
||||
if six.PY2:
|
||||
got = b"''"
|
||||
else:
|
||||
got = b"b''"
|
||||
self.assertEqual(self.body_lines(resp.body), [
|
||||
b':ERROR: 0 "Looking for :MISSING_CHECK: START got %s"' % got])
|
||||
self.assertEqual(resp.body, b'\r\n')
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual([], mocks['ismount'].call_args_list)
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(['ssync client disconnected'], error_lines)
|
||||
|
||||
def test_SSYNC_mount_check(self):
|
||||
with mock.patch.object(self.controller, 'replication_semaphore'), \
|
||||
mock.patch.object(
|
||||
self.controller._diskfile_router[POLICIES.legacy],
|
||||
@ -482,16 +479,13 @@ class TestReceiver(unittest.TestCase):
|
||||
req = swob.Request.blank(
|
||||
'/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
|
||||
resp = req.get_response(self.controller)
|
||||
if six.PY2:
|
||||
got = b"''"
|
||||
else:
|
||||
got = b"b''"
|
||||
self.assertEqual(self.body_lines(resp.body), [
|
||||
b':ERROR: 0 "Looking for :MISSING_CHECK: START got %s"' % got])
|
||||
self.assertEqual(resp.body, b'\r\n')
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual([mock.call(os.path.join(
|
||||
self.controller._diskfile_router[POLICIES.legacy].devices,
|
||||
'device'))] * 2, mocks['ismount'].call_args_list)
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(['ssync client disconnected'], error_lines)
|
||||
|
||||
def test_SSYNC_Exception(self):
|
||||
|
||||
@ -1037,6 +1031,24 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(self.controller.logger.error.called)
|
||||
self.assertFalse(self.controller.logger.exception.called)
|
||||
|
||||
def test_UPDATES_no_start(self):
|
||||
# verify behavior when the sender disconnects and does not send
|
||||
# ':UPDATES: START' e.g. if a sender timeout pops while waiting for
|
||||
# receiver response to missing checks
|
||||
self.controller.logger = mock.MagicMock()
|
||||
req = swob.Request.blank(
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n')
|
||||
req.remote_addr = '2.3.4.5'
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[b':MISSING_CHECK: START', b':MISSING_CHECK: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.error.assert_called_once_with(
|
||||
'ssync client disconnected')
|
||||
|
||||
def test_UPDATES_timeout(self):
|
||||
|
||||
class _Wrapper(io.BytesIO):
|
||||
|
Loading…
x
Reference in New Issue
Block a user