diff --git a/swift/common/bufferedhttp.py b/swift/common/bufferedhttp.py index 2b3ec1609d..c7acccc27c 100644 --- a/swift/common/bufferedhttp.py +++ b/swift/common/bufferedhttp.py @@ -62,6 +62,7 @@ class BufferedHTTPResponse(HTTPResponse): self.chunk_left = _UNKNOWN # bytes left to read in current chunk self.length = _UNKNOWN # number of bytes left in response self.will_close = _UNKNOWN # conn will close at end of response + self._readline_buffer = '' def expect_response(self): if self.fp: @@ -79,6 +80,48 @@ class BufferedHTTPResponse(HTTPResponse): self.msg = HTTPMessage(self.fp, 0) self.msg.fp = None + def read(self, amt=None): + if not self._readline_buffer: + return HTTPResponse.read(self, amt) + + if amt is None: + # Unbounded read: send anything we have buffered plus whatever + # is left. + buffered = self._readline_buffer + self._readline_buffer = '' + return buffered + HTTPResponse.read(self, amt) + elif amt <= len(self._readline_buffer): + # Bounded read that we can satisfy entirely from our buffer + res = self._readline_buffer[:amt] + self._readline_buffer = self._readline_buffer[amt:] + return res + else: + # Bounded read that wants more bytes than we have + smaller_amt = amt - len(self._readline_buffer) + buf = self._readline_buffer + self._readline_buffer = '' + return buf + HTTPResponse.read(self, smaller_amt) + + def readline(self, size=1024): + # You'd think Python's httplib would provide this, but it doesn't. + # It does, however, provide a comment in the HTTPResponse class: + # + # # XXX It would be nice to have readline and __iter__ for this, + # # too. + # + # Yes, it certainly would. + while ('\n' not in self._readline_buffer + and len(self._readline_buffer) < size): + read_size = size - len(self._readline_buffer) + chunk = HTTPResponse.read(self, read_size) + if not chunk: + break + self._readline_buffer += chunk + + line, newline, rest = self._readline_buffer.partition('\n') + self._readline_buffer = rest + return line + newline + def nuke_from_orbit(self): """ Terminate the socket with extreme prejudice. diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index b1edadee39..abc09bf36b 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -57,6 +57,10 @@ class SuffixSyncError(SwiftException): pass +class RangeAlreadyComplete(SwiftException): + pass + + class DiskFileError(SwiftException): pass diff --git a/swift/common/swob.py b/swift/common/swob.py index c2e3afb4e8..39f0c0e3cb 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -1089,13 +1089,14 @@ def content_range_header(start, stop, size): def multi_range_iterator(ranges, content_type, boundary, size, sub_iter_gen): for start, stop in ranges: - yield ''.join(['\r\n--', boundary, '\r\n', + yield ''.join(['--', boundary, '\r\n', 'Content-Type: ', content_type, '\r\n']) yield content_range_header(start, stop, size) + '\r\n\r\n' sub_iter = sub_iter_gen(start, stop) for chunk in sub_iter: yield chunk - yield '\r\n--' + boundary + '--\r\n' + yield '\r\n' + yield '--' + boundary + '--' class Response(object): @@ -1177,21 +1178,37 @@ class Response(object): self.content_type = ''.join(['multipart/byteranges;', 'boundary=', self.boundary]) - # This section calculate the total size of the targeted response - # The value 12 is the length of total bytes of hyphen, new line - # form feed for each section header. The value 8 is the length of - # total bytes of hyphen, new line, form feed characters for the - # closing boundary which appears only once - section_header_fixed_len = 12 + (len(self.boundary) + - len('Content-Type: ') + - len(content_type) + - len('Content-Range: bytes ')) + # This section calculates the total size of the response. + section_header_fixed_len = ( + # --boundary\r\n + len(self.boundary) + 4 + # Content-Type: \r\n + + len('Content-Type: ') + len(content_type) + 2 + # Content-Range: \r\n; accounted for later + + len('Content-Range: ') + 2 + # \r\n at end of headers + + 2) + body_size = 0 for start, end in ranges: body_size += section_header_fixed_len - body_size += len(str(start) + '-' + str(end - 1) + '/' + - str(content_size)) + (end - start) - body_size += 8 + len(self.boundary) + + # length of the value of Content-Range, not including the \r\n + # since that's already accounted for + cr = content_range_header_value(start, end, content_size) + body_size += len(cr) + + # the actual bytes (note: this range is half-open, i.e. begins + # with byte and ends with byte , so there's no + # fencepost error here) + body_size += (end - start) + + # \r\n prior to --boundary + body_size += 2 + + # --boundary-- terminates the message + body_size += len(self.boundary) + 4 + self.content_length = body_size self.content_range = None return content_size, content_type diff --git a/swift/common/utils.py b/swift/common/utils.py index 11a97d126b..856065a680 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -25,6 +25,7 @@ import operator import os import pwd import re +import rfc822 import sys import threading as stdlib_threading import time @@ -3181,7 +3182,7 @@ def parse_content_type(content_type): ('text/plain', [('charset, 'UTF-8'), ('level', '1')]) :param content_type: content_type to parse - :returns: a typle containing (content type, list of k, v parameter tuples) + :returns: a tuple containing (content type, list of k, v parameter tuples) """ parm_list = [] if ';' in content_type: @@ -3313,7 +3314,9 @@ class _MultipartMimeFileLikeObject(object): def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096): """ Given a multi-part-mime-encoded input file object and boundary, - yield file-like objects for each part. + yield file-like objects for each part. Note that this does not + split each part into headers and body; the caller is responsible + for doing that if necessary. :param wsgi_input: The file-like object to read from. :param boundary: The mime boundary to separate new file-like @@ -3324,6 +3327,9 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096): boundary = '--' + boundary blen = len(boundary) + 2 # \r\n got = wsgi_input.readline(blen) + while got == '\r\n': + got = wsgi_input.readline(blen) + if got.strip() != boundary: raise swift.common.exceptions.MimeInvalid( 'invalid starting boundary: wanted %r, got %r', (boundary, got)) @@ -3338,6 +3344,174 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096): input_buffer = it.input_buffer +def mime_to_document_iters(input_file, boundary, read_chunk_size=4096): + """ + Takes a file-like object containing a multipart MIME document and + returns an iterator of (headers, body-file) tuples. + + :param input_file: file-like object with the MIME doc in it + :param boundary: MIME boundary, sans dashes + (e.g. "divider", not "--divider") + :param read_chunk_size: size of strings read via input_file.read() + """ + doc_files = iter_multipart_mime_documents(input_file, boundary, + read_chunk_size) + for i, doc_file in enumerate(doc_files): + # this consumes the headers and leaves just the body in doc_file + headers = rfc822.Message(doc_file, 0) + yield (headers, doc_file) + + +def document_iters_to_multipart_byteranges(ranges_iter, boundary): + """ + Takes an iterator of range iters and yields a multipart/byteranges MIME + document suitable for sending as the body of a multi-range 206 response. + + See document_iters_to_http_response_body for parameter descriptions. + """ + + divider = "--" + boundary + "\r\n" + terminator = "--" + boundary + "--" + + for range_spec in ranges_iter: + start_byte = range_spec["start_byte"] + end_byte = range_spec["end_byte"] + entity_length = range_spec.get("entity_length", "*") + content_type = range_spec["content_type"] + part_iter = range_spec["part_iter"] + + part_header = ''.join(( + divider, + "Content-Type: ", str(content_type), "\r\n", + "Content-Range: ", "bytes %d-%d/%s\r\n" % ( + start_byte, end_byte, entity_length), + "\r\n" + )) + yield part_header + + for chunk in part_iter: + yield chunk + yield "\r\n" + yield terminator + + +def document_iters_to_http_response_body(ranges_iter, boundary, multipart, + logger): + """ + Takes an iterator of range iters and turns it into an appropriate + HTTP response body, whether that's multipart/byteranges or not. + + This is almost, but not quite, the inverse of + http_response_to_document_iters(). This function only yields chunks of + the body, not any headers. + + :param ranges_iter: an iterator of dictionaries, one per range. + Each dictionary must contain at least the following key: + "part_iter": iterator yielding the bytes in the range + + Additionally, if multipart is True, then the following other keys + are required: + + "start_byte": index of the first byte in the range + "end_byte": index of the last byte in the range + "content_type": value for the range's Content-Type header + + Finally, there is one optional key that is used in the + multipart/byteranges case: + + "entity_length": length of the requested entity (not necessarily + equal to the response length). If omitted, "*" will be used. + + Each part_iter will be exhausted prior to calling next(ranges_iter). + + :param boundary: MIME boundary to use, sans dashes (e.g. "boundary", not + "--boundary"). + :param multipart: True if the response should be multipart/byteranges, + False otherwise. This should be True if and only if you have 2 or + more ranges. + :param logger: a logger + """ + if multipart: + return document_iters_to_multipart_byteranges(ranges_iter, boundary) + else: + try: + response_body_iter = next(ranges_iter)['part_iter'] + except StopIteration: + return '' + + # We need to make sure ranges_iter does not get garbage-collected + # before response_body_iter is exhausted. The reason is that + # ranges_iter has a finally block that calls close_swift_conn, and + # so if that finally block fires before we read response_body_iter, + # there's nothing there. + def string_along(useful_iter, useless_iter_iter, logger): + for x in useful_iter: + yield x + + try: + next(useless_iter_iter) + except StopIteration: + pass + else: + logger.warn("More than one part in a single-part response?") + + return string_along(response_body_iter, ranges_iter, logger) + + +def multipart_byteranges_to_document_iters(input_file, boundary, + read_chunk_size=4096): + """ + Takes a file-like object containing a multipart/byteranges MIME document + (see RFC 7233, Appendix A) and returns an iterator of (first-byte, + last-byte, length, document-headers, body-file) 5-tuples. + + :param input_file: file-like object with the MIME doc in it + :param boundary: MIME boundary, sans dashes + (e.g. "divider", not "--divider") + :param read_chunk_size: size of strings read via input_file.read() + """ + for headers, body in mime_to_document_iters(input_file, boundary, + read_chunk_size): + first_byte, last_byte, length = parse_content_range( + headers.getheader('content-range')) + yield (first_byte, last_byte, length, headers.items(), body) + + +def http_response_to_document_iters(response, read_chunk_size=4096): + """ + Takes a successful object-GET HTTP response and turns it into an + iterator of (first-byte, last-byte, length, headers, body-file) + 5-tuples. + + The response must either be a 200 or a 206; if you feed in a 204 or + something similar, this probably won't work. + + :param response: HTTP response, like from bufferedhttp.http_connect(), + not a swob.Response. + """ + if response.status == 200: + # Single "range" that's the whole object + content_length = int(response.getheader('Content-Length')) + return iter([(0, content_length - 1, content_length, + response.getheaders(), response)]) + + content_type, params_list = parse_content_type( + response.getheader('Content-Type')) + if content_type != 'multipart/byteranges': + # Single range; no MIME framing, just the bytes. The start and end + # byte indices are in the Content-Range header. + start, end, length = parse_content_range( + response.getheader('Content-Range')) + return iter([(start, end, length, response.getheaders(), response)]) + else: + # Multiple ranges; the response body is a multipart/byteranges MIME + # document, and we have to parse it using the MIME boundary + # extracted from the Content-Type header. + params = dict(params_list) + return multipart_byteranges_to_document_iters( + response, params['boundary'], read_chunk_size) + + #: Regular expression to match form attributes. ATTRIBUTES_RE = re.compile(r'(\w+)=(".*?"|[^";]+)(; ?|$)') diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 2fb7cd945e..953a85af58 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -28,7 +28,6 @@ import os import time import functools import inspect -import logging import operator from sys import exc_info from swift import gettext_ as _ @@ -40,10 +39,11 @@ from eventlet.timeout import Timeout from swift.common.wsgi import make_pre_authed_env from swift.common.utils import Timestamp, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ - GreenAsyncPile, quorum_size, parse_content_range + GreenAsyncPile, quorum_size, parse_content_type, \ + http_response_to_document_iters, document_iters_to_http_response_body from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ - ConnectionTimeout + ConnectionTimeout, RangeAlreadyComplete from swift.common.http import is_informational, is_success, is_redirection, \ is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ @@ -613,10 +613,9 @@ def bytes_to_skip(record_size, range_start): return (record_size - (range_start % record_size)) % record_size -class GetOrHeadHandler(object): - +class ResumingGetter(object): def __init__(self, app, req, server_type, node_iter, partition, path, - backend_headers, client_chunk_size=None): + backend_headers, client_chunk_size=None, newest=None): self.app = app self.node_iter = node_iter self.server_type = server_type @@ -632,7 +631,10 @@ class GetOrHeadHandler(object): self.req_method = req.method self.req_path = req.path self.req_query_string = req.query_string - self.newest = config_true_value(req.headers.get('x-newest', 'f')) + if newest is None: + self.newest = config_true_value(req.headers.get('x-newest', 'f')) + else: + self.newest = newest # populated when finding source self.statuses = [] @@ -640,6 +642,9 @@ class GetOrHeadHandler(object): self.bodies = [] self.source_headers = [] + # populated from response headers + self.start_byte = self.end_byte = self.length = None + def fast_forward(self, num_bytes): """ Will skip num_bytes into the current ranges. @@ -648,57 +653,89 @@ class GetOrHeadHandler(object): this request. This will change the Range header so that the next req will start where it left off. - :raises NotImplementedError: if this is a multirange request :raises ValueError: if invalid range header :raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes - > end of range + > end of range + 1 + :raises RangeAlreadyComplete: if begin + num_bytes == end of range + 1 """ if 'Range' in self.backend_headers: req_range = Range(self.backend_headers['Range']) - if len(req_range.ranges) > 1: - raise NotImplementedError() - begin, end = req_range.ranges.pop() + begin, end = req_range.ranges[0] if begin is None: # this is a -50 range req (last 50 bytes of file) end -= num_bytes else: begin += num_bytes - if end and begin > end: + if end and begin == end + 1: + # we sent out exactly the first range's worth of bytes, so + # we're done with it + raise RangeAlreadyComplete() + elif end and begin > end: raise HTTPRequestedRangeNotSatisfiable() - req_range.ranges = [(begin, end)] + elif end and begin: + req_range.ranges = [(begin, end)] + req_range.ranges[1:] + elif end: + req_range.ranges = [(None, end)] + req_range.ranges[1:] + else: + req_range.ranges = [(begin, None)] + req_range.ranges[1:] + self.backend_headers['Range'] = str(req_range) else: self.backend_headers['Range'] = 'bytes=%d-' % num_bytes - def learn_size_from_content_range(self, start, end): + def pop_range(self): + """ + Remove the first byterange from our Range header. + + This is used after a byterange has been completely sent to the + client; this way, should we need to resume the download from another + object server, we do not re-fetch byteranges that the client already + has. + + If we have no Range header, this is a no-op. + """ + if 'Range' in self.backend_headers: + req_range = Range(self.backend_headers['Range']) + begin, end = req_range.ranges.pop(0) + if len(req_range.ranges) > 0: + self.backend_headers['Range'] = str(req_range) + else: + self.backend_headers.pop('Range') + + def learn_size_from_content_range(self, start, end, length): """ If client_chunk_size is set, makes sure we yield things starting on chunk boundaries based on the Content-Range header in the response. - Sets our first Range header to the value learned from the - Content-Range header in the response; if we were given a + Sets our Range header's first byterange to the value learned from + the Content-Range header in the response; if we were given a fully-specified range (e.g. "bytes=123-456"), this is a no-op. If we were given a half-specified range (e.g. "bytes=123-" or "bytes=-456"), then this changes the Range header to a semantically-equivalent one *and* it lets us resume on a proper boundary instead of just in the middle of a piece somewhere. - - If the original request is for more than one range, this does not - affect our backend Range header, since we don't support resuming one - of those anyway. """ + if length == 0: + return + if self.client_chunk_size: self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) if 'Range' in self.backend_headers: - req_range = Range(self.backend_headers['Range']) + try: + req_range = Range(self.backend_headers['Range']) + new_ranges = [(start, end)] + req_range.ranges[1:] + except ValueError: + new_ranges = [(start, end)] + else: + new_ranges = [(start, end)] - if len(req_range.ranges) > 1: - return - - self.backend_headers['Range'] = "bytes=%d-%d" % (start, end) + self.backend_headers['Range'] = ( + "bytes=" + (",".join("%s-%s" % (s if s is not None else '', + e if e is not None else '') + for s, e in new_ranges))) def is_good_source(self, src): """ @@ -712,106 +749,183 @@ class GetOrHeadHandler(object): return True return is_success(src.status) or is_redirection(src.status) - def _make_app_iter(self, req, node, source): - """ - Returns an iterator over the contents of the source (via its read - func). There is also quite a bit of cleanup to ensure garbage - collection works and the underlying socket of the source is closed. + def response_parts_iter(self, req): + source, node = self._get_source_and_node() + it = None + if source: + it = self._get_response_parts_iter(req, node, source) + return it + + def _get_response_parts_iter(self, req, node, source): + # Someday we can replace this [mess] with python 3's "nonlocal" + source = [source] + node = [node] - :param req: incoming request object - :param source: The httplib.Response object this iterator should read - from. - :param node: The node the source is reading from, for logging purposes. - """ try: - nchunks = 0 client_chunk_size = self.client_chunk_size - bytes_consumed_from_backend = 0 node_timeout = self.app.node_timeout if self.server_type == 'Object': node_timeout = self.app.recoverable_node_timeout - buf = '' - while True: - try: - with ChunkReadTimeout(node_timeout): - chunk = source.read(self.app.object_chunk_size) - nchunks += 1 - buf += chunk - except ChunkReadTimeout: - exc_type, exc_value, exc_traceback = exc_info() - if self.newest or self.server_type != 'Object': - raise exc_type, exc_value, exc_traceback + + # This is safe; it sets up a generator but does not call next() + # on it, so no IO is performed. + parts_iter = [ + http_response_to_document_iters( + source[0], read_chunk_size=self.app.object_chunk_size)] + + def get_next_doc_part(): + while True: try: - self.fast_forward(bytes_consumed_from_backend) - except (NotImplementedError, HTTPException, ValueError): - raise exc_type, exc_value, exc_traceback - buf = '' - new_source, new_node = self._get_source_and_node() - if new_source: - self.app.exception_occurred( - node, _('Object'), - _('Trying to read during GET (retrying)'), - level=logging.ERROR, exc_info=( - exc_type, exc_value, exc_traceback)) - # Close-out the connection as best as possible. - if getattr(source, 'swift_conn', None): - close_swift_conn(source) - source = new_source - node = new_node - continue - else: - raise exc_type, exc_value, exc_traceback + # This call to next() performs IO when we have a + # multipart/byteranges response; it reads the MIME + # boundary and part headers. + # + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and either just returns source or + # raises StopIteration. + with ChunkReadTimeout(node_timeout): + # if StopIteration is raised, it escapes and is + # handled elsewhere + start_byte, end_byte, length, headers, part = next( + parts_iter[0]) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.exception_occurred( + node[0], _('Object'), + _('Trying to read during GET (retrying)')) + # Close-out the connection as best as possible. + if getattr(source[0], 'swift_conn', None): + close_swift_conn(source[0]) + source[0] = new_source + node[0] = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + parts_iter[0] = http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + else: + raise StopIteration() - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - bytes_consumed_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - bytes_consumed_from_backend += len(buf) + def iter_bytes_from_response_part(part_file): + nchunks = 0 + buf = '' + bytes_used_from_backend = 0 + while True: + try: + with ChunkReadTimeout(node_timeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + buf += chunk + except ChunkReadTimeout: + exc_type, exc_value, exc_traceback = exc_info() + if self.newest or self.server_type != 'Object': + raise exc_type, exc_value, exc_traceback + try: + self.fast_forward(bytes_used_from_backend) + except (HTTPException, ValueError): + raise exc_type, exc_value, exc_traceback + except RangeAlreadyComplete: + break buf = '' + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.exception_occurred( + node[0], _('Object'), + _('Trying to read during GET (retrying)')) + # Close-out the connection as best as possible. + if getattr(source[0], 'swift_conn', None): + close_swift_conn(source[0]) + source[0] = new_source + node[0] = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + parts_iter[0] = http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) - if not chunk: - if buf: - with ChunkWriteTimeout(self.app.client_timeout): - bytes_consumed_from_backend += len(buf) - yield buf - buf = '' - break + try: + _junk, _junk, _junk, _junk, part_file = \ + get_next_doc_part() + except StopIteration: + # Tried to find a new node from which to + # finish the GET, but failed. There's + # nothing more to do here. + return + else: + raise exc_type, exc_value, exc_traceback + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 + else: + self.skip_bytes -= len(buf) + bytes_used_from_backend += len(buf) + buf = '' - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with ChunkWriteTimeout(self.app.client_timeout): - yield client_chunk - bytes_consumed_from_backend += len(client_chunk) - else: - with ChunkWriteTimeout(self.app.client_timeout): - yield buf - bytes_consumed_from_backend += len(buf) - buf = '' + if not chunk: + if buf: + with ChunkWriteTimeout( + self.app.client_timeout): + bytes_used_from_backend += len(buf) + yield buf + buf = '' + break - # This is for fairness; if the network is outpacing the CPU, - # we'll always be able to read and write data without - # encountering an EWOULDBLOCK, and so eventlet will not switch - # greenthreads on its own. We do it manually so that clients - # don't starve. - # - # The number 5 here was chosen by making stuff up. It's not - # every single chunk, but it's not too big either, so it seemed - # like it would probably be an okay choice. - # - # Note that we may trampoline to other greenthreads more often - # than once every 5 chunks, depending on how blocking our - # network IO is; the explicit sleep here simply provides a - # lower bound on the rate of trampolining. - if nchunks % 5 == 0: - sleep() + if client_chunk_size is not None: + while len(buf) >= client_chunk_size: + client_chunk = buf[:client_chunk_size] + buf = buf[client_chunk_size:] + with ChunkWriteTimeout( + self.app.client_timeout): + yield client_chunk + bytes_used_from_backend += len(client_chunk) + else: + with ChunkWriteTimeout(self.app.client_timeout): + yield buf + bytes_used_from_backend += len(buf) + buf = '' + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + + try: + while True: + start_byte, end_byte, length, headers, part = \ + get_next_doc_part() + self.learn_size_from_content_range( + start_byte, end_byte, length) + part_iter = iter_bytes_from_response_part(part) + yield {'start_byte': start_byte, 'end_byte': end_byte, + 'entity_length': length, 'headers': headers, + 'part_iter': part_iter} + self.pop_range() + except StopIteration: + return except ChunkReadTimeout: - self.app.exception_occurred(node, _('Object'), + self.app.exception_occurred(node[0], _('Object'), _('Trying to read during GET')) raise except ChunkWriteTimeout: @@ -827,8 +941,22 @@ class GetOrHeadHandler(object): raise finally: # Close-out the connection as best as possible. - if getattr(source, 'swift_conn', None): - close_swift_conn(source) + if getattr(source[0], 'swift_conn', None): + close_swift_conn(source[0]) + + @property + def last_status(self): + if self.statuses: + return self.statuses[-1] + else: + return None + + @property + def last_headers(self): + if self.source_headers: + return self.source_headers[-1] + else: + return None def _get_source_and_node(self): self.statuses = [] @@ -869,7 +997,7 @@ class GetOrHeadHandler(object): self.statuses.append(HTTP_NOT_FOUND) self.reasons.append('') self.bodies.append('') - self.source_headers.append('') + self.source_headers.append([]) close_swift_conn(possible_source) else: if self.used_source_etag: @@ -883,13 +1011,13 @@ class GetOrHeadHandler(object): self.statuses.append(HTTP_NOT_FOUND) self.reasons.append('') self.bodies.append('') - self.source_headers.append('') + self.source_headers.append([]) continue self.statuses.append(possible_source.status) self.reasons.append(possible_source.reason) self.bodies.append('') - self.source_headers.append('') + self.source_headers.append(possible_source.getheaders()) sources.append((possible_source, node)) if not self.newest: # one good source is enough break @@ -923,6 +1051,44 @@ class GetOrHeadHandler(object): return source, node return None, None + +class GetOrHeadHandler(ResumingGetter): + def _make_app_iter(self, req, node, source): + """ + Returns an iterator over the contents of the source (via its read + func). There is also quite a bit of cleanup to ensure garbage + collection works and the underlying socket of the source is closed. + + :param req: incoming request object + :param source: The httplib.Response object this iterator should read + from. + :param node: The node the source is reading from, for logging purposes. + """ + + ct = source.getheader('Content-Type') + if ct: + content_type, content_type_attrs = parse_content_type(ct) + is_multipart = content_type == 'multipart/byteranges' + else: + is_multipart = False + + boundary = "dontcare" + if is_multipart: + # we need some MIME boundary; fortunately, the object server has + # furnished one for us, so we'll just re-use it + boundary = dict(content_type_attrs)["boundary"] + + parts_iter = self._get_response_parts_iter(req, node, source) + + def add_content_type(response_part): + response_part["content_type"] = \ + HeaderKeyDict(response_part["headers"]).get("Content-Type") + return response_part + + return document_iters_to_http_response_body( + (add_content_type(pi) for pi in parts_iter), + boundary, is_multipart, self.app.logger) + def get_working_response(self, req): source, node = self._get_source_and_node() res = None @@ -932,10 +1098,6 @@ class GetOrHeadHandler(object): update_headers(res, source.getheaders()) if req.method == 'GET' and \ source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): - cr = res.headers.get('Content-Range') - if cr: - start, end, total = parse_content_range(cr) - self.learn_size_from_content_range(start, end) res.app_iter = self._make_app_iter(req, node, source) # See NOTE: swift_conn at top of file about this. res.swift_conn = source.swift_conn diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index e2ae7a325a..10e83bcad7 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -43,7 +43,8 @@ from swift.common.utils import ( clean_content_type, config_true_value, ContextPool, csv_append, GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp, normalize_delete_at_timestamp, public, get_expirer_container, - quorum_size) + document_iters_to_http_response_body, parse_content_range, + quorum_size, reiterate) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_copy_from_header, check_destination_header, \ @@ -62,11 +63,12 @@ from swift.common.http import ( from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ - cors_validation + cors_validation, ResumingGetter from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \ - HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException + HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \ + HTTPRequestedRangeNotSatisfiable, Range from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ remove_items, copy_header_subset, close_if_possible @@ -1137,119 +1139,350 @@ class ECAppIter(object): WSGI iterable that decodes EC fragment archives (or portions thereof) into the original object (or portions thereof). - :param path: path for the request + :param path: object's path, sans v1 (e.g. /a/c/o) :param policy: storage policy for this object - :param internal_app_iters: list of the WSGI iterables from object server - GET responses for fragment archives. For an M+K erasure code, the - caller must supply M such iterables. + :param internal_parts_iters: list of the response-document-parts + iterators for the backend GET responses. For an M+K erasure code, + the caller must supply M such iterables. :param range_specs: list of dictionaries describing the ranges requested by the client. Each dictionary contains the start and end of the client's requested byte range as well as the start and end of the EC segments containing that byte range. + :param fa_length: length of the fragment archive, in bytes, if the + response is a 200. If it's a 206, then this is ignored. + :param obj_length: length of the object, in bytes. Learned from the headers in the GET response from the object server. :param logger: a logger """ - def __init__(self, path, policy, internal_app_iters, range_specs, - obj_length, logger): + def __init__(self, path, policy, internal_parts_iters, range_specs, + fa_length, obj_length, logger): self.path = path self.policy = policy - self.internal_app_iters = internal_app_iters + self.internal_parts_iters = internal_parts_iters self.range_specs = range_specs - self.obj_length = obj_length + self.fa_length = fa_length + self.obj_length = obj_length if obj_length is not None else 0 self.boundary = '' self.logger = logger + self.mime_boundary = None + self.learned_content_type = None + self.stashed_iter = None + def close(self): - for it in self.internal_app_iters: + for it in self.internal_parts_iters: close_if_possible(it) - def __iter__(self): - segments_iter = self.decode_segments_from_fragments() + def kickoff(self, req, resp): + """ + Start pulling data from the backends so that we can learn things like + the real Content-Type that might only be in the multipart/byteranges + response body. Update our response accordingly. - if len(self.range_specs) == 0: - # plain GET; just yield up segments - for seg in segments_iter: - yield seg - return + Also, this is the first point at which we can learn the MIME + boundary that our response has in the headers. We grab that so we + can also use it in the body. - if len(self.range_specs) > 1: - raise NotImplementedError("multi-range GETs not done yet") + :returns: None + :raises: HTTPException on error + """ + self.mime_boundary = resp.boundary - for range_spec in self.range_specs: - client_start = range_spec['client_start'] - client_end = range_spec['client_end'] - segment_start = range_spec['segment_start'] - segment_end = range_spec['segment_end'] + self.stashed_iter = reiterate(self._real_iter(req, resp.headers)) + + if self.learned_content_type is not None: + resp.content_type = self.learned_content_type + resp.content_length = self.obj_length + + def _next_range(self): + # Each FA part should have approximately the same headers. We really + # only care about Content-Range and Content-Type, and that'll be the + # same for all the different FAs. + frag_iters = [] + headers = None + for parts_iter in self.internal_parts_iters: + part_info = next(parts_iter) + frag_iters.append(part_info['part_iter']) + headers = part_info['headers'] + headers = HeaderKeyDict(headers) + return headers, frag_iters + + def _actual_range(self, req_start, req_end, entity_length): + try: + rng = Range("bytes=%s-%s" % ( + req_start if req_start is not None else '', + req_end if req_end is not None else '')) + except ValueError: + return (None, None) + + rfl = rng.ranges_for_length(entity_length) + if not rfl: + return (None, None) + else: + # ranges_for_length() adds 1 to the last byte's position + # because webob once made a mistake + return (rfl[0][0], rfl[0][1] - 1) + + def _fill_out_range_specs_from_obj_length(self, range_specs): + # Add a few fields to each range spec: + # + # * resp_client_start, resp_client_end: the actual bytes that will + # be delivered to the client for the requested range. This may + # differ from the requested bytes if, say, the requested range + # overlaps the end of the object. + # + # * resp_segment_start, resp_segment_end: the actual offsets of the + # segments that will be decoded for the requested range. These + # differ from resp_client_start/end in that these are aligned + # to segment boundaries, while resp_client_start/end are not + # necessarily so. + # + # * satisfiable: a boolean indicating whether the range is + # satisfiable or not (i.e. the requested range overlaps the + # object in at least one byte). + # + # This is kept separate from _fill_out_range_specs_from_fa_length() + # because this computation can be done with just the response + # headers from the object servers (in particular + # X-Object-Sysmeta-Ec-Content-Length), while the computation in + # _fill_out_range_specs_from_fa_length() requires the beginnings of + # the response bodies. + for spec in range_specs: + cstart, cend = self._actual_range( + spec['req_client_start'], + spec['req_client_end'], + self.obj_length) + spec['resp_client_start'] = cstart + spec['resp_client_end'] = cend + spec['satisfiable'] = (cstart is not None and cend is not None) + + sstart, send = self._actual_range( + spec['req_segment_start'], + spec['req_segment_end'], + self.obj_length) seg_size = self.policy.ec_segment_size - is_suffix = client_start is None + if spec['req_segment_start'] is None and sstart % seg_size != 0: + # Segment start may, in the case of a suffix request, need + # to be rounded up (not down!) to the nearest segment boundary. + # This reflects the trimming of leading garbage (partial + # fragments) from the retrieved fragments. + sstart += seg_size - (sstart % seg_size) - if is_suffix: - # Suffix byte ranges (i.e. requests for the last N bytes of - # an object) are likely to end up not on a segment boundary. - client_range_len = client_end - client_start = max(self.obj_length - client_range_len, 0) - client_end = self.obj_length - 1 + spec['resp_segment_start'] = sstart + spec['resp_segment_end'] = send - # may be mid-segment; if it is, then everything up to the - # first segment boundary is garbage, and is discarded before - # ever getting into this function. - unaligned_segment_start = max(self.obj_length - segment_end, 0) - alignment_offset = ( - (seg_size - (unaligned_segment_start % seg_size)) - % seg_size) - segment_start = unaligned_segment_start + alignment_offset - segment_end = self.obj_length - 1 - else: - # It's entirely possible that the client asked for a range that - # includes some bytes we have and some we don't; for example, a - # range of bytes 1000-20000000 on a 1500-byte object. - segment_end = (min(segment_end, self.obj_length - 1) - if segment_end is not None - else self.obj_length - 1) - client_end = (min(client_end, self.obj_length - 1) - if client_end is not None - else self.obj_length - 1) + def _fill_out_range_specs_from_fa_length(self, fa_length, range_specs): + # Add two fields to each range spec: + # + # * resp_fragment_start, resp_fragment_end: the start and end of + # the fragments that compose this byterange. These values are + # aligned to fragment boundaries. + # + # This way, ECAppIter has the knowledge it needs to correlate + # response byteranges with requested ones for when some byteranges + # are omitted from the response entirely and also to put the right + # Content-Range headers in a multipart/byteranges response. + for spec in range_specs: + fstart, fend = self._actual_range( + spec['req_fragment_start'], + spec['req_fragment_end'], + fa_length) + spec['resp_fragment_start'] = fstart + spec['resp_fragment_end'] = fend - num_segments = int( - math.ceil(float(segment_end + 1 - segment_start) - / self.policy.ec_segment_size)) - # We get full segments here, but the client may have requested a - # byte range that begins or ends in the middle of a segment. - # Thus, we have some amount of overrun (extra decoded bytes) - # that we trim off so the client gets exactly what they - # requested. - start_overrun = client_start - segment_start - end_overrun = segment_end - client_end + def __iter__(self): + if self.stashed_iter is not None: + return iter(self.stashed_iter) + else: + raise ValueError("Failed to call kickoff() before __iter__()") - for i, next_seg in enumerate(segments_iter): - # We may have a start_overrun of more than one segment in - # the case of suffix-byte-range requests. However, we never - # have an end_overrun of more than one segment. - if start_overrun > 0: - seglen = len(next_seg) - if seglen <= start_overrun: - start_overrun -= seglen - continue - else: - next_seg = next_seg[start_overrun:] - start_overrun = 0 + def _real_iter(self, req, resp_headers): + if not self.range_specs: + client_asked_for_range = False + range_specs = [{ + 'req_client_start': 0, + 'req_client_end': (None if self.obj_length is None + else self.obj_length - 1), + 'resp_client_start': 0, + 'resp_client_end': (None if self.obj_length is None + else self.obj_length - 1), + 'req_segment_start': 0, + 'req_segment_end': (None if self.obj_length is None + else self.obj_length - 1), + 'resp_segment_start': 0, + 'resp_segment_end': (None if self.obj_length is None + else self.obj_length - 1), + 'req_fragment_start': 0, + 'req_fragment_end': self.fa_length - 1, + 'resp_fragment_start': 0, + 'resp_fragment_end': self.fa_length - 1, + 'satisfiable': self.obj_length > 0, + }] + else: + client_asked_for_range = True + range_specs = self.range_specs - if i == (num_segments - 1) and end_overrun: - next_seg = next_seg[:-end_overrun] + self._fill_out_range_specs_from_obj_length(range_specs) - yield next_seg + multipart = (len([rs for rs in range_specs if rs['satisfiable']]) > 1) + # Multipart responses are not required to be in the same order as + # the Range header; the parts may be in any order the server wants. + # Further, if multiple ranges are requested and only some are + # satisfiable, then only the satisfiable ones appear in the response + # at all. Thus, we cannot simply iterate over range_specs in order; + # we must use the Content-Range header from each part to figure out + # what we've been given. + # + # We do, however, make the assumption that all the object-server + # responses have their ranges in the same order. Otherwise, a + # streaming decode would be impossible. - def decode_segments_from_fragments(self): + def convert_ranges_iter(): + seen_first_headers = False + ranges_for_resp = {} + + while True: + # this'll raise StopIteration and exit the loop + next_range = self._next_range() + + headers, frag_iters = next_range + content_type = headers['Content-Type'] + + content_range = headers.get('Content-Range') + if content_range is not None: + fa_start, fa_end, fa_length = parse_content_range( + content_range) + elif self.fa_length <= 0: + fa_start = None + fa_end = None + fa_length = 0 + else: + fa_start = 0 + fa_end = self.fa_length - 1 + fa_length = self.fa_length + + if not seen_first_headers: + # This is the earliest we can possibly do this. On a + # 200 or 206-single-byterange response, we can learn + # the FA's length from the HTTP response headers. + # However, on a 206-multiple-byteranges response, we + # don't learn it until the first part of the + # response body, in the headers of the first MIME + # part. + # + # Similarly, the content type of a + # 206-multiple-byteranges response is + # "multipart/byteranges", not the object's actual + # content type. + self._fill_out_range_specs_from_fa_length( + fa_length, range_specs) + + satisfiable = False + for range_spec in range_specs: + satisfiable |= range_spec['satisfiable'] + key = (range_spec['resp_fragment_start'], + range_spec['resp_fragment_end']) + ranges_for_resp.setdefault(key, []).append(range_spec) + + # The client may have asked for an unsatisfiable set of + # ranges, but when converted to fragments, the object + # servers see it as satisfiable. For example, imagine a + # request for bytes 800-900 of a 750-byte object with a + # 1024-byte segment size. The object servers will see a + # request for bytes 0-${fragsize-1}, and that's + # satisfiable, so they return 206. It's not until we + # learn the object size that we can check for this + # condition. + # + # Note that some unsatisfiable ranges *will* be caught + # by the object servers, like bytes 1800-1900 of a + # 100-byte object with 1024-byte segments. That's not + # what we're dealing with here, though. + if client_asked_for_range and not satisfiable: + raise HTTPRequestedRangeNotSatisfiable( + request=req, headers=resp_headers) + self.learned_content_type = content_type + seen_first_headers = True + + range_spec = ranges_for_resp[(fa_start, fa_end)].pop(0) + seg_iter = self._decode_segments_from_fragments(frag_iters) + if not range_spec['satisfiable']: + # This'll be small; just a single small segment. Discard + # it. + for x in seg_iter: + pass + continue + + byterange_iter = self._iter_one_range(range_spec, seg_iter) + + converted = { + "start_byte": range_spec["resp_client_start"], + "end_byte": range_spec["resp_client_end"], + "content_type": content_type, + "part_iter": byterange_iter} + + if self.obj_length is not None: + converted["entity_length"] = self.obj_length + yield converted + + return document_iters_to_http_response_body( + convert_ranges_iter(), self.mime_boundary, multipart, self.logger) + + def _iter_one_range(self, range_spec, segment_iter): + client_start = range_spec['resp_client_start'] + client_end = range_spec['resp_client_end'] + segment_start = range_spec['resp_segment_start'] + segment_end = range_spec['resp_segment_end'] + + # It's entirely possible that the client asked for a range that + # includes some bytes we have and some we don't; for example, a + # range of bytes 1000-20000000 on a 1500-byte object. + segment_end = (min(segment_end, self.obj_length - 1) + if segment_end is not None + else self.obj_length - 1) + client_end = (min(client_end, self.obj_length - 1) + if client_end is not None + else self.obj_length - 1) + num_segments = int( + math.ceil(float(segment_end + 1 - segment_start) + / self.policy.ec_segment_size)) + # We get full segments here, but the client may have requested a + # byte range that begins or ends in the middle of a segment. + # Thus, we have some amount of overrun (extra decoded bytes) + # that we trim off so the client gets exactly what they + # requested. + start_overrun = client_start - segment_start + end_overrun = segment_end - client_end + + for i, next_seg in enumerate(segment_iter): + # We may have a start_overrun of more than one segment in + # the case of suffix-byte-range requests. However, we never + # have an end_overrun of more than one segment. + if start_overrun > 0: + seglen = len(next_seg) + if seglen <= start_overrun: + start_overrun -= seglen + continue + else: + next_seg = next_seg[start_overrun:] + start_overrun = 0 + + if i == (num_segments - 1) and end_overrun: + next_seg = next_seg[:-end_overrun] + + yield next_seg + + def _decode_segments_from_fragments(self, fragment_iters): # Decodes the fragments from the object servers and yields one # segment at a time. - queues = [Queue(1) for _junk in range(len(self.internal_app_iters))] + queues = [Queue(1) for _junk in range(len(fragment_iters))] def put_fragments_in_queue(frag_iter, queue): try: @@ -1262,7 +1495,8 @@ class ECAppIter(object): pass except ChunkReadTimeout: # unable to resume in GetOrHeadHandler - pass + self.logger.exception("Timeout fetching fragments for %r" % + self.path) except: # noqa self.logger.exception("Exception fetching fragments for %r" % self.path) @@ -1270,14 +1504,13 @@ class ECAppIter(object): queue.resize(2) # ensure there's room queue.put(None) - with ContextPool(len(self.internal_app_iters)) as pool: - for app_iter, queue in zip( - self.internal_app_iters, queues): - pool.spawn(put_fragments_in_queue, app_iter, queue) + with ContextPool(len(fragment_iters)) as pool: + for frag_iter, queue in zip(fragment_iters, queues): + pool.spawn(put_fragments_in_queue, frag_iter, queue) while True: fragments = [] - for qi, queue in enumerate(queues): + for queue in queues: fragment = queue.get() queue.task_done() fragments.append(fragment) @@ -1302,8 +1535,8 @@ class ECAppIter(object): def app_iter_range(self, start, end): return self - def app_iter_ranges(self, content_type, boundary, content_size): - self.boundary = boundary + def app_iter_ranges(self, ranges, content_type, boundary, content_size): + return self def client_range_to_segment_range(client_start, client_end, segment_size): @@ -1750,6 +1983,71 @@ def trailing_metadata(policy, client_obj_hasher, @ObjectControllerRouter.register(EC_POLICY) class ECObjectController(BaseObjectController): + def _fragment_GET_request(self, req, node_iter, partition, policy): + """ + Makes a GET request for a fragment. + """ + backend_headers = self.generate_request_headers( + req, additional=req.headers) + + getter = ResumingGetter(self.app, req, 'Object', node_iter, + partition, req.swift_entity_path, + backend_headers, + client_chunk_size=policy.fragment_size, + newest=False) + return (getter, getter.response_parts_iter(req)) + + def _convert_range(self, req, policy): + """ + Take the requested range(s) from the client and convert it to range(s) + to be sent to the object servers. + + This includes widening requested ranges to full segments, then + converting those ranges to fragments so that we retrieve the minimum + number of fragments from the object server. + + Mutates the request passed in. + + Returns a list of range specs (dictionaries with the different byte + indices in them). + """ + # Since segments and fragments have different sizes, we need + # to modify the Range header sent to the object servers to + # make sure we get the right fragments out of the fragment + # archives. + segment_size = policy.ec_segment_size + fragment_size = policy.fragment_size + + range_specs = [] + new_ranges = [] + for client_start, client_end in req.range.ranges: + # TODO: coalesce ranges that overlap segments. For + # example, "bytes=0-10,20-30,40-50" with a 64 KiB + # segment size will result in a a Range header in the + # object request of "bytes=0-65535,0-65535,0-65535", + # which is wasteful. We should be smarter and only + # request that first segment once. + segment_start, segment_end = client_range_to_segment_range( + client_start, client_end, segment_size) + + fragment_start, fragment_end = \ + segment_range_to_fragment_range( + segment_start, segment_end, + segment_size, fragment_size) + + new_ranges.append((fragment_start, fragment_end)) + range_specs.append({'req_client_start': client_start, + 'req_client_end': client_end, + 'req_segment_start': segment_start, + 'req_segment_end': segment_end, + 'req_fragment_start': fragment_start, + 'req_fragment_end': fragment_end}) + + req.range = "bytes=" + ",".join( + "%s-%s" % (s if s is not None else "", + e if e is not None else "") + for s, e in new_ranges) + return range_specs def _get_or_head_response(self, req, node_iter, partition, policy): req.headers.setdefault("X-Backend-Etag-Is-At", @@ -1767,63 +2065,35 @@ class ECObjectController(BaseObjectController): range_specs = [] if req.range: orig_range = req.range - # Since segments and fragments have different sizes, we need - # to modify the Range header sent to the object servers to - # make sure we get the right fragments out of the fragment - # archives. - segment_size = policy.ec_segment_size - fragment_size = policy.fragment_size - - range_specs = [] - new_ranges = [] - for client_start, client_end in req.range.ranges: - - segment_start, segment_end = client_range_to_segment_range( - client_start, client_end, segment_size) - - fragment_start, fragment_end = \ - segment_range_to_fragment_range( - segment_start, segment_end, - segment_size, fragment_size) - - new_ranges.append((fragment_start, fragment_end)) - range_specs.append({'client_start': client_start, - 'client_end': client_end, - 'segment_start': segment_start, - 'segment_end': segment_end}) - - req.range = "bytes=" + ",".join( - "%s-%s" % (s if s is not None else "", - e if e is not None else "") - for s, e in new_ranges) + range_specs = self._convert_range(req, policy) node_iter = GreenthreadSafeIterator(node_iter) num_gets = policy.ec_ndata with ContextPool(num_gets) as pool: pile = GreenAsyncPile(pool) for _junk in range(num_gets): - pile.spawn(self.GETorHEAD_base, - req, 'Object', node_iter, partition, - req.swift_entity_path, - client_chunk_size=policy.fragment_size) + pile.spawn(self._fragment_GET_request, + req, node_iter, partition, + policy) - responses = list(pile) - good_responses = [] - bad_responses = [] - for response in responses: - if is_success(response.status_int): - good_responses.append(response) + gets = list(pile) + good_gets = [] + bad_gets = [] + for get, parts_iter in gets: + if is_success(get.last_status): + good_gets.append((get, parts_iter)) else: - bad_responses.append(response) + bad_gets.append((get, parts_iter)) req.range = orig_range - if len(good_responses) == num_gets: + if len(good_gets) == num_gets: # If these aren't all for the same object, then error out so # at least the client doesn't get garbage. We can do a lot # better here with more work, but this'll work for now. found_obj_etags = set( - resp.headers['X-Object-Sysmeta-Ec-Etag'] - for resp in good_responses) + HeaderKeyDict( + getter.last_headers)['X-Object-Sysmeta-Ec-Etag'] + for getter, _junk in good_gets) if len(found_obj_etags) > 1: self.app.logger.debug( "Returning 503 for %s; found too many etags (%s)", @@ -1833,30 +2103,41 @@ class ECObjectController(BaseObjectController): # we found enough pieces to decode the object, so now let's # decode the object - resp_headers = HeaderKeyDict(good_responses[0].headers.items()) + resp_headers = HeaderKeyDict( + good_gets[0][0].source_headers[-1]) resp_headers.pop('Content-Range', None) eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length') obj_length = int(eccl) if eccl is not None else None + # This is only true if we didn't get a 206 response, but + # that's the only time this is used anyway. + fa_length = int(resp_headers['Content-Length']) + + app_iter = ECAppIter( + req.swift_entity_path, + policy, + [iterator for getter, iterator in good_gets], + range_specs, fa_length, obj_length, + self.app.logger) resp = Response( request=req, headers=resp_headers, conditional_response=True, - app_iter=ECAppIter( - req.swift_entity_path, - policy, - [r.app_iter for r in good_responses], - range_specs, - obj_length, - logger=self.app.logger)) + app_iter=app_iter) + app_iter.kickoff(req, resp) else: + statuses = [] + reasons = [] + bodies = [] + headers = [] + for getter, body_parts_iter in bad_gets: + statuses.extend(getter.statuses) + reasons.extend(getter.reasons) + bodies.extend(getter.bodies) + headers.extend(getter.source_headers) resp = self.best_response( - req, - [r.status_int for r in bad_responses], - [r.status.split(' ', 1)[1] for r in bad_responses], - [r.body for r in bad_responses], - 'Object', - headers=[r.headers for r in bad_responses]) + req, statuses, reasons, bodies, 'Object', + headers=headers) self._fix_response_headers(resp) return resp diff --git a/test/unit/common/test_swob.py b/test/unit/common/test_swob.py index 7015abb8eb..5dcd230849 100644 --- a/test/unit/common/test_swob.py +++ b/test/unit/common/test_swob.py @@ -1208,8 +1208,7 @@ class TestResponse(unittest.TestCase): ('0123456789112345678' '92123456789'))) - self.assert_(re.match(('\r\n' - '--[a-f0-9]{32}\r\n' + self.assert_(re.match(('--[a-f0-9]{32}\r\n' 'Content-Type: text/plain\r\n' 'Content-Range: bytes ' '0-9/100\r\n\r\n0123456789\r\n' @@ -1221,7 +1220,7 @@ class TestResponse(unittest.TestCase): 'Content-Type: text/plain\r\n' 'Content-Range: bytes ' '20-29/100\r\n\r\n2123456789\r\n' - '--[a-f0-9]{32}--\r\n'), content)) + '--[a-f0-9]{32}--'), content)) def test_multi_response_iter(self): def test_app(environ, start_response): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 113b712ab1..33298a6fcb 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -60,7 +60,7 @@ from swift.common.exceptions import (Timeout, MessageTimeout, MimeInvalid, ThreadPoolDead) from swift.common import utils from swift.common.container_sync_realms import ContainerSyncRealms -from swift.common.swob import Request, Response +from swift.common.swob import Request, Response, HeaderKeyDict from test.unit import FakeLogger @@ -4722,6 +4722,18 @@ class TestIterMultipartMimeDocuments(unittest.TestCase): exc = err self.assertTrue(exc is not None) + def test_leading_crlfs(self): + it = utils.iter_multipart_mime_documents( + StringIO('\r\n\r\n\r\n--unique\r\nabcdefg\r\n' + '--unique\r\nhijkl\r\n--unique--'), + 'unique') + fp = it.next() + self.assertEquals(fp.read(65536), 'abcdefg') + self.assertEquals(fp.read(), '') + fp = it.next() + self.assertEquals(fp.read(), 'hijkl') + self.assertRaises(StopIteration, it.next) + def test_broken_mid_stream(self): # We go ahead and accept whatever is sent instead of rejecting the # whole request, in case the partial form is still useful. @@ -4777,6 +4789,156 @@ class TestIterMultipartMimeDocuments(unittest.TestCase): self.assertTrue(exc is not None) +class FakeResponse(object): + def __init__(self, status, headers, body): + self.status = status + self.headers = HeaderKeyDict(headers) + self.body = StringIO(body) + + def getheader(self, header_name): + return str(self.headers.get(header_name, '')) + + def getheaders(self): + return self.headers.items() + + def read(self, length=None): + return self.body.read(length) + + def readline(self, length=None): + return self.body.readline(length) + + +class TestHTTPResponseToDocumentIters(unittest.TestCase): + def test_200(self): + fr = FakeResponse( + 200, + {'Content-Length': '10', 'Content-Type': 'application/lunch'}, + 'sandwiches') + + doc_iters = utils.http_response_to_document_iters(fr) + first_byte, last_byte, length, headers, body = next(doc_iters) + self.assertEqual(first_byte, 0) + self.assertEqual(last_byte, 9) + self.assertEqual(length, 10) + header_dict = HeaderKeyDict(headers) + self.assertEqual(header_dict.get('Content-Length'), '10') + self.assertEqual(header_dict.get('Content-Type'), 'application/lunch') + self.assertEqual(body.read(), 'sandwiches') + + self.assertRaises(StopIteration, next, doc_iters) + + def test_206_single_range(self): + fr = FakeResponse( + 206, + {'Content-Length': '8', 'Content-Type': 'application/lunch', + 'Content-Range': 'bytes 1-8/10'}, + 'andwiche') + + doc_iters = utils.http_response_to_document_iters(fr) + first_byte, last_byte, length, headers, body = next(doc_iters) + self.assertEqual(first_byte, 1) + self.assertEqual(last_byte, 8) + self.assertEqual(length, 10) + header_dict = HeaderKeyDict(headers) + self.assertEqual(header_dict.get('Content-Length'), '8') + self.assertEqual(header_dict.get('Content-Type'), 'application/lunch') + self.assertEqual(body.read(), 'andwiche') + + self.assertRaises(StopIteration, next, doc_iters) + + def test_206_multiple_ranges(self): + fr = FakeResponse( + 206, + {'Content-Type': 'multipart/byteranges; boundary=asdfasdfasdf'}, + ("--asdfasdfasdf\r\n" + "Content-Type: application/lunch\r\n" + "Content-Range: bytes 0-3/10\r\n" + "\r\n" + "sand\r\n" + "--asdfasdfasdf\r\n" + "Content-Type: application/lunch\r\n" + "Content-Range: bytes 6-9/10\r\n" + "\r\n" + "ches\r\n" + "--asdfasdfasdf--")) + + doc_iters = utils.http_response_to_document_iters(fr) + + first_byte, last_byte, length, headers, body = next(doc_iters) + self.assertEqual(first_byte, 0) + self.assertEqual(last_byte, 3) + self.assertEqual(length, 10) + header_dict = HeaderKeyDict(headers) + self.assertEqual(header_dict.get('Content-Type'), 'application/lunch') + self.assertEqual(body.read(), 'sand') + + first_byte, last_byte, length, headers, body = next(doc_iters) + self.assertEqual(first_byte, 6) + self.assertEqual(last_byte, 9) + self.assertEqual(length, 10) + header_dict = HeaderKeyDict(headers) + self.assertEqual(header_dict.get('Content-Type'), 'application/lunch') + self.assertEqual(body.read(), 'ches') + + self.assertRaises(StopIteration, next, doc_iters) + + +class TestDocumentItersToHTTPResponseBody(unittest.TestCase): + def test_no_parts(self): + body = utils.document_iters_to_http_response_body( + iter([]), 'dontcare', + multipart=False, logger=FakeLogger()) + self.assertEqual(body, '') + + def test_single_part(self): + body = "time flies like an arrow; fruit flies like a banana" + doc_iters = [{'part_iter': iter(StringIO(body).read, '')}] + + resp_body = ''.join( + utils.document_iters_to_http_response_body( + iter(doc_iters), 'dontcare', + multipart=False, logger=FakeLogger())) + self.assertEqual(resp_body, body) + + def test_multiple_parts(self): + part1 = "two peanuts were walking down a railroad track" + part2 = "and one was a salted. ... peanut." + + doc_iters = [{ + 'start_byte': 88, + 'end_byte': 133, + 'content_type': 'application/peanut', + 'entity_length': 1024, + 'part_iter': iter(StringIO(part1).read, ''), + }, { + 'start_byte': 500, + 'end_byte': 532, + 'content_type': 'application/salted', + 'entity_length': 1024, + 'part_iter': iter(StringIO(part2).read, ''), + }] + + resp_body = ''.join( + utils.document_iters_to_http_response_body( + iter(doc_iters), 'boundaryboundary', + multipart=True, logger=FakeLogger())) + self.assertEqual(resp_body, ( + "--boundaryboundary\r\n" + + # This is a little too strict; we don't actually care that the + # headers are in this order, but the test is much more legible + # this way. + "Content-Type: application/peanut\r\n" + + "Content-Range: bytes 88-133/1024\r\n" + + "\r\n" + + part1 + "\r\n" + + "--boundaryboundary\r\n" + "Content-Type: application/salted\r\n" + + "Content-Range: bytes 500-532/1024\r\n" + + "\r\n" + + part2 + "\r\n" + + "--boundaryboundary--")) + + class TestPairs(unittest.TestCase): def test_pairs(self): items = [10, 20, 30, 40, 50, 60] diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 0ebd96eabd..d3fafd8b91 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -658,6 +658,7 @@ class TestFuncs(unittest.TestCase): class TestSource(object): def __init__(self, chunks): self.chunks = list(chunks) + self.status = 200 def read(self, _read_size): if self.chunks: @@ -665,6 +666,13 @@ class TestFuncs(unittest.TestCase): else: return '' + def getheader(self, header): + if header.lower() == "content-length": + return str(sum(len(c) for c in self.chunks)) + + def getheaders(self): + return [('content-length', self.getheader('content-length'))] + source = TestSource(( 'abcd', '1234', 'abc', 'd1', '234abcd1234abcd1', '2')) req = Request.blank('/v1/a/c/o') @@ -682,6 +690,7 @@ class TestFuncs(unittest.TestCase): class TestSource(object): def __init__(self, chunks): self.chunks = list(chunks) + self.status = 200 def read(self, _read_size): if self.chunks: @@ -693,6 +702,14 @@ class TestFuncs(unittest.TestCase): else: return '' + def getheader(self, header): + if header.lower() == "content-length": + return str(sum(len(c) for c in self.chunks + if c is not None)) + + def getheaders(self): + return [('content-length', self.getheader('content-length'))] + node = {'ip': '1.2.3.4', 'port': 6000, 'device': 'sda'} source1 = TestSource(['abcd', '1234', 'abc', None]) @@ -707,7 +724,6 @@ class TestFuncs(unittest.TestCase): lambda: (source2, node)): client_chunks = list(app_iter) self.assertEqual(client_chunks, ['abcd1234', 'efgh5678']) - self.assertEqual(handler.backend_headers['Range'], 'bytes=8-') def test_bytes_to_skip(self): # if you start at the beginning, skip nothing diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index c28a3625aa..d16f66c34b 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -939,8 +939,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): def test_GET_simple_x_newest(self): req = swift.common.swob.Request.blank('/v1/a/c/o', headers={'X-Newest': 'true'}) - codes = [200] * self.replicas() - codes += [404] * self.obj_ring.max_more_nodes + codes = [200] * self.policy.ec_ndata with set_http_connect(*codes): resp = req.get_response(self.app) self.assertEquals(resp.status_int, 200) @@ -976,7 +975,8 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): node_fragments = zip(*fragment_payloads) self.assertEqual(len(node_fragments), self.replicas()) # sanity - responses = [(200, ''.join(node_fragments[i]), {}) + headers = {'X-Object-Sysmeta-Ec-Content-Length': str(len(real_body))} + responses = [(200, ''.join(node_fragments[i]), headers) for i in range(POLICIES.default.ec_ndata)] status_codes, body_iter, headers = zip(*responses) with set_http_connect(*status_codes, body_iter=body_iter, @@ -1260,8 +1260,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): 'X-Copy-From': 'c2/o'}) # c2 get - codes = [200] * self.replicas() - codes += [404] * self.obj_ring.max_more_nodes + codes = [404, 200] * self.policy.ec_ndata headers = { 'X-Object-Sysmeta-Ec-Content-Length': 0, } @@ -1318,9 +1317,11 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1) ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2) - headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1} + headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1, + 'X-Object-Sysmeta-Ec-Content-Length': '333'} # here we're going to *lie* and say the etag here matches - headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1} + headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1, + 'X-Object-Sysmeta-Ec-Content-Length': '333'} responses1 = [(200, body, headers1) for body in ec_archive_bodies1] diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 3b0115bbfc..5f85c55e4a 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -14,10 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import email.parser import logging import math import os import pickle +import rfc822 import sys import unittest from contextlib import closing, contextmanager, nested @@ -40,7 +42,8 @@ import random import mock from eventlet import sleep, spawn, wsgi, listen, Timeout -from swift.common.utils import hash_path, json, storage_directory, public +from swift.common.utils import hash_path, json, storage_directory, \ + parse_content_type, iter_multipart_mime_documents, public from test.unit import ( connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing, @@ -1378,6 +1381,331 @@ class TestObjectController(unittest.TestCase): self.assertEqual(res.status_int, 200) self.assertEqual(res.body, obj) + @unpatch_policies + def test_GET_ranges(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + obj = (''.join( + ('beans lots of beans lots of beans lots of beans yeah %04d ' % i) + for i in range(100))) + + path = '/v1/a/c/o.beans' + fd.write('PUT %s HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Length: %s\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (path, str(len(obj)), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + # one byte range + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': 'application/octet-stream', + 'Range': 'bytes=10-200'}) + res = req.get_response(prosrv) + self.assertEqual(res.status_int, 206) + self.assertEqual(res.body, obj[10:201]) + + # multiple byte ranges + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': 'application/octet-stream', + 'Range': 'bytes=10-200,1000-1099,4123-4523'}) + res = req.get_response(prosrv) + self.assertEqual(res.status_int, 206) + ct, params = parse_content_type(res.headers['Content-Type']) + self.assertEqual(ct, 'multipart/byteranges') + + boundary = dict(params).get('boundary') + self.assertTrue(boundary is not None) + + got_mime_docs = [] + for mime_doc_fh in iter_multipart_mime_documents(StringIO(res.body), + boundary): + headers = HeaderKeyDict(rfc822.Message(mime_doc_fh, 0).items()) + body = mime_doc_fh.read() + got_mime_docs.append((headers, body)) + self.assertEqual(len(got_mime_docs), 3) + + first_range_headers = got_mime_docs[0][0] + first_range_body = got_mime_docs[0][1] + self.assertEqual(first_range_headers['Content-Range'], + 'bytes 10-200/5800') + self.assertEqual(first_range_body, obj[10:201]) + + second_range_headers = got_mime_docs[1][0] + second_range_body = got_mime_docs[1][1] + self.assertEqual(second_range_headers['Content-Range'], + 'bytes 1000-1099/5800') + self.assertEqual(second_range_body, obj[1000:1100]) + + second_range_headers = got_mime_docs[2][0] + second_range_body = got_mime_docs[2][1] + self.assertEqual(second_range_headers['Content-Range'], + 'bytes 4123-4523/5800') + self.assertEqual(second_range_body, obj[4123:4524]) + + @unpatch_policies + def test_GET_ranges_resuming(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + obj = (''.join( + ('Smurf! The smurfing smurf is completely smurfed. %03d ' % i) + for i in range(1000))) + + path = '/v1/a/c/o.smurfs' + fd.write('PUT %s HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Length: %s\r\n' + 'Content-Type: application/smurftet-stream\r\n' + '\r\n%s' % (path, str(len(obj)), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + kaboomed = [0] + bytes_before_timeout = [None] + + class FileLikeKaboom(object): + def __init__(self, inner_file_like): + self.inner_file_like = inner_file_like + + # close(), etc. + def __getattr__(self, attr): + return getattr(self.inner_file_like, attr) + + def readline(self, *a, **kw): + if bytes_before_timeout[0] <= 0: + kaboomed[0] += 1 + raise ChunkReadTimeout(None) + result = self.inner_file_like.readline(*a, **kw) + if len(result) > bytes_before_timeout[0]: + result = result[:bytes_before_timeout[0]] + bytes_before_timeout[0] -= len(result) + return result + + def read(self, length=None): + result = self.inner_file_like.read(length) + if bytes_before_timeout[0] <= 0: + kaboomed[0] += 1 + raise ChunkReadTimeout(None) + if len(result) > bytes_before_timeout[0]: + result = result[:bytes_before_timeout[0]] + bytes_before_timeout[0] -= len(result) + return result + + orig_hrtdi = proxy_base.http_response_to_document_iters + + # Use this to mock out http_response_to_document_iters. On the first + # call, the result will be sabotaged to blow up with + # ChunkReadTimeout after some number of bytes are read. On + # subsequent calls, no sabotage will be added. + + def sabotaged_hrtdi(*a, **kw): + resp_parts = orig_hrtdi(*a, **kw) + for sb, eb, l, h, range_file in resp_parts: + if bytes_before_timeout[0] <= 0: + # simulate being unable to read MIME part of + # multipart/byteranges response + kaboomed[0] += 1 + raise ChunkReadTimeout(None) + boomer = FileLikeKaboom(range_file) + yield sb, eb, l, h, boomer + + sabotaged = [False] + + def single_sabotage_hrtdi(*a, **kw): + if not sabotaged[0]: + sabotaged[0] = True + return sabotaged_hrtdi(*a, **kw) + else: + return orig_hrtdi(*a, **kw) + + # We want sort of an end-to-end test of object resuming, so what we + # do is mock out stuff so the proxy thinks it only read a certain + # number of bytes before it got a timeout. + bytes_before_timeout[0] = 300 + with mock.patch.object(proxy_base, 'http_response_to_document_iters', + single_sabotage_hrtdi): + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Content-Type': 'application/octet-stream', + 'Range': 'bytes=0-500'}) + res = req.get_response(prosrv) + body = res.body # read the whole thing + self.assertEqual(kaboomed[0], 1) # sanity check + self.assertEqual(res.status_int, 206) + self.assertEqual(len(body), 501) + self.assertEqual(body, obj[:501]) + + # Sanity-check for multi-range resume: make sure we actually break + # in the middle of the second byterange. This test is partially + # about what happens when all the object servers break at once, and + # partially about validating all these mocks we do. After all, the + # point of resuming is that the client can't tell anything went + # wrong, so we need a test where we can't resume and something + # *does* go wrong so we can observe it. + bytes_before_timeout[0] = 700 + kaboomed[0] = 0 + sabotaged[0] = False + prosrv._error_limiting = {} # clear out errors + with mock.patch.object(proxy_base, 'http_response_to_document_iters', + sabotaged_hrtdi): # perma-broken + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=0-500,1000-1500,2000-2500'}) + res = req.get_response(prosrv) + body = '' + try: + for chunk in res.app_iter: + body += chunk + except ChunkReadTimeout: + pass + + self.assertEqual(res.status_int, 206) + self.assertTrue(kaboomed[0] > 0) # sanity check + + ct, params = parse_content_type(res.headers['Content-Type']) + self.assertEqual(ct, 'multipart/byteranges') # sanity check + boundary = dict(params).get('boundary') + self.assertTrue(boundary is not None) # sanity check + got_byteranges = [] + for mime_doc_fh in iter_multipart_mime_documents(StringIO(body), + boundary): + rfc822.Message(mime_doc_fh, 0) + body = mime_doc_fh.read() + got_byteranges.append(body) + + self.assertEqual(len(got_byteranges), 2) + self.assertEqual(len(got_byteranges[0]), 501) + self.assertEqual(len(got_byteranges[1]), 199) # partial + + # Multi-range resume, resuming in the middle of the first byterange + bytes_before_timeout[0] = 300 + kaboomed[0] = 0 + sabotaged[0] = False + prosrv._error_limiting = {} # clear out errors + with mock.patch.object(proxy_base, 'http_response_to_document_iters', + single_sabotage_hrtdi): + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=0-500,1000-1500,2000-2500'}) + res = req.get_response(prosrv) + body = ''.join(res.app_iter) + + self.assertEqual(res.status_int, 206) + self.assertEqual(kaboomed[0], 1) # sanity check + + ct, params = parse_content_type(res.headers['Content-Type']) + self.assertEqual(ct, 'multipart/byteranges') # sanity check + boundary = dict(params).get('boundary') + self.assertTrue(boundary is not None) # sanity check + got_byteranges = [] + for mime_doc_fh in iter_multipart_mime_documents(StringIO(body), + boundary): + rfc822.Message(mime_doc_fh, 0) + body = mime_doc_fh.read() + got_byteranges.append(body) + + self.assertEqual(len(got_byteranges), 3) + self.assertEqual(len(got_byteranges[0]), 501) + self.assertEqual(got_byteranges[0], obj[:501]) + self.assertEqual(len(got_byteranges[1]), 501) + self.assertEqual(got_byteranges[1], obj[1000:1501]) + self.assertEqual(len(got_byteranges[2]), 501) + self.assertEqual(got_byteranges[2], obj[2000:2501]) + + # Multi-range resume, first GET dies in the middle of the second set + # of MIME headers + bytes_before_timeout[0] = 501 + kaboomed[0] = 0 + sabotaged[0] = False + prosrv._error_limiting = {} # clear out errors + with mock.patch.object(proxy_base, 'http_response_to_document_iters', + single_sabotage_hrtdi): + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=0-500,1000-1500,2000-2500'}) + res = req.get_response(prosrv) + body = ''.join(res.app_iter) + + self.assertEqual(res.status_int, 206) + self.assertTrue(kaboomed[0] >= 1) # sanity check + + ct, params = parse_content_type(res.headers['Content-Type']) + self.assertEqual(ct, 'multipart/byteranges') # sanity check + boundary = dict(params).get('boundary') + self.assertTrue(boundary is not None) # sanity check + got_byteranges = [] + for mime_doc_fh in iter_multipart_mime_documents(StringIO(body), + boundary): + rfc822.Message(mime_doc_fh, 0) + body = mime_doc_fh.read() + got_byteranges.append(body) + + self.assertEqual(len(got_byteranges), 3) + self.assertEqual(len(got_byteranges[0]), 501) + self.assertEqual(got_byteranges[0], obj[:501]) + self.assertEqual(len(got_byteranges[1]), 501) + self.assertEqual(got_byteranges[1], obj[1000:1501]) + self.assertEqual(len(got_byteranges[2]), 501) + self.assertEqual(got_byteranges[2], obj[2000:2501]) + + # Multi-range resume, first GET dies in the middle of the second + # byterange + bytes_before_timeout[0] = 750 + kaboomed[0] = 0 + sabotaged[0] = False + prosrv._error_limiting = {} # clear out errors + with mock.patch.object(proxy_base, 'http_response_to_document_iters', + single_sabotage_hrtdi): + req = Request.blank( + path, + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=0-500,1000-1500,2000-2500'}) + res = req.get_response(prosrv) + body = ''.join(res.app_iter) + + self.assertEqual(res.status_int, 206) + self.assertTrue(kaboomed[0] >= 1) # sanity check + + ct, params = parse_content_type(res.headers['Content-Type']) + self.assertEqual(ct, 'multipart/byteranges') # sanity check + boundary = dict(params).get('boundary') + self.assertTrue(boundary is not None) # sanity check + got_byteranges = [] + for mime_doc_fh in iter_multipart_mime_documents(StringIO(body), + boundary): + rfc822.Message(mime_doc_fh, 0) + body = mime_doc_fh.read() + got_byteranges.append(body) + + self.assertEqual(len(got_byteranges), 3) + self.assertEqual(len(got_byteranges[0]), 501) + self.assertEqual(got_byteranges[0], obj[:501]) + self.assertEqual(len(got_byteranges[1]), 501) + self.assertEqual(got_byteranges[1], obj[1000:1501]) + self.assertEqual(len(got_byteranges[2]), 501) + self.assertEqual(got_byteranges[2], obj[2000:2501]) + @unpatch_policies def test_PUT_ec(self): policy = POLICIES[3] @@ -1872,6 +2200,12 @@ class TestObjectController(unittest.TestCase): yield next(inner_iter) raise Exception("doom ba doom") + def explodey_doc_parts_iter(inner_iter_iter): + for item in inner_iter_iter: + item = item.copy() # paranoia about mutable data + item['part_iter'] = explodey_iter(item['part_iter']) + yield item + real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter def explodey_ec_app_iter(path, policy, iterators, *a, **kw): @@ -1882,7 +2216,7 @@ class TestObjectController(unittest.TestCase): # the client when things go wrong. return real_ec_app_iter( path, policy, - [explodey_iter(i) for i in iterators], + [explodey_doc_parts_iter(i) for i in iterators], *a, **kw) with mock.patch("swift.proxy.controllers.obj.ECAppIter", @@ -6325,7 +6659,7 @@ class TestObjectECRangedGET(unittest.TestCase): 'Connection: close\r\n' 'Content-Length: %d\r\n' 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' + 'Content-Type: donuts\r\n' '\r\n%s' % (obj_name, len(obj), obj)) fd.flush() headers = readuntil2crlfs(fd) @@ -6359,8 +6693,44 @@ class TestObjectECRangedGET(unittest.TestCase): break gotten_obj += buf + # if we get this wrong, clients will either get truncated data or + # they'll hang waiting for bytes that aren't coming, so it warrants + # being asserted for every test case + if 'Content-Length' in headers: + self.assertEqual(int(headers['Content-Length']), len(gotten_obj)) + + # likewise, if we say MIME and don't send MIME or vice versa, + # clients will be horribly confused + if headers.get('Content-Type', '').startswith('multipart/byteranges'): + self.assertEqual(gotten_obj[:2], "--") + else: + # In general, this isn't true, as you can start an object with + # "--". However, in this test, we don't start any objects with + # "--", or even include "--" in their contents anywhere. + self.assertNotEqual(gotten_obj[:2], "--") + return (status_code, headers, gotten_obj) + def _parse_multipart(self, content_type, body): + parser = email.parser.FeedParser() + parser.feed("Content-Type: %s\r\n\r\n" % content_type) + parser.feed(body) + root_message = parser.close() + self.assertTrue(root_message.is_multipart()) + byteranges = root_message.get_payload() + self.assertFalse(root_message.defects) + for i, message in enumerate(byteranges): + self.assertFalse(message.defects, "Part %d had defects" % i) + self.assertFalse(message.is_multipart(), + "Nested multipart at %d" % i) + return byteranges + + def test_bogus(self): + status, headers, gotten_obj = self._get_obj("tacos=3-5") + self.assertEqual(status, 200) + self.assertEqual(len(gotten_obj), len(self.obj)) + self.assertEqual(gotten_obj, self.obj) + def test_unaligned(self): # One segment's worth of data, but straddling two segment boundaries # (so it has data from three segments) @@ -6372,7 +6742,7 @@ class TestObjectECRangedGET(unittest.TestCase): self.assertEqual(gotten_obj, self.obj[3783:7879]) def test_aligned_left(self): - # First byte is aligned to a segment boundary, last byte is not + # Firts byte is aligned to a segment boundary, last byte is not status, headers, gotten_obj = self._get_obj("bytes=0-5500") self.assertEqual(status, 206) self.assertEqual(headers['Content-Length'], "5501") @@ -6540,6 +6910,168 @@ class TestObjectECRangedGET(unittest.TestCase): self.assertEqual(len(gotten_obj), len(self.tiny_obj)) self.assertEqual(gotten_obj, self.tiny_obj) + def test_multiple_ranges(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-100,4490-5010", self.obj_name) + self.assertEqual(status, 206) + self.assertEqual(headers["Content-Length"], str(len(gotten_obj))) + + content_type, content_type_params = parse_content_type( + headers['Content-Type']) + content_type_params = dict(content_type_params) + + self.assertEqual(content_type, 'multipart/byteranges') + boundary = content_type_params.get('boundary') + self.assertTrue(boundary is not None) + + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 2) + first_byterange, second_byterange = got_byteranges + + self.assertEqual(first_byterange['Content-Range'], + 'bytes 0-100/14513') + self.assertEqual(first_byterange.get_payload(), self.obj[:101]) + + self.assertEqual(second_byterange['Content-Range'], + 'bytes 4490-5010/14513') + self.assertEqual(second_byterange.get_payload(), self.obj[4490:5011]) + + def test_multiple_ranges_overlapping_in_segment(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-9,20-29,40-49,60-69,80-89") + self.assertEqual(status, 206) + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 5) + + def test_multiple_ranges_off_end(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-10,14500-14513") # there is no byte 14513, only 0-14512 + self.assertEqual(status, 206) + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 2) + self.assertEqual(got_byteranges[0]['Content-Range'], + "bytes 0-10/14513") + self.assertEqual(got_byteranges[1]['Content-Range'], + "bytes 14500-14512/14513") + + def test_multiple_ranges_suffix_off_end(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-10,-13") + self.assertEqual(status, 206) + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 2) + self.assertEqual(got_byteranges[0]['Content-Range'], + "bytes 0-10/14513") + self.assertEqual(got_byteranges[1]['Content-Range'], + "bytes 14500-14512/14513") + + def test_multiple_ranges_one_barely_unsatisfiable(self): + # The thing about 14515-14520 is that it comes from the last segment + # in the object. When we turn this range into a fragment range, + # it'll be for the last fragment, so the object servers see + # something satisfiable. + # + # Basically, we'll get 3 byteranges from the object server, but we + # have to filter out the unsatisfiable one on our own. + status, headers, gotten_obj = self._get_obj( + "bytes=0-10,14515-14520,40-50") + self.assertEqual(status, 206) + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 2) + self.assertEqual(got_byteranges[0]['Content-Range'], + "bytes 0-10/14513") + self.assertEqual(got_byteranges[0].get_payload(), self.obj[0:11]) + self.assertEqual(got_byteranges[1]['Content-Range'], + "bytes 40-50/14513") + self.assertEqual(got_byteranges[1].get_payload(), self.obj[40:51]) + + def test_multiple_ranges_some_unsatisfiable(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-100,4090-5010,999999-9999999", self.obj_name) + self.assertEqual(status, 206) + + content_type, content_type_params = parse_content_type( + headers['Content-Type']) + content_type_params = dict(content_type_params) + + self.assertEqual(content_type, 'multipart/byteranges') + boundary = content_type_params.get('boundary') + self.assertTrue(boundary is not None) + + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 2) + first_byterange, second_byterange = got_byteranges + + self.assertEqual(first_byterange['Content-Range'], + 'bytes 0-100/14513') + self.assertEqual(first_byterange.get_payload(), self.obj[:101]) + + self.assertEqual(second_byterange['Content-Range'], + 'bytes 4090-5010/14513') + self.assertEqual(second_byterange.get_payload(), self.obj[4090:5011]) + + def test_two_ranges_one_unsatisfiable(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-100,999999-9999999", self.obj_name) + self.assertEqual(status, 206) + + content_type, content_type_params = parse_content_type( + headers['Content-Type']) + + # According to RFC 7233, this could be either a multipart/byteranges + # response with one part or it could be a single-part response (just + # the bytes, no MIME). We're locking it down here: single-part + # response. That's what replicated objects do, and we don't want any + # client-visible differences between EC objects and replicated ones. + self.assertEqual(content_type, 'donuts') + self.assertEqual(gotten_obj, self.obj[:101]) + + def test_two_ranges_one_unsatisfiable_same_segment(self): + # Like test_two_ranges_one_unsatisfiable(), but where both ranges + # fall within the same EC segment. + status, headers, gotten_obj = self._get_obj( + "bytes=14500-14510,14520-14530") + + self.assertEqual(status, 206) + + content_type, content_type_params = parse_content_type( + headers['Content-Type']) + + self.assertEqual(content_type, 'donuts') + self.assertEqual(gotten_obj, self.obj[14500:14511]) + + def test_multiple_ranges_some_unsatisfiable_out_of_order(self): + status, headers, gotten_obj = self._get_obj( + "bytes=0-100,99999998-99999999,4090-5010", self.obj_name) + self.assertEqual(status, 206) + + content_type, content_type_params = parse_content_type( + headers['Content-Type']) + content_type_params = dict(content_type_params) + + self.assertEqual(content_type, 'multipart/byteranges') + boundary = content_type_params.get('boundary') + self.assertTrue(boundary is not None) + + got_byteranges = self._parse_multipart(headers['Content-Type'], + gotten_obj) + self.assertEqual(len(got_byteranges), 2) + first_byterange, second_byterange = got_byteranges + + self.assertEqual(first_byterange['Content-Range'], + 'bytes 0-100/14513') + self.assertEqual(first_byterange.get_payload(), self.obj[:101]) + + self.assertEqual(second_byterange['Content-Range'], + 'bytes 4090-5010/14513') + self.assertEqual(second_byterange.get_payload(), self.obj[4090:5011]) + @patch_policies([ StoragePolicy(0, 'zero', True, object_ring=FakeRing(base_port=3000)),