Merge "Silence "Client disconnected" warnings on reads."
This commit is contained in:
commit
c2f5e30c86
@ -738,6 +738,7 @@ class ResumingGetter(object):
|
|||||||
self.backend_headers = backend_headers
|
self.backend_headers = backend_headers
|
||||||
self.client_chunk_size = client_chunk_size
|
self.client_chunk_size = client_chunk_size
|
||||||
self.skip_bytes = 0
|
self.skip_bytes = 0
|
||||||
|
self.bytes_used_from_backend = 0
|
||||||
self.used_nodes = []
|
self.used_nodes = []
|
||||||
self.used_source_etag = ''
|
self.used_source_etag = ''
|
||||||
self.concurrency = concurrency
|
self.concurrency = concurrency
|
||||||
@ -933,7 +934,6 @@ class ResumingGetter(object):
|
|||||||
def iter_bytes_from_response_part(part_file):
|
def iter_bytes_from_response_part(part_file):
|
||||||
nchunks = 0
|
nchunks = 0
|
||||||
buf = ''
|
buf = ''
|
||||||
bytes_used_from_backend = 0
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with ChunkReadTimeout(node_timeout):
|
with ChunkReadTimeout(node_timeout):
|
||||||
@ -945,7 +945,7 @@ class ResumingGetter(object):
|
|||||||
if self.newest or self.server_type != 'Object':
|
if self.newest or self.server_type != 'Object':
|
||||||
six.reraise(exc_type, exc_value, exc_traceback)
|
six.reraise(exc_type, exc_value, exc_traceback)
|
||||||
try:
|
try:
|
||||||
self.fast_forward(bytes_used_from_backend)
|
self.fast_forward(self.bytes_used_from_backend)
|
||||||
except (HTTPException, ValueError):
|
except (HTTPException, ValueError):
|
||||||
six.reraise(exc_type, exc_value, exc_traceback)
|
six.reraise(exc_type, exc_value, exc_traceback)
|
||||||
except RangeAlreadyComplete:
|
except RangeAlreadyComplete:
|
||||||
@ -982,18 +982,18 @@ class ResumingGetter(object):
|
|||||||
if buf and self.skip_bytes:
|
if buf and self.skip_bytes:
|
||||||
if self.skip_bytes < len(buf):
|
if self.skip_bytes < len(buf):
|
||||||
buf = buf[self.skip_bytes:]
|
buf = buf[self.skip_bytes:]
|
||||||
bytes_used_from_backend += self.skip_bytes
|
self.bytes_used_from_backend += self.skip_bytes
|
||||||
self.skip_bytes = 0
|
self.skip_bytes = 0
|
||||||
else:
|
else:
|
||||||
self.skip_bytes -= len(buf)
|
self.skip_bytes -= len(buf)
|
||||||
bytes_used_from_backend += len(buf)
|
self.bytes_used_from_backend += len(buf)
|
||||||
buf = ''
|
buf = ''
|
||||||
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
if buf:
|
if buf:
|
||||||
with ChunkWriteTimeout(
|
with ChunkWriteTimeout(
|
||||||
self.app.client_timeout):
|
self.app.client_timeout):
|
||||||
bytes_used_from_backend += len(buf)
|
self.bytes_used_from_backend += len(buf)
|
||||||
yield buf
|
yield buf
|
||||||
buf = ''
|
buf = ''
|
||||||
break
|
break
|
||||||
@ -1004,12 +1004,13 @@ class ResumingGetter(object):
|
|||||||
buf = buf[client_chunk_size:]
|
buf = buf[client_chunk_size:]
|
||||||
with ChunkWriteTimeout(
|
with ChunkWriteTimeout(
|
||||||
self.app.client_timeout):
|
self.app.client_timeout):
|
||||||
|
self.bytes_used_from_backend += \
|
||||||
|
len(client_chunk)
|
||||||
yield client_chunk
|
yield client_chunk
|
||||||
bytes_used_from_backend += len(client_chunk)
|
|
||||||
else:
|
else:
|
||||||
with ChunkWriteTimeout(self.app.client_timeout):
|
with ChunkWriteTimeout(self.app.client_timeout):
|
||||||
|
self.bytes_used_from_backend += len(buf)
|
||||||
yield buf
|
yield buf
|
||||||
bytes_used_from_backend += len(buf)
|
|
||||||
buf = ''
|
buf = ''
|
||||||
|
|
||||||
# This is for fairness; if the network is outpacing
|
# This is for fairness; if the network is outpacing
|
||||||
@ -1038,6 +1039,7 @@ class ResumingGetter(object):
|
|||||||
get_next_doc_part()
|
get_next_doc_part()
|
||||||
self.learn_size_from_content_range(
|
self.learn_size_from_content_range(
|
||||||
start_byte, end_byte, length)
|
start_byte, end_byte, length)
|
||||||
|
self.bytes_used_from_backend = 0
|
||||||
part_iter = iter_bytes_from_response_part(part)
|
part_iter = iter_bytes_from_response_part(part)
|
||||||
yield {'start_byte': start_byte, 'end_byte': end_byte,
|
yield {'start_byte': start_byte, 'end_byte': end_byte,
|
||||||
'entity_length': length, 'headers': headers,
|
'entity_length': length, 'headers': headers,
|
||||||
@ -1059,8 +1061,18 @@ class ResumingGetter(object):
|
|||||||
self.app.client_timeout)
|
self.app.client_timeout)
|
||||||
self.app.logger.increment('client_timeouts')
|
self.app.logger.increment('client_timeouts')
|
||||||
except GeneratorExit:
|
except GeneratorExit:
|
||||||
if not req.environ.get('swift.non_client_disconnect'):
|
warn = True
|
||||||
|
try:
|
||||||
|
req_range = Range(self.backend_headers['Range'])
|
||||||
|
except ValueError:
|
||||||
|
req_range = None
|
||||||
|
if req_range and len(req_range.ranges) == 1:
|
||||||
|
begin, end = req_range.ranges[0]
|
||||||
|
if end - begin + 1 == self.bytes_used_from_backend:
|
||||||
|
warn = False
|
||||||
|
if not req.environ.get('swift.non_client_disconnect') and warn:
|
||||||
self.app.logger.warning(_('Client disconnected on read'))
|
self.app.logger.warning(_('Client disconnected on read'))
|
||||||
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
self.app.logger.exception(_('Trying to send to client'))
|
self.app.logger.exception(_('Trying to send to client'))
|
||||||
raise
|
raise
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
import itertools
|
import itertools
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import unittest
|
import unittest
|
||||||
|
import mock
|
||||||
from mock import patch
|
from mock import patch
|
||||||
from swift.proxy.controllers.base import headers_to_container_info, \
|
from swift.proxy.controllers.base import headers_to_container_info, \
|
||||||
headers_to_account_info, headers_to_object_info, get_container_info, \
|
headers_to_account_info, headers_to_object_info, get_container_info, \
|
||||||
@ -830,6 +831,44 @@ class TestFuncs(unittest.TestCase):
|
|||||||
client_chunks = list(app_iter)
|
client_chunks = list(app_iter)
|
||||||
self.assertEqual(client_chunks, ['abcd1234', 'efgh5678'])
|
self.assertEqual(client_chunks, ['abcd1234', 'efgh5678'])
|
||||||
|
|
||||||
|
def test_disconnected_warning(self):
|
||||||
|
self.app.logger = mock.Mock()
|
||||||
|
req = Request.blank('/v1/a/c/o')
|
||||||
|
|
||||||
|
class TestSource(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.headers = {'content-type': 'text/plain',
|
||||||
|
'content-length': len(self.read(-1))}
|
||||||
|
self.status = 200
|
||||||
|
|
||||||
|
def read(self, _read_size):
|
||||||
|
return 'the cake is a lie'
|
||||||
|
|
||||||
|
def getheader(self, header):
|
||||||
|
return self.headers.get(header.lower())
|
||||||
|
|
||||||
|
def getheaders(self):
|
||||||
|
return self.headers
|
||||||
|
|
||||||
|
source = TestSource()
|
||||||
|
|
||||||
|
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
|
||||||
|
handler = GetOrHeadHandler(
|
||||||
|
self.app, req, 'Object', None, None, None, {})
|
||||||
|
app_iter = handler._make_app_iter(req, node, source)
|
||||||
|
app_iter.close()
|
||||||
|
self.app.logger.warning.assert_called_once_with(
|
||||||
|
'Client disconnected on read')
|
||||||
|
|
||||||
|
self.app.logger = mock.Mock()
|
||||||
|
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
|
||||||
|
handler = GetOrHeadHandler(
|
||||||
|
self.app, req, 'Object', None, None, None, {})
|
||||||
|
app_iter = handler._make_app_iter(req, node, source)
|
||||||
|
next(app_iter)
|
||||||
|
app_iter.close()
|
||||||
|
self.app.logger.warning.assert_not_called()
|
||||||
|
|
||||||
def test_bytes_to_skip(self):
|
def test_bytes_to_skip(self):
|
||||||
# if you start at the beginning, skip nothing
|
# if you start at the beginning, skip nothing
|
||||||
self.assertEqual(bytes_to_skip(1024, 0), 0)
|
self.assertEqual(bytes_to_skip(1024, 0), 0)
|
||||||
|
Loading…
Reference in New Issue
Block a user