From fa3c871f0b1544f859bacf38497580afd69ced0b Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 16 Nov 2010 15:35:39 -0800 Subject: [PATCH 01/18] Server-side implementation for segmented objects --- swift/common/constraints.py | 11 + swift/obj/server.py | 9 +- swift/proxy/server.py | 177 ++++++++++++- test/functionalnosetests/test_object.py | 279 ++++++++++++++++++++ test/unit/common/test_constraints.py | 27 ++ test/unit/obj/test_server.py | 104 ++++++-- test/unit/proxy/test_server.py | 335 +++++++++++++++++++++++- 7 files changed, 915 insertions(+), 27 deletions(-) diff --git a/swift/common/constraints.py b/swift/common/constraints.py index c3f4f1621d..d91c136504 100644 --- a/swift/common/constraints.py +++ b/swift/common/constraints.py @@ -113,6 +113,17 @@ def check_object_creation(req, object_name): if not check_utf8(req.headers['Content-Type']): return HTTPBadRequest(request=req, body='Invalid Content-Type', content_type='text/plain') + if 'x-object-manifest' in req.headers: + value = req.headers['x-object-manifest'] + container = prefix = None + try: + container, prefix = value.split('/', 1) + except ValueError: + pass + if not container or not prefix or '?' in value or '&' in value or \ + prefix[0] == '/': + return HTTPBadRequest(request=req, + body='X-Object-Manifest must in the format container/prefix') return check_metadata(req, 'object') diff --git a/swift/obj/server.py b/swift/obj/server.py index 632a0c04cc..cf90bb7971 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -391,6 +391,9 @@ class ObjectController(object): 'ETag': etag, 'Content-Length': str(os.fstat(fd).st_size), } + if 'x-object-manifest' in request.headers: + metadata['X-Object-Manifest'] = \ + request.headers['x-object-manifest'] metadata.update(val for val in request.headers.iteritems() if val[0].lower().startswith('x-object-meta-') and len(val[0]) > 14) @@ -460,7 +463,8 @@ class ObjectController(object): 'application/octet-stream'), app_iter=file, request=request, conditional_response=True) for key, value in file.metadata.iteritems(): - if key.lower().startswith('x-object-meta-'): + if key == 'X-Object-Manifest' or \ + key.lower().startswith('x-object-meta-'): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) @@ -488,7 +492,8 @@ class ObjectController(object): response = Response(content_type=file.metadata['Content-Type'], request=request, conditional_response=True) for key, value in file.metadata.iteritems(): - if key.lower().startswith('x-object-meta-'): + if key == 'X-Object-Manifest' or \ + key.lower().startswith('x-object-meta-'): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index bacea4db9f..4265754390 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -14,6 +14,10 @@ # limitations under the License. from __future__ import with_statement +try: + import simplejson as json +except ImportError: + import json import mimetypes import os import time @@ -22,6 +26,7 @@ from ConfigParser import ConfigParser from urllib import unquote, quote import uuid import functools +from hashlib import md5 from eventlet.timeout import Timeout from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ @@ -94,6 +99,138 @@ def get_container_memcache_key(account, container): return 'container%s' % path +class SegmentedIterable(object): + """ + Iterable that returns the object contents for a segmented object in Swift. + + In addition to these params, you can also set the `response` attr just + after creating the SegmentedIterable and it will update the response's + `bytes_transferred` value (used to log the size of the request). + + :param controller: The ObjectController instance to work with. + :param container: The container the object segments are within. + :param listing: The listing of object segments to iterate over; this is a + standard JSON decoded container listing. + """ + + def __init__(self, controller, container, listing): + self.controller = controller + self.container = container + self.listing = listing + self.segment = -1 + self.seek = 0 + self.segment_iter = None + self.position = 0 + self.response = None + + def _load_next_segment(self): + """ + Loads the self.segment_iter with the next object segment's contents. + + :raises: StopIteration when there are no more object segments. + """ + try: + self.segment += 1 + if self.segment >= len(self.listing): + raise StopIteration() + obj = self.listing[self.segment] + partition, nodes = self.controller.app.object_ring.get_nodes( + self.controller.account_name, self.container, obj['name']) + path = '/%s/%s/%s' % (self.controller.account_name, self.container, + obj['name']) + req = Request.blank(path) + if self.seek: + req.range = 'bytes=%s-' % self.seek + self.seek = 0 + resp = self.controller.GETorHEAD_base(req, 'Object', partition, + self.controller.iter_nodes(partition, nodes, + self.controller.app.object_ring), path, + self.controller.app.object_ring.replica_count) + if resp.status_int // 100 != 2: + raise Exception('Could not load object segment %s: %s' % (path, + resp.status_int)) + self.segment_iter = resp.app_iter + except Exception, err: + if not isinstance(err, StopIteration): + self.controller.app.logger.exception('ERROR: While processing ' + 'manifest /%s/%s/%s %s' % (self.controller.account_name, + self.controller.container_name, + self.controller.object_name, self.controller.trans_id)) + raise + + def __iter__(self): + """ Standard iterator function that returns the object's contents. """ + try: + while True: + if not self.segment_iter: + self._load_next_segment() + while True: + with ChunkReadTimeout(self.controller.app.node_timeout): + try: + chunk = self.segment_iter.next() + break + except StopIteration: + self._load_next_segment() + self.position += len(chunk) + if self.response: + self.response.bytes_transferred = getattr(self.response, + 'bytes_transferred', 0) + len(chunk) + yield chunk + except Exception, err: + if not isinstance(err, StopIteration): + self.controller.app.logger.exception('ERROR: While processing ' + 'manifest /%s/%s/%s %s' % (self.controller.account_name, + self.controller.container_name, + self.controller.object_name, self.controller.trans_id)) + raise + + def app_iter_range(self, start, stop): + """ + Non-standard iterator function for use with Webob in serving Range + requests more quickly. This will skip over segments and do a range + request on the first segment to return data from, if needed. + + :param start: The first byte (zero-based) to return. None for 0. + :param stop: The last byte (zero-based) to return. None for end. + """ + try: + if start: + if len(self.listing) <= self.segment + 1: + return + while start >= self.position + \ + self.listing[self.segment + 1]['bytes']: + self.segment += 1 + if len(self.listing) <= self.segment + 1: + return + self.position += self.listing[self.segment]['bytes'] + self.seek = start - self.position + else: + start = 0 + if stop is not None: + length = stop - start + else: + length = None + for chunk in self: + if length is not None: + length -= len(chunk) + if length < 0: + # Chop off the extra: + if self.response: + self.response.bytes_transferred = \ + getattr(self.response, 'bytes_transferred', 0) \ + + length + yield chunk[:length] + break + yield chunk + except Exception, err: + if not isinstance(err, StopIteration): + self.controller.app.logger.exception('ERROR: While processing ' + 'manifest /%s/%s/%s %s' % (self.controller.account_name, + self.controller.container_name, + self.controller.object_name, self.controller.trans_id)) + raise + + class Controller(object): """Base WSGI controller class for the proxy""" @@ -526,9 +663,47 @@ class ObjectController(Controller): return aresp partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - return self.GETorHEAD_base(req, 'Object', partition, + resp = self.GETorHEAD_base(req, 'Object', partition, self.iter_nodes(partition, nodes, self.app.object_ring), req.path_info, self.app.object_ring.replica_count) + # If we get a 416 Requested Range Not Satisfiable we have to check if + # we were actually requesting a manifest object and then redo the range + # request on the whole object. + if resp.status_int == 416: + req_range = req.range + req.range = None + resp2 = self.GETorHEAD_base(req, 'Object', partition, + self.iter_nodes(partition, nodes, self.app.object_ring), + req.path_info, self.app.object_ring.replica_count) + if 'x-object-manifest' not in resp2.headers: + return resp + resp = resp2 + req.range = req_range + if 'x-object-manifest' in resp.headers: + lcontainer, lprefix = \ + resp.headers['x-object-manifest'].split('/', 1) + lpartition, lnodes = self.app.container_ring.get_nodes( + self.account_name, lcontainer) + lreq = Request.blank('/%s/%s?prefix=%s&format=json' % + (self.account_name, lcontainer, lprefix)) + lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, lnodes, + lreq.path_info, self.app.container_ring.replica_count) + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + return aresp + listing = json.loads(lresp.body) + content_length = sum(o['bytes'] for o in listing) + etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() + headers = {'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type, 'Content-Length': + content_length, 'ETag': etag} + resp = Response(app_iter=SegmentedIterable(self, lcontainer, + listing), headers=headers, request=req, + conditional_response=True) + resp.app_iter.response = resp + return resp @public @delay_denial diff --git a/test/functionalnosetests/test_object.py b/test/functionalnosetests/test_object.py index e4b2fc48c5..2e1668db0e 100644 --- a/test/functionalnosetests/test_object.py +++ b/test/functionalnosetests/test_object.py @@ -16,6 +16,7 @@ class TestObject(unittest.TestCase): if skip: raise SkipTest self.container = uuid4().hex + def put(url, token, parsed, conn): conn.request('PUT', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) @@ -24,6 +25,7 @@ class TestObject(unittest.TestCase): resp.read() self.assertEquals(resp.status, 201) self.obj = uuid4().hex + def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, self.obj), 'test', {'X-Auth-Token': token}) @@ -35,6 +37,7 @@ class TestObject(unittest.TestCase): def tearDown(self): if skip: raise SkipTest + def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/%s' % (parsed.path, self.container, self.obj), '', {'X-Auth-Token': token}) @@ -42,6 +45,7 @@ class TestObject(unittest.TestCase): resp = retry(delete) resp.read() self.assertEquals(resp.status, 204) + def delete(url, token, parsed, conn): conn.request('DELETE', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) @@ -53,6 +57,7 @@ class TestObject(unittest.TestCase): def test_public_object(self): if skip: raise SkipTest + def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % (parsed.path, self.container, self.obj)) @@ -62,6 +67,7 @@ class TestObject(unittest.TestCase): raise Exception('Should not have been able to GET') except Exception, err: self.assert_(str(err).startswith('No result after ')) + def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', {'X-Auth-Token': token, @@ -73,6 +79,7 @@ class TestObject(unittest.TestCase): resp = retry(get) resp.read() self.assertEquals(resp.status, 200) + def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', {'X-Auth-Token': token, 'X-Container-Read': ''}) @@ -89,6 +96,7 @@ class TestObject(unittest.TestCase): def test_private_object(self): if skip or skip3: raise SkipTest + # Ensure we can't access the object with the third account def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % (parsed.path, self.container, @@ -98,8 +106,10 @@ class TestObject(unittest.TestCase): resp = retry(get, use_account=3) resp.read() self.assertEquals(resp.status, 403) + # create a shared container writable by account3 shared_container = uuid4().hex + def put(url, token, parsed, conn): conn.request('PUT', '%s/%s' % (parsed.path, shared_container), '', @@ -110,6 +120,7 @@ class TestObject(unittest.TestCase): resp = retry(put) resp.read() self.assertEquals(resp.status, 201) + # verify third account can not copy from private container def copy(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, @@ -123,6 +134,7 @@ class TestObject(unittest.TestCase): resp = retry(copy, use_account=3) resp.read() self.assertEquals(resp.status, 403) + # verify third account can write "obj1" to shared container def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, shared_container, @@ -131,6 +143,7 @@ class TestObject(unittest.TestCase): resp = retry(put, use_account=3) resp.read() self.assertEquals(resp.status, 201) + # verify third account can copy "obj1" to shared container def copy2(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % (parsed.path, @@ -143,6 +156,7 @@ class TestObject(unittest.TestCase): resp = retry(copy2, use_account=3) resp.read() self.assertEquals(resp.status, 201) + # verify third account STILL can not copy from private container def copy3(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % (parsed.path, @@ -155,6 +169,7 @@ class TestObject(unittest.TestCase): resp = retry(copy3, use_account=3) resp.read() self.assertEquals(resp.status, 403) + # clean up "obj1" def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/%s' % (parsed.path, shared_container, @@ -163,6 +178,7 @@ class TestObject(unittest.TestCase): resp = retry(delete) resp.read() self.assertEquals(resp.status, 204) + # clean up shared_container def delete(url, token, parsed, conn): conn.request('DELETE', @@ -173,6 +189,269 @@ class TestObject(unittest.TestCase): resp.read() self.assertEquals(resp.status, 204) + def test_manifest(self): + if skip: + raise SkipTest + # Data for the object segments + segments1 = ['one', 'two', 'three', 'four', 'five'] + segments2 = ['six', 'seven', 'eight'] + segments3 = ['nine', 'ten', 'eleven'] + + # Upload the first set of segments + def put(url, token, parsed, conn, objnum): + conn.request('PUT', '%s/%s/segments1/%s' % (parsed.path, + self.container, str(objnum)), segments1[objnum], + {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments1)): + resp = retry(put, objnum) + resp.read() + self.assertEquals(resp.status, 201) + + # Upload the manifest + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, + 'X-Object-Manifest': '%s/segments1/' % self.container, + 'Content-Type': 'text/jibberish', 'Content-Length': '0'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest (should get all the segments as the body) + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)) + self.assertEquals(resp.status, 200) + self.assertEquals(resp.getheader('content-type'), 'text/jibberish') + + # Get with a range at the start of the second segment + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, 'Range': + 'bytes=3-'}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1[1:])) + self.assertEquals(resp.status, 206) + + # Get with a range in the middle of the second segment + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, 'Range': + 'bytes=5-'}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)[5:]) + self.assertEquals(resp.status, 206) + + # Get with a full start and stop range + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, 'Range': + 'bytes=5-10'}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)[5:11]) + self.assertEquals(resp.status, 206) + + # Upload the second set of segments + def put(url, token, parsed, conn, objnum): + conn.request('PUT', '%s/%s/segments2/%s' % (parsed.path, + self.container, str(objnum)), segments2[objnum], + {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments2)): + resp = retry(put, objnum) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest (should still be the first segments of course) + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)) + self.assertEquals(resp.status, 200) + + # Update the manifest + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, + 'X-Object-Manifest': '%s/segments2/' % self.container, + 'Content-Length': '0'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest (should be the second set of segments now) + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments2)) + self.assertEquals(resp.status, 200) + + if not skip3: + + # Ensure we can't access the manifest with the third account + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # Grant access to the third account + def post(url, token, parsed, conn): + conn.request('POST', '%s/%s' % (parsed.path, self.container), + '', {'X-Auth-Token': token, 'X-Container-Read': + swift_test_user[2]}) + return check_response(conn) + resp = retry(post) + resp.read() + self.assertEquals(resp.status, 204) + + # The third account should be able to get the manifest now + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + self.assertEquals(resp.read(), ''.join(segments2)) + self.assertEquals(resp.status, 200) + + # Create another container for the third set of segments + acontainer = uuid4().hex + + def put(url, token, parsed, conn): + conn.request('PUT', parsed.path + '/' + acontainer, '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Upload the third set of segments in the other container + def put(url, token, parsed, conn, objnum): + conn.request('PUT', '%s/%s/segments3/%s' % (parsed.path, + acontainer, str(objnum)), segments3[objnum], + {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments3)): + resp = retry(put, objnum) + resp.read() + self.assertEquals(resp.status, 201) + + # Update the manifest + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, + 'X-Object-Manifest': '%s/segments3/' % acontainer, + 'Content-Length': '0'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest to ensure it's the third set of segments + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments3)) + self.assertEquals(resp.status, 200) + + if not skip3: + + # Ensure we can't access the manifest with the third account + # (because the segments are in a protected container even if the + # manifest itself is not). + + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # Grant access to the third account + def post(url, token, parsed, conn): + conn.request('POST', '%s/%s' % (parsed.path, acontainer), + '', {'X-Auth-Token': token, 'X-Container-Read': + swift_test_user[2]}) + return check_response(conn) + resp = retry(post) + resp.read() + self.assertEquals(resp.status, 204) + + # The third account should be able to get the manifest now + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + self.assertEquals(resp.read(), ''.join(segments3)) + self.assertEquals(resp.status, 200) + + # Delete the manifest + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the third set of segments + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/segments3/%s' % (parsed.path, + acontainer, str(objnum)), '', {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments3)): + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the second set of segments + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/segments2/%s' % (parsed.path, + self.container, str(objnum)), '', {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments2)): + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the first set of segments + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/segments1/%s' % (parsed.path, + self.container, str(objnum)), '', {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments1)): + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the extra container + def delete(url, token, parsed, conn): + conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(delete) + resp.read() + self.assertEquals(resp.status, 204) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_constraints.py b/test/unit/common/test_constraints.py index 0950d1e50e..bcc590f1ee 100644 --- a/test/unit/common/test_constraints.py +++ b/test/unit/common/test_constraints.py @@ -22,6 +22,7 @@ from webob.exc import HTTPBadRequest, HTTPLengthRequired, \ from swift.common import constraints + class TestConstraints(unittest.TestCase): def test_check_metadata_empty(self): @@ -137,6 +138,32 @@ class TestConstraints(unittest.TestCase): self.assert_(isinstance(resp, HTTPBadRequest)) self.assert_('Content-Type' in resp.body) + def test_check_object_manifest_header(self): + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container/prefix', 'Content-Length': + '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(not resp) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container', 'Content-Length': '0', + 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': '/container/prefix', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container/prefix?query=param', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container/prefix&query=param', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'http://host/container/prefix', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + def test_check_mount(self): self.assertFalse(constraints.check_mount('', '')) constraints.os = MockTrue() # mock os module diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 94a3b28266..90eed52977 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -42,7 +42,7 @@ class TestObjectController(unittest.TestCase): self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') if not self.path_to_test_xfs or \ not os.path.exists(self.path_to_test_xfs): - print >>sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ + print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ 'pointing to a valid directory.\n' \ 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \ 'system for testing.' @@ -77,7 +77,8 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Object-Meta-3': 'Three', 'X-Object-Meta-4': 'Four', @@ -95,7 +96,8 @@ class TestObjectController(unittest.TestCase): if not self.path_to_test_xfs: raise SkipTest timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/fail', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/fail', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Object-Meta-1': 'One', 'X-Object-Meta-2': 'Two', @@ -116,29 +118,37 @@ class TestObjectController(unittest.TestCase): def test_POST_container_connection(self): if not self.path_to_test_xfs: raise SkipTest + def mock_http_connect(response, with_exc=False): + class FakeConn(object): + def __init__(self, status, with_exc): self.status = status self.reason = 'Fake' self.host = '1.2.3.4' self.port = '1234' self.with_exc = with_exc + def getresponse(self): if self.with_exc: raise Exception('test') return self + def read(self, amt=None): return '' + return lambda *args, **kwargs: FakeConn(response, with_exc) + old_http_connect = object_server.http_connect try: timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Timestamp': timestamp, 'Content-Type': 'text/plain', - 'Content-Length': '0'}) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': + 'POST'}, headers={'X-Timestamp': timestamp, 'Content-Type': + 'text/plain', 'Content-Length': '0'}) resp = self.object_controller.PUT(req) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -148,7 +158,8 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = mock_http_connect(202) resp = self.object_controller.POST(req) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -158,7 +169,8 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = mock_http_connect(202, with_exc=True) resp = self.object_controller.POST(req) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -226,7 +238,8 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY') - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, 'Content-Length': '6', 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037', @@ -258,7 +271,8 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY TWO') - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, 'Content-Length': '10', 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039', @@ -270,17 +284,17 @@ class TestObjectController(unittest.TestCase): if not self.path_to_test_xfs: raise SkipTest req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': normalize_timestamp(time()), - 'Content-Type': 'text/plain'}) + headers={'X-Timestamp': normalize_timestamp(time()), + 'Content-Type': 'text/plain'}) req.body = 'test' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) def test_PUT_invalid_etag(self): req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': normalize_timestamp(time()), - 'Content-Type': 'text/plain', - 'ETag': 'invalid'}) + headers={'X-Timestamp': normalize_timestamp(time()), + 'Content-Type': 'text/plain', + 'ETag': 'invalid'}) req.body = 'test' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 422) @@ -304,7 +318,8 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY THREE') - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, 'Content-Length': '12', 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568', @@ -316,25 +331,33 @@ class TestObjectController(unittest.TestCase): def test_PUT_container_connection(self): if not self.path_to_test_xfs: raise SkipTest + def mock_http_connect(response, with_exc=False): + class FakeConn(object): + def __init__(self, status, with_exc): self.status = status self.reason = 'Fake' self.host = '1.2.3.4' self.port = '1234' self.with_exc = with_exc + def getresponse(self): if self.with_exc: raise Exception('test') return self + def read(self, amt=None): return '' + return lambda *args, **kwargs: FakeConn(response, with_exc) + old_http_connect = object_server.http_connect try: timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -555,7 +578,8 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 200) self.assertEquals(resp.etag, etag) - req = Request.blank('/sda1/p/a/c/o2', environ={'REQUEST_METHOD': 'GET'}, + req = Request.blank('/sda1/p/a/c/o2', + environ={'REQUEST_METHOD': 'GET'}, headers={'If-Match': '*'}) resp = self.object_controller.GET(req) self.assertEquals(resp.status_int, 412) @@ -715,7 +739,8 @@ class TestObjectController(unittest.TestCase): """ Test swift.object_server.ObjectController.DELETE """ if not self.path_to_test_xfs: raise SkipTest - req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'}) + req = Request.blank('/sda1/p/a/c', + environ={'REQUEST_METHOD': 'DELETE'}) resp = self.object_controller.DELETE(req) self.assertEquals(resp.status_int, 400) @@ -916,21 +941,26 @@ class TestObjectController(unittest.TestCase): def test_disk_file_mkstemp_creates_dir(self): tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') os.rmdir(tmpdir) - with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp(): + with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + 'o').mkstemp(): self.assert_(os.path.exists(tmpdir)) def test_max_upload_time(self): if not self.path_to_test_xfs: raise SkipTest + class SlowBody(): + def __init__(self): self.sent = 0 + def read(self, size=-1): if self.sent < 4: sleep(0.1) self.sent += 1 return ' ' return '' + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'X-Timestamp': normalize_timestamp(time()), @@ -946,14 +976,18 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 408) def test_short_body(self): + class ShortBody(): + def __init__(self): self.sent = False + def read(self, size=-1): if not self.sent: self.sent = True return ' ' return '' + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': ShortBody()}, headers={'X-Timestamp': normalize_timestamp(time()), @@ -1001,11 +1035,37 @@ class TestObjectController(unittest.TestCase): resp = self.object_controller.GET(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.headers['content-encoding'], 'gzip') - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'HEAD'}) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': + 'HEAD'}) resp = self.object_controller.HEAD(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.headers['content-encoding'], 'gzip') + def test_manifest_header(self): + if not self.path_to_test_xfs: + raise SkipTest + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Content-Length': '0', + 'X-Object-Manifest': 'c/o/'}) + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + objfile = os.path.join(self.testdir, 'sda1', + storage_directory(object_server.DATADIR, 'p', hash_path('a', 'c', + 'o')), timestamp + '.data') + self.assert_(os.path.isfile(objfile)) + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, + 'Content-Length': '0', 'Content-Type': 'text/plain', 'name': + '/a/c/o', 'X-Object-Manifest': 'c/o/', 'ETag': + 'd41d8cd98f00b204e9800998ecf8427e'}) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'}) + resp = self.object_controller.GET(req) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.headers.get('x-object-manifest'), 'c/o/') + if __name__ == '__main__': unittest.main() diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 2bf85c139c..9162455ab0 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -34,8 +34,8 @@ import eventlet from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen from eventlet.timeout import Timeout import simplejson -from webob import Request -from webob.exc import HTTPUnauthorized +from webob import Request, Response +from webob.exc import HTTPNotFound, HTTPUnauthorized from test.unit import connect_tcp, readuntil2crlfs from swift.proxy import server as proxy_server @@ -53,7 +53,9 @@ logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) def fake_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status, etag=None, body=''): self.status = status self.reason = 'Fake' @@ -158,6 +160,7 @@ class FakeRing(object): class FakeMemcache(object): + def __init__(self): self.store = {} @@ -212,9 +215,12 @@ def save_globals(): class TestProxyServer(unittest.TestCase): def test_unhandled_exception(self): + class MyApp(proxy_server.Application): + def get_controller(self, path): raise Exception('this shouldnt be caught') + app = MyApp(None, FakeMemcache(), account_ring=FakeRing(), container_ring=FakeRing(), object_ring=FakeRing()) req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'}) @@ -323,8 +329,11 @@ class TestObjectController(unittest.TestCase): test_status_map((200, 200, 204, 500, 404), 503) def test_PUT_connect_exceptions(self): + def mock_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status): self.status = status self.reason = 'Fake' @@ -344,6 +353,7 @@ class TestObjectController(unittest.TestCase): if self.status == -3: return FakeConn(507) return FakeConn(100) + code_iter = iter(code_iter) def connect(*args, **ckwargs): @@ -351,7 +361,9 @@ class TestObjectController(unittest.TestCase): if status == -1: raise HTTPException() return FakeConn(status) + return connect + with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') @@ -372,8 +384,11 @@ class TestObjectController(unittest.TestCase): test_status_map((200, 200, 503, 503, -1), 503) def test_PUT_send_exceptions(self): + def mock_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status): self.status = status self.reason = 'Fake' @@ -437,8 +452,11 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.status_int, 413) def test_PUT_getresponse_exceptions(self): + def mock_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status): self.status = status self.reason = 'Fake' @@ -633,6 +651,7 @@ class TestObjectController(unittest.TestCase): dev['port'] = 1 class SlowBody(): + def __init__(self): self.sent = 0 @@ -642,6 +661,7 @@ class TestObjectController(unittest.TestCase): self.sent += 1 return ' ' return '' + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'Content-Length': '4', 'Content-Type': 'text/plain'}) @@ -680,11 +700,13 @@ class TestObjectController(unittest.TestCase): dev['port'] = 1 class SlowBody(): + def __init__(self): self.sent = 0 def read(self, size=-1): raise Exception('Disconnected') + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'Content-Length': '4', 'Content-Type': 'text/plain'}) @@ -1334,7 +1356,9 @@ class TestObjectController(unittest.TestCase): def test_chunked_put(self): # quick test of chunked put w/o PATH_TO_TEST_XFS + class ChunkedFile(): + def __init__(self, bytes): self.bytes = bytes self.read_bytes = 0 @@ -1495,8 +1519,10 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) # Check unhandled exception orig_update_request = prosrv.update_request + def broken_update_request(env, req): raise Exception('fake') + prosrv.update_request = broken_update_request sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() @@ -1545,8 +1571,10 @@ class TestObjectController(unittest.TestCase): # in a test for logging x-forwarded-for (first entry only). class Logger(object): + def info(self, msg): self.msg = msg + orig_logger = prosrv.logger prosrv.logger = Logger() sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -1568,8 +1596,10 @@ class TestObjectController(unittest.TestCase): # Turn on header logging. class Logger(object): + def info(self, msg): self.msg = msg + orig_logger = prosrv.logger prosrv.logger = Logger() prosrv.log_headers = True @@ -1726,6 +1756,52 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) body = fd.read() self.assertEquals(body, 'oh hai123456789abcdef') + # Create a container for our segmented/manifest object testing + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/segmented HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Storage-Token: t\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Create the object segments + for segment in xrange(5): + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/segmented/name/%s HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 5\r\n\r\n1234 ' % str(segment)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Create the object manifest file + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 0\r\nX-Object-Manifest: ' + 'segmented/name/\r\nContent-Type: text/jibberish\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Ensure retrieving the manifest file gets the whole object + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('X-Object-Manifest: segmented/name/' in headers) + self.assert_('Content-Type: text/jibberish' in headers) + body = fd.read() + self.assertEquals(body, '1234 1234 1234 1234 1234 ') finally: prospa.kill() acc1spa.kill() @@ -1937,6 +2013,7 @@ class TestObjectController(unittest.TestCase): res = controller.COPY(req) self.assert_(called[0]) + class TestContainerController(unittest.TestCase): "Test swift.proxy_server.ContainerController" @@ -2080,7 +2157,9 @@ class TestContainerController(unittest.TestCase): self.assertEquals(resp.status_int, 404) def test_put_locking(self): + class MockMemcache(FakeMemcache): + def __init__(self, allow_lock=None): self.allow_lock = allow_lock super(MockMemcache, self).__init__() @@ -2091,6 +2170,7 @@ class TestContainerController(unittest.TestCase): yield True else: raise MemcacheLockError() + with save_globals(): controller = proxy_server.ContainerController(self.app, 'account', 'container') @@ -2669,5 +2749,256 @@ class TestAccountController(unittest.TestCase): self.assertEquals(resp.status_int, 400) +class FakeObjectController(object): + + def __init__(self): + self.app = self + self.logger = self + self.account_name = 'a' + self.container_name = 'c' + self.object_name = 'o' + self.trans_id = 'tx1' + self.object_ring = FakeRing() + self.node_timeout = 1 + + def exception(self, *args): + self.exception_args = args + self.exception_info = sys.exc_info() + + def GETorHEAD_base(self, *args): + self.GETorHEAD_base_args = args + req = args[0] + path = args[4] + body = data = path[-1] * int(path[-1]) + if req.range and req.range.ranges: + body = '' + for start, stop in req.range.ranges: + body += data[start:stop] + resp = Response(app_iter=iter(body)) + return resp + + def iter_nodes(self, partition, nodes, ring): + for node in nodes: + yield node + for node in ring.get_more_nodes(partition): + yield node + + +class Stub(object): + pass + + +class TestSegmentedIterable(unittest.TestCase): + + def setUp(self): + self.controller = FakeObjectController() + + def test_load_next_segment_unexpected_error(self): + self.assertRaises(Exception, + proxy_server.SegmentedIterable(self.controller, None, + None)._load_next_segment) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + + def test_load_next_segment_with_no_segments(self): + self.assertRaises(StopIteration, + proxy_server.SegmentedIterable(self.controller, 'lc', + [])._load_next_segment) + + def test_load_next_segment_with_one_segment(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}]) + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '1') + + def test_load_next_segment_with_two_segments(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '1') + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '22') + + def test_load_next_segment_with_two_segments_skip_first(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit.segment = 0 + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '22') + + def test_load_next_segment_with_seek(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit.segment = 0 + segit.seek = 1 + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') + self.assertEquals(str(self.controller.GETorHEAD_base_args[0].range), + 'bytes=1-') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '2') + + def test_load_next_segment_with_get_error(self): + + def local_GETorHEAD_base(*args): + return HTTPNotFound() + + self.controller.GETorHEAD_base = local_GETorHEAD_base + self.assertRaises(Exception, + proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}])._load_next_segment) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + self.assertEquals(str(self.controller.exception_info[1]), + 'Could not load object segment /a/lc/o1: 404') + + def test_iter_unexpected_error(self): + self.assertRaises(Exception, ''.join, + proxy_server.SegmentedIterable(self.controller, None, None)) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + + def test_iter_with_no_segments(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', []) + self.assertEquals(''.join(segit), '') + + def test_iter_with_one_segment(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}]) + segit.response = Stub() + self.assertEquals(''.join(segit), '1') + self.assertEquals(segit.response.bytes_transferred, 1) + + def test_iter_with_two_segments(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit.response = Stub() + self.assertEquals(''.join(segit), '122') + self.assertEquals(segit.response.bytes_transferred, 3) + + def test_iter_with_get_error(self): + + def local_GETorHEAD_base(*args): + return HTTPNotFound() + + self.controller.GETorHEAD_base = local_GETorHEAD_base + self.assertRaises(Exception, ''.join, + proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}])) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + self.assertEquals(str(self.controller.exception_info[1]), + 'Could not load object segment /a/lc/o1: 404') + + def test_app_iter_range_unexpected_error(self): + self.assertRaises(Exception, + proxy_server.SegmentedIterable(self.controller, None, + None).app_iter_range(None, None).next) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + + def test_app_iter_range_with_no_segments(self): + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(None, None)), '') + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(3, None)), '') + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(3, 5)), '') + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(None, 5)), '') + + def test_app_iter_range_with_one_segment(self): + listing = [{'name': 'o1', 'bytes': 1}] + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, None)), '1') + self.assertEquals(segit.response.bytes_transferred, 1) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + self.assertEquals(''.join(segit.app_iter_range(3, None)), '') + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + self.assertEquals(''.join(segit.app_iter_range(3, 5)), '') + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 5)), '1') + self.assertEquals(segit.response.bytes_transferred, 1) + + def test_app_iter_range_with_two_segments(self): + listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}] + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, None)), '122') + self.assertEquals(segit.response.bytes_transferred, 3) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(1, None)), '22') + self.assertEquals(segit.response.bytes_transferred, 2) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(1, 5)), '22') + self.assertEquals(segit.response.bytes_transferred, 2) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 2)), '12') + self.assertEquals(segit.response.bytes_transferred, 2) + + def test_app_iter_range_with_many_segments(self): + listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}, + {'name': 'o3', 'bytes': 3}, {'name': 'o4', 'bytes': 4}, {'name': + 'o5', 'bytes': 5}] + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, None)), + '122333444455555') + self.assertEquals(segit.response.bytes_transferred, 15) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(3, None)), + '333444455555') + self.assertEquals(segit.response.bytes_transferred, 12) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(5, None)), '3444455555') + self.assertEquals(segit.response.bytes_transferred, 10) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 6)), '122333') + self.assertEquals(segit.response.bytes_transferred, 6) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 7)), '1223334') + self.assertEquals(segit.response.bytes_transferred, 7) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(3, 7)), '3334') + self.assertEquals(segit.response.bytes_transferred, 4) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(5, 7)), '34') + self.assertEquals(segit.response.bytes_transferred, 2) + + if __name__ == '__main__': unittest.main() From 4e0f0b01df33664f4be109ba030dee85bb98e5f1 Mon Sep 17 00:00:00 2001 From: gholt Date: Thu, 18 Nov 2010 18:29:03 -0800 Subject: [PATCH 02/18] Basic working segmented upload --- bin/st | 121 ++++++++++++++++++++++++++++++++--------- swift/common/client.py | 27 +++++---- swift/proxy/server.py | 8 +++ 3 files changed, 121 insertions(+), 35 deletions(-) diff --git a/bin/st b/bin/st index 6a8b02bb37..b853aa2508 100755 --- a/bin/st +++ b/bin/st @@ -581,7 +581,8 @@ def put_object(url, token, container, name, contents, content_length=None, :param container: container name that the object is in :param name: object name to put :param contents: a string or a file like object to read object data from - :param content_length: value to send as content-length header + :param content_length: value to send as content-length header; also limits + the amount read from contents :param etag: etag of contents :param chunk_size: chunk size of data to write :param content_type: value to send as content-type header @@ -611,18 +612,24 @@ def put_object(url, token, container, name, contents, content_length=None, conn.putrequest('PUT', path) for header, value in headers.iteritems(): conn.putheader(header, value) - if not content_length: + if content_length is None: conn.putheader('Transfer-Encoding', 'chunked') - conn.endheaders() - chunk = contents.read(chunk_size) - while chunk: - if not content_length: - conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) - else: - conn.send(chunk) + conn.endheaders() chunk = contents.read(chunk_size) - if not content_length: + while chunk: + conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + chunk = contents.read(chunk_size) conn.send('0\r\n\r\n') + else: + conn.endheaders() + left = content_length + while left > 0: + size = chunk_size + if size > left: + size = left + chunk = contents.read(size) + conn.send(chunk) + left -= len(chunk) else: conn.request('PUT', path, contents, headers) resp = conn.getresponse() @@ -862,7 +869,10 @@ class QueueFunctionThread(Thread): st_delete_help = ''' delete --all OR delete container [object] [object] ... Deletes everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given.'''.strip('\n') + container, or a list of objects depending on the args given. Note that + object segments won't be deleted unless you're deleting everything in the + account or specifically deleting the segments' container or object + names.'''.strip('\n') def st_delete(parser, args, print_queue, error_queue): @@ -1016,6 +1026,7 @@ def st_download(options, args, print_queue, error_queue): conn.get_object(container, obj, resp_chunk_size=65536) content_type = headers.get('content-type') content_length = int(headers.get('content-length')) + manifest = headers.get('x-object-manifest') etag = headers.get('etag') path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): @@ -1024,10 +1035,12 @@ def st_download(options, args, print_queue, error_queue): if not isdir(path): mkdirs(path) read_length = 0 - md5sum = md5() + if not manifest: + md5sum = md5() for chunk in body: read_length += len(chunk) - md5sum.update(chunk) + if not manifest: + md5sum.update(chunk) else: dirpath = dirname(path) if dirpath and not isdir(dirpath): @@ -1039,13 +1052,15 @@ def st_download(options, args, print_queue, error_queue): else: fp = open(path, 'wb') read_length = 0 - md5sum = md5() + if not manifest: + md5sum = md5() for chunk in body: fp.write(chunk) read_length += len(chunk) - md5sum.update(chunk) + if not manifest: + md5sum.update(chunk) fp.close() - if md5sum.hexdigest() != etag: + if not manifest and md5sum.hexdigest() != etag: error_queue.put('%s: md5sum != etag, %s != %s' % (path, md5sum.hexdigest(), etag)) if read_length != content_length: @@ -1266,6 +1281,9 @@ Content Length: %s headers.get('content-length'), headers.get('last-modified'), headers.get('etag'))) + if 'x-object-manifest' in headers: + print_queue.put(' Manifest: %s' % + headers['x-object-manifest']) for key, value in headers.items(): if key.startswith('x-object-meta-'): print_queue.put('%14s: %s' % ('Meta %s' % @@ -1273,7 +1291,7 @@ Content Length: %s for key, value in headers.items(): if not key.startswith('x-object-meta-') and key not in ( 'content-type', 'content-length', 'last-modified', - 'etag', 'date'): + 'etag', 'date', 'x-object-manifest'): print_queue.put( '%14s: %s' % (key.title(), value)) except ClientException, err: @@ -1362,22 +1380,41 @@ st_upload_help = ''' upload [options] container file_or_directory [file_or_directory] [...] Uploads to the given container the files and directories specified by the remaining args. -c or --changed is an option that will only upload files - that have changed since the last upload.'''.strip('\n') + that have changed since the last upload. -S or --segment-size + is an option that will upload files in segments no larger than and + then create a "manifest" file that will download all the segments as if it + were the original file. The segments will be uploaded to a + _segments container so as to not pollute the main + listings.'''.strip('\n') def st_upload(options, args, print_queue, error_queue): parser.add_option('-c', '--changed', action='store_true', dest='changed', default=False, help='Will only upload files that have changed since ' 'the last upload') + parser.add_option('-S', '--segment-size', dest='segment_size', help='Will ' + 'upload files in segments no larger than and then create a ' + '"manifest" file that will download all the segments as if it were ' + 'the original file. The segments will be uploaded to a ' + '_segments container so as to not pollute the main ' + ' listings.') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: error_queue.put('Usage: %s [options] %s' % (basename(argv[0]), st_upload_help)) return - file_queue = Queue(10000) + def _upload_segment((path, obj, segment_start, segment_size, log_line), + conn): + fp = open(path, 'rb') + fp.seek(segment_start) + conn.put_object(args[0] + '_segments', obj, fp, + content_length=segment_size) + if options.verbose: + print_queue.put(log_line) + def _upload_file((path, dir_marker), conn): try: obj = path @@ -1415,9 +1452,41 @@ def st_upload(options, args, print_queue, error_queue): except ClientException, err: if err.http_status != 404: raise - conn.put_object(args[0], obj, open(path, 'rb'), - content_length=getsize(path), - headers=put_headers) + if options.segment_size and \ + getsize(path) < options.segment_size: + full_size = getsize(path) + segment_queue = Queue(10000) + segment_threads = [QueueFunctionThread(segment_queue, + _upload_segment, create_connection()) for _ in + xrange(10)] + for thread in segment_threads: + thread.start() + segment = 0 + segment_start = 0 + while segment_start < full_size: + segment_size = int(options.segment_size) + if segment_start + segment_size > full_size: + segment_size = full_size - segment_start + segment_queue.put((path, '%s/%s/%s/%08d' % (obj, + put_headers['x-object-meta-mtime'], full_size, + segment), segment_start, segment_size, + '%s segment %s' % (obj, segment))) + segment += 1 + segment_start += segment_size + while not segment_queue.empty(): + sleep(0.01) + for thread in segment_threads: + thread.abort = True + while thread.isAlive(): + thread.join(0.01) + put_headers['x-object-manifest'] = \ + '%s_segments/%s/%s/%s/' % (args[0], obj, + put_headers['x-object-meta-mtime'], full_size) + conn.put_object(args[0], obj, '', content_length=0, + headers=put_headers) + else: + conn.put_object(args[0], obj, open(path, 'rb'), + content_length=getsize(path), headers=put_headers) if options.verbose: print_queue.put(obj) except OSError, err: @@ -1428,14 +1497,14 @@ def st_upload(options, args, print_queue, error_queue): def _upload_dir(path): names = listdir(path) if not names: - file_queue.put((path, True)) # dir_marker = True + file_queue.put((path, True)) # dir_marker = True else: for name in listdir(path): subpath = join(path, name) if isdir(subpath): _upload_dir(subpath) else: - file_queue.put((subpath, False)) # dir_marker = False + file_queue.put((subpath, False)) # dir_marker = False url, token = get_auth(options.auth, options.user, options.key, snet=options.snet) @@ -1452,6 +1521,8 @@ def st_upload(options, args, print_queue, error_queue): # it'll surface on the first object PUT. try: conn.put_container(args[0]) + if options.segment_size is not None: + conn.put_container(args[0] + '_segments') except: pass try: @@ -1459,7 +1530,7 @@ def st_upload(options, args, print_queue, error_queue): if isdir(arg): _upload_dir(arg) else: - file_queue.put((arg, False)) # dir_marker = False + file_queue.put((arg, False)) # dir_marker = False while not file_queue.empty(): sleep(0.01) for thread in file_threads: diff --git a/swift/common/client.py b/swift/common/client.py index b89d06aa66..1acba8cb37 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -569,7 +569,8 @@ def put_object(url, token, container, name, contents, content_length=None, :param container: container name that the object is in :param name: object name to put :param contents: a string or a file like object to read object data from - :param content_length: value to send as content-length header + :param content_length: value to send as content-length header; also limits + the amount read from contents :param etag: etag of contents :param chunk_size: chunk size of data to write :param content_type: value to send as content-type header @@ -599,18 +600,24 @@ def put_object(url, token, container, name, contents, content_length=None, conn.putrequest('PUT', path) for header, value in headers.iteritems(): conn.putheader(header, value) - if not content_length: + if content_length is None: conn.putheader('Transfer-Encoding', 'chunked') - conn.endheaders() - chunk = contents.read(chunk_size) - while chunk: - if not content_length: - conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) - else: - conn.send(chunk) + conn.endheaders() chunk = contents.read(chunk_size) - if not content_length: + while chunk: + conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + chunk = contents.read(chunk_size) conn.send('0\r\n\r\n') + else: + conn.endheaders() + left = content_length + while left > 0: + size = chunk_size + if size > left: + size = left + chunk = contents.read(size) + conn.send(chunk) + left -= len(chunk) else: conn.request('PUT', path, contents, headers) resp = conn.getresponse() diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 4265754390..65ef0d055b 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -688,6 +688,11 @@ class ObjectController(Controller): (self.account_name, lcontainer, lprefix)) lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, lnodes, lreq.path_info, self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + lresp = HTTPNotFound(request=req) + lresp.headers['X-Object-Manifest'] = \ + resp.headers['x-object-manifest'] + return lresp if 'swift.authorize' in req.environ: req.acl = lresp.headers.get('x-container-read') aresp = req.environ['swift.authorize'](req) @@ -699,6 +704,9 @@ class ObjectController(Controller): headers = {'X-Object-Manifest': resp.headers['x-object-manifest'], 'Content-Type': resp.content_type, 'Content-Length': content_length, 'ETag': etag} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value resp = Response(app_iter=SegmentedIterable(self, lcontainer, listing), headers=headers, request=req, conditional_response=True) From df3762bd2c6ea809131c3ee7f5c37a3a6752295c Mon Sep 17 00:00:00 2001 From: gholt Date: Fri, 19 Nov 2010 13:01:28 -0800 Subject: [PATCH 03/18] st overwrites of manifests now clean up the old segments --- bin/st | 117 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 43 deletions(-) diff --git a/bin/st b/bin/st index b853aa2508..d04d3bf54b 100755 --- a/bin/st +++ b/bin/st @@ -1026,7 +1026,6 @@ def st_download(options, args, print_queue, error_queue): conn.get_object(container, obj, resp_chunk_size=65536) content_type = headers.get('content-type') content_length = int(headers.get('content-length')) - manifest = headers.get('x-object-manifest') etag = headers.get('etag') path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): @@ -1035,11 +1034,13 @@ def st_download(options, args, print_queue, error_queue): if not isdir(path): mkdirs(path) read_length = 0 - if not manifest: + if 'x-object-manifest' not in headers: md5sum = md5() + else: + md5sum = None for chunk in body: read_length += len(chunk) - if not manifest: + if md5sum: md5sum.update(chunk) else: dirpath = dirname(path) @@ -1052,15 +1053,15 @@ def st_download(options, args, print_queue, error_queue): else: fp = open(path, 'wb') read_length = 0 - if not manifest: + if 'x-object-manifest' not in headers: md5sum = md5() for chunk in body: fp.write(chunk) read_length += len(chunk) - if not manifest: + if md5sum: md5sum.update(chunk) fp.close() - if not manifest and md5sum.hexdigest() != etag: + if md5sum and md5sum.hexdigest() != etag: error_queue.put('%s: md5sum != etag, %s != %s' % (path, md5sum.hexdigest(), etag)) if read_length != content_length: @@ -1404,18 +1405,23 @@ def st_upload(options, args, print_queue, error_queue): error_queue.put('Usage: %s [options] %s' % (basename(argv[0]), st_upload_help)) return - file_queue = Queue(10000) + object_queue = Queue(10000) - def _upload_segment((path, obj, segment_start, segment_size, log_line), - conn): - fp = open(path, 'rb') - fp.seek(segment_start) - conn.put_object(args[0] + '_segments', obj, fp, - content_length=segment_size) - if options.verbose: - print_queue.put(log_line) + def _segment_job(job, conn): + if job.get('delete', False): + conn.delete_object(job['container'], job['obj']) + else: + fp = open(job['path'], 'rb') + fp.seek(job['segment_start']) + conn.put_object(job.get('container', args[0] + '_segments'), + job['obj'], fp, content_length=job['segment_size']) + if options.verbose and 'log_line' in job: + print_queue.put(job['log_line']) - def _upload_file((path, dir_marker), conn): + def _object_job(job, conn): + path = job['path'] + container = job.get('container', args[0]) + dir_marker = job.get('dir_marker', False) try: obj = path if obj.startswith('./') or obj.startswith('.\\'): @@ -1424,7 +1430,7 @@ def st_upload(options, args, print_queue, error_queue): if dir_marker: if options.changed: try: - headers = conn.head_object(args[0], obj) + headers = conn.head_object(container, obj) ct = headers.get('content-type') cl = int(headers.get('content-length')) et = headers.get('etag') @@ -1437,28 +1443,31 @@ def st_upload(options, args, print_queue, error_queue): except ClientException, err: if err.http_status != 404: raise - conn.put_object(args[0], obj, '', content_length=0, + conn.put_object(container, obj, '', content_length=0, content_type='text/directory', headers=put_headers) else: - if options.changed: - try: - headers = conn.head_object(args[0], obj) - cl = int(headers.get('content-length')) - mt = headers.get('x-object-meta-mtime') - if cl == getsize(path) and \ - mt == put_headers['x-object-meta-mtime']: - return - except ClientException, err: - if err.http_status != 404: - raise + # We need to HEAD all objects now in case we're overwriting a + # manifest object and need to delete the old segments + # ourselves. + old_manifest = None + try: + headers = conn.head_object(container, obj) + cl = int(headers.get('content-length')) + mt = headers.get('x-object-meta-mtime') + if options.changed and cl == getsize(path) and \ + mt == put_headers['x-object-meta-mtime']: + return + old_manifest = headers.get('x-object-manifest') + except ClientException, err: + if err.http_status != 404: + raise if options.segment_size and \ getsize(path) < options.segment_size: full_size = getsize(path) segment_queue = Queue(10000) segment_threads = [QueueFunctionThread(segment_queue, - _upload_segment, create_connection()) for _ in - xrange(10)] + _segment_job, create_connection()) for _ in xrange(10)] for thread in segment_threads: thread.start() segment = 0 @@ -1467,10 +1476,13 @@ def st_upload(options, args, print_queue, error_queue): segment_size = int(options.segment_size) if segment_start + segment_size > full_size: segment_size = full_size - segment_start - segment_queue.put((path, '%s/%s/%s/%08d' % (obj, - put_headers['x-object-meta-mtime'], full_size, - segment), segment_start, segment_size, - '%s segment %s' % (obj, segment))) + segment_queue.put({'path': path, + 'obj': '%s/%s/%s/%08d' % (obj, + put_headers['x-object-meta-mtime'], full_size, + segment), + 'segment_start': segment_start, + 'segment_size': segment_size, + 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size while not segment_queue.empty(): @@ -1480,13 +1492,32 @@ def st_upload(options, args, print_queue, error_queue): while thread.isAlive(): thread.join(0.01) put_headers['x-object-manifest'] = \ - '%s_segments/%s/%s/%s/' % (args[0], obj, + '%s_segments/%s/%s/%s/' % (container, obj, put_headers['x-object-meta-mtime'], full_size) - conn.put_object(args[0], obj, '', content_length=0, + conn.put_object(container, obj, '', content_length=0, headers=put_headers) else: - conn.put_object(args[0], obj, open(path, 'rb'), + conn.put_object(container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) + if old_manifest: + segment_queue = Queue(10000) + scontainer, sprefix = old_manifest.split('/', 1) + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put({'delete': True, + 'container': scontainer, 'obj': delobj['name']}) + if not segment_queue.empty(): + segment_threads = [QueueFunctionThread(segment_queue, + _segment_job, create_connection()) for _ in + xrange(10)] + for thread in segment_threads: + thread.start() + while not segment_queue.empty(): + sleep(0.01) + for thread in segment_threads: + thread.abort = True + while thread.isAlive(): + thread.join(0.01) if options.verbose: print_queue.put(obj) except OSError, err: @@ -1497,20 +1528,20 @@ def st_upload(options, args, print_queue, error_queue): def _upload_dir(path): names = listdir(path) if not names: - file_queue.put((path, True)) # dir_marker = True + object_queue.put({'path': path, 'dir_marker': True}) else: for name in listdir(path): subpath = join(path, name) if isdir(subpath): _upload_dir(subpath) else: - file_queue.put((subpath, False)) # dir_marker = False + object_queue.put({'path': subpath}) url, token = get_auth(options.auth, options.user, options.key, snet=options.snet) create_connection = lambda: Connection(options.auth, options.user, options.key, preauthurl=url, preauthtoken=token, snet=options.snet) - file_threads = [QueueFunctionThread(file_queue, _upload_file, + file_threads = [QueueFunctionThread(object_queue, _object_job, create_connection()) for _ in xrange(10)] for thread in file_threads: thread.start() @@ -1530,8 +1561,8 @@ def st_upload(options, args, print_queue, error_queue): if isdir(arg): _upload_dir(arg) else: - file_queue.put((arg, False)) # dir_marker = False - while not file_queue.empty(): + object_queue.put({'path': arg}) + while not object_queue.empty(): sleep(0.01) for thread in file_threads: thread.abort = True From 598c544eddef7d382e88969a7b820e25a2c33e0b Mon Sep 17 00:00:00 2001 From: gholt Date: Fri, 19 Nov 2010 14:50:35 -0800 Subject: [PATCH 04/18] st delete will delete manifest segments as well; added --leave-segments option to override such behavior --- bin/st | 90 ++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 27 deletions(-) diff --git a/bin/st b/bin/st index d04d3bf54b..a0541f1d88 100755 --- a/bin/st +++ b/bin/st @@ -867,18 +867,20 @@ class QueueFunctionThread(Thread): st_delete_help = ''' -delete --all OR delete container [object] [object] ... +delete --all OR delete container [--leave-segments] [object] [object] ... Deletes everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given. Note that - object segments won't be deleted unless you're deleting everything in the - account or specifically deleting the segments' container or object - names.'''.strip('\n') + container, or a list of objects depending on the args given. Segments of + manifest objects will be deleted as well, unless you specify the + --leave-segments option.'''.strip('\n') def st_delete(parser, args, print_queue, error_queue): parser.add_option('-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to delete ' 'everything in the account') + parser.add_option('', '--leave-segments', action='store_true', + dest='leave_segments', default=False, help='Indicates that you want ' + 'the segments of manifest objects left alone') (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): @@ -886,11 +888,42 @@ def st_delete(parser, args, print_queue, error_queue): (basename(argv[0]), st_delete_help)) return + def _delete_segment((container, obj), conn): + conn.delete_object(container, obj) + if options.verbose: + print_queue.put('%s/%s' % (container, obj)) + object_queue = Queue(10000) def _delete_object((container, obj), conn): try: + old_manifest = None + if not options.leave_segments: + try: + old_manifest = conn.head_object(container, obj).get( + 'x-object-manifest') + except ClientException, err: + if err.http_status != 404: + raise conn.delete_object(container, obj) + if old_manifest: + segment_queue = Queue(10000) + scontainer, sprefix = old_manifest.split('/', 1) + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put((scontainer, delobj['name'])) + if not segment_queue.empty(): + segment_threads = [QueueFunctionThread(segment_queue, + _delete_segment, create_connection()) for _ in + xrange(10)] + for thread in segment_threads: + thread.start() + while not segment_queue.empty(): + sleep(0.01) + for thread in segment_threads: + thread.abort = True + while thread.isAlive(): + thread.join(0.01) if options.verbose: path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): @@ -901,6 +934,7 @@ def st_delete(parser, args, print_queue, error_queue): raise error_queue.put('Object %s not found' % repr('%s/%s' % (container, obj))) + container_queue = Queue(10000) def _delete_container(container, conn): @@ -986,7 +1020,7 @@ def st_delete(parser, args, print_queue, error_queue): st_download_help = ''' -download --all OR download container [object] [object] ... +download --all OR download container [options] [object] [object] ... Downloads everything in the account (with --all), or everything in a container, or a list of objects depending on the args given. For a single object download, you may use the -o [--output] option to @@ -1030,14 +1064,13 @@ def st_download(options, args, print_queue, error_queue): path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): path = path[1:] + md5sum = None if content_type.split(';', 1)[0] == 'text/directory': if not isdir(path): mkdirs(path) read_length = 0 if 'x-object-manifest' not in headers: md5sum = md5() - else: - md5sum = None for chunk in body: read_length += len(chunk) if md5sum: @@ -1382,11 +1415,8 @@ upload [options] container file_or_directory [file_or_directory] [...] Uploads to the given container the files and directories specified by the remaining args. -c or --changed is an option that will only upload files that have changed since the last upload. -S or --segment-size - is an option that will upload files in segments no larger than and - then create a "manifest" file that will download all the segments as if it - were the original file. The segments will be uploaded to a - _segments container so as to not pollute the main - listings.'''.strip('\n') + and --leave-segments are options as well (see --help for more). +'''.strip('\n') def st_upload(options, args, print_queue, error_queue): @@ -1399,6 +1429,10 @@ def st_upload(options, args, print_queue, error_queue): 'the original file. The segments will be uploaded to a ' '_segments container so as to not pollute the main ' ' listings.') + parser.add_option('', '--leave-segments', action='store_true', + dest='leave_segments', default=False, help='Indicates that you want ' + 'the older segments of manifest objects left alone (in the case of ' + 'overwrites)') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: @@ -1451,17 +1485,19 @@ def st_upload(options, args, print_queue, error_queue): # manifest object and need to delete the old segments # ourselves. old_manifest = None - try: - headers = conn.head_object(container, obj) - cl = int(headers.get('content-length')) - mt = headers.get('x-object-meta-mtime') - if options.changed and cl == getsize(path) and \ - mt == put_headers['x-object-meta-mtime']: - return - old_manifest = headers.get('x-object-manifest') - except ClientException, err: - if err.http_status != 404: - raise + if options.changed or not options.leave_segments: + try: + headers = conn.head_object(container, obj) + cl = int(headers.get('content-length')) + mt = headers.get('x-object-meta-mtime') + if options.changed and cl == getsize(path) and \ + mt == put_headers['x-object-meta-mtime']: + return + if not options.leave_segments: + old_manifest = headers.get('x-object-manifest') + except ClientException, err: + if err.http_status != 404: + raise if options.segment_size and \ getsize(path) < options.segment_size: full_size = getsize(path) @@ -1541,9 +1577,9 @@ def st_upload(options, args, print_queue, error_queue): snet=options.snet) create_connection = lambda: Connection(options.auth, options.user, options.key, preauthurl=url, preauthtoken=token, snet=options.snet) - file_threads = [QueueFunctionThread(object_queue, _object_job, + object_threads = [QueueFunctionThread(object_queue, _object_job, create_connection()) for _ in xrange(10)] - for thread in file_threads: + for thread in object_threads: thread.start() conn = create_connection() # Try to create the container, just in case it doesn't exist. If this @@ -1564,7 +1600,7 @@ def st_upload(options, args, print_queue, error_queue): object_queue.put({'path': arg}) while not object_queue.empty(): sleep(0.01) - for thread in file_threads: + for thread in object_threads: thread.abort = True while thread.isAlive(): thread.join(0.01) From 1fa4ba38e554535ba0359b244d050e7795400c42 Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 23 Nov 2010 14:26:48 -0800 Subject: [PATCH 05/18] Documentation of the manifest/segments feature --- doc/source/index.rst | 1 + doc/source/overview_large_objects.rst | 121 ++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 doc/source/overview_large_objects.rst diff --git a/doc/source/index.rst b/doc/source/index.rst index 9b20293921..de07c132ea 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -44,6 +44,7 @@ Overview and Concepts overview_replication overview_stats ratelimit + overview_large_objects Developer Documentation ======================= diff --git a/doc/source/overview_large_objects.rst b/doc/source/overview_large_objects.rst new file mode 100644 index 0000000000..660c741e75 --- /dev/null +++ b/doc/source/overview_large_objects.rst @@ -0,0 +1,121 @@ +==================== +Large Object Support +==================== + +-------- +Overview +-------- + +Swift has a limit on the size of a single uploaded object; by default this is +5GB. However, the download size of a single object is virtually unlimited with +the concept of segmentation. Segments of the larger object are uploaded and a +special manifest file is created that, when downloaded, sends all the segments +concatenated as a single object. This also offers much greater upload speed +with the possibility of parallel uploads of the segments. + +---------------------------------- +Using ``st`` for Segmented Objects +---------------------------------- + +The quickest way to try out this feature is use the included ``st`` Swift Tool. +You can use the ``-S`` option to specify the segment size to use when splitting +a large file. For example:: + + st upload test_container -S 1073741824 large_file + +This would split the large_file into 1G segments and begin uploading those +segments in parallel. Once all the segments have been uploaded, ``st`` will +then create the manifest file so the segments can be downloaded as one. + +So now, the following ``st`` command would download the entire large object:: + + st download test_container large_file + +``st`` uses a strict convention for its segmented object support. In the above +example it will upload all the segments into a second container named +test_container_segments. These segments will have names like +large_file/1290206778.25/21474836480/00000000, +large_file/1290206778.25/21474836480/00000001, etc. + +The main benefit for using a separate container is that the main container +listings will not be polluted with all the segment names. The reason for using +the segment name format of /// is so that an +upload of a new file with the same name won't overwrite the contents of the +first until the last moment when the manifest file is updated. + +``st`` will manage these segment files for you, deleting old segments on +deletes and overwrites, etc. You can override this behavior with the +``--leave-segments`` option if desired; this is useful if you want to have +multiple versions of the same large object available. + +---------- +Direct API +---------- + +You can also work with the segments and manifests directly with HTTP requests +instead of having ``st`` do that for you. You can just upload the segments like +you would any other object and the manifest is just a zero-byte file with an +extra ``X-Object-Manifest`` header. + +All the object segments need to be in the same container, have a common object +name prefix, and their names sort in the order they should be concatenated. +They don't have to be in the same container as the manifest file will be, which +is useful to keep container listings clean as explained above with ``st``. + +The manifest file is simply a zero-byte file with the extra +``X-Object-Manifest: /`` header, where ```` is +the container the object segments are in and ```` is the common prefix +for all the segments. + +It is best to upload all the segments first and then create or update the +manifest. In this way, the full object won't be available for downloading until +the upload is complete. Also, you can upload a new set of segments to a second +location and then update the manifest to point to this new location. During the +upload of the new segments, the original manifest will still be available to +download the first set of segments. + +Here's an example using ``curl`` with tiny 1-byte segments:: + + # First, upload the segments + curl -X PUT -H 'X-Auth-Token: ' \ + http:///container/myobject/1 --data-binary '1' + curl -X PUT -H 'X-Auth-Token: ' \ + http:///container/myobject/2 --data-binary '2' + curl -X PUT -H 'X-Auth-Token: ' \ + http:///container/myobject/3 --data-binary '3' + + # Next, create the manifest file + curl -X PUT -H 'X-Auth-Token: ' \ + -H 'X-Object-Manifest: container/myobject/' \ + http:///container/myobject --data-binary '' + + # And now we can download the segments as a single object + curl -H 'X-Auth-Token: ' \ + http:///container/myobject + +---------------- +Additional Notes +---------------- + +* With a ``GET`` or ``HEAD`` of a manifest file, the ``X-Object-Manifest: + /`` header will be returned with the concatenated object + so you can tell where it's getting its segments from. + +* The response's ``Content-Length`` for a ``GET`` or ``HEAD`` on the manifest + file will be the sum of all the segments in the ``/`` + listing, dynamically. So, uploading additional segments after the manifest is + created will cause the concatenated object to be that much larger; there's no + need to recreate the manifest file. + +* The response's ``Content-Type`` for a ``GET`` or ``HEAD`` on the manifest + will be the same as the ``Content-Type`` set during the ``PUT`` request that + created the manifest. You can easily change the ``Content-Type`` by reissuing + the ``PUT``. + +* The response's ``ETag`` for a ``GET`` or ``HEAD`` on the manifest file will + be the MD5 sum of the concatenated string of ETags for each of the segments + in the ``/`` listing, dynamically. Usually in Swift the + ETag is the MD5 sum of the contents of the object, and that holds true for + each segment independently. But, it's not feasible to generate such an ETag + for the manifest itself, so this method was chosen to at least offer change + detection. From 512e5e2d79ac950381c86d38b6f3735a9083f5ae Mon Sep 17 00:00:00 2001 From: gholt Date: Mon, 29 Nov 2010 12:52:14 -0800 Subject: [PATCH 06/18] Added history section for large object support docs --- doc/source/overview_large_objects.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/doc/source/overview_large_objects.rst b/doc/source/overview_large_objects.rst index 660c741e75..333b6cde55 100644 --- a/doc/source/overview_large_objects.rst +++ b/doc/source/overview_large_objects.rst @@ -119,3 +119,20 @@ Additional Notes each segment independently. But, it's not feasible to generate such an ETag for the manifest itself, so this method was chosen to at least offer change detection. + +------- +History +------- + +Large object support has gone through various iterations before settling on +this implementation. This approach has the drawback that the eventual +consistency window of the container listings can cause a GET on the manifest +object to return an invalid whole object for that short term. + +We also implemented fully transparent support within the server, but the +drawbacks there were added complexity within the cluster, no option to do +parallel uploads, and no basis for a resume feature. + +We considered implementing both the "user manifest" option we have now and the +"transparent server manifest" option, but the second was deemed just to complex +for the benefit. From 197f343ddb5851371d96ec552d52aff1174e9705 Mon Sep 17 00:00:00 2001 From: gholt Date: Mon, 29 Nov 2010 13:07:30 -0800 Subject: [PATCH 07/18] Fixed bug dfg found in st --- bin/st | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bin/st b/bin/st index a0541f1d88..40dd7e079d 100755 --- a/bin/st +++ b/bin/st @@ -1527,9 +1527,12 @@ def st_upload(options, args, print_queue, error_queue): thread.abort = True while thread.isAlive(): thread.join(0.01) - put_headers['x-object-manifest'] = \ - '%s_segments/%s/%s/%s/' % (container, obj, - put_headers['x-object-meta-mtime'], full_size) + new_object_manifest = '%s_segments/%s/%s/%s/' % ( + container, obj, put_headers['x-object-meta-mtime'], + full_size) + if old_manifest == new_object_manifest: + old_manifest = None + put_headers['x-object-manifest'] = new_object_manifest conn.put_object(container, obj, '', content_length=0, headers=put_headers) else: From 7d8ff50f43b792f1576c8de1677e3b6be527b538 Mon Sep 17 00:00:00 2001 From: gholt Date: Thu, 2 Dec 2010 19:37:58 -0800 Subject: [PATCH 08/18] SegmentIterable: logs exceptions just once; 503s on exception; fix except syntax; make sure self.response is always *something* --- swift/proxy/server.py | 52 ++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index a0c71bdeb9..2d925fbc02 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -103,17 +103,21 @@ class SegmentedIterable(object): """ Iterable that returns the object contents for a segmented object in Swift. - In addition to these params, you can also set the `response` attr just - after creating the SegmentedIterable and it will update the response's - `bytes_transferred` value (used to log the size of the request). + If set, the response's `bytes_transferred` value will be updated (used to + log the size of the request). Also, if there's a failure that cuts the + transfer short, the response's `status_int` will be updated (again, just + for logging since the original status would have already been sent to the + client). :param controller: The ObjectController instance to work with. :param container: The container the object segments are within. :param listing: The listing of object segments to iterate over; this is a standard JSON decoded container listing. + :param response: The webob.Response this iterable is associated with, if + any (default: None) """ - def __init__(self, controller, container, listing): + def __init__(self, controller, container, listing, response=None): self.controller = controller self.container = container self.listing = listing @@ -121,7 +125,9 @@ class SegmentedIterable(object): self.seek = 0 self.segment_iter = None self.position = 0 - self.response = None + self.response = response + if not self.response: + self.response = Response() def _load_next_segment(self): """ @@ -150,12 +156,16 @@ class SegmentedIterable(object): raise Exception('Could not load object segment %s: %s' % (path, resp.status_int)) self.segment_iter = resp.app_iter + except StopIteration: + raise except Exception, err: - if not isinstance(err, StopIteration): + if not getattr(err, 'swift_logged', False): self.controller.app.logger.exception('ERROR: While processing ' 'manifest /%s/%s/%s %s' % (self.controller.account_name, self.controller.container_name, self.controller.object_name, self.controller.trans_id)) + err.swift_logged = True + self.response.status_int = 503 raise def __iter__(self): @@ -172,16 +182,19 @@ class SegmentedIterable(object): except StopIteration: self._load_next_segment() self.position += len(chunk) - if self.response: - self.response.bytes_transferred = getattr(self.response, - 'bytes_transferred', 0) + len(chunk) + self.response.bytes_transferred = getattr(self.response, + 'bytes_transferred', 0) + len(chunk) yield chunk + except StopIteration: + raise except Exception, err: - if not isinstance(err, StopIteration): + if not getattr(err, 'swift_logged', False): self.controller.app.logger.exception('ERROR: While processing ' 'manifest /%s/%s/%s %s' % (self.controller.account_name, self.controller.container_name, self.controller.object_name, self.controller.trans_id)) + err.swift_logged = True + self.response.status_int = 503 raise def app_iter_range(self, start, stop): @@ -215,19 +228,22 @@ class SegmentedIterable(object): length -= len(chunk) if length < 0: # Chop off the extra: - if self.response: - self.response.bytes_transferred = \ - getattr(self.response, 'bytes_transferred', 0) \ - + length + self.response.bytes_transferred = \ + getattr(self.response, 'bytes_transferred', 0) \ + + length yield chunk[:length] break yield chunk + except StopIteration: + raise except Exception, err: - if not isinstance(err, StopIteration): + if not getattr(err, 'swift_logged', False): self.controller.app.logger.exception('ERROR: While processing ' 'manifest /%s/%s/%s %s' % (self.controller.account_name, self.controller.container_name, self.controller.object_name, self.controller.trans_id)) + err.swift_logged = True + self.response.status_int = 503 raise @@ -713,10 +729,10 @@ class ObjectController(Controller): for key, value in resp.headers.iteritems(): if key.lower().startswith('x-object-meta-'): headers[key] = value - resp = Response(app_iter=SegmentedIterable(self, lcontainer, - listing), headers=headers, request=req, + resp = Response(headers=headers, request=req, conditional_response=True) - resp.app_iter.response = resp + resp.app_iter = SegmentedIterable(self, lcontainer, listing, resp) + resp.content_length = content_length return resp @public From 090f86e9a6726480cbd98d183a97dab1f32d6fa8 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Mon, 6 Dec 2010 14:01:19 -0600 Subject: [PATCH 09/18] updated large objects history --- doc/source/overview_large_objects.rst | 57 ++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/doc/source/overview_large_objects.rst b/doc/source/overview_large_objects.rst index 333b6cde55..01f4990732 100644 --- a/doc/source/overview_large_objects.rst +++ b/doc/source/overview_large_objects.rst @@ -125,14 +125,53 @@ History ------- Large object support has gone through various iterations before settling on -this implementation. This approach has the drawback that the eventual -consistency window of the container listings can cause a GET on the manifest -object to return an invalid whole object for that short term. +this implementation. -We also implemented fully transparent support within the server, but the -drawbacks there were added complexity within the cluster, no option to do -parallel uploads, and no basis for a resume feature. +The primary factor driving the limitation of object size in swift is +maintaining balance among the partitions of the ring. To maintain an even +dispersion of disk usage throughout the cluster the obvious storage pattern +was to simply split larger objects into smaller segments, which could then be +glued together during a read. -We considered implementing both the "user manifest" option we have now and the -"transparent server manifest" option, but the second was deemed just to complex -for the benefit. +Before the introduction of large object support some applications were already +splitting their uploads into segments and re-assembling them on the client +side after retrieving the individual pieces. This design allowed the client +to support backup and archiving of large data sets, but was also frequently +employed to improve performance or reduce errors due to network interruption. +The major disadvantage of this method is that knowledge of the original +partitioning scheme is required to properly reassemble the object, which is +not practical for some use cases, such as CDN origination. + +In order to eliminate any barrier to entry for clients wanting to store +objects larger than 5GB, initially we also prototyped fully transparent +support for large object uploads. A fully transparent implementation would +support a larger max size by automatically splitting objects into segments +during upload within the proxy without any changes to the client API. All +segments were completely hidden from the client API. + +This solution introduced a number of challenging failure conditions into the +cluster, wouldn't provide the client with any option to do parallel uploads, +and had no basis for a resume feature. The transparent implementation was +deemed just too complex for the benefit. + +The current "user manifest" design was chosen in order to provide a +transparent download of large objects to the client and still provide the +uploading client a clean API to support segmented uploads. + +Alternative "explicit" user manifest options were discussed which would have +required a pre-defined format for listing the segments to "finalize" the +segmented upload. While this may offer some potential advantages, it was +decided that pushing an added burden onto the client which could potentially +limit adoption should be avoided in favor of a simpler "API" (essentially just +the format of the 'X-Object-Manifest' header). + +During development it was noted that this "implicit" user manifest approach +which is based on the path prefix can be potentially affected by the eventual +consistency window of the container listings, which could theoretically cause +a GET on the manifest object to return an invalid whole object for that short +term. In reality you're unlikely to encounter this scenario unless you're +running very high concurrency uploads against a small testing environment +which isn't running the object-updaters or container-replicators. + +Like all of swift, Large Object Support is living feature which will continue +to improve and may change over time. From 20d1ee6757be06728941b6672371a3501163cf53 Mon Sep 17 00:00:00 2001 From: gholt Date: Mon, 13 Dec 2010 14:14:26 -0800 Subject: [PATCH 10/18] Now supports infinite objects! --- swift/proxy/server.py | 151 +++++++++++++++++++++++---------- test/unit/proxy/test_server.py | 31 ++++++- 2 files changed, 135 insertions(+), 47 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 32b0543e1e..ff6e5e310f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -41,8 +41,8 @@ from swift.common.utils import get_logger, normalize_timestamp, split_path, \ cache_from_env from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ - check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \ - MAX_FILE_SIZE + check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ + MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout @@ -112,8 +112,9 @@ class SegmentedIterable(object): :param controller: The ObjectController instance to work with. :param container: The container the object segments are within. - :param listing: The listing of object segments to iterate over; this is a - standard JSON decoded container listing. + :param listing: The listing of object segments to iterate over; this may + be an iterator or list that returns dicts with 'name' and + 'bytes' keys. :param response: The webob.Response this iterable is associated with, if any (default: None) """ @@ -121,8 +122,10 @@ class SegmentedIterable(object): def __init__(self, controller, container, listing, response=None): self.controller = controller self.container = container - self.listing = listing + self.listing = iter(listing) self.segment = -1 + self.segment_dict = None + self.segment_peek = None self.seek = 0 self.segment_iter = None self.position = 0 @@ -138,13 +141,13 @@ class SegmentedIterable(object): """ try: self.segment += 1 - if self.segment >= len(self.listing): - raise StopIteration() - obj = self.listing[self.segment] + self.segment_dict = self.segment_peek or self.listing.next() + self.segment_peek = None partition, nodes = self.controller.app.object_ring.get_nodes( - self.controller.account_name, self.container, obj['name']) + self.controller.account_name, self.container, + self.segment_dict['name']) path = '/%s/%s/%s' % (self.controller.account_name, self.container, - obj['name']) + self.segment_dict['name']) req = Request.blank(path) if self.seek: req.range = 'bytes=%s-' % self.seek @@ -209,14 +212,11 @@ class SegmentedIterable(object): """ try: if start: - if len(self.listing) <= self.segment + 1: - return - while start >= self.position + \ - self.listing[self.segment + 1]['bytes']: + self.segment_peek = self.listing.next() + while start >= self.position + self.segment_peek['bytes']: self.segment += 1 - if len(self.listing) <= self.segment + 1: - return - self.position += self.listing[self.segment]['bytes'] + self.position += self.segment_peek['bytes'] + self.segment_peek = self.listing.next() self.seek = start - self.position else: start = 0 @@ -707,38 +707,101 @@ class ObjectController(Controller): return resp resp = resp2 req.range = req_range + if 'x-object-manifest' in resp.headers: lcontainer, lprefix = \ resp.headers['x-object-manifest'].split('/', 1) lpartition, lnodes = self.app.container_ring.get_nodes( self.account_name, lcontainer) - lreq = Request.blank('/%s/%s?prefix=%s&format=json' % - (self.account_name, lcontainer, lprefix)) - lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, lnodes, - lreq.path_info, self.app.container_ring.replica_count) - if lresp.status_int // 100 != 2: - lresp = HTTPNotFound(request=req) - lresp.headers['X-Object-Manifest'] = \ - resp.headers['x-object-manifest'] - return lresp - if 'swift.authorize' in req.environ: - req.acl = lresp.headers.get('x-container-read') - aresp = req.environ['swift.authorize'](req) - if aresp: - return aresp - listing = json.loads(lresp.body) - content_length = sum(o['bytes'] for o in listing) - etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() - headers = {'X-Object-Manifest': resp.headers['x-object-manifest'], - 'Content-Type': resp.content_type, 'Content-Length': - content_length, 'ETag': etag} - for key, value in resp.headers.iteritems(): - if key.lower().startswith('x-object-meta-'): - headers[key] = value - resp = Response(headers=headers, request=req, - conditional_response=True) - resp.app_iter = SegmentedIterable(self, lcontainer, listing, resp) - resp.content_length = content_length + marker = '' + listing = [] + while True: + lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' % + (quote(self.account_name), quote(lcontainer), + quote(lprefix), quote(marker))) + lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, + lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + lresp = HTTPNotFound(request=req) + lresp.headers['X-Object-Manifest'] = \ + resp.headers['x-object-manifest'] + return lresp + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + return aresp + sublisting = json.loads(lresp.body) + if not sublisting: + break + listing.extend(sublisting) + if len(listing) > CONTAINER_LISTING_LIMIT: + break + marker = sublisting[-1]['name'] + + if len(listing) > CONTAINER_LISTING_LIMIT: + # We will serve large objects with a ton of segments with + # chunked transfer encoding. + + def listing_iter(): + marker = '' + while True: + lreq = Request.blank( + '/%s/%s?prefix=%s&format=json&marker=%s' % + (quote(self.account_name), quote(lcontainer), + quote(lprefix), quote(marker))) + lresp = self.GETorHEAD_base(lreq, 'Container', + lpartition, lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + raise Exception('Object manifest GET could not ' + 'continue listing: %s %s' % + (req.path, lreq.path)) + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + raise Exception('Object manifest GET could ' + 'not continue listing: %s %s' % + (req.path, aresp)) + sublisting = json.loads(lresp.body) + if not sublisting: + break + for obj in sublisting: + yield obj + marker = sublisting[-1]['name'] + + headers = { + 'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value + resp = Response(headers=headers, request=req, + conditional_response=True) + resp.app_iter = SegmentedIterable(self, lcontainer, + listing_iter(), resp) + + else: + # For objects with a reasonable number of segments, we'll serve + # them with a set content-length and computed etag. + content_length = sum(o['bytes'] for o in listing) + etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() + headers = { + 'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type, + 'Content-Length': content_length, + 'ETag': etag} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value + resp = Response(headers=headers, request=req, + conditional_response=True) + resp.app_iter = SegmentedIterable(self, lcontainer, listing, + resp) + resp.content_length = content_length + return resp @public diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index d60f4c8590..b7d43c0fb2 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1600,6 +1600,7 @@ class TestObjectController(unittest.TestCase): mkdirs(os.path.join(testdir, 'sdb1')) mkdirs(os.path.join(testdir, 'sdb1', 'tmp')) try: + orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT conf = {'devices': testdir, 'swift_dir': testdir, 'mount_check': 'false'} prolis = listen(('localhost', 0)) @@ -1976,6 +1977,24 @@ class TestObjectController(unittest.TestCase): self.assert_('Content-Type: text/jibberish' in headers) body = fd.read() self.assertEquals(body, '1234 1234 1234 1234 1234 ') + # Do it again but exceeding the container listing limit + proxy_server.CONTAINER_LISTING_LIMIT = 2 + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('X-Object-Manifest: segmented/name/' in headers) + self.assert_('Content-Type: text/jibberish' in headers) + body = fd.read() + # A bit fragile of a test; as it makes the assumption that all + # will be sent in a single chunk. + self.assertEquals(body, + '19\r\n1234 1234 1234 1234 1234 \r\n0\r\n\r\n') finally: prospa.kill() acc1spa.kill() @@ -1985,6 +2004,7 @@ class TestObjectController(unittest.TestCase): obj1spa.kill() obj2spa.kill() finally: + proxy_server.CONTAINER_LISTING_LIMIT = orig_container_listing_limit rmtree(testdir) def test_mismatched_etags(self): @@ -2995,9 +3015,10 @@ class TestSegmentedIterable(unittest.TestCase): self.controller = FakeObjectController() def test_load_next_segment_unexpected_error(self): + # Iterator value isn't a dict self.assertRaises(Exception, proxy_server.SegmentedIterable(self.controller, None, - None)._load_next_segment) + [None])._load_next_segment) self.assertEquals(self.controller.exception_args[0], 'ERROR: While processing manifest /a/c/o tx1') @@ -3030,6 +3051,7 @@ class TestSegmentedIterable(unittest.TestCase): segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.segment = 0 + segit.listing.next() segit._load_next_segment() self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') data = ''.join(segit.segment_iter) @@ -3039,6 +3061,7 @@ class TestSegmentedIterable(unittest.TestCase): segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': 'o1'}, {'name': 'o2'}]) segit.segment = 0 + segit.listing.next() segit.seek = 1 segit._load_next_segment() self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') @@ -3062,8 +3085,9 @@ class TestSegmentedIterable(unittest.TestCase): 'Could not load object segment /a/lc/o1: 404') def test_iter_unexpected_error(self): + # Iterator value isn't a dict self.assertRaises(Exception, ''.join, - proxy_server.SegmentedIterable(self.controller, None, None)) + proxy_server.SegmentedIterable(self.controller, None, [None])) self.assertEquals(self.controller.exception_args[0], 'ERROR: While processing manifest /a/c/o tx1') @@ -3100,9 +3124,10 @@ class TestSegmentedIterable(unittest.TestCase): 'Could not load object segment /a/lc/o1: 404') def test_app_iter_range_unexpected_error(self): + # Iterator value isn't a dict self.assertRaises(Exception, proxy_server.SegmentedIterable(self.controller, None, - None).app_iter_range(None, None).next) + [None]).app_iter_range(None, None).next) self.assertEquals(self.controller.exception_args[0], 'ERROR: While processing manifest /a/c/o tx1') From bf0a8e934cb82bd7567bd7cbe866ded827a18c55 Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 14 Dec 2010 11:20:12 -0800 Subject: [PATCH 11/18] Fixed a bug where a HEAD on a really, really large object would give a content-length of 0 instead of transfer-encoding: chunked --- swift/proxy/server.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index ff6e5e310f..8250d181f3 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -780,8 +780,23 @@ class ObjectController(Controller): headers[key] = value resp = Response(headers=headers, request=req, conditional_response=True) - resp.app_iter = SegmentedIterable(self, lcontainer, - listing_iter(), resp) + if req.method == 'HEAD': + # These shenanigans are because webob translates the HEAD + # request into a webob EmptyResponse for the body, which + # has a len, which eventlet translates as needing a + # content-length header added. So we call the original + # webob resp for the headers but return an empty generator + # for the body. + + def head_response(environ, start_response): + resp(environ, start_response) + return ('' for x in '') + + head_response.status_int = resp.status_int + return head_response + else: + resp.app_iter = SegmentedIterable(self, lcontainer, + listing_iter(), resp) else: # For objects with a reasonable number of segments, we'll serve From d13b34fdc1e2d66c31ab40d80213c6df475037ba Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 14 Dec 2010 13:51:24 -0800 Subject: [PATCH 12/18] x-copy-from now understands manifest sources and copies details rather than contents --- swift/proxy/server.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 8250d181f3..4ba651bc4f 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -172,6 +172,9 @@ class SegmentedIterable(object): self.response.status_int = 503 raise + def next(self): + return iter(self).next() + def __iter__(self): """ Standard iterator function that returns the object's contents. """ try: @@ -790,7 +793,7 @@ class ObjectController(Controller): def head_response(environ, start_response): resp(environ, start_response) - return ('' for x in '') + return iter([]) head_response.status_int = resp.status_int return head_response @@ -935,11 +938,17 @@ class ObjectController(Controller): return source_resp self.object_name = orig_obj_name self.container_name = orig_container_name - data_source = source_resp.app_iter new_req = Request.blank(req.path_info, environ=req.environ, headers=req.headers) - new_req.content_length = source_resp.content_length - new_req.etag = source_resp.etag + if 'x-object-manifest' in source_resp.headers: + data_source = iter(['']) + new_req.content_length = 0 + new_req.headers['X-Object-Manifest'] = \ + source_resp.headers['x-object-manifest'] + else: + data_source = source_resp.app_iter + new_req.content_length = source_resp.content_length + new_req.etag = source_resp.etag # we no longer need the X-Copy-From header del new_req.headers['X-Copy-From'] for k, v in source_resp.headers.items(): From 89ad6e727bc621e49d234162327f02e359d61b18 Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 14 Dec 2010 14:16:38 -0800 Subject: [PATCH 13/18] Limit manifest gets to one segment per second; prevents amplification attacks of tons of tiny segments --- swift/proxy/server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 4ba651bc4f..7dea1ebcde 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -28,6 +28,7 @@ import uuid import functools from hashlib import md5 +from eventlet import sleep from eventlet.timeout import Timeout from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ HTTPNotFound, HTTPPreconditionFailed, \ @@ -132,6 +133,7 @@ class SegmentedIterable(object): self.response = response if not self.response: self.response = Response() + self.next_get_time = 0 def _load_next_segment(self): """ @@ -152,6 +154,8 @@ class SegmentedIterable(object): if self.seek: req.range = 'bytes=%s-' % self.seek self.seek = 0 + sleep(max(self.next_get_time - time.time(), 0)) + self.next_get_time = time.time() + 1 resp = self.controller.GETorHEAD_base(req, 'Object', partition, self.controller.iter_nodes(partition, nodes, self.controller.app.object_ring), path, From 3e306e0f43cd70f81a74f11efe7d97890e7504a2 Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 14 Dec 2010 14:25:12 -0800 Subject: [PATCH 14/18] Changed to only limit manifest gets after first 10 segments. Makes tests run faster but does allow amplification 1:10. At least it's not 1:infinity like before. --- swift/proxy/server.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 7dea1ebcde..66d585c9e5 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -154,8 +154,9 @@ class SegmentedIterable(object): if self.seek: req.range = 'bytes=%s-' % self.seek self.seek = 0 - sleep(max(self.next_get_time - time.time(), 0)) - self.next_get_time = time.time() + 1 + if self.segment > 10: + sleep(max(self.next_get_time - time.time(), 0)) + self.next_get_time = time.time() + 1 resp = self.controller.GETorHEAD_base(req, 'Object', partition, self.controller.iter_nodes(partition, nodes, self.controller.app.object_ring), path, From 4400f0473a67989d1563262a3a6e544a161aef80 Mon Sep 17 00:00:00 2001 From: gholt Date: Tue, 14 Dec 2010 14:49:36 -0800 Subject: [PATCH 15/18] Even though isn't 100% related, made st emit a warning if there's a / in a container name --- bin/st | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/bin/st b/bin/st index 748dfa475c..d5e1d1fab3 100755 --- a/bin/st +++ b/bin/st @@ -1000,6 +1000,10 @@ def st_delete(parser, args, print_queue, error_queue): raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) conn = create_connection() _delete_container(args[0], conn) else: @@ -1159,6 +1163,10 @@ def st_download(options, args, print_queue, error_queue): raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) _download_container(args[0], create_connection()) else: if len(args) == 2: @@ -1272,6 +1280,10 @@ Containers: %d raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) try: headers = conn.head_container(args[0]) object_count = int(headers.get('x-container-object-count', 0)) @@ -1378,6 +1390,10 @@ def st_post(options, args, print_queue, error_queue): raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) headers = {} for item in options.meta: split_item = item.split(':') From 80bde91333bd54006d25a7c6d3baee02cd4587d8 Mon Sep 17 00:00:00 2001 From: gholt Date: Thu, 16 Dec 2010 09:03:59 -0800 Subject: [PATCH 16/18] st: Works with chunked transfer encoded downloads now --- bin/st | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bin/st b/bin/st index d5e1d1fab3..8a4a71a245 100755 --- a/bin/st +++ b/bin/st @@ -1063,7 +1063,10 @@ def st_download(options, args, print_queue, error_queue): headers, body = \ conn.get_object(container, obj, resp_chunk_size=65536) content_type = headers.get('content-type') - content_length = int(headers.get('content-length')) + if 'content-length' in headers: + content_length = int(headers.get('content-length')) + else: + content_length = None etag = headers.get('etag') path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): @@ -1102,7 +1105,7 @@ def st_download(options, args, print_queue, error_queue): if md5sum and md5sum.hexdigest() != etag: error_queue.put('%s: md5sum != etag, %s != %s' % (path, md5sum.hexdigest(), etag)) - if read_length != content_length: + if content_length is not None and read_length != content_length: error_queue.put('%s: read_length != content_length, %d != %d' % (path, read_length, content_length)) if 'x-object-meta-mtime' in headers and not options.out_file: From a8b239e5a05e5964189126d291f836f99930fe93 Mon Sep 17 00:00:00 2001 From: gholt Date: Thu, 16 Dec 2010 09:21:30 -0800 Subject: [PATCH 17/18] Made stat display of objects suppress content-length, last-modified, and etag if they aren't in the headers --- bin/st | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/bin/st b/bin/st index 8a4a71a245..b41aca67ec 100755 --- a/bin/st +++ b/bin/st @@ -1323,14 +1323,16 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], Account: %s Container: %s Object: %s - Content Type: %s -Content Length: %s - Last Modified: %s - ETag: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], - args[1], headers.get('content-type'), - headers.get('content-length'), - headers.get('last-modified'), - headers.get('etag'))) + Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], + args[1], headers.get('content-type'))) + if 'content-length' in headers: + print_queue.put('Content Length: %s' % + headers['content-length']) + if 'last-modified' in headers: + print_queue.put(' Last Modified: %s' % + headers['last-modified']) + if 'etag' in headers: + print_queue.put(' ETag: %s' % headers['etag']) if 'x-object-manifest' in headers: print_queue.put(' Manifest: %s' % headers['x-object-manifest']) From fa31d76eee4cd1024c1c99ca6a9d00e1b483ac6d Mon Sep 17 00:00:00 2001 From: gholt Date: Thu, 16 Dec 2010 10:46:11 -0800 Subject: [PATCH 18/18] lobjects: The Last-Modified header is now determined for reasonably segmented objects. --- swift/proxy/server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 66d585c9e5..a0fefc7ab7 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -20,9 +20,11 @@ except ImportError: import json import mimetypes import os +import re import time import traceback from ConfigParser import ConfigParser +from datetime import datetime from urllib import unquote, quote import uuid import functools @@ -793,7 +795,7 @@ class ObjectController(Controller): # request into a webob EmptyResponse for the body, which # has a len, which eventlet translates as needing a # content-length header added. So we call the original - # webob resp for the headers but return an empty generator + # webob resp for the headers but return an empty iterator # for the body. def head_response(environ, start_response): @@ -810,6 +812,9 @@ class ObjectController(Controller): # For objects with a reasonable number of segments, we'll serve # them with a set content-length and computed etag. content_length = sum(o['bytes'] for o in listing) + last_modified = max(o['last_modified'] for o in listing) + last_modified = \ + datetime(*map(int, re.split('[^\d]', last_modified)[:-1])) etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() headers = { 'X-Object-Manifest': resp.headers['x-object-manifest'], @@ -824,6 +829,7 @@ class ObjectController(Controller): resp.app_iter = SegmentedIterable(self, lcontainer, listing, resp) resp.content_length = content_length + resp.last_modified = last_modified return resp