diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index a28e60eaa3..9265e683d7 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2504,19 +2504,19 @@ class ECFragGetter(object): def get_next_doc_part(): while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the parts_iter try: - # This call to next() performs IO when we have a - # multipart/byteranges response; it reads the MIME - # boundary and part headers. - # - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and either just returns source or - # raises StopIteration. with WatchdogTimeout(self.app.watchdog, node_timeout, ChunkReadTimeout): - # if StopIteration is raised, it escapes and is - # handled elsewhere + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. start_byte, end_byte, length, headers, part = next( parts_iter[0]) return (start_byte, end_byte, length, headers, part) @@ -2537,7 +2537,7 @@ class ECFragGetter(object): new_source, read_chunk_size=self.app.object_chunk_size) else: - raise StopIteration() + raise def iter_bytes_from_response_part(part_file, nbytes): nchunks = 0 @@ -2579,14 +2579,13 @@ class ECFragGetter(object): parts_iter[0] = http_response_to_document_iters( new_source, read_chunk_size=self.app.object_chunk_size) - try: _junk, _junk, _junk, _junk, part_file = \ get_next_doc_part() except StopIteration: - # Tried to find a new node from which to - # finish the GET, but failed. There's - # nothing more we can do here. + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request six.reraise(exc_type, exc_value, exc_traceback) part_file = ByteCountEnforcer(part_file, nbytes) else: @@ -2652,8 +2651,14 @@ class ECFragGetter(object): part_iter = None try: while True: - start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + try: + start_byte, end_byte, length, headers, part = \ + get_next_doc_part() + except StopIteration: + # it seems this is the only way out of the loop; not + # sure why the req.environ update is always needed + req.environ['swift.non_client_disconnect'] = True + break # note: learn_size_from_content_range() sets # self.skip_bytes self.learn_size_from_content_range( @@ -2670,8 +2675,6 @@ class ECFragGetter(object): 'entity_length': length, 'headers': headers, 'part_iter': part_iter} self.pop_range() - except StopIteration: - req.environ['swift.non_client_disconnect'] = True finally: if part_iter: part_iter.close() diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 5b768c4a4e..a091bc97e4 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -1038,16 +1038,29 @@ def requires_o_tmpfile_support_in_tmp(func): class StubResponse(object): - def __init__(self, status, body=b'', headers=None, frag_index=None): + def __init__(self, status, body=b'', headers=None, frag_index=None, + slowdown=None): self.status = status self.body = body self.readable = BytesIO(body) + try: + self._slowdown = iter(slowdown) + except TypeError: + self._slowdown = iter([slowdown]) self.headers = HeaderKeyDict(headers) if frag_index is not None: self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index fake_reason = ('Fake', 'This response is a lie.') self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0] + def slowdown(self): + try: + wait = next(self._slowdown) + except StopIteration: + wait = None + if wait is not None: + eventlet.sleep(wait) + def nuke_from_orbit(self): if hasattr(self, 'swift_conn'): self.swift_conn.close() @@ -1061,8 +1074,13 @@ class StubResponse(object): return self.headers.items() def read(self, amt=0): + self.slowdown() return self.readable.read(amt) + def readline(self, size=-1): + self.slowdown() + return self.readable.readline(size) + def __repr__(self): info = ['Status: %s' % self.status] if self.headers: diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 083d3784ec..c137de9f0d 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -182,6 +182,7 @@ class BaseObjectControllerMixin(object): self.app = PatchedObjControllerApp( conf, account_ring=FakeRing(), container_ring=FakeRing(), logger=self.logger) + self.logger.clear() # startup/loading debug msgs not helpful # you can over-ride the container_info just by setting it on the app # (see PatchedObjControllerApp for details) @@ -4043,6 +4044,282 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 412) self.assertEqual(len(log), 2 * 8) + def test_GET_with_multirange(self): + self.app.object_chunk_size = 256 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + in ec_stub['frags'][:self.policy.ec_ndata]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i) + for i, body in enumerate(obj_resp_bodies) + ] + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 206) + self.assertEqual(len(log), self.policy.ec_ndata) + resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode() + expected = b'\r\n'.join([ + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1000-2000/16384', + b'', + ec_stub['body'][1000:2001], + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 14000-15000/16384', + b'', + ec_stub['body'][14000:15001], + b'--' + resp_boundary + b'--', + ]) + self.assertEqual(resp.body, expected) + + def test_GET_with_multirange_slow_body(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + in ec_stub['frags'][:self.policy.ec_ndata + 1]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i, + # make the first one slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(obj_resp_bodies) + ] + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 206) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode() + expected = b'\r\n'.join([ + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1000-2000/16384', + b'', + ec_stub['body'][1000:2001], + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 14000-15000/16384', + b'', + ec_stub['body'][14000:15001], + b'--' + resp_boundary + b'--', + ]) + self.assertEqual(resp.body, expected) + + def test_GET_with_multirange_unable_to_resume(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + # no extra good responses + in ec_stub['frags'][:self.policy.ec_ndata]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i, + # make the first one slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(obj_resp_bodies) + ] + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 500) + self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2) + log_lines = self.app.logger.get_lines_for_level('error') + # not the most graceful ending + self.assertIn('Unhandled exception', log_lines[-1]) + + def test_GET_with_multirange_short_resume_body(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + test_body = b'test' * self.policy.ec_segment_size + ec_stub = make_ec_object_stub(test_body, self.policy, None) + frag_archives = ec_stub['frags'] + self.assertEqual(len(frag_archives[0]), 1960) + boundary = b'81eb9c110b32ced5fe' + + def make_mime_body(frag_archive): + return b'\r\n'.join([ + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 0-489/1960', + b'', + frag_archive[0:490], + b'--' + boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1470-1959/1960', + b'', + frag_archive[1470:], + b'--' + boundary + b'--', + ]) + + obj_resp_bodies = [make_mime_body(fa) for fa + # no extra good responses + in ec_stub['frags'][:self.policy.ec_ndata]] + + headers = { + 'Content-Type': b'multipart/byteranges;boundary=' + boundary, + 'Content-Length': len(obj_resp_bodies[0]), + 'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'], + 'X-Timestamp': Timestamp(self.ts()).normal, + } + + responses = [ + StubResponse(206, body, headers, i, + # make the first one slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(obj_resp_bodies) + ] + # add a short read response for the resume + short_body = obj_resp_bodies[0][:512] + responses.append(StubResponse(206, short_body, headers, 0)) + + def get_response(req): + # there's some math going on here I don't quite understand, the + # fragment_size is 490 and there's like 4 of them because ec_body + # is 'test' * segment_size + self.assertEqual(req['headers']['Range'], 'bytes=0-489,1470-1959') + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=1000-2000,14000-15000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + resp_boundary = resp.headers['content-type'].rsplit( + '=', 1)[1].encode() + expected = b'\r\n'.join([ + b'--' + resp_boundary, + b'Content-Type: application/octet-stream', + b'Content-Range: bytes 1000-2000/16384', + b'', + b'', + b'--' + resp_boundary + b'--', + ]) + self.assertEqual(expected, resp.body) + self.assertEqual(resp.status_int, 206) + self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2) + log_lines = self.app.logger.get_lines_for_level('error') + self.assertIn("Trying to read object during GET (retrying)", + log_lines[0]) + # not the most graceful ending + self.assertIn("Exception fetching fragments for '/a/c/o'", + log_lines[-1]) + def test_GET_with_success_and_507_will_503(self): responses = [ # only 9 good nodes StubResponse(200), @@ -4091,6 +4368,206 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 404) self.assertEqual(len(log), 2 * self.replicas()) + def test_ranged_get(self): + self.app.object_chunk_size = 256 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse(206, body[:range_size], headers, i) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + + obj_req_ranges = set() + + def get_response(req): + obj_req_ranges.add(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(obj_req_ranges, {'bytes=0-%s' % (range_size - 1)}) + self.assertEqual(resp.status_int, 206) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(len(log), self.policy.ec_ndata) + + def test_ranged_get_with_slow_resp(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse(206, body[:range_size], headers, i, + # the first body comes up slow + slowdown=0.1 if i == 0 else None) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + responses.append(StubResponse( + 206, archives[self.policy.ec_ndata][:range_size], + headers, self.policy.ec_ndata)) + + obj_req_ranges = set() + + def get_response(req): + obj_req_ranges.add(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(resp.status_int, 206) + self.assertEqual(obj_req_ranges, {'bytes=0-%s' % (range_size - 1)}) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + + def test_ranged_get_with_short_resp(self): + self.app.object_chunk_size = 256 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse( + 206, + # the first body comes up short + body[:frag_size] if i == 0 else body[:range_size], + headers, i) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + responses.append(StubResponse( + 206, archives[self.policy.ec_ndata][frag_size:range_size], { + 'Content-Type': 'text/plain', + 'Content-Length': frag_size, + 'Content-Range': 'bytes %s-%s/%s' % ( + frag_size, range_size - 1, frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal, + }, self.policy.ec_ndata)) + + obj_req_ranges = [] + + def get_response(req): + obj_req_ranges.append(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(resp.status_int, 206) + self.assertEqual(obj_req_ranges, + ['bytes=0-%s' % (range_size - 1)] * + self.policy.ec_ndata + + ['bytes=%s-%s' % (frag_size, range_size - 1)]) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + + def test_ranged_get_with_short_resp_timeout(self): + self.app.object_chunk_size = 256 + self.app.recoverable_node_timeout = 0.01 + segment_size = self.policy.ec_segment_size + frag_size = self.policy.fragment_size + data = (b'test' * segment_size)[:-492] + etag = md5(data).hexdigest() + archives = self._make_ec_archive_bodies(data) + frag_archive_size = len(archives[0]) + range_size = frag_size * 2 + headers = { + 'Content-Type': 'text/plain', + 'Content-Length': range_size, + 'Content-Range': 'bytes 0-%s/%s' % (range_size - 1, + frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal + } + responses = [ + StubResponse( + 206, body[:range_size], headers, i, + # the first body slows down after awhile + slowdown=[None] * 3 + [0.1] if i == 0 else None) + for i, body in enumerate(archives[:self.policy.ec_ndata]) + ] + responses.append(StubResponse( + 206, archives[self.policy.ec_ndata][frag_size:range_size], { + 'Content-Type': 'text/plain', + 'Content-Length': frag_size, + 'Content-Range': 'bytes %s-%s/%s' % ( + frag_size, range_size - 1, frag_archive_size), + 'X-Object-Sysmeta-Ec-Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': Timestamp(self.ts()).internal, + }, self.policy.ec_ndata)) + + obj_req_ranges = [] + + def get_response(req): + obj_req_ranges.append(req['headers']['Range']) + return responses.pop(0) if responses else StubResponse(404) + + req = swob.Request.blank('/v1/a/c/o', headers={ + 'Range': 'bytes=3000-5000'}) + with capture_http_requests(get_response) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.body, data[3000:5001]) + self.assertEqual(resp.status_int, 206) + self.assertEqual(['bytes=0-%s' % (range_size - 1)] * + self.policy.ec_ndata + + ['bytes=%s-%s' % (frag_size, range_size - 1)], + obj_req_ranges) + self.assertEqual(resp.headers['Content-Range'], + 'bytes 3000-5000/%s' % len(data)) + self.assertEqual(len(log), self.policy.ec_ndata + 1) + def test_GET_mixed_ranged_responses_success(self): segment_size = self.policy.ec_segment_size frag_size = self.policy.fragment_size @@ -4241,6 +4718,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError) def test_GET_read_timeout(self): + # verify EC GET behavior when initial batch of nodes time out then + # remaining primary nodes also time out and handoffs return 404 segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-333] etag = md5(test_data, usedforsecurity=False).hexdigest() @@ -4276,6 +4755,52 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) + def test_GET_read_timeout_retrying_but_no_more_useful_nodes(self): + # verify EC GET behavior when initial batch of nodes time out then + # remaining nodes either return 404 or return data for different etag + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-333] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + headers = {'X-Object-Sysmeta-Ec-Etag': etag} + self.app.recoverable_node_timeout = 0.01 + ndata = self.policy.ec_ndata + # only ndata responses, all of which have SlowBody + responses = [ + (200, SlowBody(body, 0.1), self._add_frag_index(i, headers)) + for i, body in enumerate(ec_archive_bodies[:ndata]) + ] * self.policy.ec_duplication_factor + # 2 primaries return 404 + responses += [ + (404, '', {}), (404, '', {}) + ] * self.policy.ec_duplication_factor + # 2 primaries return different etag + headers2 = {'X-Object-Sysmeta-Ec-Etag': 'other_etag'} + responses += [ + (200, body, self._add_frag_index(i, headers2)) + for i, body in enumerate(ec_archive_bodies[ndata + 2:]) + ] * self.policy.ec_duplication_factor + + req = swob.Request.blank('/v1/a/c/o') + + # all other (handoff) responses are 404 + status_codes, body_iter, headers = zip(*responses + [ + (404, [b''], {}) for i in range( + self.policy.object_ring.max_more_nodes)]) + with mocked_http_conn(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + # do this inside the fake http context manager, it'll try to + # resume but won't be able to give us all the right bytes + self.assertNotEqual(md5(resp.body).hexdigest(), etag) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(ndata, len(error_lines)) + for line in error_lines: + self.assertIn('ChunkReadTimeout (0.01s)', line) + for line in self.logger.logger.records['ERROR']: + self.assertIn(req.headers['x-trans-id'], line) + def test_GET_read_timeout_resume(self): segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-333]