ssync: don't log tracebacks for client disconnects
Previously a traceback was logged when the ssync receiver experienced a problem reading the wsgi input. Now this is logged as an error but without a traceback. Change-Id: Ifadbe603d5fa5473397a32e14ae1b8023cb53eb2 Closes-Bug: #1889951
This commit is contained in:
parent
e22cad666a
commit
155af38d1b
@ -142,6 +142,7 @@ class Receiver(object):
|
||||
"""
|
||||
# The general theme for functions __call__ calls is that they should
|
||||
# raise exceptions.MessageTimeout for client timeouts (logged locally),
|
||||
# exceptions.ChunkReadError for client disconnects (logged locally),
|
||||
# swob.HTTPException classes for exceptions to return to the caller but
|
||||
# not log locally (unmounted, for example), and any other Exceptions
|
||||
# will be logged with a full stack trace.
|
||||
@ -185,6 +186,11 @@ class Receiver(object):
|
||||
self.request.remote_addr, self.device, self.partition,
|
||||
err))
|
||||
yield (':ERROR: %d %r\n' % (408, str(err))).encode('utf8')
|
||||
except exceptions.ChunkReadError as err:
|
||||
self.app.logger.error(
|
||||
'%s/%s/%s read failed in ssync.Receiver: %s' % (
|
||||
self.request.remote_addr, self.device, self.partition,
|
||||
err))
|
||||
except swob.HTTPException as err:
|
||||
body = b''.join(err({}, lambda *args: None))
|
||||
yield (':ERROR: %d %r\n' % (
|
||||
@ -237,6 +243,17 @@ class Receiver(object):
|
||||
raise swob.HTTPInsufficientStorage(drive=self.device)
|
||||
self.fp = self.request.environ['wsgi.input']
|
||||
|
||||
def _readline(self, context):
|
||||
# try to read a line from the wsgi input; annotate any timeout or read
|
||||
# errors with a description of the calling context
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout, context):
|
||||
try:
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
except (eventlet.wsgi.ChunkReadError, IOError) as err:
|
||||
raise exceptions.ChunkReadError('%s: %s' % (context, err))
|
||||
return line
|
||||
|
||||
def _check_local(self, remote, make_durable=True):
|
||||
"""
|
||||
Parse local diskfile and return results of current
|
||||
@ -335,18 +352,14 @@ class Receiver(object):
|
||||
have to read while it writes to ensure network buffers don't
|
||||
fill up and block everything.
|
||||
"""
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout, 'missing_check start'):
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
line = self._readline('missing_check start')
|
||||
if line.strip() != b':MISSING_CHECK: START':
|
||||
raise Exception(
|
||||
'Looking for :MISSING_CHECK: START got %r' % line[:1024])
|
||||
object_hashes = []
|
||||
nlines = 0
|
||||
while True:
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout, 'missing_check line'):
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
line = self._readline('missing_check line')
|
||||
if not line or line.strip() == b':MISSING_CHECK: END':
|
||||
break
|
||||
want = self._check_missing(line)
|
||||
@ -399,17 +412,13 @@ class Receiver(object):
|
||||
success. This is so the sender knows if it can remove an out
|
||||
of place partition, for example.
|
||||
"""
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout, 'updates start'):
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
line = self._readline('updates start')
|
||||
if line.strip() != b':UPDATES: START':
|
||||
raise Exception('Looking for :UPDATES: START got %r' % line[:1024])
|
||||
successes = 0
|
||||
failures = 0
|
||||
while True:
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout, 'updates line'):
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
line = self._readline('updates line')
|
||||
if not line or line.strip() == b':UPDATES: END':
|
||||
break
|
||||
# Read first line METHOD PATH of subrequest.
|
||||
@ -421,8 +430,7 @@ class Receiver(object):
|
||||
content_length = None
|
||||
replication_headers = []
|
||||
while True:
|
||||
with exceptions.MessageTimeout(self.app.client_timeout):
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
line = self._readline('updates line')
|
||||
if not line:
|
||||
raise Exception(
|
||||
'Got no headers for %s %s' % (method, path))
|
||||
|
@ -1324,11 +1324,11 @@ class TestReceiver(unittest.TestCase):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[b':MISSING_CHECK: START', b':MISSING_CHECK: END',
|
||||
b":ERROR: 0 'Early termination for PUT /a/c/o'"])
|
||||
[b':MISSING_CHECK: START', b':MISSING_CHECK: END'])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
self.controller.logger.error.assert_called_once_with(
|
||||
'None/device/partition read failed in ssync.Receiver: '
|
||||
'Early termination for PUT /a/c/o')
|
||||
|
||||
def test_UPDATES_failures(self):
|
||||
|
||||
@ -2069,6 +2069,7 @@ class TestSsyncRxServer(unittest.TestCase):
|
||||
|
||||
self.conf = {
|
||||
'devices': self.devices,
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.tempdir,
|
||||
}
|
||||
self.rx_logger = debug_logger('test-object-server')
|
||||
@ -2135,6 +2136,69 @@ class TestSsyncRxServer(unittest.TestCase):
|
||||
# sanity check that the receiver did not proceed to missing_check
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
def test_SSYNC_read_error(self):
|
||||
# verify that read errors from wsgi reader are caught and reported
|
||||
def do_send(data):
|
||||
self.rx_logger.clear()
|
||||
self.connection = bufferedhttp.BufferedHTTPConnection(
|
||||
'127.0.0.1:%s' % self.rx_port)
|
||||
self.connection.putrequest('SSYNC', '/sda1/0')
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
self.connection.putheader('X-Backend-Storage-Policy-Index',
|
||||
int(POLICIES[0]))
|
||||
self.connection.endheaders()
|
||||
resp = self.connection.getresponse()
|
||||
self.assertEqual(200, resp.status)
|
||||
resp.close()
|
||||
self.connection.send(data)
|
||||
self.connection.close()
|
||||
for sleep_time in (0, 0.1, 1):
|
||||
lines = self.rx_logger.get_lines_for_level('error')
|
||||
if lines:
|
||||
return lines
|
||||
eventlet.sleep(sleep_time)
|
||||
return []
|
||||
|
||||
# check read errors during missing_check phase
|
||||
error_lines = do_send(b'')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('missing_check start: invalid literal', error_lines[0])
|
||||
|
||||
error_lines = do_send(b'1\r\n')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('missing_check start: unexpected end of file',
|
||||
error_lines[0])
|
||||
|
||||
error_lines = do_send(b'17\r\n:MISSING_CHECK: START\r\n\r\nx\r\n')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('missing_check line: invalid literal', error_lines[0])
|
||||
|
||||
error_lines = do_send(b'17\r\n:MISSING_CHECK: START\r\n\r\n12\r\n')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('missing_check line: unexpected end of file',
|
||||
error_lines[0])
|
||||
|
||||
# check read errors during updates phase
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check'):
|
||||
error_lines = do_send(b'')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('updates start: invalid literal', error_lines[0])
|
||||
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check'):
|
||||
error_lines = do_send(b'1\r\n')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('updates start: unexpected end of file', error_lines[0])
|
||||
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check'):
|
||||
error_lines = do_send(b'11\r\n:UPDATES: START\r\n\r\nx\r\n')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('updates line: invalid literal', error_lines[0])
|
||||
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check'):
|
||||
error_lines = do_send(b'11\r\n:UPDATES: START\r\n\r\n12\r\n')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('updates line: unexpected end of file', error_lines[0])
|
||||
|
||||
def test_SSYNC_invalid_policy(self):
|
||||
valid_indices = sorted([int(policy) for policy in POLICIES])
|
||||
bad_index = valid_indices[-1] + 1
|
||||
|
Loading…
x
Reference in New Issue
Block a user