proxy: Include thread_locals when spawning _fragment_GET_request
Otherwise, we miss out on transaction id and client IP information when timeouts pop. Closes-Bug: #1892421 Change-Id: I6dea3ccf780bcc703db8447a2ef13c33838ff12d
This commit is contained in:
parent
06091172c2
commit
d5625abf60
@ -2348,7 +2348,7 @@ def is_good_source(status):
|
||||
class ECFragGetter(object):
|
||||
|
||||
def __init__(self, app, req, node_iter, partition, policy, path,
|
||||
backend_headers, header_provider=None):
|
||||
backend_headers, header_provider, logger_thread_locals):
|
||||
self.app = app
|
||||
self.req = req
|
||||
self.node_iter = node_iter
|
||||
@ -2361,6 +2361,7 @@ class ECFragGetter(object):
|
||||
self.skip_bytes = 0
|
||||
self.bytes_used_from_backend = 0
|
||||
self.source = None
|
||||
self.logger_thread_locals = logger_thread_locals
|
||||
|
||||
def fast_forward(self, num_bytes):
|
||||
"""
|
||||
@ -2705,8 +2706,8 @@ class ECFragGetter(object):
|
||||
else:
|
||||
return HeaderKeyDict()
|
||||
|
||||
def _make_node_request(self, node, node_timeout, logger_thread_locals):
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
def _make_node_request(self, node, node_timeout):
|
||||
self.app.logger.thread_locals = self.logger_thread_locals
|
||||
req_headers = dict(self.backend_headers)
|
||||
ip, port = get_ip_port(node, req_headers)
|
||||
req_headers.update(self.header_provider())
|
||||
@ -2774,8 +2775,7 @@ class ECFragGetter(object):
|
||||
self.status = self.reason = self.body = self.source_headers = None
|
||||
for node in self.node_iter:
|
||||
source = self._make_node_request(
|
||||
node, self.app.recoverable_node_timeout,
|
||||
self.app.logger.thread_locals)
|
||||
node, self.app.recoverable_node_timeout)
|
||||
|
||||
if source:
|
||||
self.node = node
|
||||
@ -2796,17 +2796,19 @@ class ECFragGetter(object):
|
||||
|
||||
@ObjectControllerRouter.register(EC_POLICY)
|
||||
class ECObjectController(BaseObjectController):
|
||||
def _fragment_GET_request(self, req, node_iter, partition, policy,
|
||||
header_provider=None):
|
||||
def _fragment_GET_request(
|
||||
self, req, node_iter, partition, policy,
|
||||
header_provider, logger_thread_locals):
|
||||
"""
|
||||
Makes a GET request for a fragment.
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
backend_headers = self.generate_request_headers(
|
||||
req, additional=req.headers)
|
||||
|
||||
getter = ECFragGetter(self.app, req, node_iter, partition,
|
||||
policy, req.swift_entity_path, backend_headers,
|
||||
header_provider=header_provider)
|
||||
header_provider, logger_thread_locals)
|
||||
return (getter, getter.response_parts_iter(req))
|
||||
|
||||
def _convert_range(self, req, policy):
|
||||
@ -2862,7 +2864,7 @@ class ECObjectController(BaseObjectController):
|
||||
return range_specs
|
||||
|
||||
def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy,
|
||||
buckets, feeder_q):
|
||||
buckets, feeder_q, logger_thread_locals):
|
||||
timeout = self.app.get_policy_options(policy).concurrency_timeout
|
||||
while True:
|
||||
try:
|
||||
@ -2873,7 +2875,8 @@ class ECObjectController(BaseObjectController):
|
||||
# primary we won't find out until the next pass
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, safe_iter, partition,
|
||||
policy, buckets.get_extra_headers)
|
||||
policy, buckets.get_extra_headers,
|
||||
logger_thread_locals)
|
||||
else:
|
||||
# ran out of primaries
|
||||
break
|
||||
@ -2915,13 +2918,15 @@ class ECObjectController(BaseObjectController):
|
||||
for node_count in range(ec_request_count):
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, safe_iter, partition,
|
||||
policy, buckets.get_extra_headers)
|
||||
policy, buckets.get_extra_headers,
|
||||
self.app.logger.thread_locals)
|
||||
|
||||
feeder_q = None
|
||||
if self.app.get_policy_options(policy).concurrent_gets:
|
||||
feeder_q = Queue()
|
||||
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
||||
partition, policy, buckets, feeder_q)
|
||||
partition, policy, buckets, feeder_q,
|
||||
self.app.logger.thread_locals)
|
||||
|
||||
extra_requests = 0
|
||||
# max_extra_requests is an arbitrary hard limit for spawning extra
|
||||
@ -2948,9 +2953,9 @@ class ECObjectController(BaseObjectController):
|
||||
if requests_available and (
|
||||
buckets.shortfall > pile._pending or bad_resp):
|
||||
extra_requests += 1
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, safe_iter, partition,
|
||||
policy, buckets.get_extra_headers)
|
||||
pile.spawn(self._fragment_GET_request, req, safe_iter,
|
||||
partition, policy, buckets.get_extra_headers,
|
||||
self.app.logger.thread_locals)
|
||||
if feeder_q:
|
||||
feeder_q.put('stop')
|
||||
|
||||
|
@ -674,10 +674,13 @@ class DebugLogger(FakeLogger):
|
||||
FakeLogger.__init__(self, *args, **kwargs)
|
||||
self.formatter = DebugSwiftLogFormatter(
|
||||
"%(server)s %(levelname)s: %(message)s")
|
||||
self.records = defaultdict(list)
|
||||
|
||||
def handle(self, record):
|
||||
self._handle(record)
|
||||
print(self.formatter.format(record))
|
||||
formatted = self.formatter.format(record)
|
||||
print(formatted)
|
||||
self.records[record.levelname].append(formatted)
|
||||
|
||||
|
||||
class DebugLogAdapter(utils.LogAdapter):
|
||||
|
@ -2443,7 +2443,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
feeder_q.get.side_effect = feeder_timeout
|
||||
controller.feed_remaining_primaries(
|
||||
safe_iter, pile, req, 0, self.policy,
|
||||
mock.MagicMock(), feeder_q)
|
||||
mock.MagicMock(), feeder_q, mock.MagicMock())
|
||||
expected_timeout = self.app.get_policy_options(
|
||||
self.policy).concurrency_timeout
|
||||
expected_call = mock.call(timeout=expected_timeout)
|
||||
@ -2454,12 +2454,21 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
def test_GET_timeout(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
codes = [FakeStatus(404, response_sleep=1.0)] + \
|
||||
codes = [FakeStatus(404, response_sleep=1.0)] * 2 + \
|
||||
[200] * (self.policy.ec_ndata)
|
||||
with mocked_http_conn(*codes) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(self.policy.ec_ndata + 1, len(log.requests))
|
||||
self.assertEqual(self.policy.ec_ndata + 2, len(log.requests))
|
||||
self.assertEqual(
|
||||
len(self.logger.logger.records['ERROR']), 2,
|
||||
'Expected 2 ERROR lines, got %r' % (
|
||||
self.logger.logger.records['ERROR'], ))
|
||||
for retry_line in self.logger.logger.records['ERROR']:
|
||||
self.assertIn('ERROR with Object server', retry_line)
|
||||
self.assertIn('Trying to GET', retry_line)
|
||||
self.assertIn('Timeout (0.01s)', retry_line)
|
||||
self.assertIn(req.headers['x-trans-id'], retry_line)
|
||||
|
||||
def test_GET_with_slow_primaries(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
@ -4143,6 +4152,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(error_lines))
|
||||
self.assertIn('retrying', error_lines[0])
|
||||
retry_line = self.logger.logger.records['ERROR'][0]
|
||||
self.assertIn(req.headers['x-trans-id'], retry_line)
|
||||
|
||||
def test_GET_read_timeout_resume_mixed_etag(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
|
Loading…
x
Reference in New Issue
Block a user