Move all SLO functionality to middleware

This way, with zero additional effort, SLO will support enhancements
to object storage and retrieval, such as:
 * automatic resume of GETs on broken connection (today)
 * storage policies (in the near future)
 * erasure-coded object segments (in the far future)

This also lets SLOs work with other sorts of hypothetical third-party
middleware, for example object compression or encryption.

Getting COPY to work here is sort of a hack; the proxy's object
controller now checks for "swift.copy_response_hook" in the request's
environment and feeds the GET response (the source of the new object's
data) through it. This lets a COPY of a SLO manifest actually combine
the segments instead of merely copying the manifest document.

Updated ObjectController to expect a response's app_iter to be an
iterable, not just an iterator. (PEP 333 says "When called by the
server, the application object must return an iterable yielding zero
or more strings." ObjectController was just being too strict.) This
way, SLO can re-use the same response-generation logic for GET and
COPY requests.

Added a (sort of hokey) mechanism to allow middlewares to close
incompletely-consumed app iterators without triggering a warning. SLO
does this when it realizes it's performed a ranged GET on a manifest;
it closes the iterable, removes the range, and retries the
request. Without this change, the proxy logs would get 'Client
disconnected on read' in them.

DocImpact

blueprint multi-ring-large-objects

Change-Id: Ic11662eb5c7176fbf422a6fc87a569928d6f85a1
This commit is contained in:
Samuel Merritt 2013-11-13 12:06:55 -08:00 committed by Darrell Bishop
parent 8b40f199eb
commit 1901719542
10 changed files with 1252 additions and 862 deletions

View File

@ -151,11 +151,6 @@ use = egg:swift#proxy
# the number of seconds configured by timing_expiry. # the number of seconds configured by timing_expiry.
# timing_expiry = 300 # timing_expiry = 300
# #
# If set to false will treat objects with X-Static-Large-Object header set
# as a regular object on GETs, i.e. will return that object's contents. Should
# be set to false if slo is not used in pipeline.
# allow_static_large_object = true
#
# The maximum time (seconds) that a large object connection is allowed to last. # The maximum time (seconds) that a large object connection is allowed to last.
# max_large_object_get_time = 86400 # max_large_object_get_time = 86400
# #
@ -514,6 +509,14 @@ use = egg:swift#slo
# max_manifest_segments = 1000 # max_manifest_segments = 1000
# max_manifest_size = 2097152 # max_manifest_size = 2097152
# min_segment_size = 1048576 # min_segment_size = 1048576
# Start rate-limiting SLO segment serving after the Nth segment of a
# segmented object.
# rate_limit_after_segment = 10
#
# Once segment rate-limiting kicks in for an object, limit segments served
# to N per second. 0 means no rate-limiting.
# rate_limit_segments_per_sec = 0
#
[filter:account-quotas] [filter:account-quotas]
use = egg:swift#account_quotas use = egg:swift#account_quotas

View File

@ -134,19 +134,24 @@ the manifest and the segments it's referring to) in the container and account
metadata which can be used for stats purposes. metadata which can be used for stats purposes.
""" """
from contextlib import contextmanager
from time import time
from urllib import quote from urllib import quote
from cStringIO import StringIO from cStringIO import StringIO
from datetime import datetime from datetime import datetime
from sys import exc_info
import mimetypes import mimetypes
from hashlib import md5 from hashlib import md5
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \ HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \ HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \
HTTPUnauthorized HTTPUnauthorized, HTTPRequestedRangeNotSatisfiable, Response
from swift.common.utils import (json, get_logger, config_true_value, from swift.common.utils import json, get_logger, config_true_value, \
register_swift_info) get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
from swift.common.wsgi import WSGIContext from swift.common.wsgi import WSGIContext
from swift.common.middleware.bulk import get_response_body, \ from swift.common.middleware.bulk import get_response_body, \
ACCEPTABLE_FORMATS, Bulk ACCEPTABLE_FORMATS, Bulk
@ -175,10 +180,9 @@ def parse_input(raw_data):
return parsed_data return parsed_data
class SloContext(WSGIContext): class SloPutContext(WSGIContext):
def __init__(self, slo, slo_etag): def __init__(self, slo, slo_etag):
WSGIContext.__init__(self, slo.app) super(SloPutContext, self).__init__(slo.app)
self.slo_etag = '"' + slo_etag.hexdigest() + '"' self.slo_etag = '"' + slo_etag.hexdigest() + '"'
def handle_slo_put(self, req, start_response): def handle_slo_put(self, req, start_response):
@ -195,6 +199,369 @@ class SloContext(WSGIContext):
return app_resp return app_resp
def close_if_possible(maybe_closable):
close_method = getattr(maybe_closable, 'close', None)
if callable(close_method):
return close_method()
@contextmanager
def closing_if_possible(maybe_closable):
"""
Like contextlib.closing(), but doesn't crash if the object lacks a close()
method.
PEP 333 (WSGI) says: "If the iterable returned by the application has a
close() method, the server or gateway must call that method upon
completion of the current request[.]" This function makes that easier.
"""
yield maybe_closable
close_if_possible(maybe_closable)
class SloIterable(object):
"""
Iterable that returns the object contents for a large object.
:param req: original request object
:param app: WSGI application from which segments will come
:param listing_iter: iterable yielding the object segments to fetch,
along with the byte subranges to fetch, in the
form of a tuple (object-path, first-byte, last-byte)
or (object-path, None, None) to fetch the whole thing.
:param max_get_time: maximum permitted duration of a GET request (seconds)
:param logger: logger object
:param ua_suffix: string to append to user-agent.
:param name: name of manifest (used in logging only)
"""
def __init__(self, req, app, listing_iter, max_get_time,
logger, ua_suffix, name='<not specified>'):
self.req = req
self.app = app
self.listing_iter = listing_iter
self.max_get_time = max_get_time
self.logger = logger
self.ua_suffix = ua_suffix
self.name = name
def app_iter_range(self, *a, **kw):
"""
swob.Response will only respond with a 206 status in certain cases; one
of those is if the body iterator responds to .app_iter_range().
However, this object (or really, its listing iter) is smart enough to
handle the range stuff internally, so we just no-op this out to fool
swob.Response.
"""
return self
def __iter__(self):
start_time = time()
have_yielded_data = False
try:
for seg_path, seg_etag, seg_size, first_byte, last_byte \
in self.listing_iter:
if time() - start_time > self.max_get_time:
raise SegmentError(
'ERROR: While processing manifest %s, '
'max LO GET time of %ds exceeded' %
(self.name, self.max_get_time))
seg_req = self.req.copy_get()
seg_req.range = None
seg_req.environ['PATH_INFO'] = seg_path
seg_req.user_agent = "%s %s" % (seg_req.user_agent,
self.ua_suffix)
if first_byte is not None or last_byte is not None:
seg_req.headers['Range'] = "bytes=%s-%s" % (
# The 0 is to avoid having a range like "bytes=-10",
# which actually means the *last* 10 bytes.
'0' if first_byte is None else first_byte,
'' if last_byte is None else last_byte)
seg_resp = seg_req.get_response(self.app)
if not is_success(seg_resp.status_int):
close_if_possible(seg_resp.app_iter)
raise SegmentError(
'ERROR: While processing manifest %s, '
'got %d while retrieving %s' %
(self.name, seg_resp.status_int, seg_path))
elif ((seg_resp.etag != seg_etag) or
(seg_resp.content_length != seg_size and
not seg_req.range)):
# The content-length check is for security reasons. Seems
# possible that an attacker could upload a >1mb object and
# then replace it with a much smaller object with same
# etag. Then create a big nested SLO that calls that
# object many times which would hammer our obj servers. If
# this is a range request, don't check content-length
# because it won't match.
close_if_possible(seg_resp.app_iter)
raise SegmentError(
'Object segment no longer valid: '
'%(path)s etag: %(r_etag)s != %(s_etag)s or '
'%(r_size)s != %(s_size)s.' %
{'path': seg_req.path, 'r_etag': seg_resp.etag,
'r_size': seg_resp.content_length,
's_etag': seg_etag,
's_size': seg_size})
with closing_if_possible(seg_resp.app_iter):
for chunk in seg_resp.app_iter:
yield chunk
have_yielded_data = True
except ListingIterError as ex:
# I have to save this error because yielding the ' ' below clears
# the exception from the current stack frame.
err = exc_info()
self.logger.error('ERROR: While processing manifest %s, %s',
self.name, ex)
# Normally, exceptions before any data has been yielded will
# cause Eventlet to send a 5xx response. In this particular
# case of ListingIterError we don't want that and we'd rather
# just send the normal 2xx response and then hang up early
# since 5xx codes are often used to judge Service Level
# Agreements and this ListingIterError indicates the user has
# created an invalid condition.
if not have_yielded_data:
yield ' '
raise err
except SegmentError:
self.logger.exception("Error getting segment")
raise
class SloGetContext(WSGIContext):
max_slo_recursion_depth = 10
def __init__(self, slo):
self.slo = slo
super(SloGetContext, self).__init__(slo.app)
def _fetch_sub_slo_segments(self, req, version, acc, con, obj):
"""
Fetch the submanifest, parse it, and return it.
Raise exception on failures.
"""
sub_req = req.copy_get()
sub_req.range = None
sub_req.environ['PATH_INFO'] = '/'.join(['', version, acc, con, obj])
sub_req.user_agent = "%s SLO MultipartGET" % sub_req.user_agent
sub_resp = sub_req.get_response(self.slo.app)
if not is_success(sub_resp.status_int):
raise ListingIterError(
'ERROR: while fetching %s, GET of submanifest %s '
'failed with status %d' % (req.path, sub_req.path,
sub_resp.status_int))
try:
with closing_if_possible(sub_resp.app_iter):
return json.loads(''.join(sub_resp.app_iter))
except ValueError as err:
raise ListingIterError(
'ERROR: while fetching %s, JSON-decoding of submanifest %s '
'failed with %s' % (req.path, sub_req.path, err))
def _segment_listing_iterator(self, req, version, account, segments,
first_byte=None, last_byte=None,
recursion_depth=1):
for seg_dict in segments:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(seg_dict,
logger=self.slo.logger)
# We handle the range stuff here so that we can be smart about
# skipping unused submanifests. For example, if our first segment is a
# submanifest referencing 50 MiB total, but first_byte falls in the
# 51st MiB, then we can avoid fetching the first submanifest.
#
# If we were to let SloIterable handle all the range calculations, we
# would be unable to make this optimization.
total_length = sum(int(seg['bytes']) for seg in segments)
if first_byte is None:
first_byte = 0
if last_byte is None:
last_byte = total_length - 1
for seg_dict in segments:
seg_length = int(seg_dict['bytes'])
if first_byte >= seg_length:
# don't need any bytes from this segment
first_byte = max(first_byte - seg_length, -1)
last_byte = max(last_byte - seg_length, -1)
continue
if last_byte < 0:
# no bytes are needed from this or any future segment
break
if config_true_value(seg_dict.get('sub_slo')):
# do this check here so that we can avoid fetching this last
# manifest before raising the exception
if recursion_depth >= self.max_slo_recursion_depth:
raise ListingIterError("Max recursion depth exceeded")
sub_path = get_valid_utf8_str(seg_dict['name'])
sub_cont, sub_obj = split_path(sub_path, 2, 2, True)
sub_segments = self._fetch_sub_slo_segments(
req, version, account, sub_cont, sub_obj)
for sub_seg_dict, sb, eb in self._segment_listing_iterator(
req, version, account, sub_segments,
first_byte=first_byte, last_byte=last_byte,
recursion_depth=recursion_depth + 1):
sub_seg_length = int(sub_seg_dict['bytes'])
first_byte = max(first_byte - sub_seg_length, -1)
last_byte = max(last_byte - sub_seg_length, -1)
yield sub_seg_dict, sb, eb
else:
if isinstance(seg_dict['name'], unicode):
seg_dict['name'] = seg_dict['name'].encode("utf-8")
seg_length = int(seg_dict['bytes'])
yield (seg_dict,
(None if first_byte <= 0 else first_byte),
(None if last_byte >= seg_length - 1 else last_byte))
first_byte = max(first_byte - seg_length, -1)
last_byte = max(last_byte - seg_length, -1)
def handle_slo_get_or_head(self, req, start_response):
"""
Takes a request and a start_response callable and does the normal WSGI
thing with them. Returns an iterator suitable for sending up the WSGI
chain.
:param req: swob.Request object; is a GET or HEAD request aimed at
what may be a static large object manifest (or may not).
:param start_response: WSGI start_response callable
"""
resp_iter = self._app_call(req.environ)
# make sure this response is for a static large object manifest
for header, value in self._response_headers:
if (header.lower() == 'x-static-large-object' and
config_true_value(value)):
break
else:
# Not a static large object manifest. Just pass it through.
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
# Handle pass-through request for the manifest itself
if req.params.get('multipart-manifest') == 'get':
new_headers = []
for header, value in self._response_headers:
if header.lower() == 'content-type':
new_headers.append(('Content-Type',
'application/json; charset=utf-8'))
else:
new_headers.append((header, value))
self._response_headers = new_headers
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
# Just because a response shows that an object is a SLO manifest does
# not mean that response's body contains the entire SLO manifest. If
# it doesn't, we need to make a second request to actually get the
# whole thing.
if req.method == 'HEAD' or req.range:
req.environ['swift.non_client_disconnect'] = True
close_if_possible(resp_iter)
del req.environ['swift.non_client_disconnect']
get_req = req.copy_get()
get_req.range = None
get_req.user_agent = "%s SLO MultipartGET" % get_req.user_agent
resp_iter = self._app_call(get_req.environ)
response = self.get_or_head_response(req, self._response_headers,
resp_iter)
return response(req.environ, start_response)
def get_or_head_response(self, req, resp_headers, resp_iter):
resp_body = ''.join(resp_iter)
try:
segments = json.loads(resp_body)
except ValueError:
segments = []
etag = md5()
content_length = 0
for seg_dict in segments:
etag.update(seg_dict['hash'])
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
content_length += int(seg_dict['bytes'])
response_headers = [(h, v) for h, v in resp_headers
if h.lower() not in ('etag', 'content-length')]
response_headers.append(('Content-Length', str(content_length)))
response_headers.append(('Etag', '"%s"' % etag.hexdigest()))
if req.method == 'HEAD':
return self._manifest_head_response(req, response_headers)
else:
return self._manifest_get_response(
req, content_length, response_headers, segments)
def _manifest_head_response(self, req, response_headers):
return HTTPOk(request=req, headers=response_headers, body='')
def _manifest_get_response(self, req, content_length, response_headers,
segments):
first_byte, last_byte = None, None
if req.range:
byteranges = req.range.ranges_for_length(content_length)
if len(byteranges) == 0:
return HTTPRequestedRangeNotSatisfiable(request=req)
elif len(byteranges) == 1:
first_byte, last_byte = byteranges[0]
# For some reason, swob.Range.ranges_for_length adds 1 to the
# last byte's position.
last_byte -= 1
else:
req.range = None
ver, account, _junk = req.split_path(3, 3, rest_with_last=True)
plain_listing_iter = self._segment_listing_iterator(
req, ver, account, segments, first_byte, last_byte)
ratelimited_listing_iter = RateLimitedIterator(
plain_listing_iter,
self.slo.rate_limit_segments_per_sec,
limit_after=self.slo.rate_limit_after_segment)
# self._segment_listing_iterator gives us 3-tuples of (segment dict,
# start byte, end byte), but SloIterable wants (obj path, etag, size,
# start byte, end byte), so we clean that up here
segment_listing_iter = (
("/{ver}/{acc}/{conobj}".format(
ver=ver, acc=account, conobj=seg_dict['name'].lstrip('/')),
seg_dict['hash'], int(seg_dict['bytes']),
start_byte, end_byte)
for seg_dict, start_byte, end_byte in ratelimited_listing_iter)
response = Response(request=req, content_length=content_length,
headers=response_headers,
conditional_response=True,
app_iter=SloIterable(
req, self.slo.app, segment_listing_iter,
name=req.path, logger=self.slo.logger,
ua_suffix="SLO MultipartGET",
max_get_time=self.slo.max_get_time))
if req.range:
response.headers.pop('Etag')
return response
class StaticLargeObject(object): class StaticLargeObject(object):
""" """
StaticLargeObject Middleware StaticLargeObject Middleware
@ -218,14 +585,42 @@ class StaticLargeObject(object):
1024 * 1024 * 2)) 1024 * 1024 * 2))
self.min_segment_size = int(self.conf.get('min_segment_size', self.min_segment_size = int(self.conf.get('min_segment_size',
1024 * 1024)) 1024 * 1024))
self.max_get_time = int(self.conf.get('max_get_time', 86400))
self.rate_limit_after_segment = int(self.conf.get(
'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(self.conf.get(
'rate_limit_segments_per_sec', '0'))
self.bulk_deleter = Bulk(app, {}) self.bulk_deleter = Bulk(app, {})
def handle_multipart_get_or_head(self, req, start_response):
"""
Handles the GET or HEAD of a SLO manifest.
The response body (only on GET, of course) will consist of the
concatenation of the segments.
:params req: a swob.Request with a path referencing an object
:raises: HttpException on errors
"""
return SloGetContext(self).handle_slo_get_or_head(req, start_response)
def copy_response_hook(self, inner_hook):
def slo_hook(req, resp):
if (config_true_value(resp.headers.get('X-Static-Large-Object'))
and req.params.get('multipart-manifest') != 'get'):
resp = SloGetContext(self).get_or_head_response(
req, resp.headers.items(), resp.app_iter)
return inner_hook(req, resp)
return slo_hook
def handle_multipart_put(self, req, start_response): def handle_multipart_put(self, req, start_response):
""" """
Will handle the PUT of a SLO manifest. Will handle the PUT of a SLO manifest.
Heads every object in manifest to check if is valid and if so will Heads every object in manifest to check if is valid and if so will
save a manifest generated from the user input. Uses WSGIContext to save a manifest generated from the user input. Uses WSGIContext to
call self.app and start_response and returns a WSGI iterator. call self and start_response and returns a WSGI iterator.
:params req: a swob.Request with an obj in path :params req: a swob.Request with an obj in path
:raises: HttpException on errors :raises: HttpException on errors
@ -239,7 +634,7 @@ class StaticLargeObject(object):
"Manifest File > %d bytes" % self.max_manifest_size) "Manifest File > %d bytes" % self.max_manifest_size)
if req.headers.get('X-Copy-From'): if req.headers.get('X-Copy-From'):
raise HTTPMethodNotAllowed( raise HTTPMethodNotAllowed(
'Multipart Manifest PUTs cannot be Copy requests') 'Multipart Manifest PUTs cannot be COPY requests')
if req.content_length is None and \ if req.content_length is None and \
req.headers.get('transfer-encoding', '').lower() != 'chunked': req.headers.get('transfer-encoding', '').lower() != 'chunked':
raise HTTPLengthRequired(request=req) raise HTTPLengthRequired(request=req)
@ -248,7 +643,8 @@ class StaticLargeObject(object):
if len(parsed_data) > self.max_manifest_segments: if len(parsed_data) > self.max_manifest_segments:
raise HTTPRequestEntityTooLarge( raise HTTPRequestEntityTooLarge(
'Number segments must be <= %d' % self.max_manifest_segments) 'Number of segments must be <= %d' %
self.max_manifest_segments)
total_size = 0 total_size = 0
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS) out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type: if not out_content_type:
@ -267,7 +663,7 @@ class StaticLargeObject(object):
if seg_size < self.min_segment_size and \ if seg_size < self.min_segment_size and \
(index == 0 or index < len(parsed_data) - 1): (index == 0 or index < len(parsed_data) - 1):
raise HTTPBadRequest( raise HTTPBadRequest(
'Each segment, except the last, must be larger than ' 'Each segment, except the last, must be at least '
'%d bytes.' % self.min_segment_size) '%d bytes.' % self.min_segment_size)
new_env = req.environ.copy() new_env = req.environ.copy()
@ -280,7 +676,7 @@ class StaticLargeObject(object):
new_env['HTTP_USER_AGENT'] = \ new_env['HTTP_USER_AGENT'] = \
'%s MultipartPUT' % req.environ.get('HTTP_USER_AGENT') '%s MultipartPUT' % req.environ.get('HTTP_USER_AGENT')
head_seg_resp = \ head_seg_resp = \
Request.blank(obj_path, new_env).get_response(self.app) Request.blank(obj_path, new_env).get_response(self)
if head_seg_resp.is_success: if head_seg_resp.is_success:
total_size += seg_size total_size += seg_size
if seg_size != head_seg_resp.content_length: if seg_size != head_seg_resp.content_length:
@ -319,15 +715,15 @@ class StaticLargeObject(object):
if not env.get('CONTENT_TYPE'): if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info) guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream' env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
env['swift.content_type_overriden'] = True env['swift.content_type_overridden'] = True
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
env['HTTP_X_STATIC_LARGE_OBJECT'] = 'True' env['HTTP_X_STATIC_LARGE_OBJECT'] = 'True'
json_data = json.dumps(data_for_storage) json_data = json.dumps(data_for_storage)
env['CONTENT_LENGTH'] = str(len(json_data)) env['CONTENT_LENGTH'] = str(len(json_data))
env['wsgi.input'] = StringIO(json_data) env['wsgi.input'] = StringIO(json_data)
slo_context = SloContext(self, slo_etag) slo_put_context = SloPutContext(self, slo_etag)
return slo_context.handle_slo_put(req, start_response) return slo_put_context.handle_slo_put(req, start_response)
def get_segments_to_delete_iter(self, req): def get_segments_to_delete_iter(self, req):
""" """
@ -342,10 +738,7 @@ class StaticLargeObject(object):
if not check_utf8(req.path_info): if not check_utf8(req.path_info):
raise HTTPPreconditionFailed( raise HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL') request=req, body='Invalid UTF8 or contains NULL')
try: vrs, account, container, obj = req.split_path(4, 4, True)
vrs, account, container, obj = req.split_path(4, 4, True)
except ValueError:
raise HTTPBadRequest('Invalid SLO manifiest path')
segments = [{ segments = [{
'sub_slo': True, 'sub_slo': True,
@ -391,9 +784,8 @@ class StaticLargeObject(object):
'%s MultipartDELETE' % new_env.get('HTTP_USER_AGENT') '%s MultipartDELETE' % new_env.get('HTTP_USER_AGENT')
new_env['swift.source'] = 'SLO' new_env['swift.source'] = 'SLO'
new_env['PATH_INFO'] = ( new_env['PATH_INFO'] = (
'/%s/%s/%s' % ( '/%s/%s/%s' % (vrs, account, obj_name.lstrip('/'))
vrs, account, ).encode('utf-8')
obj_name.lstrip('/'))).encode('utf-8')
resp = Request.blank('', new_env).get_response(self.app) resp = Request.blank('', new_env).get_response(self.app)
if resp.is_success: if resp.is_success:
@ -435,24 +827,29 @@ class StaticLargeObject(object):
""" """
req = Request(env) req = Request(env)
try: try:
vrs, account, container, obj = req.split_path(1, 4, True) vrs, account, container, obj = req.split_path(4, 4, True)
except ValueError: except ValueError:
return self.app(env, start_response) return self.app(env, start_response)
# install our COPY-callback hook
env['swift.copy_response_hook'] = self.copy_response_hook(
env.get('swift.copy_response_hook', lambda req, resp: resp))
try: try:
if obj: if req.method == 'PUT' and \
if req.method == 'PUT' and \ req.params.get('multipart-manifest') == 'put':
req.params.get('multipart-manifest') == 'put': return self.handle_multipart_put(req, start_response)
return self.handle_multipart_put(req, start_response) if req.method == 'DELETE' and \
if req.method == 'DELETE' and \ req.params.get('multipart-manifest') == 'delete':
req.params.get('multipart-manifest') == 'delete': return self.handle_multipart_delete(req)(env, start_response)
return self.handle_multipart_delete(req)(env, if req.method == 'GET' or req.method == 'HEAD':
start_response) return self.handle_multipart_get_or_head(req, start_response)
if 'X-Static-Large-Object' in req.headers: if 'X-Static-Large-Object' in req.headers:
raise HTTPBadRequest( raise HTTPBadRequest(
request=req, request=req,
body='X-Static-Large-Object is a reserved header. ' body='X-Static-Large-Object is a reserved header. '
'To create a static large object add query param ' 'To create a static large object add query param '
'multipart-manifest=put.') 'multipart-manifest=put.')
except HTTPException as err_resp: except HTTPException as err_resp:
return err_resp(env, start_response) return err_resp(env, start_response)

View File

@ -635,6 +635,34 @@ def validate_device_partition(device, partition):
raise ValueError('Invalid partition: %s' % quote(partition or '')) raise ValueError('Invalid partition: %s' % quote(partition or ''))
class RateLimitedIterator(object):
"""
Wrap an iterator to only yield elements at a rate of N per second.
:param iterable: iterable to wrap
:param elements_per_second: the rate at which to yield elements
:param limit_after: rate limiting kicks in only after yielding
this many elements; default is 0 (rate limit
immediately)
"""
def __init__(self, iterable, elements_per_second, limit_after=0):
self.iterator = iter(iterable)
self.elements_per_second = elements_per_second
self.limit_after = limit_after
self.running_time = 0
def __iter__(self):
return self
def next(self):
if self.limit_after > 0:
self.limit_after -= 1
else:
self.running_time = ratelimit_sleep(self.running_time,
self.elements_per_second)
return self.iterator.next()
class GreenthreadSafeIterator(object): class GreenthreadSafeIterator(object):
""" """
Wrap an iterator to ensure that only one greenthread is inside its next() Wrap an iterator to ensure that only one greenthread is inside its next()

View File

@ -626,12 +626,13 @@ class GetOrHeadHandler(object):
return True return True
return is_success(src.status) or is_redirection(src.status) return is_success(src.status) or is_redirection(src.status)
def _make_app_iter(self, node, source): def _make_app_iter(self, req, node, source):
""" """
Returns an iterator over the contents of the source (via its read Returns an iterator over the contents of the source (via its read
func). There is also quite a bit of cleanup to ensure garbage func). There is also quite a bit of cleanup to ensure garbage
collection works and the underlying socket of the source is closed. collection works and the underlying socket of the source is closed.
:param req: incoming request object
:param source: The httplib.Response object this iterator should read :param source: The httplib.Response object this iterator should read
from. from.
:param node: The node the source is reading from, for logging purposes. :param node: The node the source is reading from, for logging purposes.
@ -698,7 +699,8 @@ class GetOrHeadHandler(object):
self.app.client_timeout) self.app.client_timeout)
self.app.logger.increment('client_timeouts') self.app.logger.increment('client_timeouts')
except GeneratorExit: except GeneratorExit:
self.app.logger.warn(_('Client disconnected on read')) if not req.environ.get('swift.non_client_disconnect'):
self.app.logger.warn(_('Client disconnected on read'))
except Exception: except Exception:
self.app.logger.exception(_('Trying to send to client')) self.app.logger.exception(_('Trying to send to client'))
raise raise
@ -801,7 +803,7 @@ class GetOrHeadHandler(object):
res = Response(request=req) res = Response(request=req)
if req.method == 'GET' and \ if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
res.app_iter = self._make_app_iter(node, source) res.app_iter = self._make_app_iter(req, node, source)
# See NOTE: swift_conn at top of file about this. # See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn res.swift_conn = source.swift_conn
res.status = source.status res.status = source.status

View File

@ -41,8 +41,7 @@ from eventlet.timeout import Timeout
from swift.common.utils import ContextPool, normalize_timestamp, \ from swift.common.utils import ContextPool, normalize_timestamp, \
config_true_value, public, json, csv_append, GreenthreadSafeIterator, \ config_true_value, public, json, csv_append, GreenthreadSafeIterator, \
quorum_size, split_path, override_bytes_from_content_type, \ quorum_size, GreenAsyncPile
get_valid_utf8_str, GreenAsyncPile
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \ from swift.common.constraints import check_metadata, check_object_creation, \
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
@ -52,13 +51,13 @@ from swift.common.exceptions import ChunkReadTimeout, \
from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \ from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \
HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_CONFLICT, \ HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \ HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
HTTP_INSUFFICIENT_STORAGE, HTTP_OK HTTP_INSUFFICIENT_STORAGE
from swift.proxy.controllers.base import Controller, delay_denial, \ from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, Response, \ HTTPServerError, HTTPServiceUnavailable, Request, Response, \
HTTPClientDisconnect, HTTPNotImplemented, HTTPException HTTPClientDisconnect, HTTPNotImplemented
from swift.common.request_helpers import is_user_meta from swift.common.request_helpers import is_user_meta
@ -84,7 +83,7 @@ def copy_headers_into(from_r, to_r):
def check_content_type(req): def check_content_type(req):
if not req.environ.get('swift.content_type_overriden') and \ if not req.environ.get('swift.content_type_overridden') and \
';' in req.headers.get('content-type', ''): ';' in req.headers.get('content-type', ''):
for param in req.headers['content-type'].split(';')[1:]: for param in req.headers['content-type'].split(';')[1:]:
if param.lstrip().startswith('swift_'): if param.lstrip().startswith('swift_'):
@ -110,18 +109,15 @@ class SegmentedIterable(object):
'bytes' keys. 'bytes' keys.
:param response: The swob.Response this iterable is associated with, if :param response: The swob.Response this iterable is associated with, if
any (default: None) any (default: None)
:param is_slo: A boolean, defaults to False, as to whether this references
a SLO object.
:param max_lo_time: Defaults to 86400. The connection for the :param max_lo_time: Defaults to 86400. The connection for the
SegmentedIterable will drop after that many seconds. SegmentedIterable will drop after that many seconds.
""" """
def __init__(self, controller, container, listing, response=None, def __init__(self, controller, container, listing, response=None,
is_slo=False, max_lo_time=86400): max_lo_time=86400):
self.controller = controller self.controller = controller
self.container = container self.container = container
self.listing = segment_listing_iter(listing) self.listing = segment_listing_iter(listing)
self.is_slo = is_slo
self.max_lo_time = max_lo_time self.max_lo_time = max_lo_time
self.ratelimit_index = 0 self.ratelimit_index = 0
self.segment_dict = None self.segment_dict = None
@ -143,8 +139,7 @@ class SegmentedIterable(object):
""" """
Loads the self.segment_iter with the next object segment's contents. Loads the self.segment_iter with the next object segment's contents.
:raises: StopIteration when there are no more object segments or :raises: StopIteration when there are no more object segments
segment no longer matches SLO manifest specifications.
""" """
try: try:
self.ratelimit_index += 1 self.ratelimit_index += 1
@ -176,7 +171,7 @@ class SegmentedIterable(object):
if self.seek or range_tail: if self.seek or range_tail:
req.range = 'bytes=%s-%s' % (self.seek, range_tail) req.range = 'bytes=%s-%s' % (self.seek, range_tail)
self.seek = 0 self.seek = 0
if not self.is_slo and self.ratelimit_index > \ if self.ratelimit_index > \
self.controller.app.rate_limit_after_segment: self.controller.app.rate_limit_after_segment:
sleep(max(self.next_get_time - time.time(), 0)) sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + \ self.next_get_time = time.time() + \
@ -184,33 +179,10 @@ class SegmentedIterable(object):
resp = self.controller.GETorHEAD_base( resp = self.controller.GETorHEAD_base(
req, _('Object'), self.controller.app.object_ring, partition, req, _('Object'), self.controller.app.object_ring, partition,
path) path)
if self.is_slo and resp.status_int == HTTP_NOT_FOUND:
raise SegmentError(_(
'Could not load object segment %(path)s:'
' %(status)s') % {'path': path, 'status': resp.status_int})
if not is_success(resp.status_int): if not is_success(resp.status_int):
raise Exception(_( raise Exception(_(
'Could not load object segment %(path)s:' 'Could not load object segment %(path)s:'
' %(status)s') % {'path': path, 'status': resp.status_int}) ' %(status)s') % {'path': path, 'status': resp.status_int})
if self.is_slo:
if (resp.etag != self.segment_dict['hash'] or
(resp.content_length != self.segment_dict['bytes'] and
not req.range)):
# The content-length check is for security reasons. Seems
# possible that an attacker could upload a >1mb object and
# then replace it with a much smaller object with same
# etag. Then create a big nested SLO that calls that
# object many times which would hammer our obj servers. If
# this is a range request, don't check content-length
# because it won't match.
raise SegmentError(_(
'Object segment no longer valid: '
'%(path)s etag: %(r_etag)s != %(s_etag)s or '
'%(r_size)s != %(s_size)s.') %
{'path': path, 'r_etag': resp.etag,
'r_size': resp.content_length,
's_etag': self.segment_dict['hash'],
's_size': self.segment_dict['bytes']})
self.segment_iter = resp.app_iter self.segment_iter = resp.app_iter
# See NOTE: swift_conn at top of file about this. # See NOTE: swift_conn at top of file about this.
self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None) self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None)
@ -355,7 +327,6 @@ class SegmentedIterable(object):
class ObjectController(Controller): class ObjectController(Controller):
"""WSGI controller for object requests.""" """WSGI controller for object requests."""
server_type = 'Object' server_type = 'Object'
max_slo_recusion_depth = 10
def __init__(self, app, account_name, container_name, object_name, def __init__(self, app, account_name, container_name, object_name,
**kwargs): **kwargs):
@ -363,7 +334,6 @@ class ObjectController(Controller):
self.account_name = unquote(account_name) self.account_name = unquote(account_name)
self.container_name = unquote(container_name) self.container_name = unquote(container_name)
self.object_name = unquote(object_name) self.object_name = unquote(object_name)
self.slo_recursion_depth = 0
def _listing_iter(self, lcontainer, lprefix, env): def _listing_iter(self, lcontainer, lprefix, env):
for page in self._listing_pages_iter(lcontainer, lprefix, env): for page in self._listing_pages_iter(lcontainer, lprefix, env):
@ -403,68 +373,6 @@ class ObjectController(Controller):
marker = sublisting[-1]['name'].encode('utf-8') marker = sublisting[-1]['name'].encode('utf-8')
yield sublisting yield sublisting
def _slo_listing_obj_iter(self, incoming_req, account, container, obj,
partition=None, initial_resp=None):
"""
The initial_resp indicated that this is a SLO manifest file. This will
create an iterable that will expand nested SLOs as it walks though the
listing.
:params incoming_req: The original GET request from client
:params initial_resp: the first resp from the above request
"""
if initial_resp and initial_resp.status_int == HTTP_OK and \
incoming_req.method == 'GET' and not incoming_req.range:
valid_resp = initial_resp
else:
new_req = incoming_req.copy_get()
new_req.method = 'GET'
new_req.range = None
new_req.path_info = '/'.join(['/v1', account, container, obj])
if partition is None:
try:
partition = self.app.object_ring.get_part(
account, container, obj)
except ValueError:
raise HTTPException(
"Invalid path to whole SLO manifest: %s" %
new_req.path)
valid_resp = self.GETorHEAD_base(
new_req, _('Object'), self.app.object_ring, partition,
new_req.swift_entity_path)
if 'swift.authorize' in incoming_req.environ:
incoming_req.acl = valid_resp.headers.get('x-container-read')
auth_resp = incoming_req.environ['swift.authorize'](incoming_req)
if auth_resp:
raise ListingIterNotAuthorized(auth_resp)
if valid_resp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound()
elif not is_success(valid_resp.status_int):
raise ListingIterError()
try:
listing = json.loads(valid_resp.body)
except ValueError:
listing = []
for seg_dict in listing:
if config_true_value(seg_dict.get('sub_slo')):
if incoming_req.method == 'HEAD':
override_bytes_from_content_type(seg_dict,
logger=self.app.logger)
yield seg_dict
continue
sub_path = get_valid_utf8_str(seg_dict['name'])
sub_cont, sub_obj = split_path(sub_path, 2, 2, True)
self.slo_recursion_depth += 1
if self.slo_recursion_depth >= self.max_slo_recusion_depth:
raise ListingIterError("Max recursion depth exceeded")
for sub_seg_dict in self._slo_listing_obj_iter(
incoming_req, account, sub_cont, sub_obj):
yield sub_seg_dict
self.slo_recursion_depth -= 1
else:
yield seg_dict
def _remaining_items(self, listing_iter): def _remaining_items(self, listing_iter):
""" """
Returns an item-by-item iterator for a page-by-page iterator Returns an item-by-item iterator for a page-by-page iterator
@ -549,41 +457,6 @@ class ObjectController(Controller):
resp.content_type = content_type resp.content_type = content_type
large_object = None large_object = None
if config_true_value(resp.headers.get('x-static-large-object')) and \
req.params.get('multipart-manifest') == 'get' and \
'X-Copy-From' not in req.headers and \
self.app.allow_static_large_object:
resp.content_type = 'application/json'
resp.charset = 'utf-8'
if config_true_value(resp.headers.get('x-static-large-object')) and \
req.params.get('multipart-manifest') != 'get' and \
self.app.allow_static_large_object:
large_object = 'SLO'
lcontainer = None # container name is included in listing
try:
seg_iter = iter(self._slo_listing_obj_iter(
req, self.account_name, self.container_name,
self.object_name, partition=partition, initial_resp=resp))
listing_page1 = []
for seg in seg_iter:
listing_page1.append(seg)
if len(listing_page1) >= CONTAINER_LISTING_LIMIT:
break
listing = itertools.chain(listing_page1,
self._remaining_items(seg_iter))
except ListingIterNotFound:
return HTTPNotFound(request=req)
except ListingIterNotAuthorized, err:
return err.aresp
except ListingIterError:
return HTTPServerError(request=req)
except StopIteration:
listing_page1 = listing = ()
except HTTPException:
return HTTPServiceUnavailable(
"Unable to load SLO manifest", request=req)
if 'x-object-manifest' in resp.headers and \ if 'x-object-manifest' in resp.headers and \
req.params.get('multipart-manifest') != 'get': req.params.get('multipart-manifest') != 'get':
large_object = 'DLO' large_object = 'DLO'
@ -612,7 +485,6 @@ class ObjectController(Controller):
conditional_response=True) conditional_response=True)
resp.app_iter = SegmentedIterable( resp.app_iter = SegmentedIterable(
self, lcontainer, listing, resp, self, lcontainer, listing, resp,
is_slo=(large_object == 'SLO'),
max_lo_time=self.app.max_large_object_get_time) max_lo_time=self.app.max_large_object_get_time)
else: else:
# For objects with a reasonable number of segments, we'll serve # For objects with a reasonable number of segments, we'll serve
@ -639,7 +511,6 @@ class ObjectController(Controller):
conditional_response=True) conditional_response=True)
resp.app_iter = SegmentedIterable( resp.app_iter = SegmentedIterable(
self, lcontainer, listing, resp, self, lcontainer, listing, resp,
is_slo=(large_object == 'SLO'),
max_lo_time=self.app.max_large_object_get_time) max_lo_time=self.app.max_large_object_get_time)
resp.content_length = content_length resp.content_length = content_length
resp.last_modified = last_modified resp.last_modified = last_modified
@ -1014,14 +885,20 @@ class ObjectController(Controller):
orig_container_name = self.container_name orig_container_name = self.container_name
self.object_name = src_obj_name self.object_name = src_obj_name
self.container_name = src_container_name self.container_name = src_container_name
source_resp = self.GET(source_req) # This gives middlewares a way to change the source; for example,
# this lets you COPY a SLO manifest and have the new object be the
# concatenation of the segments (like what a GET request gives
# the client), not a copy of the manifest file.
source_resp = req.environ.get(
'swift.copy_response_hook',
lambda req, resp: resp)(source_req, self.GET(source_req))
if source_resp.status_int >= HTTP_MULTIPLE_CHOICES: if source_resp.status_int >= HTTP_MULTIPLE_CHOICES:
return source_resp return source_resp
self.object_name = orig_obj_name self.object_name = orig_obj_name
self.container_name = orig_container_name self.container_name = orig_container_name
new_req = Request.blank(req.path_info, new_req = Request.blank(req.path_info,
environ=req.environ, headers=req.headers) environ=req.environ, headers=req.headers)
data_source = source_resp.app_iter data_source = iter(source_resp.app_iter)
new_req.content_length = source_resp.content_length new_req.content_length = source_resp.content_length
if new_req.content_length is None: if new_req.content_length is None:
# This indicates a transfer-encoding: chunked source object, # This indicates a transfer-encoding: chunked source object,

View File

@ -133,8 +133,6 @@ class Application(object):
self.node_timings = {} self.node_timings = {}
self.timing_expiry = int(conf.get('timing_expiry', 300)) self.timing_expiry = int(conf.get('timing_expiry', 300))
self.sorting_method = conf.get('sorting_method', 'shuffle').lower() self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
self.allow_static_large_object = config_true_value(
conf.get('allow_static_large_object', 'true'))
self.max_large_object_get_time = float( self.max_large_object_get_time = float(
conf.get('max_large_object_get_time', '86400')) conf.get('max_large_object_get_time', '86400'))
value = conf.get('request_node_count', '2 * replicas').lower().split() value = conf.get('request_node_count', '2 * replicas').lower().split()

View File

@ -1968,16 +1968,34 @@ class TestSlo(Base):
def test_slo_copy_the_manifest(self): def test_slo_copy_the_manifest(self):
file_item = self.env.container.file("manifest-abcde") file_item = self.env.container.file("manifest-abcde")
file_item.copy(self.env.container.name, "copied-abcde", file_item.copy(self.env.container.name, "copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'}) parms={'multipart-manifest': 'get'})
copied = self.env.container.file("copied-abcde") copied = self.env.container.file("copied-abcde-manifest-only")
copied_contents = copied.read(parms={'multipart-manifest': 'get'}) copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try: try:
json.loads(copied_contents) json.loads(copied_contents)
except ValueError: except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)") self.fail("COPY didn't copy the manifest (invalid json on GET)")
def test_slo_get_the_manifest(self):
manifest = self.env.container.file("manifest-abcde")
got_body = manifest.read(parms={'multipart-manifest': 'get'})
self.assertEqual('application/json; charset=utf-8',
manifest.content_type)
try:
json.loads(got_body)
except ValueError:
self.fail("GET with multipart-manifest=get got invalid json")
def test_slo_head_the_manifest(self):
manifest = self.env.container.file("manifest-abcde")
got_info = manifest.info(parms={'multipart-manifest': 'get'})
self.assertEqual('application/json; charset=utf-8',
got_info['content_type'])
class TestSloUTF8(Base2, TestSlo): class TestSloUTF8(Base2, TestSlo):
set_up = False set_up = False

View File

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2013 OpenStack Foundation # Copyright (c) 2013 OpenStack Foundation
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
@ -13,14 +14,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import time
import unittest import unittest
from copy import deepcopy from copy import deepcopy
from mock import patch from mock import patch
from hashlib import md5 from hashlib import md5
from swift.common import swob from swift.common import swob
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.middleware import slo from swift.common.middleware import slo
from swift.common.utils import json, split_path from swift.common.utils import json, split_path
from swift.common.swob import Request, HTTPException from swift.common.swob import Request, Response, HTTPException
test_xml_data = '''<?xml version="1.0" encoding="UTF-8"?> test_xml_data = '''<?xml version="1.0" encoding="UTF-8"?>
@ -43,7 +46,7 @@ def fake_start_response(*args, **kwargs):
class FakeSwift(object): class FakeSwift(object):
def __init__(self): def __init__(self):
self.calls = [] self._calls = []
self.req_method_paths = [] self.req_method_paths = []
self.uploaded = {} self.uploaded = {}
# mapping of (method, path) --> (response class, headers, body) # mapping of (method, path) --> (response class, headers, body)
@ -55,13 +58,18 @@ class FakeSwift(object):
_, acc, cont, obj = split_path(env['PATH_INFO'], 0, 4, _, acc, cont, obj = split_path(env['PATH_INFO'], 0, 4,
rest_with_last=True) rest_with_last=True)
self.calls.append((method, path)) headers = swob.Request(env).headers
self._calls.append((method, path, headers))
try: try:
resp_class, raw_headers, body = self._responses[(method, path)] resp_class, raw_headers, body = self._responses[(method, path)]
headers = swob.HeaderKeyDict(raw_headers) headers = swob.HeaderKeyDict(raw_headers)
except KeyError: except KeyError:
if method == 'GET' and obj and path in self.uploaded: if method == 'HEAD' and ('GET', path) in self._responses:
resp_class, raw_headers, _ = self._responses[('GET', path)]
body = None
headers = swob.HeaderKeyDict(raw_headers)
elif method == 'GET' and obj and path in self.uploaded:
resp_class = swob.HTTPOk resp_class = swob.HTTPOk
headers, body = self.uploaded[path] headers, body = self.uploaded[path]
else: else:
@ -79,11 +87,23 @@ class FakeSwift(object):
if "CONTENT_TYPE" in env: if "CONTENT_TYPE" in env:
self.uploaded[path][0]['Content-Type'] = env["CONTENT_TYPE"] self.uploaded[path][0]['Content-Type'] = env["CONTENT_TYPE"]
return resp_class(headers=headers, body=body)(env, start_response) req = swob.Request(env)
# range requests ought to work, hence conditional_response=True
resp = resp_class(req=req, headers=headers, body=body,
conditional_response=True)
return resp(env, start_response)
@property
def calls(self):
return [(method, path) for method, path, headers in self._calls]
@property
def calls_with_headers(self):
return self._calls
@property @property
def call_count(self): def call_count(self):
return len(self.calls) return len(self._calls)
def register(self, method, path, response_class, headers, body): def register(self, method, path, response_class, headers, body):
self._responses[(method, path)] = (response_class, headers, body) self._responses[(method, path)] = (response_class, headers, body)
@ -95,10 +115,12 @@ class SloTestCase(unittest.TestCase):
self.slo = slo.filter_factory({})(self.app) self.slo = slo.filter_factory({})(self.app)
self.slo.min_segment_size = 1 self.slo.min_segment_size = 1
def call_app(self, req, app=None): def call_app(self, req, app=None, expect_exception=False):
if app is None: if app is None:
app = self.app app = self.app
req.headers.setdefault("User-Agent", "Mozzarella Foxfire")
status = [None] status = [None]
headers = [None] headers = [None]
@ -106,11 +128,25 @@ class SloTestCase(unittest.TestCase):
status[0] = s status[0] = s
headers[0] = h headers[0] = h
body = ''.join(app(req.environ, start_response)) body_iter = app(req.environ, start_response)
return status[0], headers[0], body body = ''
caught_exc = None
try:
for chunk in body_iter:
body += chunk
except Exception as exc:
if expect_exception:
caught_exc = exc
else:
raise
def call_slo(self, req): if expect_exception:
return self.call_app(req, app=self.slo) return status[0], headers[0], body, caught_exc
else:
return status[0], headers[0], body
def call_slo(self, req, **kwargs):
return self.call_app(req, app=self.slo, **kwargs)
class TestSloMiddleware(SloTestCase): class TestSloMiddleware(SloTestCase):
@ -131,10 +167,11 @@ class TestSloMiddleware(SloTestCase):
def test_slo_header_assigned(self): def test_slo_header_assigned(self):
req = Request.blank( req = Request.blank(
'/v1/a/c/o', headers={'x-static-large-object': "true"}) '/v1/a/c/o', headers={'x-static-large-object': "true"},
resp = self.slo(req.environ, fake_start_response) environ={'REQUEST_METHOD': 'PUT'})
resp = ''.join(self.slo(req.environ, fake_start_response))
self.assert_( self.assert_(
resp[0].startswith('X-Static-Large-Object is a reserved header')) resp.startswith('X-Static-Large-Object is a reserved header'))
def test_parse_input(self): def test_parse_input(self):
self.assertRaises(HTTPException, slo.parse_input, 'some non json') self.assertRaises(HTTPException, slo.parse_input, 'some non json')
@ -190,7 +227,7 @@ class TestSloPutManifest(SloTestCase):
'Last-Modified': 'Fri, 01 Feb 2012 20:38:36 GMT'}, 'Last-Modified': 'Fri, 01 Feb 2012 20:38:36 GMT'},
None) None)
self.app.register( self.app.register(
'HEAD', '/v1/AUTH_test/checktest/slob', 'GET', '/v1/AUTH_test/checktest/slob',
swob.HTTPOk, swob.HTTPOk,
{'X-Static-Large-Object': 'true', 'Etag': 'slob-etag'}, {'X-Static-Large-Object': 'true', 'Etag': 'slob-etag'},
None) None)
@ -361,7 +398,7 @@ class TestSloPutManifest(SloTestCase):
body=bad_data) body=bad_data)
status, headers, body = self.call_slo(req) status, headers, body = self.call_slo(req)
self.assertEquals(self.app.call_count, 4) self.assertEquals(self.app.call_count, 5)
errors = json.loads(body)['Errors'] errors = json.loads(body)['Errors']
self.assertEquals(len(errors), 5) self.assertEquals(len(errors), 5)
self.assertEquals(errors[0][0], '/checktest/a_1') self.assertEquals(errors[0][0], '/checktest/a_1')
@ -490,12 +527,23 @@ class TestSloDeleteManifest(SloTestCase):
self.slo(req.environ, fake_start_response) self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.call_count, 1) self.assertEquals(self.app.call_count, 1)
def test_handle_multipart_delete_bad_utf8(self):
req = Request.blank(
'/v1/AUTH_test/deltest/man\xff\xfe?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
self.assertEquals(status, '200 OK')
resp_data = json.loads(body)
self.assertEquals(resp_data['Response Status'],
'412 Precondition Failed')
def test_handle_multipart_delete_whole_404(self): def test_handle_multipart_delete_whole_404(self):
req = Request.blank( req = Request.blank(
'/v1/AUTH_test/deltest/man_404?multipart-manifest=delete', '/v1/AUTH_test/deltest/man_404?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'}) 'HTTP_ACCEPT': 'application/json'})
status, response, body = self.call_slo(req) status, headers, body = self.call_slo(req)
resp_data = json.loads(body) resp_data = json.loads(body)
self.assertEquals(self.app.calls, self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/man_404')]) [('GET', '/v1/AUTH_test/deltest/man_404')])
@ -510,7 +558,7 @@ class TestSloDeleteManifest(SloTestCase):
'/v1/AUTH_test/deltest/man?multipart-manifest=delete', '/v1/AUTH_test/deltest/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'}) 'HTTP_ACCEPT': 'application/json'})
status, response, body = self.call_slo(req) status, headers, body = self.call_slo(req)
resp_data = json.loads(body) resp_data = json.loads(body)
self.assertEquals(self.app.calls, self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/man'), [('GET', '/v1/AUTH_test/deltest/man'),
@ -550,13 +598,27 @@ class TestSloDeleteManifest(SloTestCase):
('DELETE', '/v1/AUTH_test/deltest/' + ('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest')])) 'manifest-with-submanifest')]))
def test_handle_multipart_delete_nested_too_many_segments(self):
req = Request.blank(
'/v1/AUTH_test/deltest/manifest-with-submanifest?' +
'multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
with patch.object(slo, 'MAX_BUFFERED_SLO_SEGMENTS', 1):
status, headers, body = self.call_slo(req)
self.assertEquals(status, '200 OK')
resp_data = json.loads(body)
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
self.assertEquals(resp_data['Response Body'],
'Too many buffered slo segments to delete.')
def test_handle_multipart_delete_nested_404(self): def test_handle_multipart_delete_nested_404(self):
req = Request.blank( req = Request.blank(
'/v1/AUTH_test/deltest/manifest-missing-submanifest' + '/v1/AUTH_test/deltest/manifest-missing-submanifest' +
'?multipart-manifest=delete', '?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'}) 'HTTP_ACCEPT': 'application/json'})
status, response, body = self.call_slo(req) status, headers, body = self.call_slo(req)
resp_data = json.loads(body) resp_data = json.loads(body)
self.assertEquals(self.app.calls, self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/' + [('GET', '/v1/AUTH_test/deltest/' +
@ -573,12 +635,47 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEquals(resp_data['Number Not Found'], 1) self.assertEquals(resp_data['Number Not Found'], 1)
self.assertEquals(resp_data['Errors'], []) self.assertEquals(resp_data['Errors'], [])
def test_handle_multipart_delete_nested_401(self):
self.app.register(
'GET', '/v1/AUTH_test/deltest/submanifest',
swob.HTTPUnauthorized, {}, None)
req = Request.blank(
('/v1/AUTH_test/deltest/manifest-with-submanifest' +
'?multipart-manifest=delete'),
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
self.assertEquals(status, '200 OK')
resp_data = json.loads(body)
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
self.assertEquals(resp_data['Errors'],
[['/deltest/submanifest', '401 Unauthorized']])
def test_handle_multipart_delete_nested_500(self):
self.app.register(
'GET', '/v1/AUTH_test/deltest/submanifest',
swob.HTTPServerError, {}, None)
req = Request.blank(
('/v1/AUTH_test/deltest/manifest-with-submanifest' +
'?multipart-manifest=delete'),
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
self.assertEquals(status, '200 OK')
resp_data = json.loads(body)
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
self.assertEquals(resp_data['Errors'],
[['/deltest/submanifest',
'Unable to load SLO manifest or segment.']])
def test_handle_multipart_delete_not_a_manifest(self): def test_handle_multipart_delete_not_a_manifest(self):
req = Request.blank( req = Request.blank(
'/v1/AUTH_test/deltest/a_1?multipart-manifest=delete', '/v1/AUTH_test/deltest/a_1?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'}) 'HTTP_ACCEPT': 'application/json'})
status, response, body = self.call_slo(req) status, headers, body = self.call_slo(req)
resp_data = json.loads(body) resp_data = json.loads(body)
self.assertEquals(self.app.calls, self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/a_1')]) [('GET', '/v1/AUTH_test/deltest/a_1')])
@ -594,7 +691,7 @@ class TestSloDeleteManifest(SloTestCase):
'/v1/AUTH_test/deltest/manifest-badjson?multipart-manifest=delete', '/v1/AUTH_test/deltest/manifest-badjson?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'}) 'HTTP_ACCEPT': 'application/json'})
status, response, body = self.call_slo(req) status, headers, body = self.call_slo(req)
resp_data = json.loads(body) resp_data = json.loads(body)
self.assertEquals(self.app.calls, self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/manifest-badjson')]) [('GET', '/v1/AUTH_test/deltest/manifest-badjson')])
@ -612,7 +709,7 @@ class TestSloDeleteManifest(SloTestCase):
'?multipart-manifest=delete', '?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'}) 'HTTP_ACCEPT': 'application/json'})
status, response, body = self.call_slo(req) status, headers, body = self.call_slo(req)
resp_data = json.loads(body) resp_data = json.loads(body)
self.assertEquals(self.app.calls, self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/' + [('GET', '/v1/AUTH_test/deltest/' +
@ -629,5 +726,597 @@ class TestSloDeleteManifest(SloTestCase):
[['/deltest-unauth/q_17', '401 Unauthorized']]) [['/deltest-unauth/q_17', '401 Unauthorized']])
class TestSloHeadManifest(SloTestCase):
def setUp(self):
super(TestSloHeadManifest, self).setUp()
self._manifest_json = json.dumps([
{'name': '/gettest/seg01',
'bytes': '100',
'hash': 'seg01-hash',
'content_type': 'text/plain',
'last_modified': '2013-11-19T11:33:45.137446'},
{'name': '/gettest/seg02',
'bytes': '200',
'hash': 'seg02-hash',
'content_type': 'text/plain',
'last_modified': '2013-11-19T11:33:45.137447'}])
self.app.register(
'GET', '/v1/AUTH_test/headtest/man',
swob.HTTPOk, {'Content-Length': str(len(self._manifest_json)),
'X-Static-Large-Object': 'true',
'Etag': md5(self._manifest_json).hexdigest()},
self._manifest_json)
def test_etag_is_hash_of_segment_etags(self):
req = Request.blank(
'/v1/AUTH_test/headtest/man',
environ={'REQUEST_METHOD': 'HEAD'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '200 OK')
self.assertEqual(headers.get('Etag', '').strip("'\""),
md5("seg01-hashseg02-hash").hexdigest())
self.assertEqual(body, '') # it's a HEAD request, after all
class TestSloGetManifest(SloTestCase):
def setUp(self):
super(TestSloGetManifest, self).setUp()
_bc_manifest_json = json.dumps(
[{'name': '/gettest/b_10', 'hash': 'b', 'bytes': '10',
'content_type': 'text/plain'},
{'name': '/gettest/c_15', 'hash': 'c', 'bytes': '15',
'content_type': 'text/plain'}])
# some plain old objects
self.app.register(
'GET', '/v1/AUTH_test/gettest/a_5',
swob.HTTPOk, {'Content-Length': '5',
'Etag': 'a'},
'a' * 5)
self.app.register(
'GET', '/v1/AUTH_test/gettest/b_10',
swob.HTTPOk, {'Content-Length': '10',
'Etag': 'b'},
'b' * 10)
self.app.register(
'GET', '/v1/AUTH_test/gettest/c_15',
swob.HTTPOk, {'Content-Length': '15',
'Etag': 'c'},
'c' * 15)
self.app.register(
'GET', '/v1/AUTH_test/gettest/d_20',
swob.HTTPOk, {'Content-Length': '20',
'Etag': 'd'},
'd' * 20)
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-bc',
swob.HTTPOk, {'Content-Type': 'application/json;swift_bytes=25',
'X-Static-Large-Object': 'true',
'X-Object-Meta-Plant': 'Ficus'},
_bc_manifest_json)
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-abcd',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/a_5', 'hash': 'a',
'content_type': 'text/plain', 'bytes': '5'},
{'name': '/gettest/manifest-bc', 'sub_slo': True,
'content_type': 'application/json;swift_bytes=25',
'hash': 'manifest-bc',
'bytes': len(_bc_manifest_json)},
{'name': '/gettest/d_20', 'hash': 'd',
'content_type': 'text/plain', 'bytes': '20'}]))
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-badjson',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'X-Object-Meta-Fish': 'Bass'},
"[not {json (at ++++all")
def test_get_manifest_passthrough(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-bc?multipart-manifest=get',
environ={'REQUEST_METHOD': 'GET',
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK')
self.assertTrue(
('Content-Type', 'application/json; charset=utf-8') in headers,
headers)
try:
resp_data = json.loads(body)
except json.JSONDecodeError:
resp_data = None
self.assertEqual(
resp_data,
[{'hash': 'b', 'bytes': '10', 'name': '/gettest/b_10',
'content_type': 'text/plain'},
{'hash': 'c', 'bytes': '15', 'name': '/gettest/c_15',
'content_type': 'text/plain'}],
body)
def test_get_nonmanifest_passthrough(self):
req = Request.blank(
'/v1/AUTH_test/gettest/a_5',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK')
self.assertEqual(body, 'aaaaa')
def test_get_manifest(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-bc',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
manifest_etag = md5("bc").hexdigest()
self.assertEqual(status, '200 OK')
self.assertEqual(headers['Content-Length'], '25')
self.assertEqual(headers['Etag'], '"%s"' % manifest_etag)
self.assertEqual(headers['X-Object-Meta-Plant'], 'Ficus')
self.assertEqual(body, 'bbbbbbbbbbccccccccccccccc')
for _, _, hdrs in self.app.calls_with_headers[1:]:
ua = hdrs.get("User-Agent", "")
self.assertTrue("SLO MultipartGET" in ua)
self.assertFalse("SLO MultipartGET SLO MultipartGET" in ua)
# the first request goes through unaltered
self.assertFalse(
"SLO MultipartGET" in self.app.calls_with_headers[0][2])
def test_get_manifest_with_submanifest(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
manifest_etag = md5("a" + "manifest-bc" + "d").hexdigest()
self.assertEqual(status, '200 OK')
self.assertEqual(headers['Content-Length'], '50')
self.assertEqual(headers['Etag'], '"%s"' % manifest_etag)
self.assertEqual(
body, 'aaaaabbbbbbbbbbcccccccccccccccdddddddddddddddddddd')
def test_range_get_manifest(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=3-17'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '15')
self.assertTrue('Etag' not in headers)
self.assertEqual(body, 'aabbbbbbbbbbccc')
self.assertEqual(
self.app.calls,
[('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/a_5'),
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/b_10'),
('GET', '/v1/AUTH_test/gettest/c_15')])
headers = [c[2] for c in self.app.calls_with_headers]
self.assertEqual(headers[0].get('Range'), 'bytes=3-17')
self.assertEqual(headers[1].get('Range'), None)
self.assertEqual(headers[2].get('Range'), 'bytes=3-')
self.assertEqual(headers[3].get('Range'), None)
self.assertEqual(headers[4].get('Range'), None)
self.assertEqual(headers[5].get('Range'), 'bytes=0-2')
def test_range_get_manifest_on_segment_boundaries(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=5-29'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '25')
self.assertTrue('Etag' not in headers)
self.assertEqual(body, 'bbbbbbbbbbccccccccccccccc')
self.assertEqual(
self.app.calls,
[('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/b_10'),
('GET', '/v1/AUTH_test/gettest/c_15')])
headers = [c[2] for c in self.app.calls_with_headers]
self.assertEqual(headers[0].get('Range'), 'bytes=5-29')
self.assertEqual(headers[1].get('Range'), None)
self.assertEqual(headers[2].get('Range'), None)
self.assertEqual(headers[3].get('Range'), None)
self.assertEqual(headers[4].get('Range'), None)
def test_range_get_manifest_first_byte(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=0-0'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '1')
self.assertEqual(body, 'a')
# Make sure we don't get any objects we don't need, including
# submanifests.
self.assertEqual(
self.app.calls,
[('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/a_5')])
def test_range_get_manifest_overlapping_end(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=45-55'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '5')
self.assertEqual(body, 'ddddd')
def test_range_get_manifest_unsatisfiable(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=100-200'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '416 Requested Range Not Satisfiable')
def test_multi_range_get_manifest(self):
# SLO doesn't support multi-range GETs. The way that you express
# "unsupported" in HTTP is to return a 200 and the whole entity.
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'},
headers={'Range': 'bytes=0-0,2-2'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '200 OK')
self.assertEqual(headers['Content-Length'], '50')
self.assertEqual(
body, 'aaaaabbbbbbbbbbcccccccccccccccdddddddddddddddddddd')
def test_get_segment_with_non_ascii_name(self):
segment_body = u"a møøse once bit my sister".encode("utf-8")
self.app.register(
'GET', u'/v1/AUTH_test/ünicode/öbject-segment'.encode('utf-8'),
swob.HTTPOk, {'Content-Length': str(len(segment_body)),
'Etag': "moose"},
segment_body)
manifest_json = json.dumps([{'name': u'/ünicode/öbject-segment',
'hash': 'moose',
'content_type': 'text/plain',
'bytes': len(segment_body)}])
self.app.register(
'GET', u'/v1/AUTH_test/ünicode/manifest'.encode('utf-8'),
swob.HTTPOk, {'Content-Type': 'application/json',
'Content-Length': str(len(manifest_json)),
'X-Static-Large-Object': 'true'},
manifest_json)
req = Request.blank(
'/v1/AUTH_test/ünicode/manifest',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '200 OK')
self.assertEqual(body, segment_body)
def test_get_bogus_manifest(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-badjson',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
self.assertEqual(status, '200 OK')
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual(headers['X-Object-Meta-Fish'], 'Bass')
self.assertEqual(body, '')
def test_head_manifest_is_efficient(self):
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'HEAD'})
status, headers, body = self.call_slo(req)
headers = swob.HeaderKeyDict(headers)
manifest_etag = md5("a" + "manifest-bc" + "d").hexdigest()
self.assertEqual(status, '200 OK')
self.assertEqual(headers['Content-Length'], '50')
self.assertEqual(headers['Etag'], '"%s"' % manifest_etag)
self.assertEqual(body, '')
# Note the lack of recursive descent into manifest-bc. We know the
# content-length from the outer manifest, so there's no need for any
# submanifest fetching here, but a naïve implementation might do it
# anyway.
self.assertEqual(self.app.calls, [
('HEAD', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/manifest-abcd')])
def test_recursion_limit(self):
# man1 points to obj1 and man2, man2 points to obj2 and man3...
for i in xrange(20):
self.app.register('GET', '/v1/AUTH_test/gettest/obj%d' % i,
swob.HTTPOk, {'Content-Type': 'text/plain',
'Etag': 'hash%d' % i},
'body%02d' % i)
manifest_json = json.dumps([{'name': '/gettest/obj20',
'hash': 'hash20',
'content_type': 'text/plain',
'bytes': '6'}])
self.app.register(
'GET', '/v1/AUTH_test/gettest/man%d' % i,
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'Etag': 'man%d' % i},
manifest_json)
for i in xrange(19, 0, -1):
manifest_data = [
{'name': '/gettest/obj%d' % i,
'hash': 'hash%d' % i,
'bytes': '6',
'content_type': 'text/plain'},
{'name': '/gettest/man%d' % (i + 1),
'hash': 'man%d' % (i + 1),
'sub_slo': True,
'bytes': len(manifest_json),
'content_type':
'application/json;swift_bytes=%d' % ((21 - i) * 6)}]
manifest_json = json.dumps(manifest_data)
self.app.register(
'GET', '/v1/AUTH_test/gettest/man%d' % i,
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true',
'Etag': 'man%d' % i},
manifest_json)
req = Request.blank(
'/v1/AUTH_test/gettest/man1',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
headers = swob.HeaderKeyDict(headers)
self.assertTrue(isinstance(exc, ListingIterError))
# we don't know at header-sending time that things are going to go
# wrong, so we end up with a 200 and a truncated body
self.assertEqual(status, '200 OK')
self.assertEqual(body, ('body01body02body03body04body05' +
'body06body07body08body09body10'))
# make sure we didn't keep asking for segments
self.assertEqual(self.app.call_count, 20)
def test_error_fetching_segment(self):
self.app.register('GET', '/v1/AUTH_test/gettest/c_15',
swob.HTTPUnauthorized, {}, None)
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
headers = swob.HeaderKeyDict(headers)
self.assertTrue(isinstance(exc, SegmentError))
self.assertEqual(status, '200 OK')
self.assertEqual(self.app.calls, [
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/a_5'),
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/b_10'),
# This one has the error, and so is the last one we fetch.
('GET', '/v1/AUTH_test/gettest/c_15')])
def test_error_fetching_submanifest(self):
self.app.register('GET', '/v1/AUTH_test/gettest/manifest-bc',
swob.HTTPUnauthorized, {}, None)
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
self.assertTrue(isinstance(exc, ListingIterError))
self.assertEqual("200 OK", status)
self.assertEqual("aaaaa", body)
self.assertEqual(self.app.calls, [
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/a_5'),
# This one has the error, and so is the last one we fetch.
('GET', '/v1/AUTH_test/gettest/manifest-bc')])
def test_error_fetching_first_segment_submanifest(self):
# This differs from the normal submanifest error because this one
# happens before we've actually sent any response body.
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-a',
swob.HTTPForbidden, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-manifest-a',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/manifest-a', 'sub_slo': True,
'content_type': 'application/json;swift_bytes=5',
'hash': 'manifest-a',
'bytes': '12345'}]))
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-manifest-a',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
self.assertTrue(isinstance(exc, ListingIterError))
self.assertEqual('200 OK', status)
self.assertEqual(body, ' ')
def test_invalid_json_submanifest(self):
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-bc',
swob.HTTPOk, {'Content-Type': 'application/json;swift_bytes=25',
'X-Static-Large-Object': 'true',
'X-Object-Meta-Plant': 'Ficus'},
"[this {isn't (JSON")
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
self.assertTrue(isinstance(exc, ListingIterError))
self.assertEqual('200 OK', status)
self.assertEqual(body, 'aaaaa')
def test_mismatched_etag(self):
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-a-b-badetag-c',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/a_5', 'hash': 'a',
'content_type': 'text/plain', 'bytes': '5'},
{'name': '/gettest/b_10', 'hash': 'wrong!',
'content_type': 'text/plain', 'bytes': '10'},
{'name': '/gettest/c_15', 'hash': 'c',
'content_type': 'text/plain', 'bytes': '15'}]))
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-a-b-badetag-c',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
self.assertTrue(isinstance(exc, SegmentError))
self.assertEqual('200 OK', status)
self.assertEqual(body, 'aaaaa')
def test_mismatched_size(self):
self.app.register(
'GET', '/v1/AUTH_test/gettest/manifest-a-b-badetag-c',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/gettest/a_5', 'hash': 'a',
'content_type': 'text/plain', 'bytes': '5'},
{'name': '/gettest/b_10', 'hash': 'b',
'content_type': 'text/plain', 'bytes': '999999'},
{'name': '/gettest/c_15', 'hash': 'c',
'content_type': 'text/plain', 'bytes': '15'}]))
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-a-b-badetag-c',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body, exc = self.call_slo(req, expect_exception=True)
self.assertTrue(isinstance(exc, SegmentError))
self.assertEqual('200 OK', status)
self.assertEqual(body, 'aaaaa')
def test_download_takes_too_long(self):
the_time = [time.time()]
def mock_time():
return the_time[0]
# this is just a convenient place to hang a time jump; there's nothing
# special about the choice of is_success().
def mock_is_success(status_int):
the_time[0] += 7 * 3600
return status_int // 100 == 2
req = Request.blank(
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'})
with patch.object(slo, 'time', mock_time):
with patch.object(slo, 'is_success', mock_is_success):
status, headers, body, exc = self.call_slo(
req, expect_exception=True)
self.assertTrue(isinstance(exc, SegmentError))
self.assertEqual(status, '200 OK')
self.assertEqual(self.app.calls, [
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
('GET', '/v1/AUTH_test/gettest/a_5'),
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/b_10'),
('GET', '/v1/AUTH_test/gettest/c_15')])
class TestSloCopyHook(SloTestCase):
def setUp(self):
super(TestSloCopyHook, self).setUp()
self.app.register(
'GET', '/v1/AUTH_test/c/o', swob.HTTPOk,
{'Content-Length': '3', 'Etag': 'obj-etag'}, "obj")
self.app.register(
'GET', '/v1/AUTH_test/c/man',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/c/o', 'hash': 'obj-etag', 'bytes': '3'}]))
copy_hook = [None]
# slip this guy in there to pull out the hook
def extract_copy_hook(env, sr):
copy_hook[0] = env['swift.copy_response_hook']
return self.app(env, sr)
self.slo = slo.filter_factory({})(extract_copy_hook)
req = Request.blank('/v1/AUTH_test/c/o',
environ={'REQUEST_METHOD': 'GET'})
self.slo(req.environ, fake_start_response)
self.copy_hook = copy_hook[0]
self.assertTrue(self.copy_hook is not None) # sanity check
def test_copy_hook_passthrough(self):
req = Request.blank('/v1/AUTH_test/c/o')
# no X-Static-Large-Object header, so do nothing
resp = Response(request=req, status=200)
modified_resp = self.copy_hook(req, resp)
self.assertTrue(modified_resp is resp)
def test_copy_hook_manifest(self):
req = Request.blank('/v1/AUTH_test/c/o')
resp = Response(request=req, status=200,
headers={"X-Static-Large-Object": "true"},
app_iter=[json.dumps([{'name': '/c/o',
'hash': 'obj-etag',
'bytes': '3'}])])
modified_resp = self.copy_hook(req, resp)
self.assertTrue(modified_resp is not resp)
self.assertEqual(modified_resp.etag, md5("obj-etag").hexdigest())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -2199,6 +2199,36 @@ class TestAffinityLocalityPredicate(unittest.TestCase):
utils.affinity_locality_predicate, 'r1z1=1') utils.affinity_locality_predicate, 'r1z1=1')
class TestRateLimitedIterator(unittest.TestCase):
def test_rate_limiting(self):
limited_iterator = utils.RateLimitedIterator(xrange(9999), 100)
got = []
started_at = time.time()
try:
while time.time() - started_at < 0.1:
got.append(limited_iterator.next())
except StopIteration:
pass
# it's 11, not 10, because ratelimiting doesn't apply to the very
# first element.
#
# Ideally this'd be == 11, but that might fail on slow machines, and
# the last thing we need is another flaky test.
self.assertTrue(len(got) <= 11)
def test_limit_after(self):
limited_iterator = utils.RateLimitedIterator(xrange(9999), 100,
limit_after=5)
got = []
started_at = time.time()
try:
while time.time() - started_at < 0.1:
got.append(limited_iterator.next())
except StopIteration:
pass
self.assertTrue(len(got) <= 16)
class TestGreenthreadSafeIterator(unittest.TestCase): class TestGreenthreadSafeIterator(unittest.TestCase):
def increment(self, iterable): def increment(self, iterable):

View File

@ -18,7 +18,6 @@ import logging
import os import os
import sys import sys
import unittest import unittest
import urlparse
from contextlib import contextmanager, nested, closing from contextlib import contextmanager, nested, closing
from gzip import GzipFile from gzip import GzipFile
from shutil import rmtree from shutil import rmtree
@ -41,7 +40,7 @@ from swift.container import server as container_server
from swift.obj import server as object_server from swift.obj import server as object_server
from swift.common import ring from swift.common import ring
from swift.common.middleware import proxy_logging from swift.common.middleware import proxy_logging
from swift.common.exceptions import ChunkReadTimeout, SegmentError from swift.common.exceptions import ChunkReadTimeout
from swift.common.constraints import MAX_META_NAME_LENGTH, \ from swift.common.constraints import MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \ MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
@ -1121,657 +1120,6 @@ class TestObjectController(unittest.TestCase):
controller.DELETE(req) controller.DELETE(req)
self.assertEquals(test_errors, []) self.assertEquals(test_errors, [])
def test_GET_manifest_no_segments(self):
for hdict in [{"X-Object-Manifest": "segments/seg"},
{"X-Static-Large-Object": "True"}]:
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps([]), # GET manifest
simplejson.dumps([])) # GET empty listing
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET manifest
200, # GET empty listing
headers=hdict,
body_iter=response_bodies)
req = Request.blank('/v1/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, '')
def test_GET_manifest_limited_listing(self):
listing1 = [{"hash": "454dfc73af632012ce3e6217dc464241",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "seg01",
"content_type": "application/octet-stream"},
{"hash": "474bab96c67528d42d5c0c52b35228eb",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "seg02",
"content_type": "application/octet-stream"}]
listing2 = [{"hash": "116baa5508693d1d1ca36abdd9f9478b",
"last_modified": "2012-11-08T04:05:37.849510",
"bytes": 2,
"name": "seg03",
"content_type": "application/octet-stream"},
{"hash": "7bd6aaa1ef6013353f0420459574ac9d",
"last_modified": "2012-11-08T04:05:37.855180",
"bytes": 2,
"name": "seg04",
"content_type": "application/octet-stream"
}]
listing3 = [{"hash": "6605f80e3cefaa24e9823544df4edbd6",
"last_modified": "2012-11-08T04:05:37.853710",
"bytes": 2,
"name": u'\N{SNOWMAN}seg05',
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
'', # GET manifest
simplejson.dumps(listing1), # GET listing1
'Aa', # GET seg01
'Bb', # GET seg02
simplejson.dumps(listing2), # GET listing2
'Cc', # GET seg03
'Dd', # GET seg04
simplejson.dumps(listing3), # GET listing3
'Ee', # GET seg05
simplejson.dumps([])) # GET final empty listing
with save_globals():
try:
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = 2
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET manifest
200, # GET listing1
200, # GET seg01
200, # GET seg02
200, # GET listing2
200, # GET seg03
200, # GET seg04
200, # GET listing3
200, # GET seg05
200, # GET final empty listing
headers={"X-Object-Manifest": "segments/seg"},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'AaBbCcDdEe')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/segments',
{'format': 'json', 'prefix': 'seg'}],
['GET', '/a/segments/seg01', {}],
['GET', '/a/segments/seg02', {}],
['GET', '/a/segments',
{'format': 'json', 'prefix': 'seg', 'marker': 'seg02'}],
['GET', '/a/segments/seg03', {}],
['GET', '/a/segments/seg04', {}],
['GET', '/a/segments',
{'format': 'json', 'prefix': 'seg', 'marker': 'seg04'}],
['GET', '/a/segments/\xe2\x98\x83seg05', {}],
['GET', '/a/segments',
{'format': 'json', 'prefix': 'seg',
'marker': '\xe2\x98\x83seg05'}]])
finally:
# other tests in this file get very unhappy if this
# isn't set back, which leads to time-wasting
# debugging of other tests.
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
_orig_container_listing_limit
def test_GET_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
'Bb') # GET seg02
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
200, # GET seg02
headers=[{}, {}, {"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'}, {}, {}],
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'AaBb')
self.assertEqual(resp.content_length, 4)
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/seg02', {}]])
def test_GET_slo_multipart_manifest(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
json_listing = simplejson.dumps(listing)
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
json_listing) # GET manifest
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
headers={"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest?multipart-manifest=get')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, json_listing)
self.assertEqual(resp.content_type, 'application/json')
self.assertEqual(resp.charset.lower(), 'utf-8')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {'multipart-manifest': 'get'}]])
def test_GET_slo_multipart_manifest_from_copy(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
json_listing = simplejson.dumps(listing)
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
json_listing) # GET manifest
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
headers={"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest?multipart-manifest=get',
headers={'x-copy-from': '/a/c/manifest'})
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, json_listing)
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {'multipart-manifest': 'get'}]])
def test_GET_bad_etag_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "invalidhash",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
'Bb') # GET seg02
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
200, # GET seg02
headers=[{}, {}, {"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'}, {}, {}],
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.content_length, 4) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertRaises(SegmentError, lambda: resp.body)
# dropped connection, exception is caught by eventlet as it is
# iterating over response
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/seg02', {}]])
def test_GET_nested_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "8681fb3ada2715c8754706ee5f23d4f8",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 4,
"name": u"/d2/sub_manifest \u2661", "sub_slo": True,
"content_type": "application/octet-stream"},
{"hash": "419af6d362a14b7a789ba1c7e772bbae",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg04",
"content_type": "application/octet-stream"}]
sub_listing = [{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg02",
"content_type": "application/octet-stream"},
{"hash": "e4c8f1de1c0855c7c2be33196d3c3537",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg03",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
simplejson.dumps(sub_listing), # GET sub_manifest
'Aa', # GET seg01
'Bb', # GET seg02
'Cc', # GET seg03
'Dd') # GET seg04
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
slob_headers = {"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'}
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET sub listing1
200, # GET seg01
200, # GET seg02
200, # GET seg03
200, # GET seg04
headers=[{}, {}, slob_headers, slob_headers, {}, {}, {}, {}],
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.content_length, 8)
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d2/sub_manifest \xe2\x99\xa1', {}]])
# iterating over body will retrieve manifest and sub manifest's
# objects
self.assertEqual(resp.body, 'AaBbCcDd')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d2/sub_manifest \xe2\x99\xa1', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d1/seg02', {}],
['GET', '/a/d2/seg03', {}],
['GET', '/a/d1/seg04', {}]])
def test_GET_nested_manifest_slo_with_range(self):
"""
Original whole slo is Aa1234Bb where 1234 is a sub-manifests. I'm
pulling out 34Bb
"""
listing = [{"hash": "98568d540134639be4655198a36614a4", # Aa
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "7b4b0ffa275d404bdc2fc6384916714f", # SubManifest1
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 4, "sub_slo": True,
"name": "/d2/subManifest01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23", # Bb
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg02",
"content_type": "application/octet-stream"}]
sublisting = [{"hash": "c20ad4d76fe97759aa27a0c99bff6710", # 12
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d2/subSeg01",
"content_type": "application/octet-stream"},
{"hash": "e369853df766fa44e1ed0ff613f563bd", # 34
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d2/subSeg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing)[1:1], # GET incomplete manifest
simplejson.dumps(listing), # GET complete manifest
simplejson.dumps(sublisting), # GET complete submanifest
'34', # GET subseg02
'Bb') # GET seg02
etag_iter = ['', '', '', '', '',
'e369853df766fa44e1ed0ff613f563bd', # subSeg02
'd526f1c8ef6c1e4e980e2b8471352d23'] # seg02
headers = [{}, {},
{'X-Static-Large-Object': 'True',
'content-type': 'text/html; swift_bytes=4'},
{'X-Static-Large-Object': 'True',
'content-type': 'text/html; swift_bytes=4'},
{'X-Static-Large-Object': 'True',
'content-type': 'text/html; swift_bytes=4'},
{}, {}]
self.assertTrue(len(response_bodies) == len(etag_iter) == len(headers))
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
206, # GET incomplete listing
200, # GET complete listing
200, # GET complete sublisting
200, # GET subSeg02
200, # GET seg02
headers=headers,
etags=etag_iter,
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest')
req.range = 'bytes=4-7'
resp = controller.GET(req)
got_called = [False, ]
def fake_start_response(*args, **kwargs):
got_called[0] = True
self.assertTrue(args[0].startswith('206'))
app_iter = resp(req.environ, fake_start_response)
resp_body = ''.join(app_iter) # read in entire resp
self.assertEqual(resp.status_int, 206)
self.assertEqual(resp_body, '34Bb')
self.assertTrue(got_called[0])
self.assertEqual(resp.content_length, 4)
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}], # for incomplete manifest
['GET', '/a/c/manifest', {}],
['GET', '/a/d2/subManifest01', {}],
['GET', '/a/d2/subSeg02', {}],
['GET', '/a/d1/seg02', {}]])
def test_GET_bad_404_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"},
{"hash": "invalidhash",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg03",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
'') # GET seg02
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
404, # GET seg02
headers=[{}, {}, {"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'}, {}, {}],
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.content_length, 6) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertRaises(SegmentError, lambda: resp.body)
# dropped connection, exception is caught by eventlet as it is
# iterating over response
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/seg02', {}],
['GET', '/a/d2/seg02', {}],
['GET', '/a/d2/seg02', {}]]) # 2nd segment not found
def test_HEAD_manifest_slo(self):
listing = [{"hash": "454dfc73af632012ce3e6217dc464241",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "474bab96c67528d42d5c0c52b35228eb",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
'', # HEAD manifest
simplejson.dumps(listing)) # GET manifest
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # HEAD listing1
200, # GET listing1
headers={"X-Static-Large-Object": "True"},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/v1/a/c/manifest',
environ={'REQUEST_METHOD': 'HEAD'})
resp = controller.HEAD(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['HEAD', '/a/c/manifest', {}],
['GET', '/a/c/manifest', {}]])
def test_PUT_auto_content_type(self): def test_PUT_auto_content_type(self):
with save_globals(): with save_globals():
controller = proxy_server.ObjectController(self.app, 'account', controller = proxy_server.ObjectController(self.app, 'account',