From 20d1ee6757be06728941b6672371a3501163cf53 Mon Sep 17 00:00:00 2001 From: gholt Date: Mon, 13 Dec 2010 14:14:26 -0800 Subject: [PATCH] Now supports infinite objects! --- swift/proxy/server.py | 151 +++++++++++++++++++++++---------- test/unit/proxy/test_server.py | 31 ++++++- 2 files changed, 135 insertions(+), 47 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 32b0543e1e..ff6e5e310f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -41,8 +41,8 @@ from swift.common.utils import get_logger, normalize_timestamp, split_path, \ cache_from_env from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ - check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \ - MAX_FILE_SIZE + check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ + MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout @@ -112,8 +112,9 @@ class SegmentedIterable(object): :param controller: The ObjectController instance to work with. :param container: The container the object segments are within. - :param listing: The listing of object segments to iterate over; this is a - standard JSON decoded container listing. + :param listing: The listing of object segments to iterate over; this may + be an iterator or list that returns dicts with 'name' and + 'bytes' keys. :param response: The webob.Response this iterable is associated with, if any (default: None) """ @@ -121,8 +122,10 @@ class SegmentedIterable(object): def __init__(self, controller, container, listing, response=None): self.controller = controller self.container = container - self.listing = listing + self.listing = iter(listing) self.segment = -1 + self.segment_dict = None + self.segment_peek = None self.seek = 0 self.segment_iter = None self.position = 0 @@ -138,13 +141,13 @@ class SegmentedIterable(object): """ try: self.segment += 1 - if self.segment >= len(self.listing): - raise StopIteration() - obj = self.listing[self.segment] + self.segment_dict = self.segment_peek or self.listing.next() + self.segment_peek = None partition, nodes = self.controller.app.object_ring.get_nodes( - self.controller.account_name, self.container, obj['name']) + self.controller.account_name, self.container, + self.segment_dict['name']) path = '/%s/%s/%s' % (self.controller.account_name, self.container, - obj['name']) + self.segment_dict['name']) req = Request.blank(path) if self.seek: req.range = 'bytes=%s-' % self.seek @@ -209,14 +212,11 @@ class SegmentedIterable(object): """ try: if start: - if len(self.listing) <= self.segment + 1: - return - while start >= self.position + \ - self.listing[self.segment + 1]['bytes']: + self.segment_peek = self.listing.next() + while start >= self.position + self.segment_peek['bytes']: self.segment += 1 - if len(self.listing) <= self.segment + 1: - return - self.position += self.listing[self.segment]['bytes'] + self.position += self.segment_peek['bytes'] + self.segment_peek = self.listing.next() self.seek = start - self.position else: start = 0 @@ -707,38 +707,101 @@ class ObjectController(Controller): return resp resp = resp2 req.range = req_range + if 'x-object-manifest' in resp.headers: lcontainer, lprefix = \ resp.headers['x-object-manifest'].split('/', 1) lpartition, lnodes = self.app.container_ring.get_nodes( self.account_name, lcontainer) - lreq = Request.blank('/%s/%s?prefix=%s&format=json' % - (self.account_name, lcontainer, lprefix)) - lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, lnodes, - lreq.path_info, self.app.container_ring.replica_count) - if lresp.status_int // 100 != 2: - lresp = HTTPNotFound(request=req) - lresp.headers['X-Object-Manifest'] = \ - resp.headers['x-object-manifest'] - return lresp - if 'swift.authorize' in req.environ: - req.acl = lresp.headers.get('x-container-read') - aresp = req.environ['swift.authorize'](req) - if aresp: - return aresp - listing = json.loads(lresp.body) - content_length = sum(o['bytes'] for o in listing) - etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() - headers = {'X-Object-Manifest': resp.headers['x-object-manifest'], - 'Content-Type': resp.content_type, 'Content-Length': - content_length, 'ETag': etag} - for key, value in resp.headers.iteritems(): - if key.lower().startswith('x-object-meta-'): - headers[key] = value - resp = Response(headers=headers, request=req, - conditional_response=True) - resp.app_iter = SegmentedIterable(self, lcontainer, listing, resp) - resp.content_length = content_length + marker = '' + listing = [] + while True: + lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' % + (quote(self.account_name), quote(lcontainer), + quote(lprefix), quote(marker))) + lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, + lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + lresp = HTTPNotFound(request=req) + lresp.headers['X-Object-Manifest'] = \ + resp.headers['x-object-manifest'] + return lresp + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + return aresp + sublisting = json.loads(lresp.body) + if not sublisting: + break + listing.extend(sublisting) + if len(listing) > CONTAINER_LISTING_LIMIT: + break + marker = sublisting[-1]['name'] + + if len(listing) > CONTAINER_LISTING_LIMIT: + # We will serve large objects with a ton of segments with + # chunked transfer encoding. + + def listing_iter(): + marker = '' + while True: + lreq = Request.blank( + '/%s/%s?prefix=%s&format=json&marker=%s' % + (quote(self.account_name), quote(lcontainer), + quote(lprefix), quote(marker))) + lresp = self.GETorHEAD_base(lreq, 'Container', + lpartition, lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + raise Exception('Object manifest GET could not ' + 'continue listing: %s %s' % + (req.path, lreq.path)) + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + raise Exception('Object manifest GET could ' + 'not continue listing: %s %s' % + (req.path, aresp)) + sublisting = json.loads(lresp.body) + if not sublisting: + break + for obj in sublisting: + yield obj + marker = sublisting[-1]['name'] + + headers = { + 'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value + resp = Response(headers=headers, request=req, + conditional_response=True) + resp.app_iter = SegmentedIterable(self, lcontainer, + listing_iter(), resp) + + else: + # For objects with a reasonable number of segments, we'll serve + # them with a set content-length and computed etag. + content_length = sum(o['bytes'] for o in listing) + etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() + headers = { + 'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type, + 'Content-Length': content_length, + 'ETag': etag} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value + resp = Response(headers=headers, request=req, + conditional_response=True) + resp.app_iter = SegmentedIterable(self, lcontainer, listing, + resp) + resp.content_length = content_length + return resp @public diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index d60f4c8590..b7d43c0fb2 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1600,6 +1600,7 @@ class TestObjectController(unittest.TestCase): mkdirs(os.path.join(testdir, 'sdb1')) mkdirs(os.path.join(testdir, 'sdb1', 'tmp')) try: + orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT conf = {'devices': testdir, 'swift_dir': testdir, 'mount_check': 'false'} prolis = listen(('localhost', 0)) @@ -1976,6 +1977,24 @@ class TestObjectController(unittest.TestCase): self.assert_('Content-Type: text/jibberish' in headers) body = fd.read() self.assertEquals(body, '1234 1234 1234 1234 1234 ') + # Do it again but exceeding the container listing limit + proxy_server.CONTAINER_LISTING_LIMIT = 2 + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('X-Object-Manifest: segmented/name/' in headers) + self.assert_('Content-Type: text/jibberish' in headers) + body = fd.read() + # A bit fragile of a test; as it makes the assumption that all + # will be sent in a single chunk. + self.assertEquals(body, + '19\r\n1234 1234 1234 1234 1234 \r\n0\r\n\r\n') finally: prospa.kill() acc1spa.kill() @@ -1985,6 +2004,7 @@ class TestObjectController(unittest.TestCase): obj1spa.kill() obj2spa.kill() finally: + proxy_server.CONTAINER_LISTING_LIMIT = orig_container_listing_limit rmtree(testdir) def test_mismatched_etags(self): @@ -2995,9 +3015,10 @@ class TestSegmentedIterable(unittest.TestCase): self.controller = FakeObjectController() def test_load_next_segment_unexpected_error(self): + # Iterator value isn't a dict self.assertRaises(Exception, proxy_server.SegmentedIterable(self.controller, None, - None)._load_next_segment) + [None])._load_next_segment) self.assertEquals(self.controller.exception_args[0], 'ERROR: While processing manifest /a/c/o tx1') @@ -3030,6 +3051,7 @@ class TestSegmentedIterable(unittest.TestCase): segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.segment = 0 + segit.listing.next() segit._load_next_segment() self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') data = ''.join(segit.segment_iter) @@ -3039,6 +3061,7 @@ class TestSegmentedIterable(unittest.TestCase): segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.segment = 0 + segit.listing.next() segit.seek = 1 segit._load_next_segment() self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') @@ -3062,8 +3085,9 @@ class TestSegmentedIterable(unittest.TestCase): 'Could not load object segment /a/lc/o1: 404') def test_iter_unexpected_error(self): + # Iterator value isn't a dict self.assertRaises(Exception, ''.join, - proxy_server.SegmentedIterable(self.controller, None, None)) + proxy_server.SegmentedIterable(self.controller, None, [None])) self.assertEquals(self.controller.exception_args[0], 'ERROR: While processing manifest /a/c/o tx1') @@ -3100,9 +3124,10 @@ class TestSegmentedIterable(unittest.TestCase): 'Could not load object segment /a/lc/o1: 404') def test_app_iter_range_unexpected_error(self): + # Iterator value isn't a dict self.assertRaises(Exception, proxy_server.SegmentedIterable(self.controller, None, - None).app_iter_range(None, None).next) + [None]).app_iter_range(None, None).next) self.assertEquals(self.controller.exception_args[0], 'ERROR: While processing manifest /a/c/o tx1')