From d17ee8ba14cc88e832da80a939a4129b9eb908f0 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Fri, 25 Sep 2020 11:46:00 -0500 Subject: [PATCH] Pass thread_locals through to frag queue feeder ... so that we don't loose txn-id on error messages Change-Id: Ic9bd98be115b8d97cf5ab879e01cee25aad497af --- swift/proxy/controllers/obj.py | 6 ++-- test/unit/proxy/controllers/test_obj.py | 45 +++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index cd924b0c61..53ae1856f2 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -1388,7 +1388,8 @@ class ECAppIter(object): # segment at a time. queues = [Queue(1) for _junk in range(len(fragment_iters))] - def put_fragments_in_queue(frag_iter, queue): + def put_fragments_in_queue(frag_iter, queue, logger_thread_locals): + self.logger.thread_locals = logger_thread_locals try: for fragment in frag_iter: if fragment.startswith(b' '): @@ -1411,7 +1412,8 @@ class ECAppIter(object): with ContextPool(len(fragment_iters)) as pool: for frag_iter, queue in zip(fragment_iters, queues): - pool.spawn(put_fragments_in_queue, frag_iter, queue) + pool.spawn(put_fragments_in_queue, frag_iter, queue, + self.logger.thread_locals) while True: fragments = [] diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 3f73dd8d5a..04315e1ee5 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -4173,6 +4173,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertIn('retrying', line) for line in error_lines[nparity:]: self.assertIn('ChunkReadTimeout (0.01s)', line) + for line in self.logger.logger.records['ERROR']: + self.assertIn(req.headers['x-trans-id'], line) def test_GET_read_timeout_resume(self): segment_size = self.policy.ec_segment_size @@ -4203,8 +4205,45 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): error_lines = self.logger.get_lines_for_level('error') self.assertEqual(1, len(error_lines)) self.assertIn('retrying', error_lines[0]) - retry_line = self.logger.logger.records['ERROR'][0] - self.assertIn(req.headers['x-trans-id'], retry_line) + for line in self.logger.logger.records['ERROR']: + self.assertIn(req.headers['x-trans-id'], line) + + def test_GET_read_timeout_fails(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-333] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + headers = { + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(test_data), + } + self.app.recoverable_node_timeout = 0.05 + # first one is slow + responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), + self._add_frag_index(0, headers))] + # ... the rest are fine + responses += [(200, body, self._add_frag_index(i, headers)) + for i, body in enumerate(ec_archive_bodies[1:], start=1)] + + req = swob.Request.blank('/v1/a/c/o') + + status_codes, body_iter, headers = zip( + *responses[:self.policy.ec_ndata]) + # I don't know why fast_forward would blow up, but if it does we + # re-raise the ChunkReadTimeout and still want a txn-id + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers), \ + mock.patch( + 'swift.proxy.controllers.obj.ECFragGetter.fast_forward', + side_effect=ValueError()): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertNotEqual(md5(resp.body).hexdigest(), etag) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + self.assertIn('Timeout fetching', error_lines[0]) + for line in self.logger.logger.records['ERROR']: + self.assertIn(req.headers['x-trans-id'], line) def test_GET_read_timeout_resume_mixed_etag(self): segment_size = self.policy.ec_segment_size @@ -4294,6 +4333,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(2, len(error_lines)) for line in error_lines: self.assertIn('retrying', line) + for line in self.logger.logger.records['ERROR']: + self.assertIn(req.headers['x-trans-id'], line) etag2_conns = [] for conn in log.responses: if conn.headers.get('X-Object-Sysmeta-Ec-Etag') == etag2: