Merge "EC: support multiple ranges for GET requests"
This commit is contained in:
commit
0cf95b0555
@ -62,6 +62,7 @@ class BufferedHTTPResponse(HTTPResponse):
|
||||
self.chunk_left = _UNKNOWN # bytes left to read in current chunk
|
||||
self.length = _UNKNOWN # number of bytes left in response
|
||||
self.will_close = _UNKNOWN # conn will close at end of response
|
||||
self._readline_buffer = ''
|
||||
|
||||
def expect_response(self):
|
||||
if self.fp:
|
||||
@ -79,6 +80,48 @@ class BufferedHTTPResponse(HTTPResponse):
|
||||
self.msg = HTTPMessage(self.fp, 0)
|
||||
self.msg.fp = None
|
||||
|
||||
def read(self, amt=None):
|
||||
if not self._readline_buffer:
|
||||
return HTTPResponse.read(self, amt)
|
||||
|
||||
if amt is None:
|
||||
# Unbounded read: send anything we have buffered plus whatever
|
||||
# is left.
|
||||
buffered = self._readline_buffer
|
||||
self._readline_buffer = ''
|
||||
return buffered + HTTPResponse.read(self, amt)
|
||||
elif amt <= len(self._readline_buffer):
|
||||
# Bounded read that we can satisfy entirely from our buffer
|
||||
res = self._readline_buffer[:amt]
|
||||
self._readline_buffer = self._readline_buffer[amt:]
|
||||
return res
|
||||
else:
|
||||
# Bounded read that wants more bytes than we have
|
||||
smaller_amt = amt - len(self._readline_buffer)
|
||||
buf = self._readline_buffer
|
||||
self._readline_buffer = ''
|
||||
return buf + HTTPResponse.read(self, smaller_amt)
|
||||
|
||||
def readline(self, size=1024):
|
||||
# You'd think Python's httplib would provide this, but it doesn't.
|
||||
# It does, however, provide a comment in the HTTPResponse class:
|
||||
#
|
||||
# # XXX It would be nice to have readline and __iter__ for this,
|
||||
# # too.
|
||||
#
|
||||
# Yes, it certainly would.
|
||||
while ('\n' not in self._readline_buffer
|
||||
and len(self._readline_buffer) < size):
|
||||
read_size = size - len(self._readline_buffer)
|
||||
chunk = HTTPResponse.read(self, read_size)
|
||||
if not chunk:
|
||||
break
|
||||
self._readline_buffer += chunk
|
||||
|
||||
line, newline, rest = self._readline_buffer.partition('\n')
|
||||
self._readline_buffer = rest
|
||||
return line + newline
|
||||
|
||||
def nuke_from_orbit(self):
|
||||
"""
|
||||
Terminate the socket with extreme prejudice.
|
||||
|
@ -57,6 +57,10 @@ class SuffixSyncError(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class RangeAlreadyComplete(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class DiskFileError(SwiftException):
|
||||
pass
|
||||
|
||||
|
@ -1089,13 +1089,14 @@ def content_range_header(start, stop, size):
|
||||
|
||||
def multi_range_iterator(ranges, content_type, boundary, size, sub_iter_gen):
|
||||
for start, stop in ranges:
|
||||
yield ''.join(['\r\n--', boundary, '\r\n',
|
||||
yield ''.join(['--', boundary, '\r\n',
|
||||
'Content-Type: ', content_type, '\r\n'])
|
||||
yield content_range_header(start, stop, size) + '\r\n\r\n'
|
||||
sub_iter = sub_iter_gen(start, stop)
|
||||
for chunk in sub_iter:
|
||||
yield chunk
|
||||
yield '\r\n--' + boundary + '--\r\n'
|
||||
yield '\r\n'
|
||||
yield '--' + boundary + '--'
|
||||
|
||||
|
||||
class Response(object):
|
||||
@ -1177,21 +1178,37 @@ class Response(object):
|
||||
self.content_type = ''.join(['multipart/byteranges;',
|
||||
'boundary=', self.boundary])
|
||||
|
||||
# This section calculate the total size of the targeted response
|
||||
# The value 12 is the length of total bytes of hyphen, new line
|
||||
# form feed for each section header. The value 8 is the length of
|
||||
# total bytes of hyphen, new line, form feed characters for the
|
||||
# closing boundary which appears only once
|
||||
section_header_fixed_len = 12 + (len(self.boundary) +
|
||||
len('Content-Type: ') +
|
||||
len(content_type) +
|
||||
len('Content-Range: bytes '))
|
||||
# This section calculates the total size of the response.
|
||||
section_header_fixed_len = (
|
||||
# --boundary\r\n
|
||||
len(self.boundary) + 4
|
||||
# Content-Type: <type>\r\n
|
||||
+ len('Content-Type: ') + len(content_type) + 2
|
||||
# Content-Range: <value>\r\n; <value> accounted for later
|
||||
+ len('Content-Range: ') + 2
|
||||
# \r\n at end of headers
|
||||
+ 2)
|
||||
|
||||
body_size = 0
|
||||
for start, end in ranges:
|
||||
body_size += section_header_fixed_len
|
||||
body_size += len(str(start) + '-' + str(end - 1) + '/' +
|
||||
str(content_size)) + (end - start)
|
||||
body_size += 8 + len(self.boundary)
|
||||
|
||||
# length of the value of Content-Range, not including the \r\n
|
||||
# since that's already accounted for
|
||||
cr = content_range_header_value(start, end, content_size)
|
||||
body_size += len(cr)
|
||||
|
||||
# the actual bytes (note: this range is half-open, i.e. begins
|
||||
# with byte <start> and ends with byte <end - 1>, so there's no
|
||||
# fencepost error here)
|
||||
body_size += (end - start)
|
||||
|
||||
# \r\n prior to --boundary
|
||||
body_size += 2
|
||||
|
||||
# --boundary-- terminates the message
|
||||
body_size += len(self.boundary) + 4
|
||||
|
||||
self.content_length = body_size
|
||||
self.content_range = None
|
||||
return content_size, content_type
|
||||
|
@ -25,6 +25,7 @@ import operator
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import rfc822
|
||||
import sys
|
||||
import threading as stdlib_threading
|
||||
import time
|
||||
@ -3181,7 +3182,7 @@ def parse_content_type(content_type):
|
||||
('text/plain', [('charset, 'UTF-8'), ('level', '1')])
|
||||
|
||||
:param content_type: content_type to parse
|
||||
:returns: a typle containing (content type, list of k, v parameter tuples)
|
||||
:returns: a tuple containing (content type, list of k, v parameter tuples)
|
||||
"""
|
||||
parm_list = []
|
||||
if ';' in content_type:
|
||||
@ -3313,7 +3314,9 @@ class _MultipartMimeFileLikeObject(object):
|
||||
def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
|
||||
"""
|
||||
Given a multi-part-mime-encoded input file object and boundary,
|
||||
yield file-like objects for each part.
|
||||
yield file-like objects for each part. Note that this does not
|
||||
split each part into headers and body; the caller is responsible
|
||||
for doing that if necessary.
|
||||
|
||||
:param wsgi_input: The file-like object to read from.
|
||||
:param boundary: The mime boundary to separate new file-like
|
||||
@ -3324,6 +3327,9 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
|
||||
boundary = '--' + boundary
|
||||
blen = len(boundary) + 2 # \r\n
|
||||
got = wsgi_input.readline(blen)
|
||||
while got == '\r\n':
|
||||
got = wsgi_input.readline(blen)
|
||||
|
||||
if got.strip() != boundary:
|
||||
raise swift.common.exceptions.MimeInvalid(
|
||||
'invalid starting boundary: wanted %r, got %r', (boundary, got))
|
||||
@ -3338,6 +3344,174 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
|
||||
input_buffer = it.input_buffer
|
||||
|
||||
|
||||
def mime_to_document_iters(input_file, boundary, read_chunk_size=4096):
|
||||
"""
|
||||
Takes a file-like object containing a multipart MIME document and
|
||||
returns an iterator of (headers, body-file) tuples.
|
||||
|
||||
:param input_file: file-like object with the MIME doc in it
|
||||
:param boundary: MIME boundary, sans dashes
|
||||
(e.g. "divider", not "--divider")
|
||||
:param read_chunk_size: size of strings read via input_file.read()
|
||||
"""
|
||||
doc_files = iter_multipart_mime_documents(input_file, boundary,
|
||||
read_chunk_size)
|
||||
for i, doc_file in enumerate(doc_files):
|
||||
# this consumes the headers and leaves just the body in doc_file
|
||||
headers = rfc822.Message(doc_file, 0)
|
||||
yield (headers, doc_file)
|
||||
|
||||
|
||||
def document_iters_to_multipart_byteranges(ranges_iter, boundary):
|
||||
"""
|
||||
Takes an iterator of range iters and yields a multipart/byteranges MIME
|
||||
document suitable for sending as the body of a multi-range 206 response.
|
||||
|
||||
See document_iters_to_http_response_body for parameter descriptions.
|
||||
"""
|
||||
|
||||
divider = "--" + boundary + "\r\n"
|
||||
terminator = "--" + boundary + "--"
|
||||
|
||||
for range_spec in ranges_iter:
|
||||
start_byte = range_spec["start_byte"]
|
||||
end_byte = range_spec["end_byte"]
|
||||
entity_length = range_spec.get("entity_length", "*")
|
||||
content_type = range_spec["content_type"]
|
||||
part_iter = range_spec["part_iter"]
|
||||
|
||||
part_header = ''.join((
|
||||
divider,
|
||||
"Content-Type: ", str(content_type), "\r\n",
|
||||
"Content-Range: ", "bytes %d-%d/%s\r\n" % (
|
||||
start_byte, end_byte, entity_length),
|
||||
"\r\n"
|
||||
))
|
||||
yield part_header
|
||||
|
||||
for chunk in part_iter:
|
||||
yield chunk
|
||||
yield "\r\n"
|
||||
yield terminator
|
||||
|
||||
|
||||
def document_iters_to_http_response_body(ranges_iter, boundary, multipart,
|
||||
logger):
|
||||
"""
|
||||
Takes an iterator of range iters and turns it into an appropriate
|
||||
HTTP response body, whether that's multipart/byteranges or not.
|
||||
|
||||
This is almost, but not quite, the inverse of
|
||||
http_response_to_document_iters(). This function only yields chunks of
|
||||
the body, not any headers.
|
||||
|
||||
:param ranges_iter: an iterator of dictionaries, one per range.
|
||||
Each dictionary must contain at least the following key:
|
||||
"part_iter": iterator yielding the bytes in the range
|
||||
|
||||
Additionally, if multipart is True, then the following other keys
|
||||
are required:
|
||||
|
||||
"start_byte": index of the first byte in the range
|
||||
"end_byte": index of the last byte in the range
|
||||
"content_type": value for the range's Content-Type header
|
||||
|
||||
Finally, there is one optional key that is used in the
|
||||
multipart/byteranges case:
|
||||
|
||||
"entity_length": length of the requested entity (not necessarily
|
||||
equal to the response length). If omitted, "*" will be used.
|
||||
|
||||
Each part_iter will be exhausted prior to calling next(ranges_iter).
|
||||
|
||||
:param boundary: MIME boundary to use, sans dashes (e.g. "boundary", not
|
||||
"--boundary").
|
||||
:param multipart: True if the response should be multipart/byteranges,
|
||||
False otherwise. This should be True if and only if you have 2 or
|
||||
more ranges.
|
||||
:param logger: a logger
|
||||
"""
|
||||
if multipart:
|
||||
return document_iters_to_multipart_byteranges(ranges_iter, boundary)
|
||||
else:
|
||||
try:
|
||||
response_body_iter = next(ranges_iter)['part_iter']
|
||||
except StopIteration:
|
||||
return ''
|
||||
|
||||
# We need to make sure ranges_iter does not get garbage-collected
|
||||
# before response_body_iter is exhausted. The reason is that
|
||||
# ranges_iter has a finally block that calls close_swift_conn, and
|
||||
# so if that finally block fires before we read response_body_iter,
|
||||
# there's nothing there.
|
||||
def string_along(useful_iter, useless_iter_iter, logger):
|
||||
for x in useful_iter:
|
||||
yield x
|
||||
|
||||
try:
|
||||
next(useless_iter_iter)
|
||||
except StopIteration:
|
||||
pass
|
||||
else:
|
||||
logger.warn("More than one part in a single-part response?")
|
||||
|
||||
return string_along(response_body_iter, ranges_iter, logger)
|
||||
|
||||
|
||||
def multipart_byteranges_to_document_iters(input_file, boundary,
|
||||
read_chunk_size=4096):
|
||||
"""
|
||||
Takes a file-like object containing a multipart/byteranges MIME document
|
||||
(see RFC 7233, Appendix A) and returns an iterator of (first-byte,
|
||||
last-byte, length, document-headers, body-file) 5-tuples.
|
||||
|
||||
:param input_file: file-like object with the MIME doc in it
|
||||
:param boundary: MIME boundary, sans dashes
|
||||
(e.g. "divider", not "--divider")
|
||||
:param read_chunk_size: size of strings read via input_file.read()
|
||||
"""
|
||||
for headers, body in mime_to_document_iters(input_file, boundary,
|
||||
read_chunk_size):
|
||||
first_byte, last_byte, length = parse_content_range(
|
||||
headers.getheader('content-range'))
|
||||
yield (first_byte, last_byte, length, headers.items(), body)
|
||||
|
||||
|
||||
def http_response_to_document_iters(response, read_chunk_size=4096):
|
||||
"""
|
||||
Takes a successful object-GET HTTP response and turns it into an
|
||||
iterator of (first-byte, last-byte, length, headers, body-file)
|
||||
5-tuples.
|
||||
|
||||
The response must either be a 200 or a 206; if you feed in a 204 or
|
||||
something similar, this probably won't work.
|
||||
|
||||
:param response: HTTP response, like from bufferedhttp.http_connect(),
|
||||
not a swob.Response.
|
||||
"""
|
||||
if response.status == 200:
|
||||
# Single "range" that's the whole object
|
||||
content_length = int(response.getheader('Content-Length'))
|
||||
return iter([(0, content_length - 1, content_length,
|
||||
response.getheaders(), response)])
|
||||
|
||||
content_type, params_list = parse_content_type(
|
||||
response.getheader('Content-Type'))
|
||||
if content_type != 'multipart/byteranges':
|
||||
# Single range; no MIME framing, just the bytes. The start and end
|
||||
# byte indices are in the Content-Range header.
|
||||
start, end, length = parse_content_range(
|
||||
response.getheader('Content-Range'))
|
||||
return iter([(start, end, length, response.getheaders(), response)])
|
||||
else:
|
||||
# Multiple ranges; the response body is a multipart/byteranges MIME
|
||||
# document, and we have to parse it using the MIME boundary
|
||||
# extracted from the Content-Type header.
|
||||
params = dict(params_list)
|
||||
return multipart_byteranges_to_document_iters(
|
||||
response, params['boundary'], read_chunk_size)
|
||||
|
||||
|
||||
#: Regular expression to match form attributes.
|
||||
ATTRIBUTES_RE = re.compile(r'(\w+)=(".*?"|[^";]+)(; ?|$)')
|
||||
|
||||
|
@ -28,7 +28,6 @@ import os
|
||||
import time
|
||||
import functools
|
||||
import inspect
|
||||
import logging
|
||||
import operator
|
||||
from sys import exc_info
|
||||
from swift import gettext_ as _
|
||||
@ -40,10 +39,11 @@ from eventlet.timeout import Timeout
|
||||
from swift.common.wsgi import make_pre_authed_env
|
||||
from swift.common.utils import Timestamp, config_true_value, \
|
||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||
GreenAsyncPile, quorum_size, parse_content_range
|
||||
GreenAsyncPile, quorum_size, parse_content_type, \
|
||||
http_response_to_document_iters, document_iters_to_http_response_body
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||
ConnectionTimeout
|
||||
ConnectionTimeout, RangeAlreadyComplete
|
||||
from swift.common.http import is_informational, is_success, is_redirection, \
|
||||
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
|
||||
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
|
||||
@ -613,10 +613,9 @@ def bytes_to_skip(record_size, range_start):
|
||||
return (record_size - (range_start % record_size)) % record_size
|
||||
|
||||
|
||||
class GetOrHeadHandler(object):
|
||||
|
||||
class ResumingGetter(object):
|
||||
def __init__(self, app, req, server_type, node_iter, partition, path,
|
||||
backend_headers, client_chunk_size=None):
|
||||
backend_headers, client_chunk_size=None, newest=None):
|
||||
self.app = app
|
||||
self.node_iter = node_iter
|
||||
self.server_type = server_type
|
||||
@ -632,7 +631,10 @@ class GetOrHeadHandler(object):
|
||||
self.req_method = req.method
|
||||
self.req_path = req.path
|
||||
self.req_query_string = req.query_string
|
||||
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
|
||||
if newest is None:
|
||||
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
|
||||
else:
|
||||
self.newest = newest
|
||||
|
||||
# populated when finding source
|
||||
self.statuses = []
|
||||
@ -640,6 +642,9 @@ class GetOrHeadHandler(object):
|
||||
self.bodies = []
|
||||
self.source_headers = []
|
||||
|
||||
# populated from response headers
|
||||
self.start_byte = self.end_byte = self.length = None
|
||||
|
||||
def fast_forward(self, num_bytes):
|
||||
"""
|
||||
Will skip num_bytes into the current ranges.
|
||||
@ -648,57 +653,89 @@ class GetOrHeadHandler(object):
|
||||
this request. This will change the Range header
|
||||
so that the next req will start where it left off.
|
||||
|
||||
:raises NotImplementedError: if this is a multirange request
|
||||
:raises ValueError: if invalid range header
|
||||
:raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes
|
||||
> end of range
|
||||
> end of range + 1
|
||||
:raises RangeAlreadyComplete: if begin + num_bytes == end of range + 1
|
||||
"""
|
||||
if 'Range' in self.backend_headers:
|
||||
req_range = Range(self.backend_headers['Range'])
|
||||
|
||||
if len(req_range.ranges) > 1:
|
||||
raise NotImplementedError()
|
||||
begin, end = req_range.ranges.pop()
|
||||
begin, end = req_range.ranges[0]
|
||||
if begin is None:
|
||||
# this is a -50 range req (last 50 bytes of file)
|
||||
end -= num_bytes
|
||||
else:
|
||||
begin += num_bytes
|
||||
if end and begin > end:
|
||||
if end and begin == end + 1:
|
||||
# we sent out exactly the first range's worth of bytes, so
|
||||
# we're done with it
|
||||
raise RangeAlreadyComplete()
|
||||
elif end and begin > end:
|
||||
raise HTTPRequestedRangeNotSatisfiable()
|
||||
req_range.ranges = [(begin, end)]
|
||||
elif end and begin:
|
||||
req_range.ranges = [(begin, end)] + req_range.ranges[1:]
|
||||
elif end:
|
||||
req_range.ranges = [(None, end)] + req_range.ranges[1:]
|
||||
else:
|
||||
req_range.ranges = [(begin, None)] + req_range.ranges[1:]
|
||||
|
||||
self.backend_headers['Range'] = str(req_range)
|
||||
else:
|
||||
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
|
||||
|
||||
def learn_size_from_content_range(self, start, end):
|
||||
def pop_range(self):
|
||||
"""
|
||||
Remove the first byterange from our Range header.
|
||||
|
||||
This is used after a byterange has been completely sent to the
|
||||
client; this way, should we need to resume the download from another
|
||||
object server, we do not re-fetch byteranges that the client already
|
||||
has.
|
||||
|
||||
If we have no Range header, this is a no-op.
|
||||
"""
|
||||
if 'Range' in self.backend_headers:
|
||||
req_range = Range(self.backend_headers['Range'])
|
||||
begin, end = req_range.ranges.pop(0)
|
||||
if len(req_range.ranges) > 0:
|
||||
self.backend_headers['Range'] = str(req_range)
|
||||
else:
|
||||
self.backend_headers.pop('Range')
|
||||
|
||||
def learn_size_from_content_range(self, start, end, length):
|
||||
"""
|
||||
If client_chunk_size is set, makes sure we yield things starting on
|
||||
chunk boundaries based on the Content-Range header in the response.
|
||||
|
||||
Sets our first Range header to the value learned from the
|
||||
Content-Range header in the response; if we were given a
|
||||
Sets our Range header's first byterange to the value learned from
|
||||
the Content-Range header in the response; if we were given a
|
||||
fully-specified range (e.g. "bytes=123-456"), this is a no-op.
|
||||
|
||||
If we were given a half-specified range (e.g. "bytes=123-" or
|
||||
"bytes=-456"), then this changes the Range header to a
|
||||
semantically-equivalent one *and* it lets us resume on a proper
|
||||
boundary instead of just in the middle of a piece somewhere.
|
||||
|
||||
If the original request is for more than one range, this does not
|
||||
affect our backend Range header, since we don't support resuming one
|
||||
of those anyway.
|
||||
"""
|
||||
if length == 0:
|
||||
return
|
||||
|
||||
if self.client_chunk_size:
|
||||
self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
|
||||
|
||||
if 'Range' in self.backend_headers:
|
||||
req_range = Range(self.backend_headers['Range'])
|
||||
try:
|
||||
req_range = Range(self.backend_headers['Range'])
|
||||
new_ranges = [(start, end)] + req_range.ranges[1:]
|
||||
except ValueError:
|
||||
new_ranges = [(start, end)]
|
||||
else:
|
||||
new_ranges = [(start, end)]
|
||||
|
||||
if len(req_range.ranges) > 1:
|
||||
return
|
||||
|
||||
self.backend_headers['Range'] = "bytes=%d-%d" % (start, end)
|
||||
self.backend_headers['Range'] = (
|
||||
"bytes=" + (",".join("%s-%s" % (s if s is not None else '',
|
||||
e if e is not None else '')
|
||||
for s, e in new_ranges)))
|
||||
|
||||
def is_good_source(self, src):
|
||||
"""
|
||||
@ -712,106 +749,183 @@ class GetOrHeadHandler(object):
|
||||
return True
|
||||
return is_success(src.status) or is_redirection(src.status)
|
||||
|
||||
def _make_app_iter(self, req, node, source):
|
||||
"""
|
||||
Returns an iterator over the contents of the source (via its read
|
||||
func). There is also quite a bit of cleanup to ensure garbage
|
||||
collection works and the underlying socket of the source is closed.
|
||||
def response_parts_iter(self, req):
|
||||
source, node = self._get_source_and_node()
|
||||
it = None
|
||||
if source:
|
||||
it = self._get_response_parts_iter(req, node, source)
|
||||
return it
|
||||
|
||||
def _get_response_parts_iter(self, req, node, source):
|
||||
# Someday we can replace this [mess] with python 3's "nonlocal"
|
||||
source = [source]
|
||||
node = [node]
|
||||
|
||||
:param req: incoming request object
|
||||
:param source: The httplib.Response object this iterator should read
|
||||
from.
|
||||
:param node: The node the source is reading from, for logging purposes.
|
||||
"""
|
||||
try:
|
||||
nchunks = 0
|
||||
client_chunk_size = self.client_chunk_size
|
||||
bytes_consumed_from_backend = 0
|
||||
node_timeout = self.app.node_timeout
|
||||
if self.server_type == 'Object':
|
||||
node_timeout = self.app.recoverable_node_timeout
|
||||
buf = ''
|
||||
while True:
|
||||
try:
|
||||
with ChunkReadTimeout(node_timeout):
|
||||
chunk = source.read(self.app.object_chunk_size)
|
||||
nchunks += 1
|
||||
buf += chunk
|
||||
except ChunkReadTimeout:
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
if self.newest or self.server_type != 'Object':
|
||||
raise exc_type, exc_value, exc_traceback
|
||||
|
||||
# This is safe; it sets up a generator but does not call next()
|
||||
# on it, so no IO is performed.
|
||||
parts_iter = [
|
||||
http_response_to_document_iters(
|
||||
source[0], read_chunk_size=self.app.object_chunk_size)]
|
||||
|
||||
def get_next_doc_part():
|
||||
while True:
|
||||
try:
|
||||
self.fast_forward(bytes_consumed_from_backend)
|
||||
except (NotImplementedError, HTTPException, ValueError):
|
||||
raise exc_type, exc_value, exc_traceback
|
||||
buf = ''
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.exception_occurred(
|
||||
node, _('Object'),
|
||||
_('Trying to read during GET (retrying)'),
|
||||
level=logging.ERROR, exc_info=(
|
||||
exc_type, exc_value, exc_traceback))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source, 'swift_conn', None):
|
||||
close_swift_conn(source)
|
||||
source = new_source
|
||||
node = new_node
|
||||
continue
|
||||
else:
|
||||
raise exc_type, exc_value, exc_traceback
|
||||
# This call to next() performs IO when we have a
|
||||
# multipart/byteranges response; it reads the MIME
|
||||
# boundary and part headers.
|
||||
#
|
||||
# If we don't have a multipart/byteranges response,
|
||||
# but just a 200 or a single-range 206, then this
|
||||
# performs no IO, and either just returns source or
|
||||
# raises StopIteration.
|
||||
with ChunkReadTimeout(node_timeout):
|
||||
# if StopIteration is raised, it escapes and is
|
||||
# handled elsewhere
|
||||
start_byte, end_byte, length, headers, part = next(
|
||||
parts_iter[0])
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.exception_occurred(
|
||||
node[0], _('Object'),
|
||||
_('Trying to read during GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
raise StopIteration()
|
||||
|
||||
if buf and self.skip_bytes:
|
||||
if self.skip_bytes < len(buf):
|
||||
buf = buf[self.skip_bytes:]
|
||||
bytes_consumed_from_backend += self.skip_bytes
|
||||
self.skip_bytes = 0
|
||||
else:
|
||||
self.skip_bytes -= len(buf)
|
||||
bytes_consumed_from_backend += len(buf)
|
||||
def iter_bytes_from_response_part(part_file):
|
||||
nchunks = 0
|
||||
buf = ''
|
||||
bytes_used_from_backend = 0
|
||||
while True:
|
||||
try:
|
||||
with ChunkReadTimeout(node_timeout):
|
||||
chunk = part_file.read(self.app.object_chunk_size)
|
||||
nchunks += 1
|
||||
buf += chunk
|
||||
except ChunkReadTimeout:
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
if self.newest or self.server_type != 'Object':
|
||||
raise exc_type, exc_value, exc_traceback
|
||||
try:
|
||||
self.fast_forward(bytes_used_from_backend)
|
||||
except (HTTPException, ValueError):
|
||||
raise exc_type, exc_value, exc_traceback
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
buf = ''
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.exception_occurred(
|
||||
node[0], _('Object'),
|
||||
_('Trying to read during GET (retrying)'))
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
if not chunk:
|
||||
if buf:
|
||||
with ChunkWriteTimeout(self.app.client_timeout):
|
||||
bytes_consumed_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = ''
|
||||
break
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
get_next_doc_part()
|
||||
except StopIteration:
|
||||
# Tried to find a new node from which to
|
||||
# finish the GET, but failed. There's
|
||||
# nothing more to do here.
|
||||
return
|
||||
else:
|
||||
raise exc_type, exc_value, exc_traceback
|
||||
else:
|
||||
if buf and self.skip_bytes:
|
||||
if self.skip_bytes < len(buf):
|
||||
buf = buf[self.skip_bytes:]
|
||||
bytes_used_from_backend += self.skip_bytes
|
||||
self.skip_bytes = 0
|
||||
else:
|
||||
self.skip_bytes -= len(buf)
|
||||
bytes_used_from_backend += len(buf)
|
||||
buf = ''
|
||||
|
||||
if client_chunk_size is not None:
|
||||
while len(buf) >= client_chunk_size:
|
||||
client_chunk = buf[:client_chunk_size]
|
||||
buf = buf[client_chunk_size:]
|
||||
with ChunkWriteTimeout(self.app.client_timeout):
|
||||
yield client_chunk
|
||||
bytes_consumed_from_backend += len(client_chunk)
|
||||
else:
|
||||
with ChunkWriteTimeout(self.app.client_timeout):
|
||||
yield buf
|
||||
bytes_consumed_from_backend += len(buf)
|
||||
buf = ''
|
||||
if not chunk:
|
||||
if buf:
|
||||
with ChunkWriteTimeout(
|
||||
self.app.client_timeout):
|
||||
bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = ''
|
||||
break
|
||||
|
||||
# This is for fairness; if the network is outpacing the CPU,
|
||||
# we'll always be able to read and write data without
|
||||
# encountering an EWOULDBLOCK, and so eventlet will not switch
|
||||
# greenthreads on its own. We do it manually so that clients
|
||||
# don't starve.
|
||||
#
|
||||
# The number 5 here was chosen by making stuff up. It's not
|
||||
# every single chunk, but it's not too big either, so it seemed
|
||||
# like it would probably be an okay choice.
|
||||
#
|
||||
# Note that we may trampoline to other greenthreads more often
|
||||
# than once every 5 chunks, depending on how blocking our
|
||||
# network IO is; the explicit sleep here simply provides a
|
||||
# lower bound on the rate of trampolining.
|
||||
if nchunks % 5 == 0:
|
||||
sleep()
|
||||
if client_chunk_size is not None:
|
||||
while len(buf) >= client_chunk_size:
|
||||
client_chunk = buf[:client_chunk_size]
|
||||
buf = buf[client_chunk_size:]
|
||||
with ChunkWriteTimeout(
|
||||
self.app.client_timeout):
|
||||
yield client_chunk
|
||||
bytes_used_from_backend += len(client_chunk)
|
||||
else:
|
||||
with ChunkWriteTimeout(self.app.client_timeout):
|
||||
yield buf
|
||||
bytes_used_from_backend += len(buf)
|
||||
buf = ''
|
||||
|
||||
# This is for fairness; if the network is outpacing
|
||||
# the CPU, we'll always be able to read and write
|
||||
# data without encountering an EWOULDBLOCK, and so
|
||||
# eventlet will not switch greenthreads on its own.
|
||||
# We do it manually so that clients don't starve.
|
||||
#
|
||||
# The number 5 here was chosen by making stuff up.
|
||||
# It's not every single chunk, but it's not too big
|
||||
# either, so it seemed like it would probably be an
|
||||
# okay choice.
|
||||
#
|
||||
# Note that we may trampoline to other greenthreads
|
||||
# more often than once every 5 chunks, depending on
|
||||
# how blocking our network IO is; the explicit sleep
|
||||
# here simply provides a lower bound on the rate of
|
||||
# trampolining.
|
||||
if nchunks % 5 == 0:
|
||||
sleep()
|
||||
|
||||
try:
|
||||
while True:
|
||||
start_byte, end_byte, length, headers, part = \
|
||||
get_next_doc_part()
|
||||
self.learn_size_from_content_range(
|
||||
start_byte, end_byte, length)
|
||||
part_iter = iter_bytes_from_response_part(part)
|
||||
yield {'start_byte': start_byte, 'end_byte': end_byte,
|
||||
'entity_length': length, 'headers': headers,
|
||||
'part_iter': part_iter}
|
||||
self.pop_range()
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
except ChunkReadTimeout:
|
||||
self.app.exception_occurred(node, _('Object'),
|
||||
self.app.exception_occurred(node[0], _('Object'),
|
||||
_('Trying to read during GET'))
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
@ -827,8 +941,22 @@ class GetOrHeadHandler(object):
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source, 'swift_conn', None):
|
||||
close_swift_conn(source)
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
|
||||
@property
|
||||
def last_status(self):
|
||||
if self.statuses:
|
||||
return self.statuses[-1]
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def last_headers(self):
|
||||
if self.source_headers:
|
||||
return self.source_headers[-1]
|
||||
else:
|
||||
return None
|
||||
|
||||
def _get_source_and_node(self):
|
||||
self.statuses = []
|
||||
@ -869,7 +997,7 @@ class GetOrHeadHandler(object):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append('')
|
||||
self.source_headers.append([])
|
||||
close_swift_conn(possible_source)
|
||||
else:
|
||||
if self.used_source_etag:
|
||||
@ -883,13 +1011,13 @@ class GetOrHeadHandler(object):
|
||||
self.statuses.append(HTTP_NOT_FOUND)
|
||||
self.reasons.append('')
|
||||
self.bodies.append('')
|
||||
self.source_headers.append('')
|
||||
self.source_headers.append([])
|
||||
continue
|
||||
|
||||
self.statuses.append(possible_source.status)
|
||||
self.reasons.append(possible_source.reason)
|
||||
self.bodies.append('')
|
||||
self.source_headers.append('')
|
||||
self.source_headers.append(possible_source.getheaders())
|
||||
sources.append((possible_source, node))
|
||||
if not self.newest: # one good source is enough
|
||||
break
|
||||
@ -923,6 +1051,44 @@ class GetOrHeadHandler(object):
|
||||
return source, node
|
||||
return None, None
|
||||
|
||||
|
||||
class GetOrHeadHandler(ResumingGetter):
|
||||
def _make_app_iter(self, req, node, source):
|
||||
"""
|
||||
Returns an iterator over the contents of the source (via its read
|
||||
func). There is also quite a bit of cleanup to ensure garbage
|
||||
collection works and the underlying socket of the source is closed.
|
||||
|
||||
:param req: incoming request object
|
||||
:param source: The httplib.Response object this iterator should read
|
||||
from.
|
||||
:param node: The node the source is reading from, for logging purposes.
|
||||
"""
|
||||
|
||||
ct = source.getheader('Content-Type')
|
||||
if ct:
|
||||
content_type, content_type_attrs = parse_content_type(ct)
|
||||
is_multipart = content_type == 'multipart/byteranges'
|
||||
else:
|
||||
is_multipart = False
|
||||
|
||||
boundary = "dontcare"
|
||||
if is_multipart:
|
||||
# we need some MIME boundary; fortunately, the object server has
|
||||
# furnished one for us, so we'll just re-use it
|
||||
boundary = dict(content_type_attrs)["boundary"]
|
||||
|
||||
parts_iter = self._get_response_parts_iter(req, node, source)
|
||||
|
||||
def add_content_type(response_part):
|
||||
response_part["content_type"] = \
|
||||
HeaderKeyDict(response_part["headers"]).get("Content-Type")
|
||||
return response_part
|
||||
|
||||
return document_iters_to_http_response_body(
|
||||
(add_content_type(pi) for pi in parts_iter),
|
||||
boundary, is_multipart, self.app.logger)
|
||||
|
||||
def get_working_response(self, req):
|
||||
source, node = self._get_source_and_node()
|
||||
res = None
|
||||
@ -932,10 +1098,6 @@ class GetOrHeadHandler(object):
|
||||
update_headers(res, source.getheaders())
|
||||
if req.method == 'GET' and \
|
||||
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
|
||||
cr = res.headers.get('Content-Range')
|
||||
if cr:
|
||||
start, end, total = parse_content_range(cr)
|
||||
self.learn_size_from_content_range(start, end)
|
||||
res.app_iter = self._make_app_iter(req, node, source)
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
res.swift_conn = source.swift_conn
|
||||
|
@ -43,7 +43,8 @@ from swift.common.utils import (
|
||||
clean_content_type, config_true_value, ContextPool, csv_append,
|
||||
GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp,
|
||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||
quorum_size)
|
||||
document_iters_to_http_response_body, parse_content_range,
|
||||
quorum_size, reiterate)
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
check_copy_from_header, check_destination_header, \
|
||||
@ -62,11 +63,12 @@ from swift.common.http import (
|
||||
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
|
||||
ECDriverError, PolicyError)
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
cors_validation
|
||||
cors_validation, ResumingGetter
|
||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
|
||||
HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \
|
||||
HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException
|
||||
HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \
|
||||
HTTPRequestedRangeNotSatisfiable, Range
|
||||
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
|
||||
remove_items, copy_header_subset, close_if_possible
|
||||
|
||||
@ -1137,119 +1139,350 @@ class ECAppIter(object):
|
||||
WSGI iterable that decodes EC fragment archives (or portions thereof)
|
||||
into the original object (or portions thereof).
|
||||
|
||||
:param path: path for the request
|
||||
:param path: object's path, sans v1 (e.g. /a/c/o)
|
||||
|
||||
:param policy: storage policy for this object
|
||||
|
||||
:param internal_app_iters: list of the WSGI iterables from object server
|
||||
GET responses for fragment archives. For an M+K erasure code, the
|
||||
caller must supply M such iterables.
|
||||
:param internal_parts_iters: list of the response-document-parts
|
||||
iterators for the backend GET responses. For an M+K erasure code,
|
||||
the caller must supply M such iterables.
|
||||
|
||||
:param range_specs: list of dictionaries describing the ranges requested
|
||||
by the client. Each dictionary contains the start and end of the
|
||||
client's requested byte range as well as the start and end of the EC
|
||||
segments containing that byte range.
|
||||
|
||||
:param fa_length: length of the fragment archive, in bytes, if the
|
||||
response is a 200. If it's a 206, then this is ignored.
|
||||
|
||||
:param obj_length: length of the object, in bytes. Learned from the
|
||||
headers in the GET response from the object server.
|
||||
|
||||
:param logger: a logger
|
||||
"""
|
||||
def __init__(self, path, policy, internal_app_iters, range_specs,
|
||||
obj_length, logger):
|
||||
def __init__(self, path, policy, internal_parts_iters, range_specs,
|
||||
fa_length, obj_length, logger):
|
||||
self.path = path
|
||||
self.policy = policy
|
||||
self.internal_app_iters = internal_app_iters
|
||||
self.internal_parts_iters = internal_parts_iters
|
||||
self.range_specs = range_specs
|
||||
self.obj_length = obj_length
|
||||
self.fa_length = fa_length
|
||||
self.obj_length = obj_length if obj_length is not None else 0
|
||||
self.boundary = ''
|
||||
self.logger = logger
|
||||
|
||||
self.mime_boundary = None
|
||||
self.learned_content_type = None
|
||||
self.stashed_iter = None
|
||||
|
||||
def close(self):
|
||||
for it in self.internal_app_iters:
|
||||
for it in self.internal_parts_iters:
|
||||
close_if_possible(it)
|
||||
|
||||
def __iter__(self):
|
||||
segments_iter = self.decode_segments_from_fragments()
|
||||
def kickoff(self, req, resp):
|
||||
"""
|
||||
Start pulling data from the backends so that we can learn things like
|
||||
the real Content-Type that might only be in the multipart/byteranges
|
||||
response body. Update our response accordingly.
|
||||
|
||||
if len(self.range_specs) == 0:
|
||||
# plain GET; just yield up segments
|
||||
for seg in segments_iter:
|
||||
yield seg
|
||||
return
|
||||
Also, this is the first point at which we can learn the MIME
|
||||
boundary that our response has in the headers. We grab that so we
|
||||
can also use it in the body.
|
||||
|
||||
if len(self.range_specs) > 1:
|
||||
raise NotImplementedError("multi-range GETs not done yet")
|
||||
:returns: None
|
||||
:raises: HTTPException on error
|
||||
"""
|
||||
self.mime_boundary = resp.boundary
|
||||
|
||||
for range_spec in self.range_specs:
|
||||
client_start = range_spec['client_start']
|
||||
client_end = range_spec['client_end']
|
||||
segment_start = range_spec['segment_start']
|
||||
segment_end = range_spec['segment_end']
|
||||
self.stashed_iter = reiterate(self._real_iter(req, resp.headers))
|
||||
|
||||
if self.learned_content_type is not None:
|
||||
resp.content_type = self.learned_content_type
|
||||
resp.content_length = self.obj_length
|
||||
|
||||
def _next_range(self):
|
||||
# Each FA part should have approximately the same headers. We really
|
||||
# only care about Content-Range and Content-Type, and that'll be the
|
||||
# same for all the different FAs.
|
||||
frag_iters = []
|
||||
headers = None
|
||||
for parts_iter in self.internal_parts_iters:
|
||||
part_info = next(parts_iter)
|
||||
frag_iters.append(part_info['part_iter'])
|
||||
headers = part_info['headers']
|
||||
headers = HeaderKeyDict(headers)
|
||||
return headers, frag_iters
|
||||
|
||||
def _actual_range(self, req_start, req_end, entity_length):
|
||||
try:
|
||||
rng = Range("bytes=%s-%s" % (
|
||||
req_start if req_start is not None else '',
|
||||
req_end if req_end is not None else ''))
|
||||
except ValueError:
|
||||
return (None, None)
|
||||
|
||||
rfl = rng.ranges_for_length(entity_length)
|
||||
if not rfl:
|
||||
return (None, None)
|
||||
else:
|
||||
# ranges_for_length() adds 1 to the last byte's position
|
||||
# because webob once made a mistake
|
||||
return (rfl[0][0], rfl[0][1] - 1)
|
||||
|
||||
def _fill_out_range_specs_from_obj_length(self, range_specs):
|
||||
# Add a few fields to each range spec:
|
||||
#
|
||||
# * resp_client_start, resp_client_end: the actual bytes that will
|
||||
# be delivered to the client for the requested range. This may
|
||||
# differ from the requested bytes if, say, the requested range
|
||||
# overlaps the end of the object.
|
||||
#
|
||||
# * resp_segment_start, resp_segment_end: the actual offsets of the
|
||||
# segments that will be decoded for the requested range. These
|
||||
# differ from resp_client_start/end in that these are aligned
|
||||
# to segment boundaries, while resp_client_start/end are not
|
||||
# necessarily so.
|
||||
#
|
||||
# * satisfiable: a boolean indicating whether the range is
|
||||
# satisfiable or not (i.e. the requested range overlaps the
|
||||
# object in at least one byte).
|
||||
#
|
||||
# This is kept separate from _fill_out_range_specs_from_fa_length()
|
||||
# because this computation can be done with just the response
|
||||
# headers from the object servers (in particular
|
||||
# X-Object-Sysmeta-Ec-Content-Length), while the computation in
|
||||
# _fill_out_range_specs_from_fa_length() requires the beginnings of
|
||||
# the response bodies.
|
||||
for spec in range_specs:
|
||||
cstart, cend = self._actual_range(
|
||||
spec['req_client_start'],
|
||||
spec['req_client_end'],
|
||||
self.obj_length)
|
||||
spec['resp_client_start'] = cstart
|
||||
spec['resp_client_end'] = cend
|
||||
spec['satisfiable'] = (cstart is not None and cend is not None)
|
||||
|
||||
sstart, send = self._actual_range(
|
||||
spec['req_segment_start'],
|
||||
spec['req_segment_end'],
|
||||
self.obj_length)
|
||||
|
||||
seg_size = self.policy.ec_segment_size
|
||||
is_suffix = client_start is None
|
||||
if spec['req_segment_start'] is None and sstart % seg_size != 0:
|
||||
# Segment start may, in the case of a suffix request, need
|
||||
# to be rounded up (not down!) to the nearest segment boundary.
|
||||
# This reflects the trimming of leading garbage (partial
|
||||
# fragments) from the retrieved fragments.
|
||||
sstart += seg_size - (sstart % seg_size)
|
||||
|
||||
if is_suffix:
|
||||
# Suffix byte ranges (i.e. requests for the last N bytes of
|
||||
# an object) are likely to end up not on a segment boundary.
|
||||
client_range_len = client_end
|
||||
client_start = max(self.obj_length - client_range_len, 0)
|
||||
client_end = self.obj_length - 1
|
||||
spec['resp_segment_start'] = sstart
|
||||
spec['resp_segment_end'] = send
|
||||
|
||||
# may be mid-segment; if it is, then everything up to the
|
||||
# first segment boundary is garbage, and is discarded before
|
||||
# ever getting into this function.
|
||||
unaligned_segment_start = max(self.obj_length - segment_end, 0)
|
||||
alignment_offset = (
|
||||
(seg_size - (unaligned_segment_start % seg_size))
|
||||
% seg_size)
|
||||
segment_start = unaligned_segment_start + alignment_offset
|
||||
segment_end = self.obj_length - 1
|
||||
else:
|
||||
# It's entirely possible that the client asked for a range that
|
||||
# includes some bytes we have and some we don't; for example, a
|
||||
# range of bytes 1000-20000000 on a 1500-byte object.
|
||||
segment_end = (min(segment_end, self.obj_length - 1)
|
||||
if segment_end is not None
|
||||
else self.obj_length - 1)
|
||||
client_end = (min(client_end, self.obj_length - 1)
|
||||
if client_end is not None
|
||||
else self.obj_length - 1)
|
||||
def _fill_out_range_specs_from_fa_length(self, fa_length, range_specs):
|
||||
# Add two fields to each range spec:
|
||||
#
|
||||
# * resp_fragment_start, resp_fragment_end: the start and end of
|
||||
# the fragments that compose this byterange. These values are
|
||||
# aligned to fragment boundaries.
|
||||
#
|
||||
# This way, ECAppIter has the knowledge it needs to correlate
|
||||
# response byteranges with requested ones for when some byteranges
|
||||
# are omitted from the response entirely and also to put the right
|
||||
# Content-Range headers in a multipart/byteranges response.
|
||||
for spec in range_specs:
|
||||
fstart, fend = self._actual_range(
|
||||
spec['req_fragment_start'],
|
||||
spec['req_fragment_end'],
|
||||
fa_length)
|
||||
spec['resp_fragment_start'] = fstart
|
||||
spec['resp_fragment_end'] = fend
|
||||
|
||||
num_segments = int(
|
||||
math.ceil(float(segment_end + 1 - segment_start)
|
||||
/ self.policy.ec_segment_size))
|
||||
# We get full segments here, but the client may have requested a
|
||||
# byte range that begins or ends in the middle of a segment.
|
||||
# Thus, we have some amount of overrun (extra decoded bytes)
|
||||
# that we trim off so the client gets exactly what they
|
||||
# requested.
|
||||
start_overrun = client_start - segment_start
|
||||
end_overrun = segment_end - client_end
|
||||
def __iter__(self):
|
||||
if self.stashed_iter is not None:
|
||||
return iter(self.stashed_iter)
|
||||
else:
|
||||
raise ValueError("Failed to call kickoff() before __iter__()")
|
||||
|
||||
for i, next_seg in enumerate(segments_iter):
|
||||
# We may have a start_overrun of more than one segment in
|
||||
# the case of suffix-byte-range requests. However, we never
|
||||
# have an end_overrun of more than one segment.
|
||||
if start_overrun > 0:
|
||||
seglen = len(next_seg)
|
||||
if seglen <= start_overrun:
|
||||
start_overrun -= seglen
|
||||
continue
|
||||
else:
|
||||
next_seg = next_seg[start_overrun:]
|
||||
start_overrun = 0
|
||||
def _real_iter(self, req, resp_headers):
|
||||
if not self.range_specs:
|
||||
client_asked_for_range = False
|
||||
range_specs = [{
|
||||
'req_client_start': 0,
|
||||
'req_client_end': (None if self.obj_length is None
|
||||
else self.obj_length - 1),
|
||||
'resp_client_start': 0,
|
||||
'resp_client_end': (None if self.obj_length is None
|
||||
else self.obj_length - 1),
|
||||
'req_segment_start': 0,
|
||||
'req_segment_end': (None if self.obj_length is None
|
||||
else self.obj_length - 1),
|
||||
'resp_segment_start': 0,
|
||||
'resp_segment_end': (None if self.obj_length is None
|
||||
else self.obj_length - 1),
|
||||
'req_fragment_start': 0,
|
||||
'req_fragment_end': self.fa_length - 1,
|
||||
'resp_fragment_start': 0,
|
||||
'resp_fragment_end': self.fa_length - 1,
|
||||
'satisfiable': self.obj_length > 0,
|
||||
}]
|
||||
else:
|
||||
client_asked_for_range = True
|
||||
range_specs = self.range_specs
|
||||
|
||||
if i == (num_segments - 1) and end_overrun:
|
||||
next_seg = next_seg[:-end_overrun]
|
||||
self._fill_out_range_specs_from_obj_length(range_specs)
|
||||
|
||||
yield next_seg
|
||||
multipart = (len([rs for rs in range_specs if rs['satisfiable']]) > 1)
|
||||
# Multipart responses are not required to be in the same order as
|
||||
# the Range header; the parts may be in any order the server wants.
|
||||
# Further, if multiple ranges are requested and only some are
|
||||
# satisfiable, then only the satisfiable ones appear in the response
|
||||
# at all. Thus, we cannot simply iterate over range_specs in order;
|
||||
# we must use the Content-Range header from each part to figure out
|
||||
# what we've been given.
|
||||
#
|
||||
# We do, however, make the assumption that all the object-server
|
||||
# responses have their ranges in the same order. Otherwise, a
|
||||
# streaming decode would be impossible.
|
||||
|
||||
def decode_segments_from_fragments(self):
|
||||
def convert_ranges_iter():
|
||||
seen_first_headers = False
|
||||
ranges_for_resp = {}
|
||||
|
||||
while True:
|
||||
# this'll raise StopIteration and exit the loop
|
||||
next_range = self._next_range()
|
||||
|
||||
headers, frag_iters = next_range
|
||||
content_type = headers['Content-Type']
|
||||
|
||||
content_range = headers.get('Content-Range')
|
||||
if content_range is not None:
|
||||
fa_start, fa_end, fa_length = parse_content_range(
|
||||
content_range)
|
||||
elif self.fa_length <= 0:
|
||||
fa_start = None
|
||||
fa_end = None
|
||||
fa_length = 0
|
||||
else:
|
||||
fa_start = 0
|
||||
fa_end = self.fa_length - 1
|
||||
fa_length = self.fa_length
|
||||
|
||||
if not seen_first_headers:
|
||||
# This is the earliest we can possibly do this. On a
|
||||
# 200 or 206-single-byterange response, we can learn
|
||||
# the FA's length from the HTTP response headers.
|
||||
# However, on a 206-multiple-byteranges response, we
|
||||
# don't learn it until the first part of the
|
||||
# response body, in the headers of the first MIME
|
||||
# part.
|
||||
#
|
||||
# Similarly, the content type of a
|
||||
# 206-multiple-byteranges response is
|
||||
# "multipart/byteranges", not the object's actual
|
||||
# content type.
|
||||
self._fill_out_range_specs_from_fa_length(
|
||||
fa_length, range_specs)
|
||||
|
||||
satisfiable = False
|
||||
for range_spec in range_specs:
|
||||
satisfiable |= range_spec['satisfiable']
|
||||
key = (range_spec['resp_fragment_start'],
|
||||
range_spec['resp_fragment_end'])
|
||||
ranges_for_resp.setdefault(key, []).append(range_spec)
|
||||
|
||||
# The client may have asked for an unsatisfiable set of
|
||||
# ranges, but when converted to fragments, the object
|
||||
# servers see it as satisfiable. For example, imagine a
|
||||
# request for bytes 800-900 of a 750-byte object with a
|
||||
# 1024-byte segment size. The object servers will see a
|
||||
# request for bytes 0-${fragsize-1}, and that's
|
||||
# satisfiable, so they return 206. It's not until we
|
||||
# learn the object size that we can check for this
|
||||
# condition.
|
||||
#
|
||||
# Note that some unsatisfiable ranges *will* be caught
|
||||
# by the object servers, like bytes 1800-1900 of a
|
||||
# 100-byte object with 1024-byte segments. That's not
|
||||
# what we're dealing with here, though.
|
||||
if client_asked_for_range and not satisfiable:
|
||||
raise HTTPRequestedRangeNotSatisfiable(
|
||||
request=req, headers=resp_headers)
|
||||
self.learned_content_type = content_type
|
||||
seen_first_headers = True
|
||||
|
||||
range_spec = ranges_for_resp[(fa_start, fa_end)].pop(0)
|
||||
seg_iter = self._decode_segments_from_fragments(frag_iters)
|
||||
if not range_spec['satisfiable']:
|
||||
# This'll be small; just a single small segment. Discard
|
||||
# it.
|
||||
for x in seg_iter:
|
||||
pass
|
||||
continue
|
||||
|
||||
byterange_iter = self._iter_one_range(range_spec, seg_iter)
|
||||
|
||||
converted = {
|
||||
"start_byte": range_spec["resp_client_start"],
|
||||
"end_byte": range_spec["resp_client_end"],
|
||||
"content_type": content_type,
|
||||
"part_iter": byterange_iter}
|
||||
|
||||
if self.obj_length is not None:
|
||||
converted["entity_length"] = self.obj_length
|
||||
yield converted
|
||||
|
||||
return document_iters_to_http_response_body(
|
||||
convert_ranges_iter(), self.mime_boundary, multipart, self.logger)
|
||||
|
||||
def _iter_one_range(self, range_spec, segment_iter):
|
||||
client_start = range_spec['resp_client_start']
|
||||
client_end = range_spec['resp_client_end']
|
||||
segment_start = range_spec['resp_segment_start']
|
||||
segment_end = range_spec['resp_segment_end']
|
||||
|
||||
# It's entirely possible that the client asked for a range that
|
||||
# includes some bytes we have and some we don't; for example, a
|
||||
# range of bytes 1000-20000000 on a 1500-byte object.
|
||||
segment_end = (min(segment_end, self.obj_length - 1)
|
||||
if segment_end is not None
|
||||
else self.obj_length - 1)
|
||||
client_end = (min(client_end, self.obj_length - 1)
|
||||
if client_end is not None
|
||||
else self.obj_length - 1)
|
||||
num_segments = int(
|
||||
math.ceil(float(segment_end + 1 - segment_start)
|
||||
/ self.policy.ec_segment_size))
|
||||
# We get full segments here, but the client may have requested a
|
||||
# byte range that begins or ends in the middle of a segment.
|
||||
# Thus, we have some amount of overrun (extra decoded bytes)
|
||||
# that we trim off so the client gets exactly what they
|
||||
# requested.
|
||||
start_overrun = client_start - segment_start
|
||||
end_overrun = segment_end - client_end
|
||||
|
||||
for i, next_seg in enumerate(segment_iter):
|
||||
# We may have a start_overrun of more than one segment in
|
||||
# the case of suffix-byte-range requests. However, we never
|
||||
# have an end_overrun of more than one segment.
|
||||
if start_overrun > 0:
|
||||
seglen = len(next_seg)
|
||||
if seglen <= start_overrun:
|
||||
start_overrun -= seglen
|
||||
continue
|
||||
else:
|
||||
next_seg = next_seg[start_overrun:]
|
||||
start_overrun = 0
|
||||
|
||||
if i == (num_segments - 1) and end_overrun:
|
||||
next_seg = next_seg[:-end_overrun]
|
||||
|
||||
yield next_seg
|
||||
|
||||
def _decode_segments_from_fragments(self, fragment_iters):
|
||||
# Decodes the fragments from the object servers and yields one
|
||||
# segment at a time.
|
||||
queues = [Queue(1) for _junk in range(len(self.internal_app_iters))]
|
||||
queues = [Queue(1) for _junk in range(len(fragment_iters))]
|
||||
|
||||
def put_fragments_in_queue(frag_iter, queue):
|
||||
try:
|
||||
@ -1262,7 +1495,8 @@ class ECAppIter(object):
|
||||
pass
|
||||
except ChunkReadTimeout:
|
||||
# unable to resume in GetOrHeadHandler
|
||||
pass
|
||||
self.logger.exception("Timeout fetching fragments for %r" %
|
||||
self.path)
|
||||
except: # noqa
|
||||
self.logger.exception("Exception fetching fragments for %r" %
|
||||
self.path)
|
||||
@ -1270,14 +1504,13 @@ class ECAppIter(object):
|
||||
queue.resize(2) # ensure there's room
|
||||
queue.put(None)
|
||||
|
||||
with ContextPool(len(self.internal_app_iters)) as pool:
|
||||
for app_iter, queue in zip(
|
||||
self.internal_app_iters, queues):
|
||||
pool.spawn(put_fragments_in_queue, app_iter, queue)
|
||||
with ContextPool(len(fragment_iters)) as pool:
|
||||
for frag_iter, queue in zip(fragment_iters, queues):
|
||||
pool.spawn(put_fragments_in_queue, frag_iter, queue)
|
||||
|
||||
while True:
|
||||
fragments = []
|
||||
for qi, queue in enumerate(queues):
|
||||
for queue in queues:
|
||||
fragment = queue.get()
|
||||
queue.task_done()
|
||||
fragments.append(fragment)
|
||||
@ -1302,8 +1535,8 @@ class ECAppIter(object):
|
||||
def app_iter_range(self, start, end):
|
||||
return self
|
||||
|
||||
def app_iter_ranges(self, content_type, boundary, content_size):
|
||||
self.boundary = boundary
|
||||
def app_iter_ranges(self, ranges, content_type, boundary, content_size):
|
||||
return self
|
||||
|
||||
|
||||
def client_range_to_segment_range(client_start, client_end, segment_size):
|
||||
@ -1750,6 +1983,71 @@ def trailing_metadata(policy, client_obj_hasher,
|
||||
|
||||
@ObjectControllerRouter.register(EC_POLICY)
|
||||
class ECObjectController(BaseObjectController):
|
||||
def _fragment_GET_request(self, req, node_iter, partition, policy):
|
||||
"""
|
||||
Makes a GET request for a fragment.
|
||||
"""
|
||||
backend_headers = self.generate_request_headers(
|
||||
req, additional=req.headers)
|
||||
|
||||
getter = ResumingGetter(self.app, req, 'Object', node_iter,
|
||||
partition, req.swift_entity_path,
|
||||
backend_headers,
|
||||
client_chunk_size=policy.fragment_size,
|
||||
newest=False)
|
||||
return (getter, getter.response_parts_iter(req))
|
||||
|
||||
def _convert_range(self, req, policy):
|
||||
"""
|
||||
Take the requested range(s) from the client and convert it to range(s)
|
||||
to be sent to the object servers.
|
||||
|
||||
This includes widening requested ranges to full segments, then
|
||||
converting those ranges to fragments so that we retrieve the minimum
|
||||
number of fragments from the object server.
|
||||
|
||||
Mutates the request passed in.
|
||||
|
||||
Returns a list of range specs (dictionaries with the different byte
|
||||
indices in them).
|
||||
"""
|
||||
# Since segments and fragments have different sizes, we need
|
||||
# to modify the Range header sent to the object servers to
|
||||
# make sure we get the right fragments out of the fragment
|
||||
# archives.
|
||||
segment_size = policy.ec_segment_size
|
||||
fragment_size = policy.fragment_size
|
||||
|
||||
range_specs = []
|
||||
new_ranges = []
|
||||
for client_start, client_end in req.range.ranges:
|
||||
# TODO: coalesce ranges that overlap segments. For
|
||||
# example, "bytes=0-10,20-30,40-50" with a 64 KiB
|
||||
# segment size will result in a a Range header in the
|
||||
# object request of "bytes=0-65535,0-65535,0-65535",
|
||||
# which is wasteful. We should be smarter and only
|
||||
# request that first segment once.
|
||||
segment_start, segment_end = client_range_to_segment_range(
|
||||
client_start, client_end, segment_size)
|
||||
|
||||
fragment_start, fragment_end = \
|
||||
segment_range_to_fragment_range(
|
||||
segment_start, segment_end,
|
||||
segment_size, fragment_size)
|
||||
|
||||
new_ranges.append((fragment_start, fragment_end))
|
||||
range_specs.append({'req_client_start': client_start,
|
||||
'req_client_end': client_end,
|
||||
'req_segment_start': segment_start,
|
||||
'req_segment_end': segment_end,
|
||||
'req_fragment_start': fragment_start,
|
||||
'req_fragment_end': fragment_end})
|
||||
|
||||
req.range = "bytes=" + ",".join(
|
||||
"%s-%s" % (s if s is not None else "",
|
||||
e if e is not None else "")
|
||||
for s, e in new_ranges)
|
||||
return range_specs
|
||||
|
||||
def _get_or_head_response(self, req, node_iter, partition, policy):
|
||||
req.headers.setdefault("X-Backend-Etag-Is-At",
|
||||
@ -1767,63 +2065,35 @@ class ECObjectController(BaseObjectController):
|
||||
range_specs = []
|
||||
if req.range:
|
||||
orig_range = req.range
|
||||
# Since segments and fragments have different sizes, we need
|
||||
# to modify the Range header sent to the object servers to
|
||||
# make sure we get the right fragments out of the fragment
|
||||
# archives.
|
||||
segment_size = policy.ec_segment_size
|
||||
fragment_size = policy.fragment_size
|
||||
|
||||
range_specs = []
|
||||
new_ranges = []
|
||||
for client_start, client_end in req.range.ranges:
|
||||
|
||||
segment_start, segment_end = client_range_to_segment_range(
|
||||
client_start, client_end, segment_size)
|
||||
|
||||
fragment_start, fragment_end = \
|
||||
segment_range_to_fragment_range(
|
||||
segment_start, segment_end,
|
||||
segment_size, fragment_size)
|
||||
|
||||
new_ranges.append((fragment_start, fragment_end))
|
||||
range_specs.append({'client_start': client_start,
|
||||
'client_end': client_end,
|
||||
'segment_start': segment_start,
|
||||
'segment_end': segment_end})
|
||||
|
||||
req.range = "bytes=" + ",".join(
|
||||
"%s-%s" % (s if s is not None else "",
|
||||
e if e is not None else "")
|
||||
for s, e in new_ranges)
|
||||
range_specs = self._convert_range(req, policy)
|
||||
|
||||
node_iter = GreenthreadSafeIterator(node_iter)
|
||||
num_gets = policy.ec_ndata
|
||||
with ContextPool(num_gets) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
for _junk in range(num_gets):
|
||||
pile.spawn(self.GETorHEAD_base,
|
||||
req, 'Object', node_iter, partition,
|
||||
req.swift_entity_path,
|
||||
client_chunk_size=policy.fragment_size)
|
||||
pile.spawn(self._fragment_GET_request,
|
||||
req, node_iter, partition,
|
||||
policy)
|
||||
|
||||
responses = list(pile)
|
||||
good_responses = []
|
||||
bad_responses = []
|
||||
for response in responses:
|
||||
if is_success(response.status_int):
|
||||
good_responses.append(response)
|
||||
gets = list(pile)
|
||||
good_gets = []
|
||||
bad_gets = []
|
||||
for get, parts_iter in gets:
|
||||
if is_success(get.last_status):
|
||||
good_gets.append((get, parts_iter))
|
||||
else:
|
||||
bad_responses.append(response)
|
||||
bad_gets.append((get, parts_iter))
|
||||
|
||||
req.range = orig_range
|
||||
if len(good_responses) == num_gets:
|
||||
if len(good_gets) == num_gets:
|
||||
# If these aren't all for the same object, then error out so
|
||||
# at least the client doesn't get garbage. We can do a lot
|
||||
# better here with more work, but this'll work for now.
|
||||
found_obj_etags = set(
|
||||
resp.headers['X-Object-Sysmeta-Ec-Etag']
|
||||
for resp in good_responses)
|
||||
HeaderKeyDict(
|
||||
getter.last_headers)['X-Object-Sysmeta-Ec-Etag']
|
||||
for getter, _junk in good_gets)
|
||||
if len(found_obj_etags) > 1:
|
||||
self.app.logger.debug(
|
||||
"Returning 503 for %s; found too many etags (%s)",
|
||||
@ -1833,30 +2103,41 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
# we found enough pieces to decode the object, so now let's
|
||||
# decode the object
|
||||
resp_headers = HeaderKeyDict(good_responses[0].headers.items())
|
||||
resp_headers = HeaderKeyDict(
|
||||
good_gets[0][0].source_headers[-1])
|
||||
resp_headers.pop('Content-Range', None)
|
||||
eccl = resp_headers.get('X-Object-Sysmeta-Ec-Content-Length')
|
||||
obj_length = int(eccl) if eccl is not None else None
|
||||
|
||||
# This is only true if we didn't get a 206 response, but
|
||||
# that's the only time this is used anyway.
|
||||
fa_length = int(resp_headers['Content-Length'])
|
||||
|
||||
app_iter = ECAppIter(
|
||||
req.swift_entity_path,
|
||||
policy,
|
||||
[iterator for getter, iterator in good_gets],
|
||||
range_specs, fa_length, obj_length,
|
||||
self.app.logger)
|
||||
resp = Response(
|
||||
request=req,
|
||||
headers=resp_headers,
|
||||
conditional_response=True,
|
||||
app_iter=ECAppIter(
|
||||
req.swift_entity_path,
|
||||
policy,
|
||||
[r.app_iter for r in good_responses],
|
||||
range_specs,
|
||||
obj_length,
|
||||
logger=self.app.logger))
|
||||
app_iter=app_iter)
|
||||
app_iter.kickoff(req, resp)
|
||||
else:
|
||||
statuses = []
|
||||
reasons = []
|
||||
bodies = []
|
||||
headers = []
|
||||
for getter, body_parts_iter in bad_gets:
|
||||
statuses.extend(getter.statuses)
|
||||
reasons.extend(getter.reasons)
|
||||
bodies.extend(getter.bodies)
|
||||
headers.extend(getter.source_headers)
|
||||
resp = self.best_response(
|
||||
req,
|
||||
[r.status_int for r in bad_responses],
|
||||
[r.status.split(' ', 1)[1] for r in bad_responses],
|
||||
[r.body for r in bad_responses],
|
||||
'Object',
|
||||
headers=[r.headers for r in bad_responses])
|
||||
req, statuses, reasons, bodies, 'Object',
|
||||
headers=headers)
|
||||
|
||||
self._fix_response_headers(resp)
|
||||
return resp
|
||||
|
@ -1208,8 +1208,7 @@ class TestResponse(unittest.TestCase):
|
||||
('0123456789112345678'
|
||||
'92123456789')))
|
||||
|
||||
self.assert_(re.match(('\r\n'
|
||||
'--[a-f0-9]{32}\r\n'
|
||||
self.assert_(re.match(('--[a-f0-9]{32}\r\n'
|
||||
'Content-Type: text/plain\r\n'
|
||||
'Content-Range: bytes '
|
||||
'0-9/100\r\n\r\n0123456789\r\n'
|
||||
@ -1221,7 +1220,7 @@ class TestResponse(unittest.TestCase):
|
||||
'Content-Type: text/plain\r\n'
|
||||
'Content-Range: bytes '
|
||||
'20-29/100\r\n\r\n2123456789\r\n'
|
||||
'--[a-f0-9]{32}--\r\n'), content))
|
||||
'--[a-f0-9]{32}--'), content))
|
||||
|
||||
def test_multi_response_iter(self):
|
||||
def test_app(environ, start_response):
|
||||
|
@ -60,7 +60,7 @@ from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||
MimeInvalid, ThreadPoolDead)
|
||||
from swift.common import utils
|
||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.swob import Request, Response, HeaderKeyDict
|
||||
from test.unit import FakeLogger
|
||||
|
||||
|
||||
@ -4722,6 +4722,18 @@ class TestIterMultipartMimeDocuments(unittest.TestCase):
|
||||
exc = err
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
def test_leading_crlfs(self):
|
||||
it = utils.iter_multipart_mime_documents(
|
||||
StringIO('\r\n\r\n\r\n--unique\r\nabcdefg\r\n'
|
||||
'--unique\r\nhijkl\r\n--unique--'),
|
||||
'unique')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(65536), 'abcdefg')
|
||||
self.assertEquals(fp.read(), '')
|
||||
fp = it.next()
|
||||
self.assertEquals(fp.read(), 'hijkl')
|
||||
self.assertRaises(StopIteration, it.next)
|
||||
|
||||
def test_broken_mid_stream(self):
|
||||
# We go ahead and accept whatever is sent instead of rejecting the
|
||||
# whole request, in case the partial form is still useful.
|
||||
@ -4777,6 +4789,156 @@ class TestIterMultipartMimeDocuments(unittest.TestCase):
|
||||
self.assertTrue(exc is not None)
|
||||
|
||||
|
||||
class FakeResponse(object):
|
||||
def __init__(self, status, headers, body):
|
||||
self.status = status
|
||||
self.headers = HeaderKeyDict(headers)
|
||||
self.body = StringIO(body)
|
||||
|
||||
def getheader(self, header_name):
|
||||
return str(self.headers.get(header_name, ''))
|
||||
|
||||
def getheaders(self):
|
||||
return self.headers.items()
|
||||
|
||||
def read(self, length=None):
|
||||
return self.body.read(length)
|
||||
|
||||
def readline(self, length=None):
|
||||
return self.body.readline(length)
|
||||
|
||||
|
||||
class TestHTTPResponseToDocumentIters(unittest.TestCase):
|
||||
def test_200(self):
|
||||
fr = FakeResponse(
|
||||
200,
|
||||
{'Content-Length': '10', 'Content-Type': 'application/lunch'},
|
||||
'sandwiches')
|
||||
|
||||
doc_iters = utils.http_response_to_document_iters(fr)
|
||||
first_byte, last_byte, length, headers, body = next(doc_iters)
|
||||
self.assertEqual(first_byte, 0)
|
||||
self.assertEqual(last_byte, 9)
|
||||
self.assertEqual(length, 10)
|
||||
header_dict = HeaderKeyDict(headers)
|
||||
self.assertEqual(header_dict.get('Content-Length'), '10')
|
||||
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
||||
self.assertEqual(body.read(), 'sandwiches')
|
||||
|
||||
self.assertRaises(StopIteration, next, doc_iters)
|
||||
|
||||
def test_206_single_range(self):
|
||||
fr = FakeResponse(
|
||||
206,
|
||||
{'Content-Length': '8', 'Content-Type': 'application/lunch',
|
||||
'Content-Range': 'bytes 1-8/10'},
|
||||
'andwiche')
|
||||
|
||||
doc_iters = utils.http_response_to_document_iters(fr)
|
||||
first_byte, last_byte, length, headers, body = next(doc_iters)
|
||||
self.assertEqual(first_byte, 1)
|
||||
self.assertEqual(last_byte, 8)
|
||||
self.assertEqual(length, 10)
|
||||
header_dict = HeaderKeyDict(headers)
|
||||
self.assertEqual(header_dict.get('Content-Length'), '8')
|
||||
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
||||
self.assertEqual(body.read(), 'andwiche')
|
||||
|
||||
self.assertRaises(StopIteration, next, doc_iters)
|
||||
|
||||
def test_206_multiple_ranges(self):
|
||||
fr = FakeResponse(
|
||||
206,
|
||||
{'Content-Type': 'multipart/byteranges; boundary=asdfasdfasdf'},
|
||||
("--asdfasdfasdf\r\n"
|
||||
"Content-Type: application/lunch\r\n"
|
||||
"Content-Range: bytes 0-3/10\r\n"
|
||||
"\r\n"
|
||||
"sand\r\n"
|
||||
"--asdfasdfasdf\r\n"
|
||||
"Content-Type: application/lunch\r\n"
|
||||
"Content-Range: bytes 6-9/10\r\n"
|
||||
"\r\n"
|
||||
"ches\r\n"
|
||||
"--asdfasdfasdf--"))
|
||||
|
||||
doc_iters = utils.http_response_to_document_iters(fr)
|
||||
|
||||
first_byte, last_byte, length, headers, body = next(doc_iters)
|
||||
self.assertEqual(first_byte, 0)
|
||||
self.assertEqual(last_byte, 3)
|
||||
self.assertEqual(length, 10)
|
||||
header_dict = HeaderKeyDict(headers)
|
||||
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
||||
self.assertEqual(body.read(), 'sand')
|
||||
|
||||
first_byte, last_byte, length, headers, body = next(doc_iters)
|
||||
self.assertEqual(first_byte, 6)
|
||||
self.assertEqual(last_byte, 9)
|
||||
self.assertEqual(length, 10)
|
||||
header_dict = HeaderKeyDict(headers)
|
||||
self.assertEqual(header_dict.get('Content-Type'), 'application/lunch')
|
||||
self.assertEqual(body.read(), 'ches')
|
||||
|
||||
self.assertRaises(StopIteration, next, doc_iters)
|
||||
|
||||
|
||||
class TestDocumentItersToHTTPResponseBody(unittest.TestCase):
|
||||
def test_no_parts(self):
|
||||
body = utils.document_iters_to_http_response_body(
|
||||
iter([]), 'dontcare',
|
||||
multipart=False, logger=FakeLogger())
|
||||
self.assertEqual(body, '')
|
||||
|
||||
def test_single_part(self):
|
||||
body = "time flies like an arrow; fruit flies like a banana"
|
||||
doc_iters = [{'part_iter': iter(StringIO(body).read, '')}]
|
||||
|
||||
resp_body = ''.join(
|
||||
utils.document_iters_to_http_response_body(
|
||||
iter(doc_iters), 'dontcare',
|
||||
multipart=False, logger=FakeLogger()))
|
||||
self.assertEqual(resp_body, body)
|
||||
|
||||
def test_multiple_parts(self):
|
||||
part1 = "two peanuts were walking down a railroad track"
|
||||
part2 = "and one was a salted. ... peanut."
|
||||
|
||||
doc_iters = [{
|
||||
'start_byte': 88,
|
||||
'end_byte': 133,
|
||||
'content_type': 'application/peanut',
|
||||
'entity_length': 1024,
|
||||
'part_iter': iter(StringIO(part1).read, ''),
|
||||
}, {
|
||||
'start_byte': 500,
|
||||
'end_byte': 532,
|
||||
'content_type': 'application/salted',
|
||||
'entity_length': 1024,
|
||||
'part_iter': iter(StringIO(part2).read, ''),
|
||||
}]
|
||||
|
||||
resp_body = ''.join(
|
||||
utils.document_iters_to_http_response_body(
|
||||
iter(doc_iters), 'boundaryboundary',
|
||||
multipart=True, logger=FakeLogger()))
|
||||
self.assertEqual(resp_body, (
|
||||
"--boundaryboundary\r\n" +
|
||||
# This is a little too strict; we don't actually care that the
|
||||
# headers are in this order, but the test is much more legible
|
||||
# this way.
|
||||
"Content-Type: application/peanut\r\n" +
|
||||
"Content-Range: bytes 88-133/1024\r\n" +
|
||||
"\r\n" +
|
||||
part1 + "\r\n" +
|
||||
"--boundaryboundary\r\n"
|
||||
"Content-Type: application/salted\r\n" +
|
||||
"Content-Range: bytes 500-532/1024\r\n" +
|
||||
"\r\n" +
|
||||
part2 + "\r\n" +
|
||||
"--boundaryboundary--"))
|
||||
|
||||
|
||||
class TestPairs(unittest.TestCase):
|
||||
def test_pairs(self):
|
||||
items = [10, 20, 30, 40, 50, 60]
|
||||
|
@ -658,6 +658,7 @@ class TestFuncs(unittest.TestCase):
|
||||
class TestSource(object):
|
||||
def __init__(self, chunks):
|
||||
self.chunks = list(chunks)
|
||||
self.status = 200
|
||||
|
||||
def read(self, _read_size):
|
||||
if self.chunks:
|
||||
@ -665,6 +666,13 @@ class TestFuncs(unittest.TestCase):
|
||||
else:
|
||||
return ''
|
||||
|
||||
def getheader(self, header):
|
||||
if header.lower() == "content-length":
|
||||
return str(sum(len(c) for c in self.chunks))
|
||||
|
||||
def getheaders(self):
|
||||
return [('content-length', self.getheader('content-length'))]
|
||||
|
||||
source = TestSource((
|
||||
'abcd', '1234', 'abc', 'd1', '234abcd1234abcd1', '2'))
|
||||
req = Request.blank('/v1/a/c/o')
|
||||
@ -682,6 +690,7 @@ class TestFuncs(unittest.TestCase):
|
||||
class TestSource(object):
|
||||
def __init__(self, chunks):
|
||||
self.chunks = list(chunks)
|
||||
self.status = 200
|
||||
|
||||
def read(self, _read_size):
|
||||
if self.chunks:
|
||||
@ -693,6 +702,14 @@ class TestFuncs(unittest.TestCase):
|
||||
else:
|
||||
return ''
|
||||
|
||||
def getheader(self, header):
|
||||
if header.lower() == "content-length":
|
||||
return str(sum(len(c) for c in self.chunks
|
||||
if c is not None))
|
||||
|
||||
def getheaders(self):
|
||||
return [('content-length', self.getheader('content-length'))]
|
||||
|
||||
node = {'ip': '1.2.3.4', 'port': 6000, 'device': 'sda'}
|
||||
|
||||
source1 = TestSource(['abcd', '1234', 'abc', None])
|
||||
@ -707,7 +724,6 @@ class TestFuncs(unittest.TestCase):
|
||||
lambda: (source2, node)):
|
||||
client_chunks = list(app_iter)
|
||||
self.assertEqual(client_chunks, ['abcd1234', 'efgh5678'])
|
||||
self.assertEqual(handler.backend_headers['Range'], 'bytes=8-')
|
||||
|
||||
def test_bytes_to_skip(self):
|
||||
# if you start at the beginning, skip nothing
|
||||
|
@ -939,8 +939,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
||||
def test_GET_simple_x_newest(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o',
|
||||
headers={'X-Newest': 'true'})
|
||||
codes = [200] * self.replicas()
|
||||
codes += [404] * self.obj_ring.max_more_nodes
|
||||
codes = [200] * self.policy.ec_ndata
|
||||
with set_http_connect(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
@ -976,7 +975,8 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
||||
|
||||
node_fragments = zip(*fragment_payloads)
|
||||
self.assertEqual(len(node_fragments), self.replicas()) # sanity
|
||||
responses = [(200, ''.join(node_fragments[i]), {})
|
||||
headers = {'X-Object-Sysmeta-Ec-Content-Length': str(len(real_body))}
|
||||
responses = [(200, ''.join(node_fragments[i]), headers)
|
||||
for i in range(POLICIES.default.ec_ndata)]
|
||||
status_codes, body_iter, headers = zip(*responses)
|
||||
with set_http_connect(*status_codes, body_iter=body_iter,
|
||||
@ -1260,8 +1260,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
||||
'X-Copy-From': 'c2/o'})
|
||||
|
||||
# c2 get
|
||||
codes = [200] * self.replicas()
|
||||
codes += [404] * self.obj_ring.max_more_nodes
|
||||
codes = [404, 200] * self.policy.ec_ndata
|
||||
headers = {
|
||||
'X-Object-Sysmeta-Ec-Content-Length': 0,
|
||||
}
|
||||
@ -1318,9 +1317,11 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
|
||||
ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1)
|
||||
ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2)
|
||||
|
||||
headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1}
|
||||
headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1,
|
||||
'X-Object-Sysmeta-Ec-Content-Length': '333'}
|
||||
# here we're going to *lie* and say the etag here matches
|
||||
headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1}
|
||||
headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1,
|
||||
'X-Object-Sysmeta-Ec-Content-Length': '333'}
|
||||
|
||||
responses1 = [(200, body, headers1)
|
||||
for body in ec_archive_bodies1]
|
||||
|
@ -14,10 +14,12 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import email.parser
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import pickle
|
||||
import rfc822
|
||||
import sys
|
||||
import unittest
|
||||
from contextlib import closing, contextmanager, nested
|
||||
@ -40,7 +42,8 @@ import random
|
||||
|
||||
import mock
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout
|
||||
from swift.common.utils import hash_path, json, storage_directory, public
|
||||
from swift.common.utils import hash_path, json, storage_directory, \
|
||||
parse_content_type, iter_multipart_mime_documents, public
|
||||
|
||||
from test.unit import (
|
||||
connect_tcp, readuntil2crlfs, FakeLogger, fake_http_connect, FakeRing,
|
||||
@ -1378,6 +1381,331 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEqual(res.status_int, 200)
|
||||
self.assertEqual(res.body, obj)
|
||||
|
||||
@unpatch_policies
|
||||
def test_GET_ranges(self):
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
obj = (''.join(
|
||||
('beans lots of beans lots of beans lots of beans yeah %04d ' % i)
|
||||
for i in range(100)))
|
||||
|
||||
path = '/v1/a/c/o.beans'
|
||||
fd.write('PUT %s HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Length: %s\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (path, str(len(obj)), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# one byte range
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Content-Type': 'application/octet-stream',
|
||||
'Range': 'bytes=10-200'})
|
||||
res = req.get_response(prosrv)
|
||||
self.assertEqual(res.status_int, 206)
|
||||
self.assertEqual(res.body, obj[10:201])
|
||||
|
||||
# multiple byte ranges
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Content-Type': 'application/octet-stream',
|
||||
'Range': 'bytes=10-200,1000-1099,4123-4523'})
|
||||
res = req.get_response(prosrv)
|
||||
self.assertEqual(res.status_int, 206)
|
||||
ct, params = parse_content_type(res.headers['Content-Type'])
|
||||
self.assertEqual(ct, 'multipart/byteranges')
|
||||
|
||||
boundary = dict(params).get('boundary')
|
||||
self.assertTrue(boundary is not None)
|
||||
|
||||
got_mime_docs = []
|
||||
for mime_doc_fh in iter_multipart_mime_documents(StringIO(res.body),
|
||||
boundary):
|
||||
headers = HeaderKeyDict(rfc822.Message(mime_doc_fh, 0).items())
|
||||
body = mime_doc_fh.read()
|
||||
got_mime_docs.append((headers, body))
|
||||
self.assertEqual(len(got_mime_docs), 3)
|
||||
|
||||
first_range_headers = got_mime_docs[0][0]
|
||||
first_range_body = got_mime_docs[0][1]
|
||||
self.assertEqual(first_range_headers['Content-Range'],
|
||||
'bytes 10-200/5800')
|
||||
self.assertEqual(first_range_body, obj[10:201])
|
||||
|
||||
second_range_headers = got_mime_docs[1][0]
|
||||
second_range_body = got_mime_docs[1][1]
|
||||
self.assertEqual(second_range_headers['Content-Range'],
|
||||
'bytes 1000-1099/5800')
|
||||
self.assertEqual(second_range_body, obj[1000:1100])
|
||||
|
||||
second_range_headers = got_mime_docs[2][0]
|
||||
second_range_body = got_mime_docs[2][1]
|
||||
self.assertEqual(second_range_headers['Content-Range'],
|
||||
'bytes 4123-4523/5800')
|
||||
self.assertEqual(second_range_body, obj[4123:4524])
|
||||
|
||||
@unpatch_policies
|
||||
def test_GET_ranges_resuming(self):
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
obj = (''.join(
|
||||
('Smurf! The smurfing smurf is completely smurfed. %03d ' % i)
|
||||
for i in range(1000)))
|
||||
|
||||
path = '/v1/a/c/o.smurfs'
|
||||
fd.write('PUT %s HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Length: %s\r\n'
|
||||
'Content-Type: application/smurftet-stream\r\n'
|
||||
'\r\n%s' % (path, str(len(obj)), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
kaboomed = [0]
|
||||
bytes_before_timeout = [None]
|
||||
|
||||
class FileLikeKaboom(object):
|
||||
def __init__(self, inner_file_like):
|
||||
self.inner_file_like = inner_file_like
|
||||
|
||||
# close(), etc.
|
||||
def __getattr__(self, attr):
|
||||
return getattr(self.inner_file_like, attr)
|
||||
|
||||
def readline(self, *a, **kw):
|
||||
if bytes_before_timeout[0] <= 0:
|
||||
kaboomed[0] += 1
|
||||
raise ChunkReadTimeout(None)
|
||||
result = self.inner_file_like.readline(*a, **kw)
|
||||
if len(result) > bytes_before_timeout[0]:
|
||||
result = result[:bytes_before_timeout[0]]
|
||||
bytes_before_timeout[0] -= len(result)
|
||||
return result
|
||||
|
||||
def read(self, length=None):
|
||||
result = self.inner_file_like.read(length)
|
||||
if bytes_before_timeout[0] <= 0:
|
||||
kaboomed[0] += 1
|
||||
raise ChunkReadTimeout(None)
|
||||
if len(result) > bytes_before_timeout[0]:
|
||||
result = result[:bytes_before_timeout[0]]
|
||||
bytes_before_timeout[0] -= len(result)
|
||||
return result
|
||||
|
||||
orig_hrtdi = proxy_base.http_response_to_document_iters
|
||||
|
||||
# Use this to mock out http_response_to_document_iters. On the first
|
||||
# call, the result will be sabotaged to blow up with
|
||||
# ChunkReadTimeout after some number of bytes are read. On
|
||||
# subsequent calls, no sabotage will be added.
|
||||
|
||||
def sabotaged_hrtdi(*a, **kw):
|
||||
resp_parts = orig_hrtdi(*a, **kw)
|
||||
for sb, eb, l, h, range_file in resp_parts:
|
||||
if bytes_before_timeout[0] <= 0:
|
||||
# simulate being unable to read MIME part of
|
||||
# multipart/byteranges response
|
||||
kaboomed[0] += 1
|
||||
raise ChunkReadTimeout(None)
|
||||
boomer = FileLikeKaboom(range_file)
|
||||
yield sb, eb, l, h, boomer
|
||||
|
||||
sabotaged = [False]
|
||||
|
||||
def single_sabotage_hrtdi(*a, **kw):
|
||||
if not sabotaged[0]:
|
||||
sabotaged[0] = True
|
||||
return sabotaged_hrtdi(*a, **kw)
|
||||
else:
|
||||
return orig_hrtdi(*a, **kw)
|
||||
|
||||
# We want sort of an end-to-end test of object resuming, so what we
|
||||
# do is mock out stuff so the proxy thinks it only read a certain
|
||||
# number of bytes before it got a timeout.
|
||||
bytes_before_timeout[0] = 300
|
||||
with mock.patch.object(proxy_base, 'http_response_to_document_iters',
|
||||
single_sabotage_hrtdi):
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Content-Type': 'application/octet-stream',
|
||||
'Range': 'bytes=0-500'})
|
||||
res = req.get_response(prosrv)
|
||||
body = res.body # read the whole thing
|
||||
self.assertEqual(kaboomed[0], 1) # sanity check
|
||||
self.assertEqual(res.status_int, 206)
|
||||
self.assertEqual(len(body), 501)
|
||||
self.assertEqual(body, obj[:501])
|
||||
|
||||
# Sanity-check for multi-range resume: make sure we actually break
|
||||
# in the middle of the second byterange. This test is partially
|
||||
# about what happens when all the object servers break at once, and
|
||||
# partially about validating all these mocks we do. After all, the
|
||||
# point of resuming is that the client can't tell anything went
|
||||
# wrong, so we need a test where we can't resume and something
|
||||
# *does* go wrong so we can observe it.
|
||||
bytes_before_timeout[0] = 700
|
||||
kaboomed[0] = 0
|
||||
sabotaged[0] = False
|
||||
prosrv._error_limiting = {} # clear out errors
|
||||
with mock.patch.object(proxy_base, 'http_response_to_document_iters',
|
||||
sabotaged_hrtdi): # perma-broken
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=0-500,1000-1500,2000-2500'})
|
||||
res = req.get_response(prosrv)
|
||||
body = ''
|
||||
try:
|
||||
for chunk in res.app_iter:
|
||||
body += chunk
|
||||
except ChunkReadTimeout:
|
||||
pass
|
||||
|
||||
self.assertEqual(res.status_int, 206)
|
||||
self.assertTrue(kaboomed[0] > 0) # sanity check
|
||||
|
||||
ct, params = parse_content_type(res.headers['Content-Type'])
|
||||
self.assertEqual(ct, 'multipart/byteranges') # sanity check
|
||||
boundary = dict(params).get('boundary')
|
||||
self.assertTrue(boundary is not None) # sanity check
|
||||
got_byteranges = []
|
||||
for mime_doc_fh in iter_multipart_mime_documents(StringIO(body),
|
||||
boundary):
|
||||
rfc822.Message(mime_doc_fh, 0)
|
||||
body = mime_doc_fh.read()
|
||||
got_byteranges.append(body)
|
||||
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
self.assertEqual(len(got_byteranges[0]), 501)
|
||||
self.assertEqual(len(got_byteranges[1]), 199) # partial
|
||||
|
||||
# Multi-range resume, resuming in the middle of the first byterange
|
||||
bytes_before_timeout[0] = 300
|
||||
kaboomed[0] = 0
|
||||
sabotaged[0] = False
|
||||
prosrv._error_limiting = {} # clear out errors
|
||||
with mock.patch.object(proxy_base, 'http_response_to_document_iters',
|
||||
single_sabotage_hrtdi):
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=0-500,1000-1500,2000-2500'})
|
||||
res = req.get_response(prosrv)
|
||||
body = ''.join(res.app_iter)
|
||||
|
||||
self.assertEqual(res.status_int, 206)
|
||||
self.assertEqual(kaboomed[0], 1) # sanity check
|
||||
|
||||
ct, params = parse_content_type(res.headers['Content-Type'])
|
||||
self.assertEqual(ct, 'multipart/byteranges') # sanity check
|
||||
boundary = dict(params).get('boundary')
|
||||
self.assertTrue(boundary is not None) # sanity check
|
||||
got_byteranges = []
|
||||
for mime_doc_fh in iter_multipart_mime_documents(StringIO(body),
|
||||
boundary):
|
||||
rfc822.Message(mime_doc_fh, 0)
|
||||
body = mime_doc_fh.read()
|
||||
got_byteranges.append(body)
|
||||
|
||||
self.assertEqual(len(got_byteranges), 3)
|
||||
self.assertEqual(len(got_byteranges[0]), 501)
|
||||
self.assertEqual(got_byteranges[0], obj[:501])
|
||||
self.assertEqual(len(got_byteranges[1]), 501)
|
||||
self.assertEqual(got_byteranges[1], obj[1000:1501])
|
||||
self.assertEqual(len(got_byteranges[2]), 501)
|
||||
self.assertEqual(got_byteranges[2], obj[2000:2501])
|
||||
|
||||
# Multi-range resume, first GET dies in the middle of the second set
|
||||
# of MIME headers
|
||||
bytes_before_timeout[0] = 501
|
||||
kaboomed[0] = 0
|
||||
sabotaged[0] = False
|
||||
prosrv._error_limiting = {} # clear out errors
|
||||
with mock.patch.object(proxy_base, 'http_response_to_document_iters',
|
||||
single_sabotage_hrtdi):
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=0-500,1000-1500,2000-2500'})
|
||||
res = req.get_response(prosrv)
|
||||
body = ''.join(res.app_iter)
|
||||
|
||||
self.assertEqual(res.status_int, 206)
|
||||
self.assertTrue(kaboomed[0] >= 1) # sanity check
|
||||
|
||||
ct, params = parse_content_type(res.headers['Content-Type'])
|
||||
self.assertEqual(ct, 'multipart/byteranges') # sanity check
|
||||
boundary = dict(params).get('boundary')
|
||||
self.assertTrue(boundary is not None) # sanity check
|
||||
got_byteranges = []
|
||||
for mime_doc_fh in iter_multipart_mime_documents(StringIO(body),
|
||||
boundary):
|
||||
rfc822.Message(mime_doc_fh, 0)
|
||||
body = mime_doc_fh.read()
|
||||
got_byteranges.append(body)
|
||||
|
||||
self.assertEqual(len(got_byteranges), 3)
|
||||
self.assertEqual(len(got_byteranges[0]), 501)
|
||||
self.assertEqual(got_byteranges[0], obj[:501])
|
||||
self.assertEqual(len(got_byteranges[1]), 501)
|
||||
self.assertEqual(got_byteranges[1], obj[1000:1501])
|
||||
self.assertEqual(len(got_byteranges[2]), 501)
|
||||
self.assertEqual(got_byteranges[2], obj[2000:2501])
|
||||
|
||||
# Multi-range resume, first GET dies in the middle of the second
|
||||
# byterange
|
||||
bytes_before_timeout[0] = 750
|
||||
kaboomed[0] = 0
|
||||
sabotaged[0] = False
|
||||
prosrv._error_limiting = {} # clear out errors
|
||||
with mock.patch.object(proxy_base, 'http_response_to_document_iters',
|
||||
single_sabotage_hrtdi):
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'Range': 'bytes=0-500,1000-1500,2000-2500'})
|
||||
res = req.get_response(prosrv)
|
||||
body = ''.join(res.app_iter)
|
||||
|
||||
self.assertEqual(res.status_int, 206)
|
||||
self.assertTrue(kaboomed[0] >= 1) # sanity check
|
||||
|
||||
ct, params = parse_content_type(res.headers['Content-Type'])
|
||||
self.assertEqual(ct, 'multipart/byteranges') # sanity check
|
||||
boundary = dict(params).get('boundary')
|
||||
self.assertTrue(boundary is not None) # sanity check
|
||||
got_byteranges = []
|
||||
for mime_doc_fh in iter_multipart_mime_documents(StringIO(body),
|
||||
boundary):
|
||||
rfc822.Message(mime_doc_fh, 0)
|
||||
body = mime_doc_fh.read()
|
||||
got_byteranges.append(body)
|
||||
|
||||
self.assertEqual(len(got_byteranges), 3)
|
||||
self.assertEqual(len(got_byteranges[0]), 501)
|
||||
self.assertEqual(got_byteranges[0], obj[:501])
|
||||
self.assertEqual(len(got_byteranges[1]), 501)
|
||||
self.assertEqual(got_byteranges[1], obj[1000:1501])
|
||||
self.assertEqual(len(got_byteranges[2]), 501)
|
||||
self.assertEqual(got_byteranges[2], obj[2000:2501])
|
||||
|
||||
@unpatch_policies
|
||||
def test_PUT_ec(self):
|
||||
policy = POLICIES[3]
|
||||
@ -1872,6 +2200,12 @@ class TestObjectController(unittest.TestCase):
|
||||
yield next(inner_iter)
|
||||
raise Exception("doom ba doom")
|
||||
|
||||
def explodey_doc_parts_iter(inner_iter_iter):
|
||||
for item in inner_iter_iter:
|
||||
item = item.copy() # paranoia about mutable data
|
||||
item['part_iter'] = explodey_iter(item['part_iter'])
|
||||
yield item
|
||||
|
||||
real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter
|
||||
|
||||
def explodey_ec_app_iter(path, policy, iterators, *a, **kw):
|
||||
@ -1882,7 +2216,7 @@ class TestObjectController(unittest.TestCase):
|
||||
# the client when things go wrong.
|
||||
return real_ec_app_iter(
|
||||
path, policy,
|
||||
[explodey_iter(i) for i in iterators],
|
||||
[explodey_doc_parts_iter(i) for i in iterators],
|
||||
*a, **kw)
|
||||
|
||||
with mock.patch("swift.proxy.controllers.obj.ECAppIter",
|
||||
@ -6325,7 +6659,7 @@ class TestObjectECRangedGET(unittest.TestCase):
|
||||
'Connection: close\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'Content-Type: donuts\r\n'
|
||||
'\r\n%s' % (obj_name, len(obj), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
@ -6359,8 +6693,44 @@ class TestObjectECRangedGET(unittest.TestCase):
|
||||
break
|
||||
gotten_obj += buf
|
||||
|
||||
# if we get this wrong, clients will either get truncated data or
|
||||
# they'll hang waiting for bytes that aren't coming, so it warrants
|
||||
# being asserted for every test case
|
||||
if 'Content-Length' in headers:
|
||||
self.assertEqual(int(headers['Content-Length']), len(gotten_obj))
|
||||
|
||||
# likewise, if we say MIME and don't send MIME or vice versa,
|
||||
# clients will be horribly confused
|
||||
if headers.get('Content-Type', '').startswith('multipart/byteranges'):
|
||||
self.assertEqual(gotten_obj[:2], "--")
|
||||
else:
|
||||
# In general, this isn't true, as you can start an object with
|
||||
# "--". However, in this test, we don't start any objects with
|
||||
# "--", or even include "--" in their contents anywhere.
|
||||
self.assertNotEqual(gotten_obj[:2], "--")
|
||||
|
||||
return (status_code, headers, gotten_obj)
|
||||
|
||||
def _parse_multipart(self, content_type, body):
|
||||
parser = email.parser.FeedParser()
|
||||
parser.feed("Content-Type: %s\r\n\r\n" % content_type)
|
||||
parser.feed(body)
|
||||
root_message = parser.close()
|
||||
self.assertTrue(root_message.is_multipart())
|
||||
byteranges = root_message.get_payload()
|
||||
self.assertFalse(root_message.defects)
|
||||
for i, message in enumerate(byteranges):
|
||||
self.assertFalse(message.defects, "Part %d had defects" % i)
|
||||
self.assertFalse(message.is_multipart(),
|
||||
"Nested multipart at %d" % i)
|
||||
return byteranges
|
||||
|
||||
def test_bogus(self):
|
||||
status, headers, gotten_obj = self._get_obj("tacos=3-5")
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(len(gotten_obj), len(self.obj))
|
||||
self.assertEqual(gotten_obj, self.obj)
|
||||
|
||||
def test_unaligned(self):
|
||||
# One segment's worth of data, but straddling two segment boundaries
|
||||
# (so it has data from three segments)
|
||||
@ -6372,7 +6742,7 @@ class TestObjectECRangedGET(unittest.TestCase):
|
||||
self.assertEqual(gotten_obj, self.obj[3783:7879])
|
||||
|
||||
def test_aligned_left(self):
|
||||
# First byte is aligned to a segment boundary, last byte is not
|
||||
# Firts byte is aligned to a segment boundary, last byte is not
|
||||
status, headers, gotten_obj = self._get_obj("bytes=0-5500")
|
||||
self.assertEqual(status, 206)
|
||||
self.assertEqual(headers['Content-Length'], "5501")
|
||||
@ -6540,6 +6910,168 @@ class TestObjectECRangedGET(unittest.TestCase):
|
||||
self.assertEqual(len(gotten_obj), len(self.tiny_obj))
|
||||
self.assertEqual(gotten_obj, self.tiny_obj)
|
||||
|
||||
def test_multiple_ranges(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-100,4490-5010", self.obj_name)
|
||||
self.assertEqual(status, 206)
|
||||
self.assertEqual(headers["Content-Length"], str(len(gotten_obj)))
|
||||
|
||||
content_type, content_type_params = parse_content_type(
|
||||
headers['Content-Type'])
|
||||
content_type_params = dict(content_type_params)
|
||||
|
||||
self.assertEqual(content_type, 'multipart/byteranges')
|
||||
boundary = content_type_params.get('boundary')
|
||||
self.assertTrue(boundary is not None)
|
||||
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
first_byterange, second_byterange = got_byteranges
|
||||
|
||||
self.assertEqual(first_byterange['Content-Range'],
|
||||
'bytes 0-100/14513')
|
||||
self.assertEqual(first_byterange.get_payload(), self.obj[:101])
|
||||
|
||||
self.assertEqual(second_byterange['Content-Range'],
|
||||
'bytes 4490-5010/14513')
|
||||
self.assertEqual(second_byterange.get_payload(), self.obj[4490:5011])
|
||||
|
||||
def test_multiple_ranges_overlapping_in_segment(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-9,20-29,40-49,60-69,80-89")
|
||||
self.assertEqual(status, 206)
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 5)
|
||||
|
||||
def test_multiple_ranges_off_end(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-10,14500-14513") # there is no byte 14513, only 0-14512
|
||||
self.assertEqual(status, 206)
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
self.assertEqual(got_byteranges[0]['Content-Range'],
|
||||
"bytes 0-10/14513")
|
||||
self.assertEqual(got_byteranges[1]['Content-Range'],
|
||||
"bytes 14500-14512/14513")
|
||||
|
||||
def test_multiple_ranges_suffix_off_end(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-10,-13")
|
||||
self.assertEqual(status, 206)
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
self.assertEqual(got_byteranges[0]['Content-Range'],
|
||||
"bytes 0-10/14513")
|
||||
self.assertEqual(got_byteranges[1]['Content-Range'],
|
||||
"bytes 14500-14512/14513")
|
||||
|
||||
def test_multiple_ranges_one_barely_unsatisfiable(self):
|
||||
# The thing about 14515-14520 is that it comes from the last segment
|
||||
# in the object. When we turn this range into a fragment range,
|
||||
# it'll be for the last fragment, so the object servers see
|
||||
# something satisfiable.
|
||||
#
|
||||
# Basically, we'll get 3 byteranges from the object server, but we
|
||||
# have to filter out the unsatisfiable one on our own.
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-10,14515-14520,40-50")
|
||||
self.assertEqual(status, 206)
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
self.assertEqual(got_byteranges[0]['Content-Range'],
|
||||
"bytes 0-10/14513")
|
||||
self.assertEqual(got_byteranges[0].get_payload(), self.obj[0:11])
|
||||
self.assertEqual(got_byteranges[1]['Content-Range'],
|
||||
"bytes 40-50/14513")
|
||||
self.assertEqual(got_byteranges[1].get_payload(), self.obj[40:51])
|
||||
|
||||
def test_multiple_ranges_some_unsatisfiable(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-100,4090-5010,999999-9999999", self.obj_name)
|
||||
self.assertEqual(status, 206)
|
||||
|
||||
content_type, content_type_params = parse_content_type(
|
||||
headers['Content-Type'])
|
||||
content_type_params = dict(content_type_params)
|
||||
|
||||
self.assertEqual(content_type, 'multipart/byteranges')
|
||||
boundary = content_type_params.get('boundary')
|
||||
self.assertTrue(boundary is not None)
|
||||
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
first_byterange, second_byterange = got_byteranges
|
||||
|
||||
self.assertEqual(first_byterange['Content-Range'],
|
||||
'bytes 0-100/14513')
|
||||
self.assertEqual(first_byterange.get_payload(), self.obj[:101])
|
||||
|
||||
self.assertEqual(second_byterange['Content-Range'],
|
||||
'bytes 4090-5010/14513')
|
||||
self.assertEqual(second_byterange.get_payload(), self.obj[4090:5011])
|
||||
|
||||
def test_two_ranges_one_unsatisfiable(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-100,999999-9999999", self.obj_name)
|
||||
self.assertEqual(status, 206)
|
||||
|
||||
content_type, content_type_params = parse_content_type(
|
||||
headers['Content-Type'])
|
||||
|
||||
# According to RFC 7233, this could be either a multipart/byteranges
|
||||
# response with one part or it could be a single-part response (just
|
||||
# the bytes, no MIME). We're locking it down here: single-part
|
||||
# response. That's what replicated objects do, and we don't want any
|
||||
# client-visible differences between EC objects and replicated ones.
|
||||
self.assertEqual(content_type, 'donuts')
|
||||
self.assertEqual(gotten_obj, self.obj[:101])
|
||||
|
||||
def test_two_ranges_one_unsatisfiable_same_segment(self):
|
||||
# Like test_two_ranges_one_unsatisfiable(), but where both ranges
|
||||
# fall within the same EC segment.
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=14500-14510,14520-14530")
|
||||
|
||||
self.assertEqual(status, 206)
|
||||
|
||||
content_type, content_type_params = parse_content_type(
|
||||
headers['Content-Type'])
|
||||
|
||||
self.assertEqual(content_type, 'donuts')
|
||||
self.assertEqual(gotten_obj, self.obj[14500:14511])
|
||||
|
||||
def test_multiple_ranges_some_unsatisfiable_out_of_order(self):
|
||||
status, headers, gotten_obj = self._get_obj(
|
||||
"bytes=0-100,99999998-99999999,4090-5010", self.obj_name)
|
||||
self.assertEqual(status, 206)
|
||||
|
||||
content_type, content_type_params = parse_content_type(
|
||||
headers['Content-Type'])
|
||||
content_type_params = dict(content_type_params)
|
||||
|
||||
self.assertEqual(content_type, 'multipart/byteranges')
|
||||
boundary = content_type_params.get('boundary')
|
||||
self.assertTrue(boundary is not None)
|
||||
|
||||
got_byteranges = self._parse_multipart(headers['Content-Type'],
|
||||
gotten_obj)
|
||||
self.assertEqual(len(got_byteranges), 2)
|
||||
first_byterange, second_byterange = got_byteranges
|
||||
|
||||
self.assertEqual(first_byterange['Content-Range'],
|
||||
'bytes 0-100/14513')
|
||||
self.assertEqual(first_byterange.get_payload(), self.obj[:101])
|
||||
|
||||
self.assertEqual(second_byterange['Content-Range'],
|
||||
'bytes 4090-5010/14513')
|
||||
self.assertEqual(second_byterange.get_payload(), self.obj[4090:5011])
|
||||
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', True, object_ring=FakeRing(base_port=3000)),
|
||||
|
Loading…
Reference in New Issue
Block a user