Merge "Add some extra logging when EC decode fails"
This commit is contained in:
commit
b92bd23299
@ -1417,6 +1417,7 @@ class ECAppIter(object):
|
||||
queue.put(None)
|
||||
frag_iter.close()
|
||||
|
||||
segments_decoded = 0
|
||||
with ContextPool(len(fragment_iters)) as pool:
|
||||
for frag_iter, queue in zip(fragment_iters, queues):
|
||||
pool.spawn(put_fragments_in_queue, frag_iter, queue,
|
||||
@ -1445,11 +1446,17 @@ class ECAppIter(object):
|
||||
break
|
||||
try:
|
||||
segment = self.policy.pyeclib_driver.decode(fragments)
|
||||
except ECDriverError:
|
||||
self.logger.exception("Error decoding fragments for %r",
|
||||
quote(self.path))
|
||||
except ECDriverError as err:
|
||||
self.logger.error(
|
||||
"Error decoding fragments for %r. "
|
||||
"Segments decoded: %d, "
|
||||
"Lengths: [%s]: %s" % (
|
||||
quote(self.path), segments_decoded,
|
||||
', '.join(map(str, map(len, fragments))),
|
||||
str(err)))
|
||||
raise
|
||||
|
||||
segments_decoded += 1
|
||||
yield segment
|
||||
|
||||
def app_iter_range(self, start, end):
|
||||
|
@ -47,7 +47,6 @@ from swift.proxy.controllers.base import \
|
||||
get_container_info as _real_get_container_info
|
||||
from swift.common.storage_policy import POLICIES, ECDriverError, \
|
||||
StoragePolicy, ECStoragePolicy
|
||||
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import (
|
||||
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
|
||||
@ -4677,33 +4676,64 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
# sanity check responses1
|
||||
orig_decode = self.policy.pyeclib_driver.decode
|
||||
captured_fragments = []
|
||||
|
||||
def mock_decode(fragments):
|
||||
captured_fragments.append(fragments)
|
||||
return orig_decode(fragments)
|
||||
|
||||
# sanity check responses1 and capture frag lengths
|
||||
responses = responses1[:self.policy.ec_ndata]
|
||||
status_codes, body_iter, headers = zip(*responses)
|
||||
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(
|
||||
md5(resp.body, usedforsecurity=False).hexdigest(),
|
||||
etag1)
|
||||
with mock.patch.object(
|
||||
self.policy.pyeclib_driver, 'decode', mock_decode):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
# read body while decode is mocked
|
||||
self.assertEqual(
|
||||
md5(resp.body, usedforsecurity=False).hexdigest(),
|
||||
etag1)
|
||||
fragment_lengths1 = [[len(frag) for frag in frags]
|
||||
for frags in captured_fragments]
|
||||
self.assertEqual( # sanity check
|
||||
len(ec_archive_bodies1[0]),
|
||||
sum([length for length in [lengths[0]
|
||||
for lengths in fragment_lengths1]]))
|
||||
|
||||
# sanity check responses2
|
||||
# sanity check responses2 and capture frag lengths
|
||||
captured_fragments = []
|
||||
responses = responses2[:self.policy.ec_ndata]
|
||||
status_codes, body_iter, headers = zip(*responses)
|
||||
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(
|
||||
md5(resp.body, usedforsecurity=False).hexdigest(),
|
||||
etag2)
|
||||
with mock.patch.object(
|
||||
self.policy.pyeclib_driver, 'decode', mock_decode):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
# read body while decode is mocked
|
||||
self.assertEqual(
|
||||
md5(resp.body, usedforsecurity=False).hexdigest(),
|
||||
etag2)
|
||||
fragment_lengths2 = [[len(frag) for frag in frags]
|
||||
for frags in captured_fragments]
|
||||
self.assertEqual( # sanity check
|
||||
len(ec_archive_bodies2[0]),
|
||||
sum([length for length in [lengths[0]
|
||||
for lengths in fragment_lengths2]]))
|
||||
|
||||
# now mix the responses a bit
|
||||
mix_index = random.randint(0, self.policy.ec_ndata - 1)
|
||||
mixed_responses = responses1[:self.policy.ec_ndata]
|
||||
mixed_responses[mix_index] = responses2[mix_index]
|
||||
|
||||
num_segments = len(fragment_lengths1)
|
||||
mixed_lengths = fragment_lengths1[num_segments - 1]
|
||||
mixed_lengths[mix_index] = fragment_lengths2[
|
||||
num_segments - 1][mix_index]
|
||||
|
||||
status_codes, body_iter, headers = zip(*mixed_responses)
|
||||
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
@ -4720,8 +4750,11 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
msg = error_lines[0]
|
||||
self.assertIn('Error decoding fragments', msg)
|
||||
self.assertIn('/a/c/o', msg)
|
||||
log_msg_args, log_msg_kwargs = self.logger.log_dict['error'][0]
|
||||
self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError)
|
||||
self.assertIn('Segments decoded: %d' % (num_segments - 1), msg)
|
||||
self.assertIn(
|
||||
"[%s]" % ", ".join([str(length) for length in mixed_lengths]), msg)
|
||||
self.assertIn("Invalid fragment payload in ECPyECLibDriver.decode",
|
||||
msg)
|
||||
|
||||
def test_GET_read_timeout(self):
|
||||
# verify EC GET behavior when initial batch of nodes time out then
|
||||
@ -4984,6 +5017,43 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
do_test(490 + 490 + 458)
|
||||
do_test(490 + 490 + 490 + 458)
|
||||
|
||||
def test_GET_trigger_ec_metadata_check_failure(self):
|
||||
# verify that a warning is logged when there are only k - 1 fragment
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-333]
|
||||
etag = md5(test_data).hexdigest()
|
||||
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
|
||||
bad_bodies = [b'd' * segment_size] * (self.policy.ec_nparity + 1)
|
||||
ec_archive_bodies = \
|
||||
ec_archive_bodies[:self.policy.ec_ndata - 1] + bad_bodies
|
||||
|
||||
self.logger.clear()
|
||||
headers = {
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(test_data),
|
||||
}
|
||||
responses = [
|
||||
(200, body, self._add_frag_index(i, headers))
|
||||
for i, body in enumerate(ec_archive_bodies)]
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
status_codes, body_iter, headers = zip(
|
||||
*responses[:self.policy.ec_ndata])
|
||||
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 500)
|
||||
self.assertNotEqual(md5(resp.body).hexdigest(), etag)
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
expected_log_line = (
|
||||
"Error decoding fragments for '/a/c/o'. "
|
||||
"Segments decoded: 0, Lengths: "
|
||||
"[490, 490, 490, 490, 490, 490, 490, 490, 490, 490]: "
|
||||
"pyeclib_c_decode ERROR: Fragment integrity check failed. "
|
||||
"Please inspect syslog for liberasurecode error report.")
|
||||
self.assertEqual(expected_log_line, error_lines[0])
|
||||
|
||||
def test_GET_read_timeout_resume_mixed_etag(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data2 = (b'blah1' * segment_size)[:-333]
|
||||
|
Loading…
x
Reference in New Issue
Block a user