Server-side implementation for segmented objects

This commit is contained in:
gholt 2010-11-16 15:35:39 -08:00
parent aea6903e4a
commit fa3c871f0b
7 changed files with 915 additions and 27 deletions

View File

@ -113,6 +113,17 @@ def check_object_creation(req, object_name):
if not check_utf8(req.headers['Content-Type']):
return HTTPBadRequest(request=req, body='Invalid Content-Type',
content_type='text/plain')
if 'x-object-manifest' in req.headers:
value = req.headers['x-object-manifest']
container = prefix = None
try:
container, prefix = value.split('/', 1)
except ValueError:
pass
if not container or not prefix or '?' in value or '&' in value or \
prefix[0] == '/':
return HTTPBadRequest(request=req,
body='X-Object-Manifest must in the format container/prefix')
return check_metadata(req, 'object')

View File

@ -391,6 +391,9 @@ class ObjectController(object):
'ETag': etag,
'Content-Length': str(os.fstat(fd).st_size),
}
if 'x-object-manifest' in request.headers:
metadata['X-Object-Manifest'] = \
request.headers['x-object-manifest']
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-') and
len(val[0]) > 14)
@ -460,7 +463,8 @@ class ObjectController(object):
'application/octet-stream'), app_iter=file,
request=request, conditional_response=True)
for key, value in file.metadata.iteritems():
if key.lower().startswith('x-object-meta-'):
if key == 'X-Object-Manifest' or \
key.lower().startswith('x-object-meta-'):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
@ -488,7 +492,8 @@ class ObjectController(object):
response = Response(content_type=file.metadata['Content-Type'],
request=request, conditional_response=True)
for key, value in file.metadata.iteritems():
if key.lower().startswith('x-object-meta-'):
if key == 'X-Object-Manifest' or \
key.lower().startswith('x-object-meta-'):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])

View File

@ -14,6 +14,10 @@
# limitations under the License.
from __future__ import with_statement
try:
import simplejson as json
except ImportError:
import json
import mimetypes
import os
import time
@ -22,6 +26,7 @@ from ConfigParser import ConfigParser
from urllib import unquote, quote
import uuid
import functools
from hashlib import md5
from eventlet.timeout import Timeout
from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
@ -94,6 +99,138 @@ def get_container_memcache_key(account, container):
return 'container%s' % path
class SegmentedIterable(object):
"""
Iterable that returns the object contents for a segmented object in Swift.
In addition to these params, you can also set the `response` attr just
after creating the SegmentedIterable and it will update the response's
`bytes_transferred` value (used to log the size of the request).
:param controller: The ObjectController instance to work with.
:param container: The container the object segments are within.
:param listing: The listing of object segments to iterate over; this is a
standard JSON decoded container listing.
"""
def __init__(self, controller, container, listing):
self.controller = controller
self.container = container
self.listing = listing
self.segment = -1
self.seek = 0
self.segment_iter = None
self.position = 0
self.response = None
def _load_next_segment(self):
"""
Loads the self.segment_iter with the next object segment's contents.
:raises: StopIteration when there are no more object segments.
"""
try:
self.segment += 1
if self.segment >= len(self.listing):
raise StopIteration()
obj = self.listing[self.segment]
partition, nodes = self.controller.app.object_ring.get_nodes(
self.controller.account_name, self.container, obj['name'])
path = '/%s/%s/%s' % (self.controller.account_name, self.container,
obj['name'])
req = Request.blank(path)
if self.seek:
req.range = 'bytes=%s-' % self.seek
self.seek = 0
resp = self.controller.GETorHEAD_base(req, 'Object', partition,
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring), path,
self.controller.app.object_ring.replica_count)
if resp.status_int // 100 != 2:
raise Exception('Could not load object segment %s: %s' % (path,
resp.status_int))
self.segment_iter = resp.app_iter
except Exception, err:
if not isinstance(err, StopIteration):
self.controller.app.logger.exception('ERROR: While processing '
'manifest /%s/%s/%s %s' % (self.controller.account_name,
self.controller.container_name,
self.controller.object_name, self.controller.trans_id))
raise
def __iter__(self):
""" Standard iterator function that returns the object's contents. """
try:
while True:
if not self.segment_iter:
self._load_next_segment()
while True:
with ChunkReadTimeout(self.controller.app.node_timeout):
try:
chunk = self.segment_iter.next()
break
except StopIteration:
self._load_next_segment()
self.position += len(chunk)
if self.response:
self.response.bytes_transferred = getattr(self.response,
'bytes_transferred', 0) + len(chunk)
yield chunk
except Exception, err:
if not isinstance(err, StopIteration):
self.controller.app.logger.exception('ERROR: While processing '
'manifest /%s/%s/%s %s' % (self.controller.account_name,
self.controller.container_name,
self.controller.object_name, self.controller.trans_id))
raise
def app_iter_range(self, start, stop):
"""
Non-standard iterator function for use with Webob in serving Range
requests more quickly. This will skip over segments and do a range
request on the first segment to return data from, if needed.
:param start: The first byte (zero-based) to return. None for 0.
:param stop: The last byte (zero-based) to return. None for end.
"""
try:
if start:
if len(self.listing) <= self.segment + 1:
return
while start >= self.position + \
self.listing[self.segment + 1]['bytes']:
self.segment += 1
if len(self.listing) <= self.segment + 1:
return
self.position += self.listing[self.segment]['bytes']
self.seek = start - self.position
else:
start = 0
if stop is not None:
length = stop - start
else:
length = None
for chunk in self:
if length is not None:
length -= len(chunk)
if length < 0:
# Chop off the extra:
if self.response:
self.response.bytes_transferred = \
getattr(self.response, 'bytes_transferred', 0) \
+ length
yield chunk[:length]
break
yield chunk
except Exception, err:
if not isinstance(err, StopIteration):
self.controller.app.logger.exception('ERROR: While processing '
'manifest /%s/%s/%s %s' % (self.controller.account_name,
self.controller.container_name,
self.controller.object_name, self.controller.trans_id))
raise
class Controller(object):
"""Base WSGI controller class for the proxy"""
@ -526,9 +663,47 @@ class ObjectController(Controller):
return aresp
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
return self.GETorHEAD_base(req, 'Object', partition,
resp = self.GETorHEAD_base(req, 'Object', partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
# If we get a 416 Requested Range Not Satisfiable we have to check if
# we were actually requesting a manifest object and then redo the range
# request on the whole object.
if resp.status_int == 416:
req_range = req.range
req.range = None
resp2 = self.GETorHEAD_base(req, 'Object', partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
if 'x-object-manifest' not in resp2.headers:
return resp
resp = resp2
req.range = req_range
if 'x-object-manifest' in resp.headers:
lcontainer, lprefix = \
resp.headers['x-object-manifest'].split('/', 1)
lpartition, lnodes = self.app.container_ring.get_nodes(
self.account_name, lcontainer)
lreq = Request.blank('/%s/%s?prefix=%s&format=json' %
(self.account_name, lcontainer, lprefix))
lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, lnodes,
lreq.path_info, self.app.container_ring.replica_count)
if 'swift.authorize' in req.environ:
req.acl = lresp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
listing = json.loads(lresp.body)
content_length = sum(o['bytes'] for o in listing)
etag = md5('"'.join(o['hash'] for o in listing)).hexdigest()
headers = {'X-Object-Manifest': resp.headers['x-object-manifest'],
'Content-Type': resp.content_type, 'Content-Length':
content_length, 'ETag': etag}
resp = Response(app_iter=SegmentedIterable(self, lcontainer,
listing), headers=headers, request=req,
conditional_response=True)
resp.app_iter.response = resp
return resp
@public
@delay_denial

View File

@ -16,6 +16,7 @@ class TestObject(unittest.TestCase):
if skip:
raise SkipTest
self.container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token})
@ -24,6 +25,7 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEquals(resp.status, 201)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
self.obj), 'test', {'X-Auth-Token': token})
@ -35,6 +37,7 @@ class TestObject(unittest.TestCase):
def tearDown(self):
if skip:
raise SkipTest
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (parsed.path, self.container,
self.obj), '', {'X-Auth-Token': token})
@ -42,6 +45,7 @@ class TestObject(unittest.TestCase):
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token})
@ -53,6 +57,7 @@ class TestObject(unittest.TestCase):
def test_public_object(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET',
'%s/%s/%s' % (parsed.path, self.container, self.obj))
@ -62,6 +67,7 @@ class TestObject(unittest.TestCase):
raise Exception('Should not have been able to GET')
except Exception, err:
self.assert_(str(err).startswith('No result after '))
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token,
@ -73,6 +79,7 @@ class TestObject(unittest.TestCase):
resp = retry(get)
resp.read()
self.assertEquals(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token, 'X-Container-Read': ''})
@ -89,6 +96,7 @@ class TestObject(unittest.TestCase):
def test_private_object(self):
if skip or skip3:
raise SkipTest
# Ensure we can't access the object with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/%s' % (parsed.path, self.container,
@ -98,8 +106,10 @@ class TestObject(unittest.TestCase):
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (parsed.path,
shared_container), '',
@ -110,6 +120,7 @@ class TestObject(unittest.TestCase):
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path,
@ -123,6 +134,7 @@ class TestObject(unittest.TestCase):
resp = retry(copy, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, shared_container,
@ -131,6 +143,7 @@ class TestObject(unittest.TestCase):
resp = retry(put, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (parsed.path,
@ -143,6 +156,7 @@ class TestObject(unittest.TestCase):
resp = retry(copy2, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (parsed.path,
@ -155,6 +169,7 @@ class TestObject(unittest.TestCase):
resp = retry(copy3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (parsed.path, shared_container,
@ -163,6 +178,7 @@ class TestObject(unittest.TestCase):
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
# clean up shared_container
def delete(url, token, parsed, conn):
conn.request('DELETE',
@ -173,6 +189,269 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEquals(resp.status, 204)
def test_manifest(self):
if skip:
raise SkipTest
# Data for the object segments
segments1 = ['one', 'two', 'three', 'four', 'five']
segments2 = ['six', 'seven', 'eight']
segments3 = ['nine', 'ten', 'eleven']
# Upload the first set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments1/%s' % (parsed.path,
self.container, str(objnum)), segments1[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments1/' % self.container,
'Content-Type': 'text/jibberish', 'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token, 'Range':
'bytes=3-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1[1:]))
self.assertEquals(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token, 'Range':
'bytes=5-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:])
self.assertEquals(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token, 'Range':
'bytes=5-10'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:11])
self.assertEquals(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments2/%s' % (parsed.path,
self.container, str(objnum)), segments2[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments2/' % self.container,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
if not skip3:
# Ensure we can't access the manifest with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token, 'X-Container-Read':
swift_test_user[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + acontainer, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments3/%s' % (parsed.path,
acontainer, str(objnum)), segments3[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments3/' % acontainer,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
if not skip3:
# Ensure we can't access the manifest with the third account
# (because the segments are in a protected container even if the
# manifest itself is not).
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
'', {'X-Auth-Token': token, 'X-Container-Read':
swift_test_user[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments3/%s' % (parsed.path,
acontainer, str(objnum)), '', {'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments2/%s' % (parsed.path,
self.container, str(objnum)), '', {'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments1/%s' % (parsed.path,
self.container, str(objnum)), '', {'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the extra container
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
if __name__ == '__main__':
unittest.main()

View File

@ -22,6 +22,7 @@ from webob.exc import HTTPBadRequest, HTTPLengthRequired, \
from swift.common import constraints
class TestConstraints(unittest.TestCase):
def test_check_metadata_empty(self):
@ -137,6 +138,32 @@ class TestConstraints(unittest.TestCase):
self.assert_(isinstance(resp, HTTPBadRequest))
self.assert_('Content-Type' in resp.body)
def test_check_object_manifest_header(self):
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container/prefix', 'Content-Length':
'0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(not resp)
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container', 'Content-Length': '0',
'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': '/container/prefix',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container/prefix?query=param',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container/prefix&query=param',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'http://host/container/prefix',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
def test_check_mount(self):
self.assertFalse(constraints.check_mount('', ''))
constraints.os = MockTrue() # mock os module

View File

@ -42,7 +42,7 @@ class TestObjectController(unittest.TestCase):
self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
if not self.path_to_test_xfs or \
not os.path.exists(self.path_to_test_xfs):
print >>sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
'pointing to a valid directory.\n' \
'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
'system for testing.'
@ -77,7 +77,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Object-Meta-3': 'Three',
'X-Object-Meta-4': 'Four',
@ -95,7 +96,8 @@ class TestObjectController(unittest.TestCase):
if not self.path_to_test_xfs:
raise SkipTest
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/fail', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/fail',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Object-Meta-1': 'One',
'X-Object-Meta-2': 'Two',
@ -116,29 +118,37 @@ class TestObjectController(unittest.TestCase):
def test_POST_container_connection(self):
if not self.path_to_test_xfs:
raise SkipTest
def mock_http_connect(response, with_exc=False):
class FakeConn(object):
def __init__(self, status, with_exc):
self.status = status
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
self.with_exc = with_exc
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def read(self, amt=None):
return ''
return lambda *args, **kwargs: FakeConn(response, with_exc)
old_http_connect = object_server.http_connect
try:
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp, 'Content-Type': 'text/plain',
'Content-Length': '0'})
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD':
'POST'}, headers={'X-Timestamp': timestamp, 'Content-Type':
'text/plain', 'Content-Length': '0'})
resp = self.object_controller.PUT(req)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -148,7 +158,8 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = mock_http_connect(202)
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -158,7 +169,8 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = mock_http_connect(202, with_exc=True)
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -226,7 +238,8 @@ class TestObjectController(unittest.TestCase):
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY')
self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '6',
'ETag': '0b4c12d7e0a73840c1c4f148fda3b037',
@ -258,7 +271,8 @@ class TestObjectController(unittest.TestCase):
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY TWO')
self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '10',
'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
@ -270,17 +284,17 @@ class TestObjectController(unittest.TestCase):
if not self.path_to_test_xfs:
raise SkipTest
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Type': 'text/plain'})
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Type': 'text/plain'})
req.body = 'test'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
def test_PUT_invalid_etag(self):
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Type': 'text/plain',
'ETag': 'invalid'})
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Type': 'text/plain',
'ETag': 'invalid'})
req.body = 'test'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 422)
@ -304,7 +318,8 @@ class TestObjectController(unittest.TestCase):
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY THREE')
self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '12',
'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
@ -316,25 +331,33 @@ class TestObjectController(unittest.TestCase):
def test_PUT_container_connection(self):
if not self.path_to_test_xfs:
raise SkipTest
def mock_http_connect(response, with_exc=False):
class FakeConn(object):
def __init__(self, status, with_exc):
self.status = status
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
self.with_exc = with_exc
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def read(self, amt=None):
return ''
return lambda *args, **kwargs: FakeConn(response, with_exc)
old_http_connect = object_server.http_connect
try:
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -555,7 +578,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.etag, etag)
req = Request.blank('/sda1/p/a/c/o2', environ={'REQUEST_METHOD': 'GET'},
req = Request.blank('/sda1/p/a/c/o2',
environ={'REQUEST_METHOD': 'GET'},
headers={'If-Match': '*'})
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 412)
@ -715,7 +739,8 @@ class TestObjectController(unittest.TestCase):
""" Test swift.object_server.ObjectController.DELETE """
if not self.path_to_test_xfs:
raise SkipTest
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'})
req = Request.blank('/sda1/p/a/c',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 400)
@ -916,21 +941,26 @@ class TestObjectController(unittest.TestCase):
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
os.rmdir(tmpdir)
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp():
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
'o').mkstemp():
self.assert_(os.path.exists(tmpdir))
def test_max_upload_time(self):
if not self.path_to_test_xfs:
raise SkipTest
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
if self.sent < 4:
sleep(0.1)
self.sent += 1
return ' '
return ''
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'X-Timestamp': normalize_timestamp(time()),
@ -946,14 +976,18 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 408)
def test_short_body(self):
class ShortBody():
def __init__(self):
self.sent = False
def read(self, size=-1):
if not self.sent:
self.sent = True
return ' '
return ''
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': ShortBody()},
headers={'X-Timestamp': normalize_timestamp(time()),
@ -1001,11 +1035,37 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers['content-encoding'], 'gzip')
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'HEAD'})
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD':
'HEAD'})
resp = self.object_controller.HEAD(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers['content-encoding'], 'gzip')
def test_manifest_header(self):
if not self.path_to_test_xfs:
raise SkipTest
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'Content-Length': '0',
'X-Object-Manifest': 'c/o/'})
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p', hash_path('a', 'c',
'o')), timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)), {'X-Timestamp': timestamp,
'Content-Length': '0', 'Content-Type': 'text/plain', 'name':
'/a/c/o', 'X-Object-Manifest': 'c/o/', 'ETag':
'd41d8cd98f00b204e9800998ecf8427e'})
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'})
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers.get('x-object-manifest'), 'c/o/')
if __name__ == '__main__':
unittest.main()

View File

@ -34,8 +34,8 @@ import eventlet
from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen
from eventlet.timeout import Timeout
import simplejson
from webob import Request
from webob.exc import HTTPUnauthorized
from webob import Request, Response
from webob.exc import HTTPNotFound, HTTPUnauthorized
from test.unit import connect_tcp, readuntil2crlfs
from swift.proxy import server as proxy_server
@ -53,7 +53,9 @@ logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
def fake_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status, etag=None, body=''):
self.status = status
self.reason = 'Fake'
@ -158,6 +160,7 @@ class FakeRing(object):
class FakeMemcache(object):
def __init__(self):
self.store = {}
@ -212,9 +215,12 @@ def save_globals():
class TestProxyServer(unittest.TestCase):
def test_unhandled_exception(self):
class MyApp(proxy_server.Application):
def get_controller(self, path):
raise Exception('this shouldnt be caught')
app = MyApp(None, FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})
@ -323,8 +329,11 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 204, 500, 404), 503)
def test_PUT_connect_exceptions(self):
def mock_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
@ -344,6 +353,7 @@ class TestObjectController(unittest.TestCase):
if self.status == -3:
return FakeConn(507)
return FakeConn(100)
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
@ -351,7 +361,9 @@ class TestObjectController(unittest.TestCase):
if status == -1:
raise HTTPException()
return FakeConn(status)
return connect
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
@ -372,8 +384,11 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 503, 503, -1), 503)
def test_PUT_send_exceptions(self):
def mock_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
@ -437,8 +452,11 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(res.status_int, 413)
def test_PUT_getresponse_exceptions(self):
def mock_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
@ -633,6 +651,7 @@ class TestObjectController(unittest.TestCase):
dev['port'] = 1
class SlowBody():
def __init__(self):
self.sent = 0
@ -642,6 +661,7 @@ class TestObjectController(unittest.TestCase):
self.sent += 1
return ' '
return ''
req = Request.blank('/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'Content-Length': '4', 'Content-Type': 'text/plain'})
@ -680,11 +700,13 @@ class TestObjectController(unittest.TestCase):
dev['port'] = 1
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
raise Exception('Disconnected')
req = Request.blank('/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'Content-Length': '4', 'Content-Type': 'text/plain'})
@ -1334,7 +1356,9 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put(self):
# quick test of chunked put w/o PATH_TO_TEST_XFS
class ChunkedFile():
def __init__(self, bytes):
self.bytes = bytes
self.read_bytes = 0
@ -1495,8 +1519,10 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(headers[:len(exp)], exp)
# Check unhandled exception
orig_update_request = prosrv.update_request
def broken_update_request(env, req):
raise Exception('fake')
prosrv.update_request = broken_update_request
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
@ -1545,8 +1571,10 @@ class TestObjectController(unittest.TestCase):
# in a test for logging x-forwarded-for (first entry only).
class Logger(object):
def info(self, msg):
self.msg = msg
orig_logger = prosrv.logger
prosrv.logger = Logger()
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
@ -1568,8 +1596,10 @@ class TestObjectController(unittest.TestCase):
# Turn on header logging.
class Logger(object):
def info(self, msg):
self.msg = msg
orig_logger = prosrv.logger
prosrv.logger = Logger()
prosrv.log_headers = True
@ -1726,6 +1756,52 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(headers[:len(exp)], exp)
body = fd.read()
self.assertEquals(body, 'oh hai123456789abcdef')
# Create a container for our segmented/manifest object testing
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Storage-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEquals(headers[:len(exp)], exp)
# Create the object segments
for segment in xrange(5):
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented/name/%s HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Storage-Token: '
't\r\nContent-Length: 5\r\n\r\n1234 ' % str(segment))
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEquals(headers[:len(exp)], exp)
# Create the object manifest file
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented/name HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Storage-Token: '
't\r\nContent-Length: 0\r\nX-Object-Manifest: '
'segmented/name/\r\nContent-Type: text/jibberish\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEquals(headers[:len(exp)], exp)
# Ensure retrieving the manifest file gets the whole object
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Auth-Token: '
't\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 200'
self.assertEquals(headers[:len(exp)], exp)
self.assert_('X-Object-Manifest: segmented/name/' in headers)
self.assert_('Content-Type: text/jibberish' in headers)
body = fd.read()
self.assertEquals(body, '1234 1234 1234 1234 1234 ')
finally:
prospa.kill()
acc1spa.kill()
@ -1937,6 +2013,7 @@ class TestObjectController(unittest.TestCase):
res = controller.COPY(req)
self.assert_(called[0])
class TestContainerController(unittest.TestCase):
"Test swift.proxy_server.ContainerController"
@ -2080,7 +2157,9 @@ class TestContainerController(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
def test_put_locking(self):
class MockMemcache(FakeMemcache):
def __init__(self, allow_lock=None):
self.allow_lock = allow_lock
super(MockMemcache, self).__init__()
@ -2091,6 +2170,7 @@ class TestContainerController(unittest.TestCase):
yield True
else:
raise MemcacheLockError()
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
@ -2669,5 +2749,256 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(resp.status_int, 400)
class FakeObjectController(object):
def __init__(self):
self.app = self
self.logger = self
self.account_name = 'a'
self.container_name = 'c'
self.object_name = 'o'
self.trans_id = 'tx1'
self.object_ring = FakeRing()
self.node_timeout = 1
def exception(self, *args):
self.exception_args = args
self.exception_info = sys.exc_info()
def GETorHEAD_base(self, *args):
self.GETorHEAD_base_args = args
req = args[0]
path = args[4]
body = data = path[-1] * int(path[-1])
if req.range and req.range.ranges:
body = ''
for start, stop in req.range.ranges:
body += data[start:stop]
resp = Response(app_iter=iter(body))
return resp
def iter_nodes(self, partition, nodes, ring):
for node in nodes:
yield node
for node in ring.get_more_nodes(partition):
yield node
class Stub(object):
pass
class TestSegmentedIterable(unittest.TestCase):
def setUp(self):
self.controller = FakeObjectController()
def test_load_next_segment_unexpected_error(self):
self.assertRaises(Exception,
proxy_server.SegmentedIterable(self.controller, None,
None)._load_next_segment)
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
def test_load_next_segment_with_no_segments(self):
self.assertRaises(StopIteration,
proxy_server.SegmentedIterable(self.controller, 'lc',
[])._load_next_segment)
def test_load_next_segment_with_one_segment(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '1')
def test_load_next_segment_with_two_segments(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '1')
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '22')
def test_load_next_segment_with_two_segments_skip_first(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.segment = 0
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '22')
def test_load_next_segment_with_seek(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.segment = 0
segit.seek = 1
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
self.assertEquals(str(self.controller.GETorHEAD_base_args[0].range),
'bytes=1-')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '2')
def test_load_next_segment_with_get_error(self):
def local_GETorHEAD_base(*args):
return HTTPNotFound()
self.controller.GETorHEAD_base = local_GETorHEAD_base
self.assertRaises(Exception,
proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])._load_next_segment)
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
self.assertEquals(str(self.controller.exception_info[1]),
'Could not load object segment /a/lc/o1: 404')
def test_iter_unexpected_error(self):
self.assertRaises(Exception, ''.join,
proxy_server.SegmentedIterable(self.controller, None, None))
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
def test_iter_with_no_segments(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [])
self.assertEquals(''.join(segit), '')
def test_iter_with_one_segment(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])
segit.response = Stub()
self.assertEquals(''.join(segit), '1')
self.assertEquals(segit.response.bytes_transferred, 1)
def test_iter_with_two_segments(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.response = Stub()
self.assertEquals(''.join(segit), '122')
self.assertEquals(segit.response.bytes_transferred, 3)
def test_iter_with_get_error(self):
def local_GETorHEAD_base(*args):
return HTTPNotFound()
self.controller.GETorHEAD_base = local_GETorHEAD_base
self.assertRaises(Exception, ''.join,
proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}]))
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
self.assertEquals(str(self.controller.exception_info[1]),
'Could not load object segment /a/lc/o1: 404')
def test_app_iter_range_unexpected_error(self):
self.assertRaises(Exception,
proxy_server.SegmentedIterable(self.controller, None,
None).app_iter_range(None, None).next)
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
def test_app_iter_range_with_no_segments(self):
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(None, None)), '')
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(3, None)), '')
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(3, 5)), '')
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(None, 5)), '')
def test_app_iter_range_with_one_segment(self):
listing = [{'name': 'o1', 'bytes': 1}]
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)), '1')
self.assertEquals(segit.response.bytes_transferred, 1)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
self.assertEquals(''.join(segit.app_iter_range(3, None)), '')
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
self.assertEquals(''.join(segit.app_iter_range(3, 5)), '')
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 5)), '1')
self.assertEquals(segit.response.bytes_transferred, 1)
def test_app_iter_range_with_two_segments(self):
listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}]
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)), '122')
self.assertEquals(segit.response.bytes_transferred, 3)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(1, None)), '22')
self.assertEquals(segit.response.bytes_transferred, 2)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(1, 5)), '22')
self.assertEquals(segit.response.bytes_transferred, 2)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 2)), '12')
self.assertEquals(segit.response.bytes_transferred, 2)
def test_app_iter_range_with_many_segments(self):
listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2},
{'name': 'o3', 'bytes': 3}, {'name': 'o4', 'bytes': 4}, {'name':
'o5', 'bytes': 5}]
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)),
'122333444455555')
self.assertEquals(segit.response.bytes_transferred, 15)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(3, None)),
'333444455555')
self.assertEquals(segit.response.bytes_transferred, 12)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(5, None)), '3444455555')
self.assertEquals(segit.response.bytes_transferred, 10)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 6)), '122333')
self.assertEquals(segit.response.bytes_transferred, 6)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 7)), '1223334')
self.assertEquals(segit.response.bytes_transferred, 7)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(3, 7)), '3334')
self.assertEquals(segit.response.bytes_transferred, 4)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(5, 7)), '34')
self.assertEquals(segit.response.bytes_transferred, 2)
if __name__ == '__main__':
unittest.main()