Pass thread_locals through to frag queue feeder
... so that we don't loose txn-id on error messages Change-Id: Ic9bd98be115b8d97cf5ab879e01cee25aad497af
This commit is contained in:
parent
004052dc65
commit
d17ee8ba14
@ -1388,7 +1388,8 @@ class ECAppIter(object):
|
|||||||
# segment at a time.
|
# segment at a time.
|
||||||
queues = [Queue(1) for _junk in range(len(fragment_iters))]
|
queues = [Queue(1) for _junk in range(len(fragment_iters))]
|
||||||
|
|
||||||
def put_fragments_in_queue(frag_iter, queue):
|
def put_fragments_in_queue(frag_iter, queue, logger_thread_locals):
|
||||||
|
self.logger.thread_locals = logger_thread_locals
|
||||||
try:
|
try:
|
||||||
for fragment in frag_iter:
|
for fragment in frag_iter:
|
||||||
if fragment.startswith(b' '):
|
if fragment.startswith(b' '):
|
||||||
@ -1411,7 +1412,8 @@ class ECAppIter(object):
|
|||||||
|
|
||||||
with ContextPool(len(fragment_iters)) as pool:
|
with ContextPool(len(fragment_iters)) as pool:
|
||||||
for frag_iter, queue in zip(fragment_iters, queues):
|
for frag_iter, queue in zip(fragment_iters, queues):
|
||||||
pool.spawn(put_fragments_in_queue, frag_iter, queue)
|
pool.spawn(put_fragments_in_queue, frag_iter, queue,
|
||||||
|
self.logger.thread_locals)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
fragments = []
|
fragments = []
|
||||||
|
@ -4173,6 +4173,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
self.assertIn('retrying', line)
|
self.assertIn('retrying', line)
|
||||||
for line in error_lines[nparity:]:
|
for line in error_lines[nparity:]:
|
||||||
self.assertIn('ChunkReadTimeout (0.01s)', line)
|
self.assertIn('ChunkReadTimeout (0.01s)', line)
|
||||||
|
for line in self.logger.logger.records['ERROR']:
|
||||||
|
self.assertIn(req.headers['x-trans-id'], line)
|
||||||
|
|
||||||
def test_GET_read_timeout_resume(self):
|
def test_GET_read_timeout_resume(self):
|
||||||
segment_size = self.policy.ec_segment_size
|
segment_size = self.policy.ec_segment_size
|
||||||
@ -4203,8 +4205,45 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
error_lines = self.logger.get_lines_for_level('error')
|
error_lines = self.logger.get_lines_for_level('error')
|
||||||
self.assertEqual(1, len(error_lines))
|
self.assertEqual(1, len(error_lines))
|
||||||
self.assertIn('retrying', error_lines[0])
|
self.assertIn('retrying', error_lines[0])
|
||||||
retry_line = self.logger.logger.records['ERROR'][0]
|
for line in self.logger.logger.records['ERROR']:
|
||||||
self.assertIn(req.headers['x-trans-id'], retry_line)
|
self.assertIn(req.headers['x-trans-id'], line)
|
||||||
|
|
||||||
|
def test_GET_read_timeout_fails(self):
|
||||||
|
segment_size = self.policy.ec_segment_size
|
||||||
|
test_data = (b'test' * segment_size)[:-333]
|
||||||
|
etag = md5(test_data).hexdigest()
|
||||||
|
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
|
||||||
|
headers = {
|
||||||
|
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||||
|
'X-Object-Sysmeta-Ec-Content-Length': len(test_data),
|
||||||
|
}
|
||||||
|
self.app.recoverable_node_timeout = 0.05
|
||||||
|
# first one is slow
|
||||||
|
responses = [(200, SlowBody(ec_archive_bodies[0], 0.1),
|
||||||
|
self._add_frag_index(0, headers))]
|
||||||
|
# ... the rest are fine
|
||||||
|
responses += [(200, body, self._add_frag_index(i, headers))
|
||||||
|
for i, body in enumerate(ec_archive_bodies[1:], start=1)]
|
||||||
|
|
||||||
|
req = swob.Request.blank('/v1/a/c/o')
|
||||||
|
|
||||||
|
status_codes, body_iter, headers = zip(
|
||||||
|
*responses[:self.policy.ec_ndata])
|
||||||
|
# I don't know why fast_forward would blow up, but if it does we
|
||||||
|
# re-raise the ChunkReadTimeout and still want a txn-id
|
||||||
|
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||||
|
headers=headers), \
|
||||||
|
mock.patch(
|
||||||
|
'swift.proxy.controllers.obj.ECFragGetter.fast_forward',
|
||||||
|
side_effect=ValueError()):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 200)
|
||||||
|
self.assertNotEqual(md5(resp.body).hexdigest(), etag)
|
||||||
|
error_lines = self.logger.get_lines_for_level('error')
|
||||||
|
self.assertEqual(1, len(error_lines))
|
||||||
|
self.assertIn('Timeout fetching', error_lines[0])
|
||||||
|
for line in self.logger.logger.records['ERROR']:
|
||||||
|
self.assertIn(req.headers['x-trans-id'], line)
|
||||||
|
|
||||||
def test_GET_read_timeout_resume_mixed_etag(self):
|
def test_GET_read_timeout_resume_mixed_etag(self):
|
||||||
segment_size = self.policy.ec_segment_size
|
segment_size = self.policy.ec_segment_size
|
||||||
@ -4294,6 +4333,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
self.assertEqual(2, len(error_lines))
|
self.assertEqual(2, len(error_lines))
|
||||||
for line in error_lines:
|
for line in error_lines:
|
||||||
self.assertIn('retrying', line)
|
self.assertIn('retrying', line)
|
||||||
|
for line in self.logger.logger.records['ERROR']:
|
||||||
|
self.assertIn(req.headers['x-trans-id'], line)
|
||||||
etag2_conns = []
|
etag2_conns = []
|
||||||
for conn in log.responses:
|
for conn in log.responses:
|
||||||
if conn.headers.get('X-Object-Sysmeta-Ec-Etag') == etag2:
|
if conn.headers.get('X-Object-Sysmeta-Ec-Etag') == etag2:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user