Merge "proxy: Include thread_locals when spawning _fragment_GET_request"
This commit is contained in:
commit
4b2d8c2258
@ -2346,7 +2346,7 @@ def is_good_source(status):
|
|||||||
class ECFragGetter(object):
|
class ECFragGetter(object):
|
||||||
|
|
||||||
def __init__(self, app, req, node_iter, partition, policy, path,
|
def __init__(self, app, req, node_iter, partition, policy, path,
|
||||||
backend_headers, header_provider=None):
|
backend_headers, header_provider, logger_thread_locals):
|
||||||
self.app = app
|
self.app = app
|
||||||
self.req = req
|
self.req = req
|
||||||
self.node_iter = node_iter
|
self.node_iter = node_iter
|
||||||
@ -2359,6 +2359,7 @@ class ECFragGetter(object):
|
|||||||
self.skip_bytes = 0
|
self.skip_bytes = 0
|
||||||
self.bytes_used_from_backend = 0
|
self.bytes_used_from_backend = 0
|
||||||
self.source = None
|
self.source = None
|
||||||
|
self.logger_thread_locals = logger_thread_locals
|
||||||
|
|
||||||
def fast_forward(self, num_bytes):
|
def fast_forward(self, num_bytes):
|
||||||
"""
|
"""
|
||||||
@ -2703,8 +2704,8 @@ class ECFragGetter(object):
|
|||||||
else:
|
else:
|
||||||
return HeaderKeyDict()
|
return HeaderKeyDict()
|
||||||
|
|
||||||
def _make_node_request(self, node, node_timeout, logger_thread_locals):
|
def _make_node_request(self, node, node_timeout):
|
||||||
self.app.logger.thread_locals = logger_thread_locals
|
self.app.logger.thread_locals = self.logger_thread_locals
|
||||||
req_headers = dict(self.backend_headers)
|
req_headers = dict(self.backend_headers)
|
||||||
ip, port = get_ip_port(node, req_headers)
|
ip, port = get_ip_port(node, req_headers)
|
||||||
req_headers.update(self.header_provider())
|
req_headers.update(self.header_provider())
|
||||||
@ -2772,8 +2773,7 @@ class ECFragGetter(object):
|
|||||||
self.status = self.reason = self.body = self.source_headers = None
|
self.status = self.reason = self.body = self.source_headers = None
|
||||||
for node in self.node_iter:
|
for node in self.node_iter:
|
||||||
source = self._make_node_request(
|
source = self._make_node_request(
|
||||||
node, self.app.recoverable_node_timeout,
|
node, self.app.recoverable_node_timeout)
|
||||||
self.app.logger.thread_locals)
|
|
||||||
|
|
||||||
if source:
|
if source:
|
||||||
self.node = node
|
self.node = node
|
||||||
@ -2794,17 +2794,19 @@ class ECFragGetter(object):
|
|||||||
|
|
||||||
@ObjectControllerRouter.register(EC_POLICY)
|
@ObjectControllerRouter.register(EC_POLICY)
|
||||||
class ECObjectController(BaseObjectController):
|
class ECObjectController(BaseObjectController):
|
||||||
def _fragment_GET_request(self, req, node_iter, partition, policy,
|
def _fragment_GET_request(
|
||||||
header_provider=None):
|
self, req, node_iter, partition, policy,
|
||||||
|
header_provider, logger_thread_locals):
|
||||||
"""
|
"""
|
||||||
Makes a GET request for a fragment.
|
Makes a GET request for a fragment.
|
||||||
"""
|
"""
|
||||||
|
self.app.logger.thread_locals = logger_thread_locals
|
||||||
backend_headers = self.generate_request_headers(
|
backend_headers = self.generate_request_headers(
|
||||||
req, additional=req.headers)
|
req, additional=req.headers)
|
||||||
|
|
||||||
getter = ECFragGetter(self.app, req, node_iter, partition,
|
getter = ECFragGetter(self.app, req, node_iter, partition,
|
||||||
policy, req.swift_entity_path, backend_headers,
|
policy, req.swift_entity_path, backend_headers,
|
||||||
header_provider=header_provider)
|
header_provider, logger_thread_locals)
|
||||||
return (getter, getter.response_parts_iter(req))
|
return (getter, getter.response_parts_iter(req))
|
||||||
|
|
||||||
def _convert_range(self, req, policy):
|
def _convert_range(self, req, policy):
|
||||||
@ -2860,7 +2862,7 @@ class ECObjectController(BaseObjectController):
|
|||||||
return range_specs
|
return range_specs
|
||||||
|
|
||||||
def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy,
|
def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy,
|
||||||
buckets, feeder_q):
|
buckets, feeder_q, logger_thread_locals):
|
||||||
timeout = self.app.get_policy_options(policy).concurrency_timeout
|
timeout = self.app.get_policy_options(policy).concurrency_timeout
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@ -2871,7 +2873,8 @@ class ECObjectController(BaseObjectController):
|
|||||||
# primary we won't find out until the next pass
|
# primary we won't find out until the next pass
|
||||||
pile.spawn(self._fragment_GET_request,
|
pile.spawn(self._fragment_GET_request,
|
||||||
req, safe_iter, partition,
|
req, safe_iter, partition,
|
||||||
policy, buckets.get_extra_headers)
|
policy, buckets.get_extra_headers,
|
||||||
|
logger_thread_locals)
|
||||||
else:
|
else:
|
||||||
# ran out of primaries
|
# ran out of primaries
|
||||||
break
|
break
|
||||||
@ -2914,13 +2917,15 @@ class ECObjectController(BaseObjectController):
|
|||||||
for node_count in range(ec_request_count):
|
for node_count in range(ec_request_count):
|
||||||
pile.spawn(self._fragment_GET_request,
|
pile.spawn(self._fragment_GET_request,
|
||||||
req, safe_iter, partition,
|
req, safe_iter, partition,
|
||||||
policy, buckets.get_extra_headers)
|
policy, buckets.get_extra_headers,
|
||||||
|
self.app.logger.thread_locals)
|
||||||
|
|
||||||
feeder_q = None
|
feeder_q = None
|
||||||
if self.app.get_policy_options(policy).concurrent_gets:
|
if self.app.get_policy_options(policy).concurrent_gets:
|
||||||
feeder_q = Queue()
|
feeder_q = Queue()
|
||||||
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
||||||
partition, policy, buckets, feeder_q)
|
partition, policy, buckets, feeder_q,
|
||||||
|
self.app.logger.thread_locals)
|
||||||
|
|
||||||
extra_requests = 0
|
extra_requests = 0
|
||||||
# max_extra_requests is an arbitrary hard limit for spawning extra
|
# max_extra_requests is an arbitrary hard limit for spawning extra
|
||||||
@ -2947,9 +2952,9 @@ class ECObjectController(BaseObjectController):
|
|||||||
if requests_available and (
|
if requests_available and (
|
||||||
buckets.shortfall > pile._pending or bad_resp):
|
buckets.shortfall > pile._pending or bad_resp):
|
||||||
extra_requests += 1
|
extra_requests += 1
|
||||||
pile.spawn(self._fragment_GET_request,
|
pile.spawn(self._fragment_GET_request, req, safe_iter,
|
||||||
req, safe_iter, partition,
|
partition, policy, buckets.get_extra_headers,
|
||||||
policy, buckets.get_extra_headers)
|
self.app.logger.thread_locals)
|
||||||
if feeder_q:
|
if feeder_q:
|
||||||
feeder_q.put('stop')
|
feeder_q.put('stop')
|
||||||
|
|
||||||
|
@ -674,10 +674,13 @@ class DebugLogger(FakeLogger):
|
|||||||
FakeLogger.__init__(self, *args, **kwargs)
|
FakeLogger.__init__(self, *args, **kwargs)
|
||||||
self.formatter = DebugSwiftLogFormatter(
|
self.formatter = DebugSwiftLogFormatter(
|
||||||
"%(server)s %(levelname)s: %(message)s")
|
"%(server)s %(levelname)s: %(message)s")
|
||||||
|
self.records = defaultdict(list)
|
||||||
|
|
||||||
def handle(self, record):
|
def handle(self, record):
|
||||||
self._handle(record)
|
self._handle(record)
|
||||||
print(self.formatter.format(record))
|
formatted = self.formatter.format(record)
|
||||||
|
print(formatted)
|
||||||
|
self.records[record.levelname].append(formatted)
|
||||||
|
|
||||||
|
|
||||||
class DebugLogAdapter(utils.LogAdapter):
|
class DebugLogAdapter(utils.LogAdapter):
|
||||||
|
@ -2494,7 +2494,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
feeder_q.get.side_effect = feeder_timeout
|
feeder_q.get.side_effect = feeder_timeout
|
||||||
controller.feed_remaining_primaries(
|
controller.feed_remaining_primaries(
|
||||||
safe_iter, pile, req, 0, self.policy,
|
safe_iter, pile, req, 0, self.policy,
|
||||||
mock.MagicMock(), feeder_q)
|
mock.MagicMock(), feeder_q, mock.MagicMock())
|
||||||
expected_timeout = self.app.get_policy_options(
|
expected_timeout = self.app.get_policy_options(
|
||||||
self.policy).concurrency_timeout
|
self.policy).concurrency_timeout
|
||||||
expected_call = mock.call(timeout=expected_timeout)
|
expected_call = mock.call(timeout=expected_timeout)
|
||||||
@ -2505,12 +2505,21 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
def test_GET_timeout(self):
|
def test_GET_timeout(self):
|
||||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||||
self.app.recoverable_node_timeout = 0.01
|
self.app.recoverable_node_timeout = 0.01
|
||||||
codes = [FakeStatus(404, response_sleep=1.0)] + \
|
codes = [FakeStatus(404, response_sleep=1.0)] * 2 + \
|
||||||
[200] * (self.policy.ec_ndata)
|
[200] * (self.policy.ec_ndata)
|
||||||
with mocked_http_conn(*codes) as log:
|
with mocked_http_conn(*codes) as log:
|
||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
self.assertEqual(self.policy.ec_ndata + 1, len(log.requests))
|
self.assertEqual(self.policy.ec_ndata + 2, len(log.requests))
|
||||||
|
self.assertEqual(
|
||||||
|
len(self.logger.logger.records['ERROR']), 2,
|
||||||
|
'Expected 2 ERROR lines, got %r' % (
|
||||||
|
self.logger.logger.records['ERROR'], ))
|
||||||
|
for retry_line in self.logger.logger.records['ERROR']:
|
||||||
|
self.assertIn('ERROR with Object server', retry_line)
|
||||||
|
self.assertIn('Trying to GET', retry_line)
|
||||||
|
self.assertIn('Timeout (0.01s)', retry_line)
|
||||||
|
self.assertIn(req.headers['x-trans-id'], retry_line)
|
||||||
|
|
||||||
def test_GET_with_slow_primaries(self):
|
def test_GET_with_slow_primaries(self):
|
||||||
segment_size = self.policy.ec_segment_size
|
segment_size = self.policy.ec_segment_size
|
||||||
@ -4194,6 +4203,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
error_lines = self.logger.get_lines_for_level('error')
|
error_lines = self.logger.get_lines_for_level('error')
|
||||||
self.assertEqual(1, len(error_lines))
|
self.assertEqual(1, len(error_lines))
|
||||||
self.assertIn('retrying', error_lines[0])
|
self.assertIn('retrying', error_lines[0])
|
||||||
|
retry_line = self.logger.logger.records['ERROR'][0]
|
||||||
|
self.assertIn(req.headers['x-trans-id'], retry_line)
|
||||||
|
|
||||||
def test_GET_read_timeout_resume_mixed_etag(self):
|
def test_GET_read_timeout_resume_mixed_etag(self):
|
||||||
segment_size = self.policy.ec_segment_size
|
segment_size = self.policy.ec_segment_size
|
||||||
|
Loading…
x
Reference in New Issue
Block a user