Merge "Add more detail to ECFragGetter logging"
This commit is contained in:
commit
079ffbd5ab
@ -1404,11 +1404,17 @@ class ECAppIter(object):
|
||||
pass
|
||||
except ChunkReadTimeout:
|
||||
# unable to resume in ECFragGetter
|
||||
self.logger.exception(_("Timeout fetching fragments for %r"),
|
||||
quote(self.path))
|
||||
self.logger.exception(
|
||||
"ChunkReadTimeout fetching fragments for %r",
|
||||
quote(self.path))
|
||||
except ChunkWriteTimeout:
|
||||
# slow client disconnect
|
||||
self.logger.exception(
|
||||
"ChunkWriteTimeout fetching fragments for %r",
|
||||
quote(self.path))
|
||||
except: # noqa
|
||||
self.logger.exception(_("Exception fetching fragments for"
|
||||
" %r"), quote(self.path))
|
||||
self.logger.exception("Exception fetching fragments for %r",
|
||||
quote(self.path))
|
||||
finally:
|
||||
queue.resize(2) # ensure there's room
|
||||
queue.put(None)
|
||||
@ -1437,8 +1443,8 @@ class ECAppIter(object):
|
||||
try:
|
||||
segment = self.policy.pyeclib_driver.decode(fragments)
|
||||
except ECDriverError:
|
||||
self.logger.exception(_("Error decoding fragments for"
|
||||
" %r"), quote(self.path))
|
||||
self.logger.exception("Error decoding fragments for %r",
|
||||
quote(self.path))
|
||||
raise
|
||||
|
||||
yield segment
|
||||
@ -2522,22 +2528,21 @@ class ECFragGetter(object):
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._dig_for_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
self.node, _('Trying to read object during '
|
||||
'GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
if not new_source:
|
||||
raise
|
||||
self.app.error_occurred(
|
||||
self.node, 'Trying to read next part of '
|
||||
'EC multi-part GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
def iter_bytes_from_response_part(part_file, nbytes):
|
||||
nchunks = 0
|
||||
@ -2559,6 +2564,7 @@ class ECFragGetter(object):
|
||||
try:
|
||||
self.fast_forward(self.bytes_used_from_backend)
|
||||
except (HTTPException, ValueError):
|
||||
self.app.logger.exception('Unable to fast forward')
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
@ -2566,8 +2572,8 @@ class ECFragGetter(object):
|
||||
new_source, new_node = self._dig_for_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
self.node, _('Trying to read object during '
|
||||
'GET (retrying)'))
|
||||
self.node, 'Trying to read EC fragment '
|
||||
'during GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
@ -2699,8 +2705,8 @@ class ECFragGetter(object):
|
||||
if end - begin + 1 == self.bytes_used_from_backend:
|
||||
warn = False
|
||||
if not req.environ.get('swift.non_client_disconnect') and warn:
|
||||
self.app.logger.warning('Client disconnected on read of %r',
|
||||
self.path)
|
||||
self.app.logger.warning(
|
||||
'Client disconnected on read of EC frag %r', self.path)
|
||||
raise
|
||||
except Exception:
|
||||
self.app.logger.exception(_('Trying to send to client'))
|
||||
@ -2743,7 +2749,7 @@ class ECFragGetter(object):
|
||||
except (Exception, Timeout):
|
||||
self.app.exception_occurred(
|
||||
node, 'Object',
|
||||
_('Trying to %(method)s %(path)s') %
|
||||
'Trying to %(method)s %(path)s' %
|
||||
{'method': self.req.method, 'path': self.req.path})
|
||||
return None
|
||||
|
||||
@ -2757,6 +2763,8 @@ class ECFragGetter(object):
|
||||
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
|
||||
# throw out 5XX and 404s from handoff nodes unless the data is
|
||||
# really on disk and had been DELETEd
|
||||
self.app.logger.debug('Ignoring %s from handoff' %
|
||||
possible_source.status)
|
||||
conn.close()
|
||||
return None
|
||||
|
||||
@ -2778,6 +2786,10 @@ class ECFragGetter(object):
|
||||
'From Object Server') %
|
||||
{'status': possible_source.status,
|
||||
'body': self.body[:1024]})
|
||||
else:
|
||||
self.app.logger.debug(
|
||||
'Ignoring %s from primary' % possible_source.status)
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
@ -2803,8 +2815,14 @@ class ECFragGetter(object):
|
||||
# capture last used etag before continuation
|
||||
used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag')
|
||||
for source, node in self.source_and_node_iter:
|
||||
if source and is_good_source(source.status) and \
|
||||
source.getheader('X-Object-Sysmeta-EC-ETag') == used_etag:
|
||||
if not source:
|
||||
# _make_node_request only returns good sources
|
||||
continue
|
||||
if source.getheader('X-Object-Sysmeta-EC-ETag') != used_etag:
|
||||
self.app.logger.warning(
|
||||
'Skipping source (etag mismatch: got %s, expected %s)',
|
||||
source.getheader('X-Object-Sysmeta-EC-ETag'), used_etag)
|
||||
else:
|
||||
return source, node
|
||||
return None, None
|
||||
|
||||
|
@ -4314,8 +4314,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
|
||||
log_lines = self.app.logger.get_lines_for_level('error')
|
||||
self.assertIn("Trying to read object during GET (retrying)",
|
||||
log_lines[0])
|
||||
self.assertIn("Trying to read next part of EC multi-part "
|
||||
"GET (retrying)", log_lines[0])
|
||||
# not the most graceful ending
|
||||
self.assertIn("Exception fetching fragments for '/a/c/o'",
|
||||
log_lines[-1])
|
||||
@ -4755,6 +4755,44 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
for line in self.logger.logger.records['ERROR']:
|
||||
self.assertIn(req.headers['x-trans-id'], line)
|
||||
|
||||
def test_GET_write_timeout(self):
|
||||
# verify EC GET behavior when there's a timeout sending decoded frags
|
||||
# via the queue.
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-333]
|
||||
etag = md5(test_data, usedforsecurity=False).hexdigest()
|
||||
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
|
||||
headers = {'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Object-Sysmeta-Ec-Content-Length': '333'}
|
||||
ndata = self.policy.ec_ndata
|
||||
responses = [
|
||||
(200, body, self._add_frag_index(i, headers))
|
||||
for i, body in enumerate(ec_archive_bodies[:ndata])
|
||||
] * self.policy.ec_duplication_factor
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
status_codes, body_iter, headers = zip(*responses)
|
||||
self.app.client_timeout = 0.01
|
||||
with mocked_http_conn(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
resp_body = next(resp.app_iter)
|
||||
sleep(0.5) # lazy client
|
||||
# remaining resp truncated
|
||||
resp_body += b''.join(resp.app_iter)
|
||||
# we log errors
|
||||
log_lines = self.app.logger.get_lines_for_level('error')
|
||||
for line in log_lines:
|
||||
self.assertIn('ChunkWriteTimeout fetching fragments', line)
|
||||
# client gets a short read
|
||||
self.assertEqual(16051, len(test_data))
|
||||
self.assertEqual(8192, len(resp_body))
|
||||
self.assertNotEqual(
|
||||
md5(resp_body, usedforsecurity=False).hexdigest(),
|
||||
etag)
|
||||
|
||||
def test_GET_read_timeout_retrying_but_no_more_useful_nodes(self):
|
||||
# verify EC GET behavior when initial batch of nodes time out then
|
||||
# remaining nodes either return 404 or return data for different etag
|
||||
@ -4801,6 +4839,22 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
for line in self.logger.logger.records['ERROR']:
|
||||
self.assertIn(req.headers['x-trans-id'], line)
|
||||
|
||||
debug_lines = self.logger.get_lines_for_level('debug')
|
||||
nparity = self.policy.ec_nparity
|
||||
nhandoffs = self.policy.object_ring.max_more_nodes
|
||||
ignore_404 = ignore_404_handoff = 0
|
||||
for line in debug_lines:
|
||||
if 'Ignoring 404 from primary' in line:
|
||||
ignore_404 += 1
|
||||
if 'Ignoring 404 from handoff' in line:
|
||||
ignore_404_handoff += 1
|
||||
self.assertEqual(nparity - 2, ignore_404, debug_lines)
|
||||
self.assertEqual(nhandoffs, ignore_404_handoff, debug_lines)
|
||||
self.assertEqual(len(debug_lines), ignore_404_handoff + ignore_404)
|
||||
self.assertEqual(self.logger.get_lines_for_level('warning'), [
|
||||
'Skipping source (etag mismatch: got other_etag, '
|
||||
'expected %s)' % etag] * 2)
|
||||
|
||||
def test_GET_read_timeout_resume(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-333]
|
||||
@ -4867,8 +4921,9 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertNotEqual(md5(resp.body).hexdigest(), etag)
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('Timeout fetching', error_lines[0])
|
||||
self.assertEqual(2, len(error_lines))
|
||||
self.assertIn('Unable to fast forward', error_lines[0])
|
||||
self.assertIn('Timeout fetching', error_lines[1])
|
||||
for line in self.logger.logger.records['ERROR']:
|
||||
self.assertIn(req.headers['x-trans-id'], line)
|
||||
|
||||
|
@ -7471,8 +7471,9 @@ class BaseTestECObjectController(BaseTestObjectController):
|
||||
_test_servers[0].logger.get_lines_for_level('warning'))
|
||||
|
||||
# check for disconnect message!
|
||||
expected = ["Client disconnected on read of '/a/%s-discon/test'"
|
||||
% self.ec_policy.name] * 2
|
||||
expected = [
|
||||
"Client disconnected on read of EC frag '/a/%s-discon/test'"
|
||||
% self.ec_policy.name] * 2
|
||||
self.assertEqual(
|
||||
_test_servers[0].logger.get_lines_for_level('warning'),
|
||||
expected)
|
||||
|
Loading…
x
Reference in New Issue
Block a user