Add some extra logging when EC decode fails

When a pyeclib decode fails, we just get a stack trace in Swift. This
patch adds some extra details around the failed decode to give us a
better idea of why.

In this patch we now track the number of segments returned and the lengths
of each fragment, and also stitch the message of the exception returned
from liberasurecode. So all together a line looks something like:

  Nov 22 02:53:34 saio1 proxy-server[30535]: Error decoding fragments for \
  '/AUTH_test/ec/obj'. segments decoded: 0, lengths: [82, 82, 82, 82]: \
  pyeclib_c_decode ERROR: Fragment integrity check failed. Please inspect \
  syslog for liberasurecode error report. \
  (txn: txbabfabdd22ed4999834fa-00619b062e) (client_ip: 127.0.0.1)

NOTE: The fragments lengths could be done down in pyeclib
and passed back through the error, but doing in ECIter means it'll work
for all EC drivers.

Thre is a still a stack trace, as we always reraise when we encouter an
exception during the ECIter is in flight. But the log line before it
should now tell us why.

Change-Id: I04ee8149e895ec66a33b4f21fea3def770a248f5
This commit is contained in:
Matthew Oliver 2021-12-02 11:15:15 +11:00
parent d827b852b8
commit 9acc44b15c
2 changed files with 95 additions and 18 deletions

View File

@ -1417,6 +1417,7 @@ class ECAppIter(object):
queue.put(None)
frag_iter.close()
segments_decoded = 0
with ContextPool(len(fragment_iters)) as pool:
for frag_iter, queue in zip(fragment_iters, queues):
pool.spawn(put_fragments_in_queue, frag_iter, queue,
@ -1445,11 +1446,17 @@ class ECAppIter(object):
break
try:
segment = self.policy.pyeclib_driver.decode(fragments)
except ECDriverError:
self.logger.exception("Error decoding fragments for %r",
quote(self.path))
except ECDriverError as err:
self.logger.error(
"Error decoding fragments for %r. "
"Segments decoded: %d, "
"Lengths: [%s]: %s" % (
quote(self.path), segments_decoded,
', '.join(map(str, map(len, fragments))),
str(err)))
raise
segments_decoded += 1
yield segment
def app_iter_range(self, start, end):

View File

@ -47,7 +47,6 @@ from swift.proxy.controllers.base import \
get_container_info as _real_get_container_info
from swift.common.storage_policy import POLICIES, ECDriverError, \
StoragePolicy, ECStoragePolicy
from test.debug_logger import debug_logger
from test.unit import (
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
@ -4677,33 +4676,64 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
req = swob.Request.blank('/v1/a/c/o')
# sanity check responses1
orig_decode = self.policy.pyeclib_driver.decode
captured_fragments = []
def mock_decode(fragments):
captured_fragments.append(fragments)
return orig_decode(fragments)
# sanity check responses1 and capture frag lengths
responses = responses1[:self.policy.ec_ndata]
status_codes, body_iter, headers = zip(*responses)
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(
md5(resp.body, usedforsecurity=False).hexdigest(),
etag1)
with mock.patch.object(
self.policy.pyeclib_driver, 'decode', mock_decode):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
# read body while decode is mocked
self.assertEqual(
md5(resp.body, usedforsecurity=False).hexdigest(),
etag1)
fragment_lengths1 = [[len(frag) for frag in frags]
for frags in captured_fragments]
self.assertEqual( # sanity check
len(ec_archive_bodies1[0]),
sum([length for length in [lengths[0]
for lengths in fragment_lengths1]]))
# sanity check responses2
# sanity check responses2 and capture frag lengths
captured_fragments = []
responses = responses2[:self.policy.ec_ndata]
status_codes, body_iter, headers = zip(*responses)
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(
md5(resp.body, usedforsecurity=False).hexdigest(),
etag2)
with mock.patch.object(
self.policy.pyeclib_driver, 'decode', mock_decode):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
# read body while decode is mocked
self.assertEqual(
md5(resp.body, usedforsecurity=False).hexdigest(),
etag2)
fragment_lengths2 = [[len(frag) for frag in frags]
for frags in captured_fragments]
self.assertEqual( # sanity check
len(ec_archive_bodies2[0]),
sum([length for length in [lengths[0]
for lengths in fragment_lengths2]]))
# now mix the responses a bit
mix_index = random.randint(0, self.policy.ec_ndata - 1)
mixed_responses = responses1[:self.policy.ec_ndata]
mixed_responses[mix_index] = responses2[mix_index]
num_segments = len(fragment_lengths1)
mixed_lengths = fragment_lengths1[num_segments - 1]
mixed_lengths[mix_index] = fragment_lengths2[
num_segments - 1][mix_index]
status_codes, body_iter, headers = zip(*mixed_responses)
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
@ -4720,8 +4750,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
msg = error_lines[0]
self.assertIn('Error decoding fragments', msg)
self.assertIn('/a/c/o', msg)
log_msg_args, log_msg_kwargs = self.logger.log_dict['error'][0]
self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError)
self.assertIn('Segments decoded: %d' % (num_segments - 1), msg)
self.assertIn(
"[%s]" % ", ".join([str(length) for length in mixed_lengths]), msg)
self.assertIn("Invalid fragment payload in ECPyECLibDriver.decode",
msg)
def test_GET_read_timeout(self):
# verify EC GET behavior when initial batch of nodes time out then
@ -4984,6 +5017,43 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
do_test(490 + 490 + 458)
do_test(490 + 490 + 490 + 458)
def test_GET_trigger_ec_metadata_check_failure(self):
# verify that a warning is logged when there are only k - 1 fragment
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-333]
etag = md5(test_data).hexdigest()
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
bad_bodies = [b'd' * segment_size] * (self.policy.ec_nparity + 1)
ec_archive_bodies = \
ec_archive_bodies[:self.policy.ec_ndata - 1] + bad_bodies
self.logger.clear()
headers = {
'X-Object-Sysmeta-Ec-Etag': etag,
'X-Object-Sysmeta-Ec-Content-Length': len(test_data),
}
responses = [
(200, body, self._add_frag_index(i, headers))
for i, body in enumerate(ec_archive_bodies)]
req = swob.Request.blank('/v1/a/c/o')
status_codes, body_iter, headers = zip(
*responses[:self.policy.ec_ndata])
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 500)
self.assertNotEqual(md5(resp.body).hexdigest(), etag)
error_lines = self.logger.get_lines_for_level('error')
expected_log_line = (
"Error decoding fragments for '/a/c/o'. "
"Segments decoded: 0, Lengths: "
"[490, 490, 490, 490, 490, 490, 490, 490, 490, 490]: "
"pyeclib_c_decode ERROR: Fragment integrity check failed. "
"Please inspect syslog for liberasurecode error report.")
self.assertEqual(expected_log_line, error_lines[0])
def test_GET_read_timeout_resume_mixed_etag(self):
segment_size = self.policy.ec_segment_size
test_data2 = (b'blah1' * segment_size)[:-333]