From 4a4d899680736c85d2982e23e1f6becedab6ea7f Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Wed, 4 Nov 2020 18:01:42 -0600 Subject: [PATCH] Refactor EC multipart/byteranges control flow The multipart document handling in the proxy is consumed via iteration, but the error handling code is not consistent with how it applies conversions of IO errors/timeouts and retry failures to StopIteration. In an effort to make the code more obvious and easier to debug and maintain I've added comments and additional tests as well as tightening up StopIteration exception handling. Co-Authored-By: Alistair Coles Change-Id: I0654815543be3df059eb2875d9b3669dbd97f5b4 --- swift/proxy/controllers/obj.py | 41 +- test/unit/__init__.py | 20 +- test/unit/proxy/controllers/test_obj.py | 525 ++++++++++++++++++++++++ 3 files changed, 566 insertions(+), 20 deletions(-) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index a28e60eaa3..9265e683d7 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2504,19 +2504,19 @@ class ECFragGetter(object): def get_next_doc_part(): while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the parts_iter try: - # This call to next() performs IO when we have a - # multipart/byteranges response; it reads the MIME - # boundary and part headers. - # - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and either just returns source or - # raises StopIteration. with WatchdogTimeout(self.app.watchdog, node_timeout, ChunkReadTimeout): - # if StopIteration is raised, it escapes and is - # handled elsewhere + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. start_byte, end_byte, length, headers, part = next( parts_iter[0]) return (start_byte, end_byte, length, headers, part) @@ -2537,7 +2537,7 @@ class ECFragGetter(object): new_source, read_chunk_size=self.app.object_chunk_size) else: - raise StopIteration() + raise def iter_bytes_from_response_part(part_file, nbytes): nchunks = 0 @@ -2579,14 +2579,13 @@ class ECFragGetter(object): parts_iter[0] = http_response_to_document_iters( new_source, read_chunk_size=self.app.object_chunk_size) - try: _junk, _junk, _junk, _junk, part_file = \ get_next_doc_part() except StopIteration: - # Tried to find a new node from which to - # finish the GET, but failed. There's - # nothing more we can do here. + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request six.reraise(exc_type, exc_value, exc_traceback) part_file = ByteCountEnforcer(part_file, nbytes) else: @@ -2652,8 +2651,14 @@ class ECFragGetter(object): part_iter = None try: while True: - start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + try: + start_byte, end_byte, length, headers, part = \ + get_next_doc_part() + except StopIteration: + # it seems this is the only way out of the loop; not + # sure why the req.environ update is always needed + req.environ['swift.non_client_disconnect'] = True + break # note: learn_size_from_content_range() sets # self.skip_bytes self.learn_size_from_content_range( @@ -2670,8 +2675,6 @@ class ECFragGetter(object): 'entity_length': length, 'headers': headers, 'part_iter': part_iter} self.pop_range() - except StopIteration: - req.environ['swift.non_client_disconnect'] = True finally: if part_iter: part_iter.close() diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 5b768c4a4e..a091bc97e4 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -1038,16 +1038,29 @@ def requires_o_tmpfile_support_in_tmp(func): class StubResponse(object): - def __init__(self, status, body=b'', headers=None, frag_index=None): + def __init__(self, status, body=b'', headers=None, frag_index=None, + slowdown=None): self.status = status self.body = body self.readable = BytesIO(body) + try: + self._slowdown = iter(slowdown) + except TypeError: + self._slowdown = iter([slowdown]) self.headers = HeaderKeyDict(headers) if frag_index is not None: self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index fake_reason = ('Fake', 'This response is a lie.') self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0] + def slowdown(self): + try: + wait = next(self._slowdown) + except StopIteration: + wait = None + if wait is not None: + eventlet.sleep(wait) + def nuke_from_orbit(self): if hasattr(self, 'swift_conn'): self.swift_conn.close() @@ -1061,8 +1074,13 @@ class StubResponse(object): return self.headers.items() def read(self, amt=0): + self.slowdown() return self.readable.read(amt) + def readline(self, size=-1): + self.slowdown() + return self.readable.readline(size) + def __repr__(self): info = ['Status: %s' % self.status] if self.headers: diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 083d3784ec..c137de9f0d 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -182,6 +182,7 @@ class BaseObjectControllerMixin(object): self.app = PatchedObjControllerApp( conf, account_ring=FakeRing(), container_ring=FakeRing(), logger=self.logger) + self.logger.clear() # startup/loading debug msgs not helpful # you can over-ride the container_info just by setting it on the app # (see PatchedObjControllerApp for details) @@ -4043,6 +4044,282 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 412) self.assertEqual(len(log), 2 * 8) + def test_GET_with_multirange(self): + self.app.object_chunk_size = 256 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + in ec_stub['frags'][:self.policy.ec_ndata]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i) + for i, body in enumerate(obj_resp_bodies) + ] + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 206) + self.assertEqual(len(log), self.policy.ec_ndata) + resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode() + expected = b'\r\n'.join([ + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1000-2000/16384', + b'', + ec_stub['body'][1000:2001], + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 14000-15000/16384', + b'', + ec_stub['body'][14000:15001], + b'--' + resp_boundary + b'--', + ]) + self.assertEqual(resp.body, expected) + + def test_GET_with_multirange_slow_body(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + in ec_stub['frags'][:self.policy.ec_ndata + 1]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i, + # make the first one slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(obj_resp_bodies) + ] + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 206) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode() + expected = b'\r\n'.join([ + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1000-2000/16384', + b'', + ec_stub['body'][1000:2001], + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 14000-15000/16384', + b'', + ec_stub['body'][14000:15001], + b'--' + resp_boundary + b'--', + ]) + self.assertEqual(resp.body, expected) + + def test_GET_with_multirange_unable_to_resume(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + # no extra good responses + in ec_stub['frags'][:self.policy.ec_ndata]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i, + # make the first one slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(obj_resp_bodies) + ] + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 500) + self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2) + log_lines = self.app.logger.get_lines_for_level('error') + # not the most graceful ending + self.assertIn('Unhandled exception', log_lines[-1]) + + def test_GET_with_multirange_short_resume_body(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + # no extra good responses + in ec_stub['frags'][:self.policy.ec_ndata]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i, + # make the first one slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(obj_resp_bodies) + ] + # add a short read response for the resume + short_body = obj_resp_bodies[0][:512] + responses.append(StubResponse(206, short_body, headers, 0)) + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + resp_boundary = resp.headers['content-type'].rsplit( + '=', 1)[1].encode() + expected = b'\r\n'.join([ + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1000-2000/16384', + b'', + b'', + b'--' + resp_boundary + b'--', + ]) + self.assertEqual(expected, resp.body) + self.assertEqual(resp.status_int, 206) + self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2) + log_lines = self.app.logger.get_lines_for_level('error') + self.assertIn("Trying to read object during GET (retrying)", + log_lines[0]) + # not the most graceful ending + self.assertIn("Exception fetching fragments for '/a/c/o'", + log_lines[-1]) + def test_GET_with_success_and_507_will_503(self): responses = [ # only 9 good nodes StubResponse(200), @@ -4091,6 +4368,206 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 404) self.assertEqual(len(log), 2 * self.replicas()) + def test_ranged_get(self): + self.app.object_chunk_size = 256 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse(206, body[:range_size], headers, i) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + + obj_req_ranges = set() + + def get_response(req): + obj_req_ranges.add(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(obj_req_ranges, {'bytes=0-%s' % (range_size - 1)}) + self.assertEqual(resp.status_int, 206) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(len(log), self.policy.ec_ndata) + + def test_ranged_get_with_slow_resp(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse(206, body[:range_size], headers, i, + # the first body comes up slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + responses.append(StubResponse( + 206, archives[self.policy.ec_ndata][:range_size], + headers, self.policy.ec_ndata)) + + obj_req_ranges = set() + + def get_response(req): + obj_req_ranges.add(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(resp.status_int, 206) + self.assertEqual(obj_req_ranges, {'bytes=0-%s' % (range_size - 1)}) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + + def test_ranged_get_with_short_resp(self): + self.app.object_chunk_size = 256 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse( + 206, + # the first body comes up short + body[:frag_size] if i == 0 else body[:range_size], + headers, i) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + responses.append(StubResponse( + 206, archives[self.policy.ec_ndata][frag_size:range_size], { + 'Content-Type': 'text/plain', + 'Content-Length': frag_size, + 'Content-Range': 'bytes %s-%s/%s' % ( + frag_size, range_size - 1, frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal, + }, self.policy.ec_ndata)) + + obj_req_ranges = [] + + def get_response(req): + obj_req_ranges.append(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(resp.status_int, 206) + self.assertEqual(obj_req_ranges, + ['bytes=0-%s' % (range_size - 1)] * + self.policy.ec_ndata + + ['bytes=%s-%s' % (frag_size, range_size - 1)]) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + + def test_ranged_get_with_short_resp_timeout(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse( + 206, body[:range_size], headers, i, + # the first body slows down after awhile + slowdown=[None] * 3 + [0.1] if i == 0 else None) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + responses.append(StubResponse( + 206, archives[self.policy.ec_ndata][frag_size:range_size], { + 'Content-Type': 'text/plain', + 'Content-Length': frag_size, + 'Content-Range': 'bytes %s-%s/%s' % ( + frag_size, range_size - 1, frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal, + }, self.policy.ec_ndata)) + + obj_req_ranges = [] + + def get_response(req): + obj_req_ranges.append(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(resp.status_int, 206) + self.assertEqual(['bytes=0-%s' % (range_size - 1)] * + self.policy.ec_ndata + + ['bytes=%s-%s' % (frag_size, range_size - 1)], + obj_req_ranges) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + def test_GET_mixed_ranged_responses_success(self): segment_size = self.policy.ec_segment_size frag_size = self.policy.fragment_size @@ -4241,6 +4718,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError) def test_GET_read_timeout(self): + # verify EC GET behavior when initial batch of nodes time out then + # remaining primary nodes also time out and handoffs return 404 segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-333] etag = md5(test_data, usedforsecurity=False).hexdigest() @@ -4276,6 +4755,52 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) + def test_GET_read_timeout_retrying_but_no_more_useful_nodes(self): + # verify EC GET behavior when initial batch of nodes time out then + # remaining nodes either return 404 or return data for different etag + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-333] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + headers = {'X-Object-Sysmeta-Ec-Etag': etag} + self.app.recoverable_node_timeout = 0.01 + ndata = self.policy.ec_ndata + # only ndata responses, all of which have SlowBody + responses = [ + (200, SlowBody(body, 0.1), self._add_frag_index(i, headers)) + for i, body in enumerate(ec_archive_bodies[:ndata]) + ] * self.policy.ec_duplication_factor + # 2 primaries return 404 + responses += [ + (404, '', {}), (404, '', {}) + ] * self.policy.ec_duplication_factor + # 2 primaries return different etag + headers2 = {'X-Object-Sysmeta-Ec-Etag': 'other_etag'} + responses += [ + (200, body, self._add_frag_index(i, headers2)) + for i, body in enumerate(ec_archive_bodies[ndata + 2:]) + ] * self.policy.ec_duplication_factor + + req = swob.Request.blank('/v1/a/c/o') + + # all other (handoff) responses are 404 + status_codes, body_iter, headers = zip(*responses + [ + (404, [b''], {}) for i in range( + self.policy.object_ring.max_more_nodes)]) + with mocked_http_conn(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + # do this inside the fake http context manager, it'll try to + # resume but won't be able to give us all the right bytes + self.assertNotEqual(md5(resp.body).hexdigest(), etag) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(ndata, len(error_lines)) + for line in error_lines: + self.assertIn('ChunkReadTimeout (0.01s)', line) + for line in self.logger.logger.records['ERROR']: + self.assertIn(req.headers['x-trans-id'], line) + def test_GET_read_timeout_resume(self): segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-333]