Merge "ec: Close down some unused responses more quickly"
This commit is contained in:
commit
06091172c2
@ -2087,6 +2087,14 @@ class ECGetResponseBucket(object):
|
||||
result = self.policy.ec_ndata - (len(self.get_responses()) + len(alts))
|
||||
return max(result, 0)
|
||||
|
||||
def close_conns(self):
|
||||
"""
|
||||
Close bucket's responses; they won't be used for a client response.
|
||||
"""
|
||||
for getter, frag_iter in self.get_responses():
|
||||
if getattr(getter.source, 'swift_conn', None):
|
||||
close_swift_conn(getter.source)
|
||||
|
||||
def __str__(self):
|
||||
# return a string summarising bucket state, useful for debugging.
|
||||
return '<%s, %s, %s, %s(%s), %s>' \
|
||||
@ -2227,6 +2235,15 @@ class ECGetResponseCollection(object):
|
||||
return bucket
|
||||
return self.least_bad_bucket
|
||||
|
||||
def choose_best_bucket(self):
|
||||
best_bucket = self.best_bucket
|
||||
# it's now or never -- close down any other requests
|
||||
for bucket in self.buckets.values():
|
||||
if bucket is best_bucket:
|
||||
continue
|
||||
bucket.close_conns()
|
||||
return best_bucket
|
||||
|
||||
@property
|
||||
def least_bad_bucket(self):
|
||||
"""
|
||||
@ -2343,6 +2360,7 @@ class ECFragGetter(object):
|
||||
self.client_chunk_size = policy.fragment_size
|
||||
self.skip_bytes = 0
|
||||
self.bytes_used_from_backend = 0
|
||||
self.source = None
|
||||
|
||||
def fast_forward(self, num_bytes):
|
||||
"""
|
||||
@ -2452,19 +2470,15 @@ class ECFragGetter(object):
|
||||
|
||||
def response_parts_iter(self, req):
|
||||
try:
|
||||
source, node = next(self.source_and_node_iter)
|
||||
self.source, self.node = next(self.source_and_node_iter)
|
||||
except StopIteration:
|
||||
return
|
||||
it = None
|
||||
if source:
|
||||
it = self._get_response_parts_iter(req, node, source)
|
||||
if self.source:
|
||||
it = self._get_response_parts_iter(req)
|
||||
return it
|
||||
|
||||
def _get_response_parts_iter(self, req, node, source):
|
||||
# Someday we can replace this [mess] with python 3's "nonlocal"
|
||||
source = [source]
|
||||
node = [node]
|
||||
|
||||
def _get_response_parts_iter(self, req):
|
||||
try:
|
||||
client_chunk_size = self.client_chunk_size
|
||||
node_timeout = self.app.recoverable_node_timeout
|
||||
@ -2473,7 +2487,7 @@ class ECFragGetter(object):
|
||||
# on it, so no IO is performed.
|
||||
parts_iter = [
|
||||
http_response_to_document_iters(
|
||||
source[0], read_chunk_size=self.app.object_chunk_size)]
|
||||
self.source, read_chunk_size=self.app.object_chunk_size)]
|
||||
|
||||
def get_next_doc_part():
|
||||
while True:
|
||||
@ -2497,13 +2511,13 @@ class ECFragGetter(object):
|
||||
new_source, new_node = self._dig_for_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
node[0], _('Trying to read object during '
|
||||
self.node, _('Trying to read object during '
|
||||
'GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
@ -2539,13 +2553,13 @@ class ECFragGetter(object):
|
||||
new_source, new_node = self._dig_for_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
node[0], _('Trying to read object during '
|
||||
self.node, _('Trying to read object during '
|
||||
'GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
@ -2650,7 +2664,7 @@ class ECFragGetter(object):
|
||||
part_iter.close()
|
||||
|
||||
except ChunkReadTimeout:
|
||||
self.app.exception_occurred(node[0], _('Object'),
|
||||
self.app.exception_occurred(self.node, _('Object'),
|
||||
_('Trying to read during GET'))
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
@ -2677,8 +2691,8 @@ class ECFragGetter(object):
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
|
||||
@property
|
||||
def last_status(self):
|
||||
@ -2727,6 +2741,7 @@ class ECFragGetter(object):
|
||||
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
|
||||
# throw out 5XX and 404s from handoff nodes unless the data is
|
||||
# really on disk and had been DELETEd
|
||||
conn.close()
|
||||
return None
|
||||
|
||||
self.status = possible_source.status
|
||||
@ -2737,6 +2752,7 @@ class ECFragGetter(object):
|
||||
return possible_source
|
||||
else:
|
||||
self.body = possible_source.read()
|
||||
conn.close()
|
||||
|
||||
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
|
||||
self.app.error_limit(node, _('ERROR Insufficient Storage'))
|
||||
@ -2941,7 +2957,7 @@ class ECObjectController(BaseObjectController):
|
||||
# Put this back, since we *may* need it for kickoff()/_fix_response()
|
||||
# (but note that _fix_ranges() may also pop it back off before then)
|
||||
req.range = orig_range
|
||||
best_bucket = buckets.best_bucket
|
||||
best_bucket = buckets.choose_best_bucket()
|
||||
if best_bucket.shortfall <= 0 and best_bucket.durable:
|
||||
# headers can come from any of the getters
|
||||
resp_headers = best_bucket.headers
|
||||
@ -2981,6 +2997,7 @@ class ECObjectController(BaseObjectController):
|
||||
reasons = []
|
||||
bodies = []
|
||||
headers = []
|
||||
best_bucket.close_conns()
|
||||
for status, bad_bucket in buckets.bad_buckets.items():
|
||||
for getter, _parts_iter in bad_bucket.get_responses():
|
||||
if best_bucket.durable:
|
||||
|
@ -1029,6 +1029,10 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
def getheader(self, name, default=None):
|
||||
return HeaderKeyDict(self.getheaders()).get(name, default)
|
||||
|
||||
def nuke_from_orbit(self):
|
||||
# wrapped connections from buffered_http have this helper
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
@ -1091,10 +1095,13 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
body = static_body or b''
|
||||
else:
|
||||
body = next(body_iter)
|
||||
return FakeConn(status, etag, body=body, timestamp=timestamp,
|
||||
conn = FakeConn(status, etag, body=body, timestamp=timestamp,
|
||||
headers=headers, expect_headers=expect_headers,
|
||||
connection_id=i, give_send=kwargs.get('give_send'),
|
||||
give_expect=kwargs.get('give_expect'))
|
||||
if 'capture_connections' in kwargs:
|
||||
kwargs['capture_connections'].append(conn)
|
||||
return conn
|
||||
|
||||
connect.unexpected_requests = unexpected_requests
|
||||
connect.code_iter = code_iter
|
||||
@ -1105,6 +1112,7 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
@contextmanager
|
||||
def mocked_http_conn(*args, **kwargs):
|
||||
requests = []
|
||||
responses = []
|
||||
|
||||
def capture_requests(ip, port, method, path, headers, qs, ssl):
|
||||
if six.PY2 and not isinstance(ip, bytes):
|
||||
@ -1120,8 +1128,10 @@ def mocked_http_conn(*args, **kwargs):
|
||||
}
|
||||
requests.append(req)
|
||||
kwargs.setdefault('give_connect', capture_requests)
|
||||
kwargs['capture_connections'] = responses
|
||||
fake_conn = fake_http_connect(*args, **kwargs)
|
||||
fake_conn.requests = requests
|
||||
fake_conn.responses = responses
|
||||
with mocklib.patch('swift.common.bufferedhttp.http_connect_raw',
|
||||
new=fake_conn):
|
||||
yield fake_conn
|
||||
@ -1185,6 +1195,10 @@ class StubResponse(object):
|
||||
fake_reason = ('Fake', 'This response is a lie.')
|
||||
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
|
||||
|
||||
def nuke_from_orbit(self):
|
||||
if hasattr(self, 'swift_conn'):
|
||||
self.swift_conn.close()
|
||||
|
||||
def getheader(self, header_name, default=None):
|
||||
return self.headers.get(header_name, default)
|
||||
|
||||
|
@ -2061,6 +2061,7 @@ def capture_http_requests(get_response):
|
||||
self.req = req
|
||||
self.resp = None
|
||||
self.path = "/"
|
||||
self.closed = False
|
||||
|
||||
def getresponse(self):
|
||||
self.resp = get_response(self.req)
|
||||
@ -2075,6 +2076,9 @@ def capture_http_requests(get_response):
|
||||
def endheaders(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
class ConnectionLog(object):
|
||||
|
||||
def __init__(self):
|
||||
@ -2838,7 +2842,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers['etag'], obj2['etag'])
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
|
||||
closed_conn[etag].add(conn.closed)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {True},
|
||||
obj2['etag']: {False},
|
||||
}, closed_conn)
|
||||
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
|
||||
self.assertEqual({True}, {conn.closed for conn in log})
|
||||
|
||||
collected_responses = defaultdict(set)
|
||||
for conn in log:
|
||||
@ -3011,6 +3024,15 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
with capture_http_requests(fake_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
||||
closed_conn[etag].add(conn.closed)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {True},
|
||||
obj2['etag']: {True},
|
||||
None: {True},
|
||||
}, dict(closed_conn))
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
collected_responses = defaultdict(set)
|
||||
@ -3160,10 +3182,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
collected_etags = set()
|
||||
collected_status = set()
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
|
||||
collected_etags.add(etag)
|
||||
collected_status.add(conn.resp.status)
|
||||
closed_conn[etag].add(conn.closed)
|
||||
|
||||
# default node_iter will exhaust at 2 * replicas
|
||||
self.assertEqual(len(log), 2 * self.replicas())
|
||||
@ -3171,6 +3195,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
|
||||
collected_etags)
|
||||
self.assertEqual({200}, collected_status)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {True},
|
||||
obj2['etag']: {True},
|
||||
obj3['etag']: {True},
|
||||
obj4['etag']: {True},
|
||||
}, closed_conn)
|
||||
|
||||
def test_GET_with_mixed_durable_and_nondurable_frags_will_503(self):
|
||||
# all nodes have a frag but there is no one set that reaches quorum,
|
||||
@ -3220,12 +3250,14 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
closed_conn = defaultdict(set)
|
||||
collected_etags = set()
|
||||
collected_status = set()
|
||||
for conn in log:
|
||||
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
|
||||
collected_etags.add(etag)
|
||||
collected_status.add(conn.resp.status)
|
||||
closed_conn[etag].add(conn.closed)
|
||||
|
||||
# default node_iter will exhaust at 2 * replicas
|
||||
self.assertEqual(len(log), 2 * self.replicas())
|
||||
@ -3233,6 +3265,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
|
||||
collected_etags)
|
||||
self.assertEqual({200}, collected_status)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {True},
|
||||
obj2['etag']: {True},
|
||||
obj3['etag']: {True},
|
||||
obj4['etag']: {True},
|
||||
}, closed_conn)
|
||||
|
||||
def test_GET_with_mixed_durable_frags_and_no_quorum_will_503(self):
|
||||
# all nodes have a frag but there is no one set that reaches quorum,
|
||||
@ -3282,12 +3320,17 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
for conn in log:
|
||||
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
||||
|
||||
collected_etags = set()
|
||||
collected_status = set()
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
|
||||
collected_etags.add(etag)
|
||||
collected_status.add(conn.resp.status)
|
||||
closed_conn[etag].add(conn.closed)
|
||||
|
||||
# default node_iter will exhaust at 2 * replicas
|
||||
self.assertEqual(len(log), 2 * self.replicas())
|
||||
@ -3295,6 +3338,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
|
||||
collected_etags)
|
||||
self.assertEqual({200}, collected_status)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {True},
|
||||
obj2['etag']: {True},
|
||||
obj3['etag']: {True},
|
||||
obj4['etag']: {True},
|
||||
}, closed_conn)
|
||||
|
||||
def test_GET_with_quorum_durable_files(self):
|
||||
# verify that only (ec_nparity + 1) nodes need to be durable for a GET
|
||||
@ -3448,9 +3497,18 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
fake_response = self._fake_ec_node_response(list(node_frags))
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o')
|
||||
with capture_http_requests(fake_response):
|
||||
with capture_http_requests(fake_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
||||
closed_conn[etag].add(conn.closed)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {False},
|
||||
obj2['etag']: {True},
|
||||
}, closed_conn)
|
||||
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers['etag'], obj1['etag'])
|
||||
self.assertEqual(md5(resp.body).hexdigest(), obj1['etag'])
|
||||
@ -3542,6 +3600,15 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
with capture_http_requests(fake_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
||||
closed_conn[etag].add(conn.closed)
|
||||
self.assertEqual({
|
||||
obj1['etag']: {True},
|
||||
obj2['etag']: {False},
|
||||
}, closed_conn)
|
||||
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers['etag'], obj2['etag'])
|
||||
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
|
||||
@ -3934,6 +4001,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
closed_conn = defaultdict(set)
|
||||
for conn in log:
|
||||
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
||||
closed_conn[etag].add(conn.closed)
|
||||
self.assertEqual({
|
||||
old_etag: {True},
|
||||
new_etag: {False},
|
||||
None: {True},
|
||||
}, dict(closed_conn))
|
||||
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, new_data[:segment_size])
|
||||
self.assertEqual(len(log), self.policy.ec_ndata + 10)
|
||||
@ -4146,8 +4223,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
req = swob.Request.blank('/v1/a/c/o')
|
||||
status_codes, body_iter, headers = zip(*responses)
|
||||
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
with mocked_http_conn(*status_codes, body_iter=body_iter,
|
||||
headers=headers) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(md5(resp.body).hexdigest(), etag1)
|
||||
@ -4155,6 +4232,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(2, len(error_lines))
|
||||
for line in error_lines:
|
||||
self.assertIn('retrying', line)
|
||||
etag2_conns = []
|
||||
for conn in log.responses:
|
||||
if conn.headers.get('X-Object-Sysmeta-Ec-Etag') == etag2:
|
||||
etag2_conns.append(conn)
|
||||
self.assertEqual(
|
||||
([True] * 8) + [False], # the resumed etag2 doesn't get closed
|
||||
[conn.closed for conn in etag2_conns])
|
||||
|
||||
def test_fix_response_HEAD(self):
|
||||
headers = {'X-Object-Sysmeta-Ec-Content-Length': '10',
|
||||
|
Loading…
x
Reference in New Issue
Block a user