Refactor EC multipart/byteranges control flow
The multipart document handling in the proxy is consumed via iteration, but the error handling code is not consistent with how it applies conversions of IO errors/timeouts and retry failures to StopIteration. In an effort to make the code more obvious and easier to debug and maintain I've added comments and additional tests as well as tightening up StopIteration exception handling. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Change-Id: I0654815543be3df059eb2875d9b3669dbd97f5b4
This commit is contained in:
parent
c9b5d44e9e
commit
4a4d899680
@ -2504,19 +2504,19 @@ class ECFragGetter(object):
|
||||
|
||||
def get_next_doc_part():
|
||||
while True:
|
||||
# the loop here is to resume if trying to parse
|
||||
# multipart/byteranges response raises a ChunkReadTimeout
|
||||
# and resets the parts_iter
|
||||
try:
|
||||
# This call to next() performs IO when we have a
|
||||
# multipart/byteranges response; it reads the MIME
|
||||
# boundary and part headers.
|
||||
#
|
||||
# If we don't have a multipart/byteranges response,
|
||||
# but just a 200 or a single-range 206, then this
|
||||
# performs no IO, and either just returns source or
|
||||
# raises StopIteration.
|
||||
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||
ChunkReadTimeout):
|
||||
# if StopIteration is raised, it escapes and is
|
||||
# handled elsewhere
|
||||
# If we don't have a multipart/byteranges response,
|
||||
# but just a 200 or a single-range 206, then this
|
||||
# performs no IO, and just returns source (or
|
||||
# raises StopIteration).
|
||||
# Otherwise, this call to next() performs IO when
|
||||
# we have a multipart/byteranges response; as it
|
||||
# will read the MIME boundary and part headers.
|
||||
start_byte, end_byte, length, headers, part = next(
|
||||
parts_iter[0])
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
@ -2537,7 +2537,7 @@ class ECFragGetter(object):
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
raise StopIteration()
|
||||
raise
|
||||
|
||||
def iter_bytes_from_response_part(part_file, nbytes):
|
||||
nchunks = 0
|
||||
@ -2579,14 +2579,13 @@ class ECFragGetter(object):
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
get_next_doc_part()
|
||||
except StopIteration:
|
||||
# Tried to find a new node from which to
|
||||
# finish the GET, but failed. There's
|
||||
# nothing more we can do here.
|
||||
# it's not clear to me how to make
|
||||
# get_next_doc_part raise StopIteration for the
|
||||
# first doc part of a new request
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
else:
|
||||
@ -2652,8 +2651,14 @@ class ECFragGetter(object):
|
||||
part_iter = None
|
||||
try:
|
||||
while True:
|
||||
start_byte, end_byte, length, headers, part = \
|
||||
get_next_doc_part()
|
||||
try:
|
||||
start_byte, end_byte, length, headers, part = \
|
||||
get_next_doc_part()
|
||||
except StopIteration:
|
||||
# it seems this is the only way out of the loop; not
|
||||
# sure why the req.environ update is always needed
|
||||
req.environ['swift.non_client_disconnect'] = True
|
||||
break
|
||||
# note: learn_size_from_content_range() sets
|
||||
# self.skip_bytes
|
||||
self.learn_size_from_content_range(
|
||||
@ -2670,8 +2675,6 @@ class ECFragGetter(object):
|
||||
'entity_length': length, 'headers': headers,
|
||||
'part_iter': part_iter}
|
||||
self.pop_range()
|
||||
except StopIteration:
|
||||
req.environ['swift.non_client_disconnect'] = True
|
||||
finally:
|
||||
if part_iter:
|
||||
part_iter.close()
|
||||
|
@ -1038,16 +1038,29 @@ def requires_o_tmpfile_support_in_tmp(func):
|
||||
|
||||
class StubResponse(object):
|
||||
|
||||
def __init__(self, status, body=b'', headers=None, frag_index=None):
|
||||
def __init__(self, status, body=b'', headers=None, frag_index=None,
|
||||
slowdown=None):
|
||||
self.status = status
|
||||
self.body = body
|
||||
self.readable = BytesIO(body)
|
||||
try:
|
||||
self._slowdown = iter(slowdown)
|
||||
except TypeError:
|
||||
self._slowdown = iter([slowdown])
|
||||
self.headers = HeaderKeyDict(headers)
|
||||
if frag_index is not None:
|
||||
self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
|
||||
fake_reason = ('Fake', 'This response is a lie.')
|
||||
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
|
||||
|
||||
def slowdown(self):
|
||||
try:
|
||||
wait = next(self._slowdown)
|
||||
except StopIteration:
|
||||
wait = None
|
||||
if wait is not None:
|
||||
eventlet.sleep(wait)
|
||||
|
||||
def nuke_from_orbit(self):
|
||||
if hasattr(self, 'swift_conn'):
|
||||
self.swift_conn.close()
|
||||
@ -1061,8 +1074,13 @@ class StubResponse(object):
|
||||
return self.headers.items()
|
||||
|
||||
def read(self, amt=0):
|
||||
self.slowdown()
|
||||
return self.readable.read(amt)
|
||||
|
||||
def readline(self, size=-1):
|
||||
self.slowdown()
|
||||
return self.readable.readline(size)
|
||||
|
||||
def __repr__(self):
|
||||
info = ['Status: %s' % self.status]
|
||||
if self.headers:
|
||||
|
@ -182,6 +182,7 @@ class BaseObjectControllerMixin(object):
|
||||
self.app = PatchedObjControllerApp(
|
||||
conf, account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), logger=self.logger)
|
||||
self.logger.clear() # startup/loading debug msgs not helpful
|
||||
|
||||
# you can over-ride the container_info just by setting it on the app
|
||||
# (see PatchedObjControllerApp for details)
|
||||
@ -4043,6 +4044,282 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 412)
|
||||
self.assertEqual(len(log), 2 * 8)
|
||||
|
||||
def test_GET_with_multirange(self):
|
||||
self.app.object_chunk_size = 256
|
||||
test_body = b'test' * self.policy.ec_segment_size
|
||||
ec_stub = make_ec_object_stub(test_body, self.policy, None)
|
||||
frag_archives = ec_stub['frags']
|
||||
self.assertEqual(len(frag_archives[0]), 1960)
|
||||
boundary = b'81eb9c110b32ced5fe'
|
||||
|
||||
def make_mime_body(frag_archive):
|
||||
return b'\r\n'.join([
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 0-489/1960',
|
||||
b'',
|
||||
frag_archive[0:490],
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1470-1959/1960',
|
||||
b'',
|
||||
frag_archive[1470:],
|
||||
b'--' + boundary + b'--',
|
||||
])
|
||||
|
||||
obj_resp_bodies = [make_mime_body(fa) for fa
|
||||
in ec_stub['frags'][:self.policy.ec_ndata]]
|
||||
|
||||
headers = {
|
||||
'Content-Type': b'multipart/byteranges;boundary=' + boundary,
|
||||
'Content-Length': len(obj_resp_bodies[0]),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']),
|
||||
'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'],
|
||||
'X-Timestamp': Timestamp(self.ts()).normal,
|
||||
}
|
||||
|
||||
responses = [
|
||||
StubResponse(206, body, headers, i)
|
||||
for i, body in enumerate(obj_resp_bodies)
|
||||
]
|
||||
|
||||
def get_response(req):
|
||||
# there's some math going on here I don't quite understand, the
|
||||
# fragment_size is 490 and there's like 4 of them because ec_body
|
||||
# is 'test' * segment_size
|
||||
self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959')
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=1000-2000,14000-15000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(len(log), self.policy.ec_ndata)
|
||||
resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode()
|
||||
expected = b'\r\n'.join([
|
||||
b'--' + resp_boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1000-2000/16384',
|
||||
b'',
|
||||
ec_stub['body'][1000:2001],
|
||||
b'--' + resp_boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 14000-15000/16384',
|
||||
b'',
|
||||
ec_stub['body'][14000:15001],
|
||||
b'--' + resp_boundary + b'--',
|
||||
])
|
||||
self.assertEqual(resp.body, expected)
|
||||
|
||||
def test_GET_with_multirange_slow_body(self):
|
||||
self.app.object_chunk_size = 256
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
test_body = b'test' * self.policy.ec_segment_size
|
||||
ec_stub = make_ec_object_stub(test_body, self.policy, None)
|
||||
frag_archives = ec_stub['frags']
|
||||
self.assertEqual(len(frag_archives[0]), 1960)
|
||||
boundary = b'81eb9c110b32ced5fe'
|
||||
|
||||
def make_mime_body(frag_archive):
|
||||
return b'\r\n'.join([
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 0-489/1960',
|
||||
b'',
|
||||
frag_archive[0:490],
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1470-1959/1960',
|
||||
b'',
|
||||
frag_archive[1470:],
|
||||
b'--' + boundary + b'--',
|
||||
])
|
||||
|
||||
obj_resp_bodies = [make_mime_body(fa) for fa
|
||||
in ec_stub['frags'][:self.policy.ec_ndata + 1]]
|
||||
|
||||
headers = {
|
||||
'Content-Type': b'multipart/byteranges;boundary=' + boundary,
|
||||
'Content-Length': len(obj_resp_bodies[0]),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']),
|
||||
'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'],
|
||||
'X-Timestamp': Timestamp(self.ts()).normal,
|
||||
}
|
||||
|
||||
responses = [
|
||||
StubResponse(206, body, headers, i,
|
||||
# make the first one slow
|
||||
slowdown=0.1 if i == 0 else None)
|
||||
for i, body in enumerate(obj_resp_bodies)
|
||||
]
|
||||
|
||||
def get_response(req):
|
||||
# there's some math going on here I don't quite understand, the
|
||||
# fragment_size is 490 and there's like 4 of them because ec_body
|
||||
# is 'test' * segment_size
|
||||
self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959')
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=1000-2000,14000-15000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(len(log), self.policy.ec_ndata + 1)
|
||||
resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode()
|
||||
expected = b'\r\n'.join([
|
||||
b'--' + resp_boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1000-2000/16384',
|
||||
b'',
|
||||
ec_stub['body'][1000:2001],
|
||||
b'--' + resp_boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 14000-15000/16384',
|
||||
b'',
|
||||
ec_stub['body'][14000:15001],
|
||||
b'--' + resp_boundary + b'--',
|
||||
])
|
||||
self.assertEqual(resp.body, expected)
|
||||
|
||||
def test_GET_with_multirange_unable_to_resume(self):
|
||||
self.app.object_chunk_size = 256
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
test_body = b'test' * self.policy.ec_segment_size
|
||||
ec_stub = make_ec_object_stub(test_body, self.policy, None)
|
||||
frag_archives = ec_stub['frags']
|
||||
self.assertEqual(len(frag_archives[0]), 1960)
|
||||
boundary = b'81eb9c110b32ced5fe'
|
||||
|
||||
def make_mime_body(frag_archive):
|
||||
return b'\r\n'.join([
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 0-489/1960',
|
||||
b'',
|
||||
frag_archive[0:490],
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1470-1959/1960',
|
||||
b'',
|
||||
frag_archive[1470:],
|
||||
b'--' + boundary + b'--',
|
||||
])
|
||||
|
||||
obj_resp_bodies = [make_mime_body(fa) for fa
|
||||
# no extra good responses
|
||||
in ec_stub['frags'][:self.policy.ec_ndata]]
|
||||
|
||||
headers = {
|
||||
'Content-Type': b'multipart/byteranges;boundary=' + boundary,
|
||||
'Content-Length': len(obj_resp_bodies[0]),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']),
|
||||
'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'],
|
||||
'X-Timestamp': Timestamp(self.ts()).normal,
|
||||
}
|
||||
|
||||
responses = [
|
||||
StubResponse(206, body, headers, i,
|
||||
# make the first one slow
|
||||
slowdown=0.1 if i == 0 else None)
|
||||
for i, body in enumerate(obj_resp_bodies)
|
||||
]
|
||||
|
||||
def get_response(req):
|
||||
# there's some math going on here I don't quite understand, the
|
||||
# fragment_size is 490 and there's like 4 of them because ec_body
|
||||
# is 'test' * segment_size
|
||||
self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959')
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=1000-2000,14000-15000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 500)
|
||||
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
|
||||
log_lines = self.app.logger.get_lines_for_level('error')
|
||||
# not the most graceful ending
|
||||
self.assertIn('Unhandled exception', log_lines[-1])
|
||||
|
||||
def test_GET_with_multirange_short_resume_body(self):
|
||||
self.app.object_chunk_size = 256
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
test_body = b'test' * self.policy.ec_segment_size
|
||||
ec_stub = make_ec_object_stub(test_body, self.policy, None)
|
||||
frag_archives = ec_stub['frags']
|
||||
self.assertEqual(len(frag_archives[0]), 1960)
|
||||
boundary = b'81eb9c110b32ced5fe'
|
||||
|
||||
def make_mime_body(frag_archive):
|
||||
return b'\r\n'.join([
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 0-489/1960',
|
||||
b'',
|
||||
frag_archive[0:490],
|
||||
b'--' + boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1470-1959/1960',
|
||||
b'',
|
||||
frag_archive[1470:],
|
||||
b'--' + boundary + b'--',
|
||||
])
|
||||
|
||||
obj_resp_bodies = [make_mime_body(fa) for fa
|
||||
# no extra good responses
|
||||
in ec_stub['frags'][:self.policy.ec_ndata]]
|
||||
|
||||
headers = {
|
||||
'Content-Type': b'multipart/byteranges;boundary=' + boundary,
|
||||
'Content-Length': len(obj_resp_bodies[0]),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']),
|
||||
'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'],
|
||||
'X-Timestamp': Timestamp(self.ts()).normal,
|
||||
}
|
||||
|
||||
responses = [
|
||||
StubResponse(206, body, headers, i,
|
||||
# make the first one slow
|
||||
slowdown=0.1 if i == 0 else None)
|
||||
for i, body in enumerate(obj_resp_bodies)
|
||||
]
|
||||
# add a short read response for the resume
|
||||
short_body = obj_resp_bodies[0][:512]
|
||||
responses.append(StubResponse(206, short_body, headers, 0))
|
||||
|
||||
def get_response(req):
|
||||
# there's some math going on here I don't quite understand, the
|
||||
# fragment_size is 490 and there's like 4 of them because ec_body
|
||||
# is 'test' * segment_size
|
||||
self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959')
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=1000-2000,14000-15000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
resp_boundary = resp.headers['content-type'].rsplit(
|
||||
'=', 1)[1].encode()
|
||||
expected = b'\r\n'.join([
|
||||
b'--' + resp_boundary,
|
||||
b'Content-Type: application/octet-stream',
|
||||
b'Content-Range: bytes 1000-2000/16384',
|
||||
b'',
|
||||
b'',
|
||||
b'--' + resp_boundary + b'--',
|
||||
])
|
||||
self.assertEqual(expected, resp.body)
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
|
||||
log_lines = self.app.logger.get_lines_for_level('error')
|
||||
self.assertIn("Trying to read object during GET (retrying)",
|
||||
log_lines[0])
|
||||
# not the most graceful ending
|
||||
self.assertIn("Exception fetching fragments for '/a/c/o'",
|
||||
log_lines[-1])
|
||||
|
||||
def test_GET_with_success_and_507_will_503(self):
|
||||
responses = [ # only 9 good nodes
|
||||
StubResponse(200),
|
||||
@ -4091,6 +4368,206 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(len(log), 2 * self.replicas())
|
||||
|
||||
def test_ranged_get(self):
|
||||
self.app.object_chunk_size = 256
|
||||
segment_size = self.policy.ec_segment_size
|
||||
frag_size = self.policy.fragment_size
|
||||
data = (b'test' * segment_size)[:-492]
|
||||
etag = md5(data).hexdigest()
|
||||
archives = self._make_ec_archive_bodies(data)
|
||||
frag_archive_size = len(archives[0])
|
||||
range_size = frag_size * 2
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': range_size,
|
||||
'Content-Range': 'bytes 0-%s/%s' % (range_size - 1,
|
||||
frag_archive_size),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Backend-Timestamp': Timestamp(self.ts()).internal
|
||||
}
|
||||
responses = [
|
||||
StubResponse(206, body[:range_size], headers, i)
|
||||
for i, body in enumerate(archives[:self.policy.ec_ndata])
|
||||
]
|
||||
|
||||
obj_req_ranges = set()
|
||||
|
||||
def get_response(req):
|
||||
obj_req_ranges.add(req['headers']['Range'])
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=3000-5000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(obj_req_ranges, {'bytes=0-%s' % (range_size - 1)})
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(resp.headers['Content-Range'],
|
||||
'bytes 3000-5000/%s' % len(data))
|
||||
self.assertEqual(resp.body, data[3000:5001])
|
||||
self.assertEqual(len(log), self.policy.ec_ndata)
|
||||
|
||||
def test_ranged_get_with_slow_resp(self):
|
||||
self.app.object_chunk_size = 256
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
segment_size = self.policy.ec_segment_size
|
||||
frag_size = self.policy.fragment_size
|
||||
data = (b'test' * segment_size)[:-492]
|
||||
etag = md5(data).hexdigest()
|
||||
archives = self._make_ec_archive_bodies(data)
|
||||
frag_archive_size = len(archives[0])
|
||||
range_size = frag_size * 2
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': range_size,
|
||||
'Content-Range': 'bytes 0-%s/%s' % (range_size - 1,
|
||||
frag_archive_size),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Backend-Timestamp': Timestamp(self.ts()).internal
|
||||
}
|
||||
responses = [
|
||||
StubResponse(206, body[:range_size], headers, i,
|
||||
# the first body comes up slow
|
||||
slowdown=0.1 if i == 0 else None)
|
||||
for i, body in enumerate(archives[:self.policy.ec_ndata])
|
||||
]
|
||||
responses.append(StubResponse(
|
||||
206, archives[self.policy.ec_ndata][:range_size],
|
||||
headers, self.policy.ec_ndata))
|
||||
|
||||
obj_req_ranges = set()
|
||||
|
||||
def get_response(req):
|
||||
obj_req_ranges.add(req['headers']['Range'])
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=3000-5000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.body, data[3000:5001])
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(obj_req_ranges, {'bytes=0-%s' % (range_size - 1)})
|
||||
self.assertEqual(resp.headers['Content-Range'],
|
||||
'bytes 3000-5000/%s' % len(data))
|
||||
self.assertEqual(len(log), self.policy.ec_ndata + 1)
|
||||
|
||||
def test_ranged_get_with_short_resp(self):
|
||||
self.app.object_chunk_size = 256
|
||||
segment_size = self.policy.ec_segment_size
|
||||
frag_size = self.policy.fragment_size
|
||||
data = (b'test' * segment_size)[:-492]
|
||||
etag = md5(data).hexdigest()
|
||||
archives = self._make_ec_archive_bodies(data)
|
||||
frag_archive_size = len(archives[0])
|
||||
range_size = frag_size * 2
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': range_size,
|
||||
'Content-Range': 'bytes 0-%s/%s' % (range_size - 1,
|
||||
frag_archive_size),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Backend-Timestamp': Timestamp(self.ts()).internal
|
||||
}
|
||||
responses = [
|
||||
StubResponse(
|
||||
206,
|
||||
# the first body comes up short
|
||||
body[:frag_size] if i == 0 else body[:range_size],
|
||||
headers, i)
|
||||
for i, body in enumerate(archives[:self.policy.ec_ndata])
|
||||
]
|
||||
responses.append(StubResponse(
|
||||
206, archives[self.policy.ec_ndata][frag_size:range_size], {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': frag_size,
|
||||
'Content-Range': 'bytes %s-%s/%s' % (
|
||||
frag_size, range_size - 1, frag_archive_size),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Backend-Timestamp': Timestamp(self.ts()).internal,
|
||||
}, self.policy.ec_ndata))
|
||||
|
||||
obj_req_ranges = []
|
||||
|
||||
def get_response(req):
|
||||
obj_req_ranges.append(req['headers']['Range'])
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=3000-5000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.body, data[3000:5001])
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(obj_req_ranges,
|
||||
['bytes=0-%s' % (range_size - 1)] *
|
||||
self.policy.ec_ndata +
|
||||
['bytes=%s-%s' % (frag_size, range_size - 1)])
|
||||
self.assertEqual(resp.headers['Content-Range'],
|
||||
'bytes 3000-5000/%s' % len(data))
|
||||
self.assertEqual(len(log), self.policy.ec_ndata + 1)
|
||||
|
||||
def test_ranged_get_with_short_resp_timeout(self):
|
||||
self.app.object_chunk_size = 256
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
segment_size = self.policy.ec_segment_size
|
||||
frag_size = self.policy.fragment_size
|
||||
data = (b'test' * segment_size)[:-492]
|
||||
etag = md5(data).hexdigest()
|
||||
archives = self._make_ec_archive_bodies(data)
|
||||
frag_archive_size = len(archives[0])
|
||||
range_size = frag_size * 2
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': range_size,
|
||||
'Content-Range': 'bytes 0-%s/%s' % (range_size - 1,
|
||||
frag_archive_size),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Backend-Timestamp': Timestamp(self.ts()).internal
|
||||
}
|
||||
responses = [
|
||||
StubResponse(
|
||||
206, body[:range_size], headers, i,
|
||||
# the first body slows down after awhile
|
||||
slowdown=[None] * 3 + [0.1] if i == 0 else None)
|
||||
for i, body in enumerate(archives[:self.policy.ec_ndata])
|
||||
]
|
||||
responses.append(StubResponse(
|
||||
206, archives[self.policy.ec_ndata][frag_size:range_size], {
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': frag_size,
|
||||
'Content-Range': 'bytes %s-%s/%s' % (
|
||||
frag_size, range_size - 1, frag_archive_size),
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Backend-Timestamp': Timestamp(self.ts()).internal,
|
||||
}, self.policy.ec_ndata))
|
||||
|
||||
obj_req_ranges = []
|
||||
|
||||
def get_response(req):
|
||||
obj_req_ranges.append(req['headers']['Range'])
|
||||
return responses.pop(0) if responses else StubResponse(404)
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||
'Range': 'bytes=3000-5000'})
|
||||
with capture_http_requests(get_response) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.body, data[3000:5001])
|
||||
self.assertEqual(resp.status_int, 206)
|
||||
self.assertEqual(['bytes=0-%s' % (range_size - 1)] *
|
||||
self.policy.ec_ndata +
|
||||
['bytes=%s-%s' % (frag_size, range_size - 1)],
|
||||
obj_req_ranges)
|
||||
self.assertEqual(resp.headers['Content-Range'],
|
||||
'bytes 3000-5000/%s' % len(data))
|
||||
self.assertEqual(len(log), self.policy.ec_ndata + 1)
|
||||
|
||||
def test_GET_mixed_ranged_responses_success(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
frag_size = self.policy.fragment_size
|
||||
@ -4241,6 +4718,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError)
|
||||
|
||||
def test_GET_read_timeout(self):
|
||||
# verify EC GET behavior when initial batch of nodes time out then
|
||||
# remaining primary nodes also time out and handoffs return 404
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-333]
|
||||
etag = md5(test_data, usedforsecurity=False).hexdigest()
|
||||
@ -4276,6 +4755,52 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
for line in self.logger.logger.records['ERROR']:
|
||||
self.assertIn(req.headers['x-trans-id'], line)
|
||||
|
||||
def test_GET_read_timeout_retrying_but_no_more_useful_nodes(self):
|
||||
# verify EC GET behavior when initial batch of nodes time out then
|
||||
# remaining nodes either return 404 or return data for different etag
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-333]
|
||||
etag = md5(test_data).hexdigest()
|
||||
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
|
||||
headers = {'X-Object-Sysmeta-Ec-Etag': etag}
|
||||
self.app.recoverable_node_timeout = 0.01
|
||||
ndata = self.policy.ec_ndata
|
||||
# only ndata responses, all of which have SlowBody
|
||||
responses = [
|
||||
(200, SlowBody(body, 0.1), self._add_frag_index(i, headers))
|
||||
for i, body in enumerate(ec_archive_bodies[:ndata])
|
||||
] * self.policy.ec_duplication_factor
|
||||
# 2 primaries return 404
|
||||
responses += [
|
||||
(404, '', {}), (404, '', {})
|
||||
] * self.policy.ec_duplication_factor
|
||||
# 2 primaries return different etag
|
||||
headers2 = {'X-Object-Sysmeta-Ec-Etag': 'other_etag'}
|
||||
responses += [
|
||||
(200, body, self._add_frag_index(i, headers2))
|
||||
for i, body in enumerate(ec_archive_bodies[ndata + 2:])
|
||||
] * self.policy.ec_duplication_factor
|
||||
|
||||
req = swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
# all other (handoff) responses are 404
|
||||
status_codes, body_iter, headers = zip(*responses + [
|
||||
(404, [b''], {}) for i in range(
|
||||
self.policy.object_ring.max_more_nodes)])
|
||||
with mocked_http_conn(*status_codes, body_iter=body_iter,
|
||||
headers=headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
# do this inside the fake http context manager, it'll try to
|
||||
# resume but won't be able to give us all the right bytes
|
||||
self.assertNotEqual(md5(resp.body).hexdigest(), etag)
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(ndata, len(error_lines))
|
||||
for line in error_lines:
|
||||
self.assertIn('ChunkReadTimeout (0.01s)', line)
|
||||
for line in self.logger.logger.records['ERROR']:
|
||||
self.assertIn(req.headers['x-trans-id'], line)
|
||||
|
||||
def test_GET_read_timeout_resume(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-333]
|
||||
|
Loading…
x
Reference in New Issue
Block a user