Now supports infinite objects!
This commit is contained in:
parent
d464757d71
commit
20d1ee6757
@ -41,8 +41,8 @@ from swift.common.utils import get_logger, normalize_timestamp, split_path, \
|
||||
cache_from_env
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
|
||||
MAX_FILE_SIZE
|
||||
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
|
||||
MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
|
||||
from swift.common.exceptions import ChunkReadTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout
|
||||
|
||||
@ -112,8 +112,9 @@ class SegmentedIterable(object):
|
||||
|
||||
:param controller: The ObjectController instance to work with.
|
||||
:param container: The container the object segments are within.
|
||||
:param listing: The listing of object segments to iterate over; this is a
|
||||
standard JSON decoded container listing.
|
||||
:param listing: The listing of object segments to iterate over; this may
|
||||
be an iterator or list that returns dicts with 'name' and
|
||||
'bytes' keys.
|
||||
:param response: The webob.Response this iterable is associated with, if
|
||||
any (default: None)
|
||||
"""
|
||||
@ -121,8 +122,10 @@ class SegmentedIterable(object):
|
||||
def __init__(self, controller, container, listing, response=None):
|
||||
self.controller = controller
|
||||
self.container = container
|
||||
self.listing = listing
|
||||
self.listing = iter(listing)
|
||||
self.segment = -1
|
||||
self.segment_dict = None
|
||||
self.segment_peek = None
|
||||
self.seek = 0
|
||||
self.segment_iter = None
|
||||
self.position = 0
|
||||
@ -138,13 +141,13 @@ class SegmentedIterable(object):
|
||||
"""
|
||||
try:
|
||||
self.segment += 1
|
||||
if self.segment >= len(self.listing):
|
||||
raise StopIteration()
|
||||
obj = self.listing[self.segment]
|
||||
self.segment_dict = self.segment_peek or self.listing.next()
|
||||
self.segment_peek = None
|
||||
partition, nodes = self.controller.app.object_ring.get_nodes(
|
||||
self.controller.account_name, self.container, obj['name'])
|
||||
self.controller.account_name, self.container,
|
||||
self.segment_dict['name'])
|
||||
path = '/%s/%s/%s' % (self.controller.account_name, self.container,
|
||||
obj['name'])
|
||||
self.segment_dict['name'])
|
||||
req = Request.blank(path)
|
||||
if self.seek:
|
||||
req.range = 'bytes=%s-' % self.seek
|
||||
@ -209,14 +212,11 @@ class SegmentedIterable(object):
|
||||
"""
|
||||
try:
|
||||
if start:
|
||||
if len(self.listing) <= self.segment + 1:
|
||||
return
|
||||
while start >= self.position + \
|
||||
self.listing[self.segment + 1]['bytes']:
|
||||
self.segment_peek = self.listing.next()
|
||||
while start >= self.position + self.segment_peek['bytes']:
|
||||
self.segment += 1
|
||||
if len(self.listing) <= self.segment + 1:
|
||||
return
|
||||
self.position += self.listing[self.segment]['bytes']
|
||||
self.position += self.segment_peek['bytes']
|
||||
self.segment_peek = self.listing.next()
|
||||
self.seek = start - self.position
|
||||
else:
|
||||
start = 0
|
||||
@ -707,38 +707,101 @@ class ObjectController(Controller):
|
||||
return resp
|
||||
resp = resp2
|
||||
req.range = req_range
|
||||
|
||||
if 'x-object-manifest' in resp.headers:
|
||||
lcontainer, lprefix = \
|
||||
resp.headers['x-object-manifest'].split('/', 1)
|
||||
lpartition, lnodes = self.app.container_ring.get_nodes(
|
||||
self.account_name, lcontainer)
|
||||
lreq = Request.blank('/%s/%s?prefix=%s&format=json' %
|
||||
(self.account_name, lcontainer, lprefix))
|
||||
lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, lnodes,
|
||||
lreq.path_info, self.app.container_ring.replica_count)
|
||||
if lresp.status_int // 100 != 2:
|
||||
lresp = HTTPNotFound(request=req)
|
||||
lresp.headers['X-Object-Manifest'] = \
|
||||
resp.headers['x-object-manifest']
|
||||
return lresp
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = lresp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
listing = json.loads(lresp.body)
|
||||
content_length = sum(o['bytes'] for o in listing)
|
||||
etag = md5('"'.join(o['hash'] for o in listing)).hexdigest()
|
||||
headers = {'X-Object-Manifest': resp.headers['x-object-manifest'],
|
||||
'Content-Type': resp.content_type, 'Content-Length':
|
||||
content_length, 'ETag': etag}
|
||||
for key, value in resp.headers.iteritems():
|
||||
if key.lower().startswith('x-object-meta-'):
|
||||
headers[key] = value
|
||||
resp = Response(headers=headers, request=req,
|
||||
conditional_response=True)
|
||||
resp.app_iter = SegmentedIterable(self, lcontainer, listing, resp)
|
||||
resp.content_length = content_length
|
||||
marker = ''
|
||||
listing = []
|
||||
while True:
|
||||
lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' %
|
||||
(quote(self.account_name), quote(lcontainer),
|
||||
quote(lprefix), quote(marker)))
|
||||
lresp = self.GETorHEAD_base(lreq, 'Container', lpartition,
|
||||
lnodes, lreq.path_info,
|
||||
self.app.container_ring.replica_count)
|
||||
if lresp.status_int // 100 != 2:
|
||||
lresp = HTTPNotFound(request=req)
|
||||
lresp.headers['X-Object-Manifest'] = \
|
||||
resp.headers['x-object-manifest']
|
||||
return lresp
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = lresp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
listing.extend(sublisting)
|
||||
if len(listing) > CONTAINER_LISTING_LIMIT:
|
||||
break
|
||||
marker = sublisting[-1]['name']
|
||||
|
||||
if len(listing) > CONTAINER_LISTING_LIMIT:
|
||||
# We will serve large objects with a ton of segments with
|
||||
# chunked transfer encoding.
|
||||
|
||||
def listing_iter():
|
||||
marker = ''
|
||||
while True:
|
||||
lreq = Request.blank(
|
||||
'/%s/%s?prefix=%s&format=json&marker=%s' %
|
||||
(quote(self.account_name), quote(lcontainer),
|
||||
quote(lprefix), quote(marker)))
|
||||
lresp = self.GETorHEAD_base(lreq, 'Container',
|
||||
lpartition, lnodes, lreq.path_info,
|
||||
self.app.container_ring.replica_count)
|
||||
if lresp.status_int // 100 != 2:
|
||||
raise Exception('Object manifest GET could not '
|
||||
'continue listing: %s %s' %
|
||||
(req.path, lreq.path))
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = lresp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
raise Exception('Object manifest GET could '
|
||||
'not continue listing: %s %s' %
|
||||
(req.path, aresp))
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
for obj in sublisting:
|
||||
yield obj
|
||||
marker = sublisting[-1]['name']
|
||||
|
||||
headers = {
|
||||
'X-Object-Manifest': resp.headers['x-object-manifest'],
|
||||
'Content-Type': resp.content_type}
|
||||
for key, value in resp.headers.iteritems():
|
||||
if key.lower().startswith('x-object-meta-'):
|
||||
headers[key] = value
|
||||
resp = Response(headers=headers, request=req,
|
||||
conditional_response=True)
|
||||
resp.app_iter = SegmentedIterable(self, lcontainer,
|
||||
listing_iter(), resp)
|
||||
|
||||
else:
|
||||
# For objects with a reasonable number of segments, we'll serve
|
||||
# them with a set content-length and computed etag.
|
||||
content_length = sum(o['bytes'] for o in listing)
|
||||
etag = md5('"'.join(o['hash'] for o in listing)).hexdigest()
|
||||
headers = {
|
||||
'X-Object-Manifest': resp.headers['x-object-manifest'],
|
||||
'Content-Type': resp.content_type,
|
||||
'Content-Length': content_length,
|
||||
'ETag': etag}
|
||||
for key, value in resp.headers.iteritems():
|
||||
if key.lower().startswith('x-object-meta-'):
|
||||
headers[key] = value
|
||||
resp = Response(headers=headers, request=req,
|
||||
conditional_response=True)
|
||||
resp.app_iter = SegmentedIterable(self, lcontainer, listing,
|
||||
resp)
|
||||
resp.content_length = content_length
|
||||
|
||||
return resp
|
||||
|
||||
@public
|
||||
|
@ -1600,6 +1600,7 @@ class TestObjectController(unittest.TestCase):
|
||||
mkdirs(os.path.join(testdir, 'sdb1'))
|
||||
mkdirs(os.path.join(testdir, 'sdb1', 'tmp'))
|
||||
try:
|
||||
orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT
|
||||
conf = {'devices': testdir, 'swift_dir': testdir,
|
||||
'mount_check': 'false'}
|
||||
prolis = listen(('localhost', 0))
|
||||
@ -1976,6 +1977,24 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assert_('Content-Type: text/jibberish' in headers)
|
||||
body = fd.read()
|
||||
self.assertEquals(body, '1234 1234 1234 1234 1234 ')
|
||||
# Do it again but exceeding the container listing limit
|
||||
proxy_server.CONTAINER_LISTING_LIMIT = 2
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: '
|
||||
'localhost\r\nConnection: close\r\nX-Auth-Token: '
|
||||
't\r\n\r\n')
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 200'
|
||||
self.assertEquals(headers[:len(exp)], exp)
|
||||
self.assert_('X-Object-Manifest: segmented/name/' in headers)
|
||||
self.assert_('Content-Type: text/jibberish' in headers)
|
||||
body = fd.read()
|
||||
# A bit fragile of a test; as it makes the assumption that all
|
||||
# will be sent in a single chunk.
|
||||
self.assertEquals(body,
|
||||
'19\r\n1234 1234 1234 1234 1234 \r\n0\r\n\r\n')
|
||||
finally:
|
||||
prospa.kill()
|
||||
acc1spa.kill()
|
||||
@ -1985,6 +2004,7 @@ class TestObjectController(unittest.TestCase):
|
||||
obj1spa.kill()
|
||||
obj2spa.kill()
|
||||
finally:
|
||||
proxy_server.CONTAINER_LISTING_LIMIT = orig_container_listing_limit
|
||||
rmtree(testdir)
|
||||
|
||||
def test_mismatched_etags(self):
|
||||
@ -2995,9 +3015,10 @@ class TestSegmentedIterable(unittest.TestCase):
|
||||
self.controller = FakeObjectController()
|
||||
|
||||
def test_load_next_segment_unexpected_error(self):
|
||||
# Iterator value isn't a dict
|
||||
self.assertRaises(Exception,
|
||||
proxy_server.SegmentedIterable(self.controller, None,
|
||||
None)._load_next_segment)
|
||||
[None])._load_next_segment)
|
||||
self.assertEquals(self.controller.exception_args[0],
|
||||
'ERROR: While processing manifest /a/c/o tx1')
|
||||
|
||||
@ -3030,6 +3051,7 @@ class TestSegmentedIterable(unittest.TestCase):
|
||||
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
|
||||
'o1'}, {'name': 'o2'}])
|
||||
segit.segment = 0
|
||||
segit.listing.next()
|
||||
segit._load_next_segment()
|
||||
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
|
||||
data = ''.join(segit.segment_iter)
|
||||
@ -3039,6 +3061,7 @@ class TestSegmentedIterable(unittest.TestCase):
|
||||
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
|
||||
'o1'}, {'name': 'o2'}])
|
||||
segit.segment = 0
|
||||
segit.listing.next()
|
||||
segit.seek = 1
|
||||
segit._load_next_segment()
|
||||
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
|
||||
@ -3062,8 +3085,9 @@ class TestSegmentedIterable(unittest.TestCase):
|
||||
'Could not load object segment /a/lc/o1: 404')
|
||||
|
||||
def test_iter_unexpected_error(self):
|
||||
# Iterator value isn't a dict
|
||||
self.assertRaises(Exception, ''.join,
|
||||
proxy_server.SegmentedIterable(self.controller, None, None))
|
||||
proxy_server.SegmentedIterable(self.controller, None, [None]))
|
||||
self.assertEquals(self.controller.exception_args[0],
|
||||
'ERROR: While processing manifest /a/c/o tx1')
|
||||
|
||||
@ -3100,9 +3124,10 @@ class TestSegmentedIterable(unittest.TestCase):
|
||||
'Could not load object segment /a/lc/o1: 404')
|
||||
|
||||
def test_app_iter_range_unexpected_error(self):
|
||||
# Iterator value isn't a dict
|
||||
self.assertRaises(Exception,
|
||||
proxy_server.SegmentedIterable(self.controller, None,
|
||||
None).app_iter_range(None, None).next)
|
||||
[None]).app_iter_range(None, None).next)
|
||||
self.assertEquals(self.controller.exception_args[0],
|
||||
'ERROR: While processing manifest /a/c/o tx1')
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user