Merge "proxy: error limit the correct node when resuming GETs"
This commit is contained in:
commit
83bab150b7
@ -1030,6 +1030,31 @@ class GetterBase(object):
|
||||
self.source = None
|
||||
self.source_parts_iter = None
|
||||
|
||||
def _get_source_and_node(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _replace_source_and_node(self, err_msg):
|
||||
# be defensive against _get_source_and_node modifying self.source
|
||||
# or self.node...
|
||||
old_source = self.source
|
||||
old_node = self.node
|
||||
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if not new_source:
|
||||
return False
|
||||
|
||||
self.app.error_occurred(old_node, err_msg)
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(old_source, 'swift_conn', None):
|
||||
close_swift_conn(old_source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
self.source_parts_iter = http_response_to_document_iters(
|
||||
new_source, read_chunk_size=self.app.object_chunk_size)
|
||||
return True
|
||||
|
||||
def fast_forward(self, num_bytes):
|
||||
"""
|
||||
Will skip num_bytes into the current ranges.
|
||||
@ -1207,22 +1232,8 @@ class GetOrHeadHandler(GetterBase):
|
||||
self.source_parts_iter)
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
self.node, 'Trying to read object during '
|
||||
'GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
self.source_parts_iter = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
if not self._replace_source_and_node(
|
||||
'Trying to read object during GET (retrying)'):
|
||||
raise StopIteration()
|
||||
|
||||
def iter_bytes_from_response_part(self, part_file, nbytes):
|
||||
@ -1249,24 +1260,8 @@ class GetOrHeadHandler(GetterBase):
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
buf = b''
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
self.node, 'Trying to read object during '
|
||||
'GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
self.source_parts_iter = \
|
||||
http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
if self._replace_source_and_node(
|
||||
'Trying to read object during GET (retrying)'):
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
self.get_next_doc_part()
|
||||
|
@ -2568,23 +2568,10 @@ class ECFragGetter(GetterBase):
|
||||
self.source_parts_iter)
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._dig_for_source_and_node()
|
||||
if not new_source:
|
||||
if not self._replace_source_and_node(
|
||||
'Trying to read next part of EC multi-part GET '
|
||||
'(retrying)'):
|
||||
raise
|
||||
self.app.error_occurred(
|
||||
self.node, 'Trying to read next part of '
|
||||
'EC multi-part GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
self.source_parts_iter = \
|
||||
http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
def iter_bytes_from_response_part(self, part_file, nbytes):
|
||||
buf = b''
|
||||
@ -2610,24 +2597,8 @@ class ECFragGetter(GetterBase):
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
buf = b''
|
||||
old_node = self.node
|
||||
new_source, new_node = self._dig_for_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
old_node, 'Trying to read EC fragment '
|
||||
'during GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
self.source_parts_iter = \
|
||||
http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
if self._replace_source_and_node(
|
||||
'Trying to read EC fragment during GET (retrying)'):
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
self.get_next_doc_part()
|
||||
@ -2814,13 +2785,12 @@ class ECFragGetter(GetterBase):
|
||||
node, self.app.recoverable_node_timeout)
|
||||
|
||||
if source:
|
||||
self.node = node
|
||||
yield source, node
|
||||
else:
|
||||
yield None, None
|
||||
self.status = self.reason = self.body = self.source_headers = None
|
||||
|
||||
def _dig_for_source_and_node(self):
|
||||
def _get_source_and_node(self):
|
||||
# capture last used etag before continuation
|
||||
used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag')
|
||||
for source, node in self.source_and_node_iter:
|
||||
|
@ -1575,6 +1575,54 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
||||
self.assertIn('Accept-Ranges', resp.headers)
|
||||
self.assertNotIn('Connection', resp.headers)
|
||||
|
||||
def test_GET_slow_read(self):
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
self.app.client_timeout = 0.1
|
||||
self.app.object_chunk_size = 10
|
||||
body = b'test'
|
||||
etag = md5(body, usedforsecurity=False).hexdigest()
|
||||
headers = {
|
||||
'Etag': etag,
|
||||
'Content-Length': len(body),
|
||||
'X-Timestamp': Timestamp(self.ts()).normal,
|
||||
}
|
||||
responses = [(200, body, headers)] * 2
|
||||
status_codes, body_iter, headers = zip(*responses)
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
# make the first response slow...
|
||||
read_sleeps = [0.1, 0]
|
||||
with mocked_http_conn(*status_codes, body_iter=body_iter,
|
||||
headers=headers, slow=read_sleeps) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
_ = resp.body
|
||||
self.assertEqual(len(log.requests), 2)
|
||||
|
||||
def make_key(r):
|
||||
r['device'] = r['path'].split('/')[1]
|
||||
return '%(ip)s:%(port)s/%(device)s' % r
|
||||
# the first node got errors incr'd
|
||||
expected_error_limiting = {
|
||||
make_key(log.requests[0]): {
|
||||
'errors': 1,
|
||||
'last_error': mock.ANY,
|
||||
}
|
||||
}
|
||||
actual = {}
|
||||
for n in self.app.get_object_ring(int(self.policy)).devs:
|
||||
node_key = self.app.error_limiter.node_key(n)
|
||||
stats = self.app.error_limiter.stats.get(node_key) or {}
|
||||
if stats:
|
||||
actual[self.app.error_limiter.node_key(n)] = stats
|
||||
self.assertEqual(actual, expected_error_limiting)
|
||||
for read_line in self.app.logger.get_lines_for_level('error'):
|
||||
self.assertIn("Trying to read object during GET (retrying)",
|
||||
read_line)
|
||||
self.assertEqual(
|
||||
len(self.logger.logger.records['ERROR']), 1,
|
||||
'Expected 1 ERROR lines, got %r' % (
|
||||
self.logger.logger.records['ERROR'], ))
|
||||
|
||||
def test_GET_transfer_encoding_chunked(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
with set_http_connect(200, headers={'transfer-encoding': 'chunked'}):
|
||||
@ -6710,7 +6758,7 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
|
||||
def test_iter_bytes_from_response_part_insufficient_bytes(self):
|
||||
part = FileLikeIter([b'some', b'thing'])
|
||||
it = self.getter.iter_bytes_from_response_part(part, nbytes=100)
|
||||
with mock.patch.object(self.getter, '_dig_for_source_and_node',
|
||||
with mock.patch.object(self.getter, '_get_source_and_node',
|
||||
return_value=(None, None)):
|
||||
with self.assertRaises(ShortReadError) as cm:
|
||||
b''.join(it)
|
||||
@ -6722,7 +6770,7 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
|
||||
self.app.recoverable_node_timeout = 0.05
|
||||
self.app.client_timeout = 0.8
|
||||
it = self.getter.iter_bytes_from_response_part(part, nbytes=9)
|
||||
with mock.patch.object(self.getter, '_dig_for_source_and_node',
|
||||
with mock.patch.object(self.getter, '_get_source_and_node',
|
||||
return_value=(None, None)):
|
||||
with mock.patch.object(part, 'read',
|
||||
side_effect=[b'some', ChunkReadTimeout(9)]):
|
||||
|
Loading…
x
Reference in New Issue
Block a user