Allow SLOs to be made up of other SLOs
We've gone back and forth about this. In the initial commit, it couldn't possibly work because you wouldn't be able to get the Etags to match. Then it was expressly disallowed with a custom error message, and now its allowed. The reason we're allowing it is that 1,000 segments isn't enough for some use cases and we decided its better than just upping the number of allowed segments. The code to make it work isn't all that complicated and it allows for virtually unlimited SLO object size. There is also a new configurable limit on the maximum connection time for both SLOs and DLOs defaulting to 1 day. This will hopefully alleviate worries about infinite requests. Think I'll leave the python-swift client support for nested SLOs to somebody else though :). DocImpact Change-Id: Id16187481b37e716d2bd09bdbab8cc87537e3ddd
This commit is contained in:
parent
66a0817e99
commit
9f942b1256
@ -133,6 +133,9 @@ use = egg:swift#proxy
|
|||||||
# be set to false if slo is not used in pipeline.
|
# be set to false if slo is not used in pipeline.
|
||||||
# allow_static_large_object = true
|
# allow_static_large_object = true
|
||||||
#
|
#
|
||||||
|
# The maximum time (seconds) that a large object connection is allowed to last.
|
||||||
|
# max_large_object_get_time = 86400
|
||||||
|
#
|
||||||
# Set to the number of nodes to contact for a normal request. You can use
|
# Set to the number of nodes to contact for a normal request. You can use
|
||||||
# '* replicas' at the end to have it use the number given times the number of
|
# '* replicas' at the end to have it use the number given times the number of
|
||||||
# replicas for the ring being used for the request.
|
# replicas for the ring being used for the request.
|
||||||
|
@ -56,6 +56,8 @@ MAX_ACCOUNT_NAME_LENGTH = constraints_conf_int('max_account_name_length', 256)
|
|||||||
#: Max container name length
|
#: Max container name length
|
||||||
MAX_CONTAINER_NAME_LENGTH = constraints_conf_int('max_container_name_length',
|
MAX_CONTAINER_NAME_LENGTH = constraints_conf_int('max_container_name_length',
|
||||||
256)
|
256)
|
||||||
|
# Maximum slo segments in buffer
|
||||||
|
MAX_BUFFERED_SLO_SEGMENTS = 10000
|
||||||
|
|
||||||
|
|
||||||
#: Query string format= values to their corresponding content-type values
|
#: Query string format= values to their corresponding content-type values
|
||||||
|
@ -112,5 +112,5 @@ class ListingIterNotAuthorized(ListingIterError):
|
|||||||
self.aresp = aresp
|
self.aresp = aresp
|
||||||
|
|
||||||
|
|
||||||
class SloSegmentError(SwiftException):
|
class SegmentError(SwiftException):
|
||||||
pass
|
pass
|
||||||
|
@ -61,9 +61,11 @@ appended to the existing Content-Type, where total_size is the sum of all
|
|||||||
the included segments' size_bytes. This extra parameter will be hidden from
|
the included segments' size_bytes. This extra parameter will be hidden from
|
||||||
the user.
|
the user.
|
||||||
|
|
||||||
Manifest files can reference objects in separate containers, which
|
Manifest files can reference objects in separate containers, which will improve
|
||||||
will improve concurrent upload speed. Objects can be referenced by
|
concurrent upload speed. Objects can be referenced by multiple manifests. The
|
||||||
multiple manifests.
|
segments of a SLO manifest can even be other SLO manifests. Treat them as any
|
||||||
|
other object i.e., use the Etag and Content-Length given on the PUT of the
|
||||||
|
sub-SLO in the manifest to the parent SLO.
|
||||||
|
|
||||||
-------------------------
|
-------------------------
|
||||||
Retrieving a Large Object
|
Retrieving a Large Object
|
||||||
@ -107,9 +109,8 @@ A DELETE with a query parameter::
|
|||||||
|
|
||||||
?multipart-manifest=delete
|
?multipart-manifest=delete
|
||||||
|
|
||||||
will delete all the segments referenced in the manifest and then, if
|
will delete all the segments referenced in the manifest and then the manifest
|
||||||
successful, the manifest itself. The failure response will be similar to
|
itself. The failure response will be similar to the bulk delete middleware.
|
||||||
the bulk delete middleware.
|
|
||||||
|
|
||||||
------------------------
|
------------------------
|
||||||
Modifying a Large Object
|
Modifying a Large Object
|
||||||
@ -141,7 +142,8 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
|||||||
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
|
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
|
||||||
HTTPOk, HTTPPreconditionFailed, wsgify
|
HTTPOk, HTTPPreconditionFailed, wsgify
|
||||||
from swift.common.utils import json, get_logger, config_true_value
|
from swift.common.utils import json, get_logger, config_true_value
|
||||||
from swift.common.constraints import check_utf8
|
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
|
||||||
|
from swift.common.http import HTTP_NOT_FOUND
|
||||||
from swift.common.middleware.bulk import get_response_body, \
|
from swift.common.middleware.bulk import get_response_body, \
|
||||||
ACCEPTABLE_FORMATS, Bulk
|
ACCEPTABLE_FORMATS, Bulk
|
||||||
|
|
||||||
@ -254,16 +256,12 @@ class StaticLargeObject(object):
|
|||||||
'%s MultipartPUT' % req.environ.get('HTTP_USER_AGENT')
|
'%s MultipartPUT' % req.environ.get('HTTP_USER_AGENT')
|
||||||
head_seg_resp = \
|
head_seg_resp = \
|
||||||
Request.blank(obj_path, new_env).get_response(self.app)
|
Request.blank(obj_path, new_env).get_response(self.app)
|
||||||
if head_seg_resp.status_int // 100 == 2:
|
if head_seg_resp.is_success:
|
||||||
total_size += seg_size
|
total_size += seg_size
|
||||||
if seg_size != head_seg_resp.content_length:
|
if seg_size != head_seg_resp.content_length:
|
||||||
problem_segments.append([quote(obj_path), 'Size Mismatch'])
|
problem_segments.append([quote(obj_path), 'Size Mismatch'])
|
||||||
if seg_dict['etag'] != head_seg_resp.etag:
|
if seg_dict['etag'] != head_seg_resp.etag:
|
||||||
problem_segments.append([quote(obj_path), 'Etag Mismatch'])
|
problem_segments.append([quote(obj_path), 'Etag Mismatch'])
|
||||||
if 'X-Static-Large-Object' in head_seg_resp.headers or \
|
|
||||||
'X-Object-Manifest' in head_seg_resp.headers:
|
|
||||||
problem_segments.append(
|
|
||||||
[quote(obj_path), 'Segments cannot be Large Objects'])
|
|
||||||
if head_seg_resp.last_modified:
|
if head_seg_resp.last_modified:
|
||||||
last_modified = head_seg_resp.last_modified
|
last_modified = head_seg_resp.last_modified
|
||||||
else:
|
else:
|
||||||
@ -272,12 +270,15 @@ class StaticLargeObject(object):
|
|||||||
|
|
||||||
last_modified_formatted = \
|
last_modified_formatted = \
|
||||||
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
|
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
|
||||||
data_for_storage.append(
|
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
|
||||||
{'name': '/' + seg_dict['path'].lstrip('/'),
|
|
||||||
'bytes': seg_size,
|
'bytes': seg_size,
|
||||||
'hash': seg_dict['etag'],
|
'hash': seg_dict['etag'],
|
||||||
'content_type': head_seg_resp.content_type,
|
'content_type': head_seg_resp.content_type,
|
||||||
'last_modified': last_modified_formatted})
|
'last_modified': last_modified_formatted}
|
||||||
|
if config_true_value(
|
||||||
|
head_seg_resp.headers.get('X-Static-Large-Object')):
|
||||||
|
seg_data['sub_slo'] = True
|
||||||
|
data_for_storage.append(seg_data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
problem_segments.append([quote(obj_path),
|
problem_segments.append([quote(obj_path),
|
||||||
@ -299,6 +300,60 @@ class StaticLargeObject(object):
|
|||||||
env['wsgi.input'] = StringIO(json_data)
|
env['wsgi.input'] = StringIO(json_data)
|
||||||
return self.app
|
return self.app
|
||||||
|
|
||||||
|
def get_segments_to_delete_iter(self, req):
|
||||||
|
"""
|
||||||
|
A generator function to be used to delete all the segments and
|
||||||
|
sub-segments referenced in a manifest.
|
||||||
|
|
||||||
|
:raises HTTPBadRequest: on sub manifest not manifest anymore or
|
||||||
|
on too many buffered sub segments
|
||||||
|
:raises HTTPServerError: on unable to load manifest
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
vrs, account, container, obj = req.split_path(4, 4, True)
|
||||||
|
except ValueError:
|
||||||
|
raise HTTPBadRequest('Not a SLO manifest')
|
||||||
|
sub_segments = [{
|
||||||
|
'sub_slo': True,
|
||||||
|
'name': ('/%s/%s' % (container, obj)).decode('utf-8')}]
|
||||||
|
while sub_segments:
|
||||||
|
if len(sub_segments) > MAX_BUFFERED_SLO_SEGMENTS:
|
||||||
|
raise HTTPBadRequest(
|
||||||
|
'Too many buffered slo segments to delete.')
|
||||||
|
if sub_segments:
|
||||||
|
seg_data = sub_segments.pop(0)
|
||||||
|
if seg_data.get('sub_slo'):
|
||||||
|
new_env = req.environ.copy()
|
||||||
|
new_env['REQUEST_METHOD'] = 'GET'
|
||||||
|
del(new_env['wsgi.input'])
|
||||||
|
new_env['QUERY_STRING'] = 'multipart-manifest=get'
|
||||||
|
new_env['CONTENT_LENGTH'] = 0
|
||||||
|
new_env['HTTP_USER_AGENT'] = \
|
||||||
|
'%s MultipartDELETE' % new_env.get('HTTP_USER_AGENT')
|
||||||
|
new_env['swift.source'] = 'SLO'
|
||||||
|
new_env['PATH_INFO'] = (
|
||||||
|
'/%s/%s/%s' % (
|
||||||
|
vrs, account,
|
||||||
|
seg_data['name'].lstrip('/'))).encode('utf-8')
|
||||||
|
sub_resp = Request.blank('', new_env).get_response(self.app)
|
||||||
|
if sub_resp.is_success:
|
||||||
|
try:
|
||||||
|
# if its still a SLO, load its segments
|
||||||
|
if config_true_value(
|
||||||
|
sub_resp.headers.get('X-Static-Large-Object')):
|
||||||
|
sub_segments.extend(json.loads(sub_resp.body))
|
||||||
|
except ValueError:
|
||||||
|
raise HTTPServerError('Unable to load SLO manifest')
|
||||||
|
# add sub-manifest back to be deleted after sub segments
|
||||||
|
# (even if obj is not a SLO)
|
||||||
|
seg_data['sub_slo'] = False
|
||||||
|
sub_segments.append(seg_data)
|
||||||
|
elif sub_resp.status_int != HTTP_NOT_FOUND:
|
||||||
|
# on deletes treat not found as success
|
||||||
|
raise HTTPServerError('Sub SLO unable to load.')
|
||||||
|
else:
|
||||||
|
yield seg_data['name'].encode('utf-8')
|
||||||
|
|
||||||
def handle_multipart_delete(self, req):
|
def handle_multipart_delete(self, req):
|
||||||
"""
|
"""
|
||||||
Will delete all the segments in the SLO manifest and then, if
|
Will delete all the segments in the SLO manifest and then, if
|
||||||
@ -310,38 +365,16 @@ class StaticLargeObject(object):
|
|||||||
if not check_utf8(req.path_info):
|
if not check_utf8(req.path_info):
|
||||||
raise HTTPPreconditionFailed(
|
raise HTTPPreconditionFailed(
|
||||||
request=req, body='Invalid UTF8 or contains NULL')
|
request=req, body='Invalid UTF8 or contains NULL')
|
||||||
try:
|
|
||||||
vrs, account, container, obj = req.split_path(4, 4, True)
|
|
||||||
except ValueError:
|
|
||||||
raise HTTPBadRequest('Not an SLO manifest')
|
|
||||||
new_env = req.environ.copy()
|
|
||||||
new_env['REQUEST_METHOD'] = 'GET'
|
|
||||||
del(new_env['wsgi.input'])
|
|
||||||
new_env['QUERY_STRING'] = 'multipart-manifest=get'
|
|
||||||
new_env['CONTENT_LENGTH'] = 0
|
|
||||||
new_env['HTTP_USER_AGENT'] = \
|
|
||||||
'%s MultipartDELETE' % req.environ.get('HTTP_USER_AGENT')
|
|
||||||
new_env['swift.source'] = 'SLO'
|
|
||||||
get_man_resp = \
|
|
||||||
Request.blank('', new_env).get_response(self.app)
|
|
||||||
if get_man_resp.status_int // 100 == 2:
|
|
||||||
if not config_true_value(
|
|
||||||
get_man_resp.headers.get('X-Static-Large-Object')):
|
|
||||||
raise HTTPBadRequest('Not an SLO manifest')
|
|
||||||
try:
|
|
||||||
manifest = json.loads(get_man_resp.body)
|
|
||||||
# append the manifest file for deletion at the end
|
|
||||||
manifest.append(
|
|
||||||
{'name': '/'.join(['', container, obj]).decode('utf-8')})
|
|
||||||
except ValueError:
|
|
||||||
raise HTTPServerError('Invalid manifest file')
|
|
||||||
resp = HTTPOk(request=req)
|
resp = HTTPOk(request=req)
|
||||||
|
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
|
||||||
|
if out_content_type:
|
||||||
|
resp.content_type = out_content_type
|
||||||
resp.app_iter = self.bulk_deleter.handle_delete_iter(
|
resp.app_iter = self.bulk_deleter.handle_delete_iter(
|
||||||
req,
|
req, objs_to_delete=self.get_segments_to_delete_iter(req),
|
||||||
objs_to_delete=[o['name'].encode('utf-8') for o in manifest],
|
user_agent='MultipartDELETE', swift_source='SLO',
|
||||||
user_agent='MultipartDELETE', swift_source='SLO')
|
out_content_type=out_content_type)
|
||||||
return resp
|
return resp
|
||||||
return get_man_resp
|
|
||||||
|
|
||||||
@wsgify
|
@wsgify
|
||||||
def __call__(self, req):
|
def __call__(self, req):
|
||||||
|
@ -1103,6 +1103,10 @@ class Response(object):
|
|||||||
return self.location
|
return self.location
|
||||||
return self.host_url + self.location
|
return self.host_url + self.location
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_success(self):
|
||||||
|
return self.status_int // 100 == 2
|
||||||
|
|
||||||
def __call__(self, env, start_response):
|
def __call__(self, env, start_response):
|
||||||
if not self.request:
|
if not self.request:
|
||||||
self.request = Request(env)
|
self.request = Request(env)
|
||||||
|
@ -40,10 +40,10 @@ from swift.common.utils import ContextPool, normalize_timestamp, \
|
|||||||
config_true_value, public, json, csv_append, GreenthreadSafeIterator
|
config_true_value, public, json, csv_append, GreenthreadSafeIterator
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||||
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
|
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE, MAX_BUFFERED_SLO_SEGMENTS
|
||||||
from swift.common.exceptions import ChunkReadTimeout, \
|
from swift.common.exceptions import ChunkReadTimeout, \
|
||||||
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
|
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
|
||||||
ListingIterNotAuthorized, ListingIterError, SloSegmentError
|
ListingIterNotAuthorized, ListingIterError, SegmentError
|
||||||
from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \
|
from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \
|
||||||
HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||||
HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
|
HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
|
||||||
@ -56,10 +56,28 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
|||||||
HTTPClientDisconnect, HTTPNotImplemented
|
HTTPClientDisconnect, HTTPNotImplemented
|
||||||
|
|
||||||
|
|
||||||
def segment_listing_iter(listing):
|
class SegmentListing(object):
|
||||||
listing = iter(listing)
|
|
||||||
|
def __init__(self, listing):
|
||||||
|
self.listing = iter(listing)
|
||||||
|
self._prepended_segments = []
|
||||||
|
|
||||||
|
def prepend_segments(self, new_segs):
|
||||||
|
"""
|
||||||
|
Will prepend given segments to listing when iterating.
|
||||||
|
:raises SegmentError: when # segments > MAX_BUFFERED_SLO_SEGMENTS
|
||||||
|
"""
|
||||||
|
new_segs.extend(self._prepended_segments)
|
||||||
|
if len(new_segs) > MAX_BUFFERED_SLO_SEGMENTS:
|
||||||
|
raise SegmentError('Too many unread slo segments in buffer')
|
||||||
|
self._prepended_segments = new_segs
|
||||||
|
|
||||||
|
def listing_iter(self):
|
||||||
while True:
|
while True:
|
||||||
seg_dict = listing.next()
|
if self._prepended_segments:
|
||||||
|
seg_dict = self._prepended_segments.pop(0)
|
||||||
|
else:
|
||||||
|
seg_dict = self.listing.next()
|
||||||
if isinstance(seg_dict['name'], unicode):
|
if isinstance(seg_dict['name'], unicode):
|
||||||
seg_dict['name'] = seg_dict['name'].encode('utf-8')
|
seg_dict['name'] = seg_dict['name'].encode('utf-8')
|
||||||
yield seg_dict
|
yield seg_dict
|
||||||
@ -104,14 +122,20 @@ class SegmentedIterable(object):
|
|||||||
'bytes' keys.
|
'bytes' keys.
|
||||||
:param response: The swob.Response this iterable is associated with, if
|
:param response: The swob.Response this iterable is associated with, if
|
||||||
any (default: None)
|
any (default: None)
|
||||||
|
:param is_slo: A boolean, defaults to False, as to whether this references
|
||||||
|
a SLO object.
|
||||||
|
:param max_lo_time: Defaults to 86400. The connection for the
|
||||||
|
SegmentedIterable will drop after that many seconds.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, controller, container, listing, response=None,
|
def __init__(self, controller, container, listing, response=None,
|
||||||
is_slo=False):
|
is_slo=False, max_lo_time=86400):
|
||||||
self.controller = controller
|
self.controller = controller
|
||||||
self.container = container
|
self.container = container
|
||||||
self.listing = segment_listing_iter(listing)
|
self.segment_listing = SegmentListing(listing)
|
||||||
|
self.listing = self.segment_listing.listing_iter()
|
||||||
self.is_slo = is_slo
|
self.is_slo = is_slo
|
||||||
|
self.max_lo_time = max_lo_time
|
||||||
self.ratelimit_index = 0
|
self.ratelimit_index = 0
|
||||||
self.segment_dict = None
|
self.segment_dict = None
|
||||||
self.segment_peek = None
|
self.segment_peek = None
|
||||||
@ -121,10 +145,12 @@ class SegmentedIterable(object):
|
|||||||
# See NOTE: swift_conn at top of file about this.
|
# See NOTE: swift_conn at top of file about this.
|
||||||
self.segment_iter_swift_conn = None
|
self.segment_iter_swift_conn = None
|
||||||
self.position = 0
|
self.position = 0
|
||||||
|
self.have_yielded_data = False
|
||||||
self.response = response
|
self.response = response
|
||||||
if not self.response:
|
if not self.response:
|
||||||
self.response = Response()
|
self.response = Response()
|
||||||
self.next_get_time = 0
|
self.next_get_time = 0
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
def _load_next_segment(self):
|
def _load_next_segment(self):
|
||||||
"""
|
"""
|
||||||
@ -135,6 +161,9 @@ class SegmentedIterable(object):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self.ratelimit_index += 1
|
self.ratelimit_index += 1
|
||||||
|
if time.time() - self.start_time > self.max_lo_time:
|
||||||
|
raise SegmentError(
|
||||||
|
_('Max LO GET time of %s exceeded.') % self.max_lo_time)
|
||||||
self.segment_dict = self.segment_peek or self.listing.next()
|
self.segment_dict = self.segment_peek or self.listing.next()
|
||||||
self.segment_peek = None
|
self.segment_peek = None
|
||||||
if self.container is None:
|
if self.container is None:
|
||||||
@ -167,7 +196,7 @@ class SegmentedIterable(object):
|
|||||||
req, _('Object'), self.controller.app.object_ring, partition,
|
req, _('Object'), self.controller.app.object_ring, partition,
|
||||||
path)
|
path)
|
||||||
if self.is_slo and resp.status_int == HTTP_NOT_FOUND:
|
if self.is_slo and resp.status_int == HTTP_NOT_FOUND:
|
||||||
raise SloSegmentError(_(
|
raise SegmentError(_(
|
||||||
'Could not load object segment %(path)s:'
|
'Could not load object segment %(path)s:'
|
||||||
' %(status)s') % {'path': path, 'status': resp.status_int})
|
' %(status)s') % {'path': path, 'status': resp.status_int})
|
||||||
if not is_success(resp.status_int):
|
if not is_success(resp.status_int):
|
||||||
@ -175,21 +204,41 @@ class SegmentedIterable(object):
|
|||||||
'Could not load object segment %(path)s:'
|
'Could not load object segment %(path)s:'
|
||||||
' %(status)s') % {'path': path, 'status': resp.status_int})
|
' %(status)s') % {'path': path, 'status': resp.status_int})
|
||||||
if self.is_slo:
|
if self.is_slo:
|
||||||
if resp.etag != self.segment_dict['hash']:
|
|
||||||
raise SloSegmentError(_(
|
|
||||||
'Object segment no longer valid: '
|
|
||||||
'%(path)s etag: %(r_etag)s != %(s_etag)s.' %
|
|
||||||
{'path': path, 'r_etag': resp.etag,
|
|
||||||
's_etag': self.segment_dict['hash']}))
|
|
||||||
if 'X-Static-Large-Object' in resp.headers:
|
if 'X-Static-Large-Object' in resp.headers:
|
||||||
raise SloSegmentError(_(
|
# this segment is a nested slo object. read in the body
|
||||||
'SLO can not be made of other SLOs: %s' % path))
|
# and add its segments into this slo.
|
||||||
|
try:
|
||||||
|
sub_manifest = json.loads(resp.body)
|
||||||
|
self.segment_listing.prepend_segments(sub_manifest)
|
||||||
|
sub_etag = md5(''.join(
|
||||||
|
o['hash'] for o in sub_manifest)).hexdigest()
|
||||||
|
if sub_etag != self.segment_dict['hash']:
|
||||||
|
raise SegmentError(_(
|
||||||
|
'Object segment does not match sub-slo: '
|
||||||
|
'%(path)s etag: %(r_etag)s != %(s_etag)s.' %
|
||||||
|
{'path': path, 'r_etag': sub_etag,
|
||||||
|
's_etag': self.segment_dict['hash']}))
|
||||||
|
return self._load_next_segment()
|
||||||
|
except ValueError:
|
||||||
|
raise SegmentError(_(
|
||||||
|
'Sub SLO has invalid manifest: %s' % path))
|
||||||
|
|
||||||
|
elif resp.etag != self.segment_dict['hash'] or \
|
||||||
|
resp.content_length != self.segment_dict['bytes']:
|
||||||
|
raise SegmentError(_(
|
||||||
|
'Object segment no longer valid: '
|
||||||
|
'%(path)s etag: %(r_etag)s != %(s_etag)s or '
|
||||||
|
'%(r_size)s != %(s_size)s.' %
|
||||||
|
{'path': path, 'r_etag': resp.etag,
|
||||||
|
'r_size': resp.content_length,
|
||||||
|
's_etag': self.segment_dict['hash'],
|
||||||
|
's_size': self.segment_dict['bytes']}))
|
||||||
self.segment_iter = resp.app_iter
|
self.segment_iter = resp.app_iter
|
||||||
# See NOTE: swift_conn at top of file about this.
|
# See NOTE: swift_conn at top of file about this.
|
||||||
self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None)
|
self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
raise
|
raise
|
||||||
except SloSegmentError, err:
|
except SegmentError, err:
|
||||||
if not getattr(err, 'swift_logged', False):
|
if not getattr(err, 'swift_logged', False):
|
||||||
self.controller.app.logger.error(_(
|
self.controller.app.logger.error(_(
|
||||||
'ERROR: While processing manifest '
|
'ERROR: While processing manifest '
|
||||||
@ -232,9 +281,21 @@ class SegmentedIterable(object):
|
|||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
self.position += len(chunk)
|
self.position += len(chunk)
|
||||||
|
self.have_yielded_data = True
|
||||||
yield chunk
|
yield chunk
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
raise
|
raise
|
||||||
|
except SegmentError:
|
||||||
|
if not self.have_yielded_data:
|
||||||
|
# Normally, exceptions before any data has been yielded will
|
||||||
|
# cause Eventlet to send a 5xx response. In this particular
|
||||||
|
# case of SegmentError we don't want that and we'd rather
|
||||||
|
# just send the normal 2xx response and then hang up early
|
||||||
|
# since 5xx codes are often used to judge Service Level
|
||||||
|
# Agreements and this SegmentError indicates the user has
|
||||||
|
# created an invalid condition.
|
||||||
|
yield ' '
|
||||||
|
raise
|
||||||
except (Exception, Timeout), err:
|
except (Exception, Timeout), err:
|
||||||
if not getattr(err, 'swift_logged', False):
|
if not getattr(err, 'swift_logged', False):
|
||||||
self.controller.app.logger.exception(_(
|
self.controller.app.logger.exception(_(
|
||||||
@ -532,7 +593,8 @@ class ObjectController(Controller):
|
|||||||
else:
|
else:
|
||||||
resp.app_iter = SegmentedIterable(
|
resp.app_iter = SegmentedIterable(
|
||||||
self, lcontainer, listing, resp,
|
self, lcontainer, listing, resp,
|
||||||
is_slo=(large_object == 'SLO'))
|
is_slo=(large_object == 'SLO'),
|
||||||
|
max_lo_time=self.app.max_large_object_get_time)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# For objects with a reasonable number of segments, we'll serve
|
# For objects with a reasonable number of segments, we'll serve
|
||||||
@ -559,7 +621,8 @@ class ObjectController(Controller):
|
|||||||
conditional_response=True)
|
conditional_response=True)
|
||||||
resp.app_iter = SegmentedIterable(
|
resp.app_iter = SegmentedIterable(
|
||||||
self, lcontainer, listing, resp,
|
self, lcontainer, listing, resp,
|
||||||
is_slo=(large_object == 'SLO'))
|
is_slo=(large_object == 'SLO'),
|
||||||
|
max_lo_time=self.app.max_large_object_get_time)
|
||||||
resp.content_length = content_length
|
resp.content_length = content_length
|
||||||
resp.last_modified = last_modified
|
resp.last_modified = last_modified
|
||||||
resp.etag = etag
|
resp.etag = etag
|
||||||
|
@ -116,6 +116,8 @@ class Application(object):
|
|||||||
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
|
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
|
||||||
self.allow_static_large_object = config_true_value(
|
self.allow_static_large_object = config_true_value(
|
||||||
conf.get('allow_static_large_object', 'true'))
|
conf.get('allow_static_large_object', 'true'))
|
||||||
|
self.max_large_object_get_time = float(
|
||||||
|
conf.get('max_large_object_get_time', '86400'))
|
||||||
value = conf.get('request_node_count', '2 * replicas').lower().split()
|
value = conf.get('request_node_count', '2 * replicas').lower().split()
|
||||||
if len(value) == 1:
|
if len(value) == 1:
|
||||||
value = int(value[0])
|
value = int(value[0])
|
||||||
|
@ -94,6 +94,26 @@ class FakeApp(object):
|
|||||||
headers={'X-Static-Large-Object': 'True'},
|
headers={'X-Static-Large-Object': 'True'},
|
||||||
body=good_data)(env, start_response)
|
body=good_data)(env, start_response)
|
||||||
|
|
||||||
|
if env['PATH_INFO'].startswith('/test_delete_nested/'):
|
||||||
|
nested_data = json.dumps(
|
||||||
|
[{'name': '/b/b_2', 'hash': 'a', 'bytes': '1'},
|
||||||
|
{'name': '/c/c_3', 'hash': 'b', 'bytes': '2'}])
|
||||||
|
good_data = json.dumps(
|
||||||
|
[{'name': '/a/a_1', 'hash': 'a', 'bytes': '1'},
|
||||||
|
{'name': '/a/sub_nest', 'hash': 'a', 'sub_slo': True,
|
||||||
|
'bytes': len(nested_data)},
|
||||||
|
{'name': '/d/d_3', 'hash': 'b', 'bytes': '2'}])
|
||||||
|
self.req_method_paths.append((env['REQUEST_METHOD'],
|
||||||
|
env['PATH_INFO']))
|
||||||
|
if 'sub_nest' in env['PATH_INFO']:
|
||||||
|
return Response(status=200,
|
||||||
|
headers={'X-Static-Large-Object': 'True'},
|
||||||
|
body=nested_data)(env, start_response)
|
||||||
|
else:
|
||||||
|
return Response(status=200,
|
||||||
|
headers={'X-Static-Large-Object': 'True'},
|
||||||
|
body=good_data)(env, start_response)
|
||||||
|
|
||||||
if env['PATH_INFO'].startswith('/test_delete_bad_json/'):
|
if env['PATH_INFO'].startswith('/test_delete_bad_json/'):
|
||||||
self.req_method_paths.append((env['REQUEST_METHOD'],
|
self.req_method_paths.append((env['REQUEST_METHOD'],
|
||||||
env['PATH_INFO']))
|
env['PATH_INFO']))
|
||||||
@ -309,7 +329,7 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
|
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
|
||||||
{'path': '/c/a_2', 'etag': 'a', 'size_bytes': '1'},
|
{'path': '/c/a_2', 'etag': 'a', 'size_bytes': '1'},
|
||||||
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'},
|
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'},
|
||||||
{'path': '/d/slob', 'etag': 'b', 'size_bytes': '2'}])
|
{'path': '/d/slob', 'etag': 'a', 'size_bytes': '2'}])
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/test_good/A/c/man?multipart-manifest=put',
|
'/test_good/A/c/man?multipart-manifest=put',
|
||||||
environ={'REQUEST_METHOD': 'PUT'},
|
environ={'REQUEST_METHOD': 'PUT'},
|
||||||
@ -327,8 +347,7 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
self.assertEquals(errors[4][0], '/test_good/A/d/b_2')
|
self.assertEquals(errors[4][0], '/test_good/A/d/b_2')
|
||||||
self.assertEquals(errors[4][1], 'Etag Mismatch')
|
self.assertEquals(errors[4][1], 'Etag Mismatch')
|
||||||
self.assertEquals(errors[-1][0], '/test_good/A/d/slob')
|
self.assertEquals(errors[-1][0], '/test_good/A/d/slob')
|
||||||
self.assertEquals(errors[-1][1],
|
self.assertEquals(errors[-1][1], 'Etag Mismatch')
|
||||||
'Segments cannot be Large Objects')
|
|
||||||
else:
|
else:
|
||||||
self.assert_(False)
|
self.assert_(False)
|
||||||
|
|
||||||
@ -342,7 +361,8 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/test_delete_404/A/c/man?multipart-manifest=delete',
|
'/test_delete_404/A/c/man?multipart-manifest=delete',
|
||||||
environ={'REQUEST_METHOD': 'DELETE'})
|
environ={'REQUEST_METHOD': 'DELETE'})
|
||||||
self.slo(req.environ, fake_start_response)
|
app_iter = self.slo(req.environ, fake_start_response)
|
||||||
|
list(app_iter) # iterate through whole response
|
||||||
self.assertEquals(self.app.calls, 1)
|
self.assertEquals(self.app.calls, 1)
|
||||||
self.assertEquals(self.app.req_method_paths,
|
self.assertEquals(self.app.req_method_paths,
|
||||||
[('GET', '/test_delete_404/A/c/man')])
|
[('GET', '/test_delete_404/A/c/man')])
|
||||||
@ -360,25 +380,52 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
('DELETE', '/test_delete/A/d/b_2'),
|
('DELETE', '/test_delete/A/d/b_2'),
|
||||||
('DELETE', '/test_delete/A/c/man')])
|
('DELETE', '/test_delete/A/c/man')])
|
||||||
|
|
||||||
def test_handle_multipart_delete_bad_manifest(self):
|
def test_handle_multipart_delete_nested(self):
|
||||||
|
req = Request.blank(
|
||||||
|
'/test_delete_nested/A/c/man?multipart-manifest=delete',
|
||||||
|
environ={'REQUEST_METHOD': 'DELETE'})
|
||||||
|
app_iter = self.slo(req.environ, fake_start_response)
|
||||||
|
list(app_iter) # iterate through whole response
|
||||||
|
self.assertEquals(self.app.calls, 8)
|
||||||
|
self.assertEquals(
|
||||||
|
set(self.app.req_method_paths),
|
||||||
|
set([('GET', '/test_delete_nested/A/c/man'),
|
||||||
|
('GET', '/test_delete_nested/A/a/sub_nest'),
|
||||||
|
('DELETE', '/test_delete_nested/A/a/a_1'),
|
||||||
|
('DELETE', '/test_delete_nested/A/b/b_2'),
|
||||||
|
('DELETE', '/test_delete_nested/A/c/c_3'),
|
||||||
|
('DELETE', '/test_delete_nested/A/a/sub_nest'),
|
||||||
|
('DELETE', '/test_delete_nested/A/d/d_3'),
|
||||||
|
('DELETE', '/test_delete_nested/A/c/man')]))
|
||||||
|
|
||||||
|
def test_handle_multipart_delete_not_a_manifest(self):
|
||||||
|
# when trying to delete a SLO and its not an SLO, just go ahead
|
||||||
|
# and delete it
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/test_delete_bad_man/A/c/man?multipart-manifest=delete',
|
'/test_delete_bad_man/A/c/man?multipart-manifest=delete',
|
||||||
environ={'REQUEST_METHOD': 'DELETE'})
|
environ={'REQUEST_METHOD': 'DELETE',
|
||||||
resp = self.slo(req.environ, fake_start_response)
|
'HTTP_ACCEPT': 'application/json'})
|
||||||
self.assertEquals(self.app.calls, 1)
|
app_iter = self.slo(req.environ, fake_start_response)
|
||||||
|
app_iter = list(app_iter) # iterate through whole response
|
||||||
|
resp_data = json.loads(app_iter[0])
|
||||||
|
self.assertEquals(self.app.calls, 2)
|
||||||
self.assertEquals(self.app.req_method_paths,
|
self.assertEquals(self.app.req_method_paths,
|
||||||
[('GET', '/test_delete_bad_man/A/c/man')])
|
[('GET', '/test_delete_bad_man/A/c/man'),
|
||||||
self.assertEquals(resp, ['Not an SLO manifest'])
|
('DELETE', '/test_delete_bad_man/A/c/man')])
|
||||||
|
self.assertEquals(resp_data['Response Status'], '200 OK')
|
||||||
|
|
||||||
def test_handle_multipart_delete_bad_json(self):
|
def test_handle_multipart_delete_bad_json(self):
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/test_delete_bad_json/A/c/man?multipart-manifest=delete',
|
'/test_delete_bad_json/A/c/man?multipart-manifest=delete',
|
||||||
environ={'REQUEST_METHOD': 'DELETE'})
|
environ={'REQUEST_METHOD': 'DELETE',
|
||||||
resp = self.slo(req.environ, fake_start_response)
|
'HTTP_ACCEPT': 'application/json'})
|
||||||
|
app_iter = self.slo(req.environ, fake_start_response)
|
||||||
|
app_iter = list(app_iter) # iterate through whole response
|
||||||
|
resp_data = json.loads(app_iter[0])
|
||||||
self.assertEquals(self.app.calls, 1)
|
self.assertEquals(self.app.calls, 1)
|
||||||
self.assertEquals(self.app.req_method_paths,
|
self.assertEquals(self.app.req_method_paths,
|
||||||
[('GET', '/test_delete_bad_json/A/c/man')])
|
[('GET', '/test_delete_bad_json/A/c/man')])
|
||||||
self.assertEquals(resp, ['Invalid manifest file'])
|
self.assertEquals(resp_data["Response Status"], "500 Internal Error")
|
||||||
|
|
||||||
def test_handle_multipart_delete_whole_bad(self):
|
def test_handle_multipart_delete_whole_bad(self):
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
|
@ -41,7 +41,7 @@ from swift.account import server as account_server
|
|||||||
from swift.container import server as container_server
|
from swift.container import server as container_server
|
||||||
from swift.obj import server as object_server
|
from swift.obj import server as object_server
|
||||||
from swift.common import ring
|
from swift.common import ring
|
||||||
from swift.common.exceptions import ChunkReadTimeout, SloSegmentError
|
from swift.common.exceptions import ChunkReadTimeout, SegmentError
|
||||||
from swift.common.constraints import MAX_META_NAME_LENGTH, \
|
from swift.common.constraints import MAX_META_NAME_LENGTH, \
|
||||||
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
|
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
|
||||||
MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
|
MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
|
||||||
@ -1370,7 +1370,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
self.assertEqual(resp.content_length, 4) # content incomplete
|
self.assertEqual(resp.content_length, 4) # content incomplete
|
||||||
self.assertEqual(resp.content_type, 'text/html')
|
self.assertEqual(resp.content_type, 'text/html')
|
||||||
self.assertRaises(SloSegmentError, lambda: resp.body)
|
self.assertRaises(SegmentError, lambda: resp.body)
|
||||||
# dropped connection, exception is caught by eventlet as it is
|
# dropped connection, exception is caught by eventlet as it is
|
||||||
# iterating over response
|
# iterating over response
|
||||||
|
|
||||||
@ -1388,10 +1388,26 @@ class TestObjectController(unittest.TestCase):
|
|||||||
"bytes": 2,
|
"bytes": 2,
|
||||||
"name": "/d1/seg01",
|
"name": "/d1/seg01",
|
||||||
"content_type": "application/octet-stream"},
|
"content_type": "application/octet-stream"},
|
||||||
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
|
{"hash": "8681fb3ada2715c8754706ee5f23d4f8",
|
||||||
|
"last_modified": "2012-11-08T04:05:37.846710",
|
||||||
|
"bytes": 4,
|
||||||
|
"name": "/d2/sub_manifest",
|
||||||
|
"content_type": "application/octet-stream"},
|
||||||
|
{"hash": "419af6d362a14b7a789ba1c7e772bbae",
|
||||||
|
"last_modified": "2012-11-08T04:05:37.866820",
|
||||||
|
"bytes": 2,
|
||||||
|
"name": "/d1/seg04",
|
||||||
|
"content_type": "application/octet-stream"}]
|
||||||
|
|
||||||
|
sub_listing = [{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
|
||||||
|
"last_modified": "2012-11-08T04:05:37.866820",
|
||||||
|
"bytes": 2,
|
||||||
|
"name": "/d1/seg02",
|
||||||
|
"content_type": "application/octet-stream"},
|
||||||
|
{"hash": "e4c8f1de1c0855c7c2be33196d3c3537",
|
||||||
"last_modified": "2012-11-08T04:05:37.846710",
|
"last_modified": "2012-11-08T04:05:37.846710",
|
||||||
"bytes": 2,
|
"bytes": 2,
|
||||||
"name": "/d2/seg02",
|
"name": "/d2/seg03",
|
||||||
"content_type": "application/octet-stream"}]
|
"content_type": "application/octet-stream"}]
|
||||||
|
|
||||||
response_bodies = (
|
response_bodies = (
|
||||||
@ -1399,7 +1415,10 @@ class TestObjectController(unittest.TestCase):
|
|||||||
'', # HEAD /a/c
|
'', # HEAD /a/c
|
||||||
simplejson.dumps(listing), # GET manifest
|
simplejson.dumps(listing), # GET manifest
|
||||||
'Aa', # GET seg01
|
'Aa', # GET seg01
|
||||||
'Bb') # GET seg02
|
simplejson.dumps(sub_listing), # GET sub_manifest
|
||||||
|
'Bb', # GET seg02
|
||||||
|
'Cc', # GET seg03
|
||||||
|
'Dd') # GET seg04
|
||||||
with save_globals():
|
with save_globals():
|
||||||
controller = proxy_server.ObjectController(
|
controller = proxy_server.ObjectController(
|
||||||
self.app, 'a', 'c', 'manifest')
|
self.app, 'a', 'c', 'manifest')
|
||||||
@ -1419,26 +1438,37 @@ class TestObjectController(unittest.TestCase):
|
|||||||
200, # HEAD /a/c
|
200, # HEAD /a/c
|
||||||
200, # GET listing1
|
200, # GET listing1
|
||||||
200, # GET seg01
|
200, # GET seg01
|
||||||
|
200, # GET sub listing1
|
||||||
200, # GET seg02
|
200, # GET seg02
|
||||||
headers=[{}, {}, slob_headers, {}, slob_headers],
|
200, # GET seg03
|
||||||
|
200, # GET seg04
|
||||||
|
headers=[{}, {}, slob_headers, {}, slob_headers, {}, {}, {}],
|
||||||
body_iter=response_bodies,
|
body_iter=response_bodies,
|
||||||
give_connect=capture_requested_paths)
|
give_connect=capture_requested_paths)
|
||||||
req = Request.blank('/a/c/manifest')
|
req = Request.blank('/a/c/manifest')
|
||||||
resp = controller.GET(req)
|
resp = controller.GET(req)
|
||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
self.assertEqual(resp.content_length, 4) # content incomplete
|
self.assertEqual(resp.content_length, 8)
|
||||||
self.assertEqual(resp.content_type, 'text/html')
|
self.assertEqual(resp.content_type, 'text/html')
|
||||||
self.assertRaises(SloSegmentError, lambda: resp.body)
|
|
||||||
# dropped connection, exception is caught by eventlet as it is
|
|
||||||
# iterating over response
|
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
requested,
|
||||||
|
[['HEAD', '/a', {}],
|
||||||
|
['HEAD', '/a/c', {}],
|
||||||
|
['GET', '/a/c/manifest', {}]])
|
||||||
|
# iterating over body will retrieve manifest and sub manifest's
|
||||||
|
# objects
|
||||||
|
self.assertEqual(resp.body, 'AaBbCcDd')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
requested,
|
requested,
|
||||||
[['HEAD', '/a', {}],
|
[['HEAD', '/a', {}],
|
||||||
['HEAD', '/a/c', {}],
|
['HEAD', '/a/c', {}],
|
||||||
['GET', '/a/c/manifest', {}],
|
['GET', '/a/c/manifest', {}],
|
||||||
['GET', '/a/d1/seg01', {}],
|
['GET', '/a/d1/seg01', {}],
|
||||||
['GET', '/a/d2/seg02', {}]])
|
['GET', '/a/d2/sub_manifest', {}],
|
||||||
|
['GET', '/a/d1/seg02', {}],
|
||||||
|
['GET', '/a/d2/seg03', {}],
|
||||||
|
['GET', '/a/d1/seg04', {}]])
|
||||||
|
|
||||||
def test_GET_bad_404_manifest_slo(self):
|
def test_GET_bad_404_manifest_slo(self):
|
||||||
listing = [{"hash": "98568d540134639be4655198a36614a4",
|
listing = [{"hash": "98568d540134639be4655198a36614a4",
|
||||||
@ -1490,7 +1520,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
self.assertEqual(resp.content_length, 6) # content incomplete
|
self.assertEqual(resp.content_length, 6) # content incomplete
|
||||||
self.assertEqual(resp.content_type, 'text/html')
|
self.assertEqual(resp.content_type, 'text/html')
|
||||||
self.assertRaises(SloSegmentError, lambda: resp.body)
|
self.assertRaises(SegmentError, lambda: resp.body)
|
||||||
# dropped connection, exception is caught by eventlet as it is
|
# dropped connection, exception is caught by eventlet as it is
|
||||||
# iterating over response
|
# iterating over response
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user