update to trunk

This commit is contained in:
Michael Barton 2010-12-20 21:59:42 +00:00
commit affc53bc1d
14 changed files with 1474 additions and 92 deletions

236
bin/st
View File

@ -581,7 +581,8 @@ def put_object(url, token, container, name, contents, content_length=None,
:param container: container name that the object is in
:param name: object name to put
:param contents: a string or a file like object to read object data from
:param content_length: value to send as content-length header
:param content_length: value to send as content-length header; also limits
the amount read from contents
:param etag: etag of contents
:param chunk_size: chunk size of data to write
:param content_type: value to send as content-type header
@ -611,18 +612,24 @@ def put_object(url, token, container, name, contents, content_length=None,
conn.putrequest('PUT', path)
for header, value in headers.iteritems():
conn.putheader(header, value)
if not content_length:
if content_length is None:
conn.putheader('Transfer-Encoding', 'chunked')
conn.endheaders()
chunk = contents.read(chunk_size)
while chunk:
if not content_length:
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
else:
conn.send(chunk)
chunk = contents.read(chunk_size)
if not content_length:
conn.send('0\r\n\r\n')
else:
conn.endheaders()
left = content_length
while left > 0:
size = chunk_size
if size > left:
size = left
chunk = contents.read(size)
conn.send(chunk)
left -= len(chunk)
else:
conn.request('PUT', path, contents, headers)
resp = conn.getresponse()
@ -860,15 +867,20 @@ class QueueFunctionThread(Thread):
st_delete_help = '''
delete --all OR delete container [object] [object] ...
delete --all OR delete container [--leave-segments] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given.'''.strip('\n')
container, or a list of objects depending on the args given. Segments of
manifest objects will be deleted as well, unless you specify the
--leave-segments option.'''.strip('\n')
def st_delete(parser, args, print_queue, error_queue):
parser.add_option('-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to delete '
'everything in the account')
parser.add_option('', '--leave-segments', action='store_true',
dest='leave_segments', default=False, help='Indicates that you want '
'the segments of manifest objects left alone')
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
@ -876,11 +888,42 @@ def st_delete(parser, args, print_queue, error_queue):
(basename(argv[0]), st_delete_help))
return
def _delete_segment((container, obj), conn):
conn.delete_object(container, obj)
if options.verbose:
print_queue.put('%s/%s' % (container, obj))
object_queue = Queue(10000)
def _delete_object((container, obj), conn):
try:
old_manifest = None
if not options.leave_segments:
try:
old_manifest = conn.head_object(container, obj).get(
'x-object-manifest')
except ClientException, err:
if err.http_status != 404:
raise
conn.delete_object(container, obj)
if old_manifest:
segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1)
for delobj in conn.get_container(scontainer,
prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name']))
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_delete_segment, create_connection()) for _ in
xrange(10)]
for thread in segment_threads:
thread.start()
while not segment_queue.empty():
sleep(0.01)
for thread in segment_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
if options.verbose:
path = options.yes_all and join(container, obj) or obj
if path[:1] in ('/', '\\'):
@ -891,6 +934,7 @@ def st_delete(parser, args, print_queue, error_queue):
raise
error_queue.put('Object %s not found' %
repr('%s/%s' % (container, obj)))
container_queue = Queue(10000)
def _delete_container(container, conn):
@ -956,6 +1000,10 @@ def st_delete(parser, args, print_queue, error_queue):
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
conn = create_connection()
_delete_container(args[0], conn)
else:
@ -976,7 +1024,7 @@ def st_delete(parser, args, print_queue, error_queue):
st_download_help = '''
download --all OR download container [object] [object] ...
download --all OR download container [options] [object] [object] ...
Downloads everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given. For a single
object download, you may use the -o [--output] <filename> option to
@ -1015,19 +1063,25 @@ def st_download(options, args, print_queue, error_queue):
headers, body = \
conn.get_object(container, obj, resp_chunk_size=65536)
content_type = headers.get('content-type')
if 'content-length' in headers:
content_length = int(headers.get('content-length'))
else:
content_length = None
etag = headers.get('etag')
path = options.yes_all and join(container, obj) or obj
if path[:1] in ('/', '\\'):
path = path[1:]
md5sum = None
make_dir = out_file != "-"
if content_type.split(';', 1)[0] == 'text/directory':
if make_dir and not isdir(path):
mkdirs(path)
read_length = 0
if 'x-object-manifest' not in headers:
md5sum = md5()
for chunk in body:
read_length += len(chunk)
if md5sum:
md5sum.update(chunk)
else:
dirpath = dirname(path)
@ -1040,16 +1094,18 @@ def st_download(options, args, print_queue, error_queue):
else:
fp = open(path, 'wb')
read_length = 0
if 'x-object-manifest' not in headers:
md5sum = md5()
for chunk in body:
fp.write(chunk)
read_length += len(chunk)
if md5sum:
md5sum.update(chunk)
fp.close()
if md5sum.hexdigest() != etag:
if md5sum and md5sum.hexdigest() != etag:
error_queue.put('%s: md5sum != etag, %s != %s' %
(path, md5sum.hexdigest(), etag))
if read_length != content_length:
if content_length is not None and read_length != content_length:
error_queue.put('%s: read_length != content_length, %d != %d' %
(path, read_length, content_length))
if 'x-object-meta-mtime' in headers and not options.out_file:
@ -1110,6 +1166,10 @@ def st_download(options, args, print_queue, error_queue):
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
_download_container(args[0], create_connection())
else:
if len(args) == 2:
@ -1223,6 +1283,10 @@ Containers: %d
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
try:
headers = conn.head_container(args[0])
object_count = int(headers.get('x-container-object-count', 0))
@ -1259,14 +1323,19 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
Account: %s
Container: %s
Object: %s
Content Type: %s
Content Length: %s
Last Modified: %s
ETag: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
args[1], headers.get('content-type'),
headers.get('content-length'),
headers.get('last-modified'),
headers.get('etag')))
Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
args[1], headers.get('content-type')))
if 'content-length' in headers:
print_queue.put('Content Length: %s' %
headers['content-length'])
if 'last-modified' in headers:
print_queue.put(' Last Modified: %s' %
headers['last-modified'])
if 'etag' in headers:
print_queue.put(' ETag: %s' % headers['etag'])
if 'x-object-manifest' in headers:
print_queue.put(' Manifest: %s' %
headers['x-object-manifest'])
for key, value in headers.items():
if key.startswith('x-object-meta-'):
print_queue.put('%14s: %s' % ('Meta %s' %
@ -1274,7 +1343,7 @@ Content Length: %s
for key, value in headers.items():
if not key.startswith('x-object-meta-') and key not in (
'content-type', 'content-length', 'last-modified',
'etag', 'date'):
'etag', 'date', 'x-object-manifest'):
print_queue.put(
'%14s: %s' % (key.title(), value))
except ClientException, err:
@ -1326,6 +1395,10 @@ def st_post(options, args, print_queue, error_queue):
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
headers = {}
for item in options.meta:
split_item = item.split(':')
@ -1363,23 +1436,48 @@ st_upload_help = '''
upload [options] container file_or_directory [file_or_directory] [...]
Uploads to the given container the files and directories specified by the
remaining args. -c or --changed is an option that will only upload files
that have changed since the last upload.'''.strip('\n')
that have changed since the last upload. -S <size> or --segment-size <size>
and --leave-segments are options as well (see --help for more).
'''.strip('\n')
def st_upload(options, args, print_queue, error_queue):
parser.add_option('-c', '--changed', action='store_true', dest='changed',
default=False, help='Will only upload files that have changed since '
'the last upload')
parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
'upload files in segments no larger than <size> and then create a '
'"manifest" file that will download all the segments as if it were '
'the original file. The segments will be uploaded to a '
'<container>_segments container so as to not pollute the main '
'<container> listings.')
parser.add_option('', '--leave-segments', action='store_true',
dest='leave_segments', default=False, help='Indicates that you want '
'the older segments of manifest objects left alone (in the case of '
'overwrites)')
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_upload_help))
return
object_queue = Queue(10000)
file_queue = Queue(10000)
def _segment_job(job, conn):
if job.get('delete', False):
conn.delete_object(job['container'], job['obj'])
else:
fp = open(job['path'], 'rb')
fp.seek(job['segment_start'])
conn.put_object(job.get('container', args[0] + '_segments'),
job['obj'], fp, content_length=job['segment_size'])
if options.verbose and 'log_line' in job:
print_queue.put(job['log_line'])
def _upload_file((path, dir_marker), conn):
def _object_job(job, conn):
path = job['path']
container = job.get('container', args[0])
dir_marker = job.get('dir_marker', False)
try:
obj = path
if obj.startswith('./') or obj.startswith('.\\'):
@ -1388,7 +1486,7 @@ def st_upload(options, args, print_queue, error_queue):
if dir_marker:
if options.changed:
try:
headers = conn.head_object(args[0], obj)
headers = conn.head_object(container, obj)
ct = headers.get('content-type')
cl = int(headers.get('content-length'))
et = headers.get('etag')
@ -1401,24 +1499,86 @@ def st_upload(options, args, print_queue, error_queue):
except ClientException, err:
if err.http_status != 404:
raise
conn.put_object(args[0], obj, '', content_length=0,
conn.put_object(container, obj, '', content_length=0,
content_type='text/directory',
headers=put_headers)
else:
if options.changed:
# We need to HEAD all objects now in case we're overwriting a
# manifest object and need to delete the old segments
# ourselves.
old_manifest = None
if options.changed or not options.leave_segments:
try:
headers = conn.head_object(args[0], obj)
headers = conn.head_object(container, obj)
cl = int(headers.get('content-length'))
mt = headers.get('x-object-meta-mtime')
if cl == getsize(path) and \
if options.changed and cl == getsize(path) and \
mt == put_headers['x-object-meta-mtime']:
return
if not options.leave_segments:
old_manifest = headers.get('x-object-manifest')
except ClientException, err:
if err.http_status != 404:
raise
conn.put_object(args[0], obj, open(path, 'rb'),
content_length=getsize(path),
if options.segment_size and \
getsize(path) < options.segment_size:
full_size = getsize(path)
segment_queue = Queue(10000)
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _ in xrange(10)]
for thread in segment_threads:
thread.start()
segment = 0
segment_start = 0
while segment_start < full_size:
segment_size = int(options.segment_size)
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
segment_queue.put({'path': path,
'obj': '%s/%s/%s/%08d' % (obj,
put_headers['x-object-meta-mtime'], full_size,
segment),
'segment_start': segment_start,
'segment_size': segment_size,
'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
while not segment_queue.empty():
sleep(0.01)
for thread in segment_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
new_object_manifest = '%s_segments/%s/%s/%s/' % (
container, obj, put_headers['x-object-meta-mtime'],
full_size)
if old_manifest == new_object_manifest:
old_manifest = None
put_headers['x-object-manifest'] = new_object_manifest
conn.put_object(container, obj, '', content_length=0,
headers=put_headers)
else:
conn.put_object(container, obj, open(path, 'rb'),
content_length=getsize(path), headers=put_headers)
if old_manifest:
segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1)
for delobj in conn.get_container(scontainer,
prefix=sprefix)[1]:
segment_queue.put({'delete': True,
'container': scontainer, 'obj': delobj['name']})
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _ in
xrange(10)]
for thread in segment_threads:
thread.start()
while not segment_queue.empty():
sleep(0.01)
for thread in segment_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)
if options.verbose:
print_queue.put(obj)
except OSError, err:
@ -1429,22 +1589,22 @@ def st_upload(options, args, print_queue, error_queue):
def _upload_dir(path):
names = listdir(path)
if not names:
file_queue.put((path, True)) # dir_marker = True
object_queue.put({'path': path, 'dir_marker': True})
else:
for name in listdir(path):
subpath = join(path, name)
if isdir(subpath):
_upload_dir(subpath)
else:
file_queue.put((subpath, False)) # dir_marker = False
object_queue.put({'path': subpath})
url, token = get_auth(options.auth, options.user, options.key,
snet=options.snet)
create_connection = lambda: Connection(options.auth, options.user,
options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
file_threads = [QueueFunctionThread(file_queue, _upload_file,
object_threads = [QueueFunctionThread(object_queue, _object_job,
create_connection()) for _ in xrange(10)]
for thread in file_threads:
for thread in object_threads:
thread.start()
conn = create_connection()
# Try to create the container, just in case it doesn't exist. If this
@ -1453,6 +1613,8 @@ def st_upload(options, args, print_queue, error_queue):
# it'll surface on the first object PUT.
try:
conn.put_container(args[0])
if options.segment_size is not None:
conn.put_container(args[0] + '_segments')
except:
pass
try:
@ -1460,10 +1622,10 @@ def st_upload(options, args, print_queue, error_queue):
if isdir(arg):
_upload_dir(arg)
else:
file_queue.put((arg, False)) # dir_marker = False
while not file_queue.empty():
object_queue.put({'path': arg})
while not object_queue.empty():
sleep(0.01)
for thread in file_threads:
for thread in object_threads:
thread.abort = True
while thread.isAlive():
thread.join(0.01)

View File

@ -44,6 +44,7 @@ Overview and Concepts
overview_replication
overview_stats
ratelimit
overview_large_objects
Developer Documentation
=======================

View File

@ -0,0 +1,177 @@
====================
Large Object Support
====================
--------
Overview
--------
Swift has a limit on the size of a single uploaded object; by default this is
5GB. However, the download size of a single object is virtually unlimited with
the concept of segmentation. Segments of the larger object are uploaded and a
special manifest file is created that, when downloaded, sends all the segments
concatenated as a single object. This also offers much greater upload speed
with the possibility of parallel uploads of the segments.
----------------------------------
Using ``st`` for Segmented Objects
----------------------------------
The quickest way to try out this feature is use the included ``st`` Swift Tool.
You can use the ``-S`` option to specify the segment size to use when splitting
a large file. For example::
st upload test_container -S 1073741824 large_file
This would split the large_file into 1G segments and begin uploading those
segments in parallel. Once all the segments have been uploaded, ``st`` will
then create the manifest file so the segments can be downloaded as one.
So now, the following ``st`` command would download the entire large object::
st download test_container large_file
``st`` uses a strict convention for its segmented object support. In the above
example it will upload all the segments into a second container named
test_container_segments. These segments will have names like
large_file/1290206778.25/21474836480/00000000,
large_file/1290206778.25/21474836480/00000001, etc.
The main benefit for using a separate container is that the main container
listings will not be polluted with all the segment names. The reason for using
the segment name format of <name>/<timestamp>/<size>/<segment> is so that an
upload of a new file with the same name won't overwrite the contents of the
first until the last moment when the manifest file is updated.
``st`` will manage these segment files for you, deleting old segments on
deletes and overwrites, etc. You can override this behavior with the
``--leave-segments`` option if desired; this is useful if you want to have
multiple versions of the same large object available.
----------
Direct API
----------
You can also work with the segments and manifests directly with HTTP requests
instead of having ``st`` do that for you. You can just upload the segments like
you would any other object and the manifest is just a zero-byte file with an
extra ``X-Object-Manifest`` header.
All the object segments need to be in the same container, have a common object
name prefix, and their names sort in the order they should be concatenated.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``st``.
The manifest file is simply a zero-byte file with the extra
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
the container the object segments are in and ``<prefix>`` is the common prefix
for all the segments.
It is best to upload all the segments first and then create or update the
manifest. In this way, the full object won't be available for downloading until
the upload is complete. Also, you can upload a new set of segments to a second
location and then update the manifest to point to this new location. During the
upload of the new segments, the original manifest will still be available to
download the first set of segments.
Here's an example using ``curl`` with tiny 1-byte segments::
# First, upload the segments
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/1 --data-binary '1'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/2 --data-binary '2'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/3 --data-binary '3'
# Next, create the manifest file
curl -X PUT -H 'X-Auth-Token: <token>' \
-H 'X-Object-Manifest: container/myobject/' \
http://<storage_url>/container/myobject --data-binary ''
# And now we can download the segments as a single object
curl -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject
----------------
Additional Notes
----------------
* With a ``GET`` or ``HEAD`` of a manifest file, the ``X-Object-Manifest:
<container>/<prefix>`` header will be returned with the concatenated object
so you can tell where it's getting its segments from.
* The response's ``Content-Length`` for a ``GET`` or ``HEAD`` on the manifest
file will be the sum of all the segments in the ``<container>/<prefix>``
listing, dynamically. So, uploading additional segments after the manifest is
created will cause the concatenated object to be that much larger; there's no
need to recreate the manifest file.
* The response's ``Content-Type`` for a ``GET`` or ``HEAD`` on the manifest
will be the same as the ``Content-Type`` set during the ``PUT`` request that
created the manifest. You can easily change the ``Content-Type`` by reissuing
the ``PUT``.
* The response's ``ETag`` for a ``GET`` or ``HEAD`` on the manifest file will
be the MD5 sum of the concatenated string of ETags for each of the segments
in the ``<container>/<prefix>`` listing, dynamically. Usually in Swift the
ETag is the MD5 sum of the contents of the object, and that holds true for
each segment independently. But, it's not feasible to generate such an ETag
for the manifest itself, so this method was chosen to at least offer change
detection.
-------
History
-------
Large object support has gone through various iterations before settling on
this implementation.
The primary factor driving the limitation of object size in swift is
maintaining balance among the partitions of the ring. To maintain an even
dispersion of disk usage throughout the cluster the obvious storage pattern
was to simply split larger objects into smaller segments, which could then be
glued together during a read.
Before the introduction of large object support some applications were already
splitting their uploads into segments and re-assembling them on the client
side after retrieving the individual pieces. This design allowed the client
to support backup and archiving of large data sets, but was also frequently
employed to improve performance or reduce errors due to network interruption.
The major disadvantage of this method is that knowledge of the original
partitioning scheme is required to properly reassemble the object, which is
not practical for some use cases, such as CDN origination.
In order to eliminate any barrier to entry for clients wanting to store
objects larger than 5GB, initially we also prototyped fully transparent
support for large object uploads. A fully transparent implementation would
support a larger max size by automatically splitting objects into segments
during upload within the proxy without any changes to the client API. All
segments were completely hidden from the client API.
This solution introduced a number of challenging failure conditions into the
cluster, wouldn't provide the client with any option to do parallel uploads,
and had no basis for a resume feature. The transparent implementation was
deemed just too complex for the benefit.
The current "user manifest" design was chosen in order to provide a
transparent download of large objects to the client and still provide the
uploading client a clean API to support segmented uploads.
Alternative "explicit" user manifest options were discussed which would have
required a pre-defined format for listing the segments to "finalize" the
segmented upload. While this may offer some potential advantages, it was
decided that pushing an added burden onto the client which could potentially
limit adoption should be avoided in favor of a simpler "API" (essentially just
the format of the 'X-Object-Manifest' header).
During development it was noted that this "implicit" user manifest approach
which is based on the path prefix can be potentially affected by the eventual
consistency window of the container listings, which could theoretically cause
a GET on the manifest object to return an invalid whole object for that short
term. In reality you're unlikely to encounter this scenario unless you're
running very high concurrency uploads against a small testing environment
which isn't running the object-updaters or container-replicators.
Like all of swift, Large Object Support is living feature which will continue
to improve and may change over time.

View File

@ -23,7 +23,7 @@ class_path = swift.stats.access_processor.AccessLogProcessor
# load balancer private ips is for load balancer ip addresses that should be
# counted as servicenet
# lb_private_ips =
# server_name = proxy
# server_name = proxy-server
# user = swift
# warn_percent = 0.8

View File

@ -569,7 +569,8 @@ def put_object(url, token, container, name, contents, content_length=None,
:param container: container name that the object is in
:param name: object name to put
:param contents: a string or a file like object to read object data from
:param content_length: value to send as content-length header
:param content_length: value to send as content-length header; also limits
the amount read from contents
:param etag: etag of contents
:param chunk_size: chunk size of data to write
:param content_type: value to send as content-type header
@ -599,18 +600,24 @@ def put_object(url, token, container, name, contents, content_length=None,
conn.putrequest('PUT', path)
for header, value in headers.iteritems():
conn.putheader(header, value)
if not content_length:
if content_length is None:
conn.putheader('Transfer-Encoding', 'chunked')
conn.endheaders()
chunk = contents.read(chunk_size)
while chunk:
if not content_length:
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
else:
conn.send(chunk)
chunk = contents.read(chunk_size)
if not content_length:
conn.send('0\r\n\r\n')
else:
conn.endheaders()
left = content_length
while left > 0:
size = chunk_size
if size > left:
size = left
chunk = contents.read(size)
conn.send(chunk)
left -= len(chunk)
else:
conn.request('PUT', path, contents, headers)
resp = conn.getresponse()

View File

@ -113,6 +113,17 @@ def check_object_creation(req, object_name):
if not check_utf8(req.headers['Content-Type']):
return HTTPBadRequest(request=req, body='Invalid Content-Type',
content_type='text/plain')
if 'x-object-manifest' in req.headers:
value = req.headers['x-object-manifest']
container = prefix = None
try:
container, prefix = value.split('/', 1)
except ValueError:
pass
if not container or not prefix or '?' in value or '&' in value or \
prefix[0] == '/':
return HTTPBadRequest(request=req,
body='X-Object-Manifest must in the format container/prefix')
return check_metadata(req, 'object')

View File

@ -394,6 +394,9 @@ class ObjectController(object):
'ETag': etag,
'Content-Length': str(os.fstat(fd).st_size),
}
if 'x-object-manifest' in request.headers:
metadata['X-Object-Manifest'] = \
request.headers['x-object-manifest']
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-') and
len(val[0]) > 14)
@ -463,7 +466,8 @@ class ObjectController(object):
'application/octet-stream'), app_iter=file,
request=request, conditional_response=True)
for key, value in file.metadata.iteritems():
if key.lower().startswith('x-object-meta-'):
if key == 'X-Object-Manifest' or \
key.lower().startswith('x-object-meta-'):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
@ -491,7 +495,8 @@ class ObjectController(object):
response = Response(content_type=file.metadata['Content-Type'],
request=request, conditional_response=True)
for key, value in file.metadata.iteritems():
if key.lower().startswith('x-object-meta-'):
if key == 'X-Object-Manifest' or \
key.lower().startswith('x-object-meta-'):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])

View File

@ -14,16 +14,24 @@
# limitations under the License.
from __future__ import with_statement
try:
import simplejson as json
except ImportError:
import json
import mimetypes
import os
import re
import time
import traceback
from ConfigParser import ConfigParser
from datetime import datetime
from urllib import unquote, quote
import uuid
import functools
from gettext import gettext as _
from hashlib import md5
from eventlet import sleep
from eventlet.timeout import Timeout
from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
HTTPNotFound, HTTPPreconditionFailed, \
@ -37,8 +45,8 @@ from swift.common.utils import get_logger, normalize_timestamp, split_path, \
cache_from_env
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
MAX_FILE_SIZE
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout
@ -96,6 +104,161 @@ def get_container_memcache_key(account, container):
return 'container/%s/%s' % (account, container)
class SegmentedIterable(object):
"""
Iterable that returns the object contents for a segmented object in Swift.
If set, the response's `bytes_transferred` value will be updated (used to
log the size of the request). Also, if there's a failure that cuts the
transfer short, the response's `status_int` will be updated (again, just
for logging since the original status would have already been sent to the
client).
:param controller: The ObjectController instance to work with.
:param container: The container the object segments are within.
:param listing: The listing of object segments to iterate over; this may
be an iterator or list that returns dicts with 'name' and
'bytes' keys.
:param response: The webob.Response this iterable is associated with, if
any (default: None)
"""
def __init__(self, controller, container, listing, response=None):
self.controller = controller
self.container = container
self.listing = iter(listing)
self.segment = -1
self.segment_dict = None
self.segment_peek = None
self.seek = 0
self.segment_iter = None
self.position = 0
self.response = response
if not self.response:
self.response = Response()
self.next_get_time = 0
def _load_next_segment(self):
"""
Loads the self.segment_iter with the next object segment's contents.
:raises: StopIteration when there are no more object segments.
"""
try:
self.segment += 1
self.segment_dict = self.segment_peek or self.listing.next()
self.segment_peek = None
partition, nodes = self.controller.app.object_ring.get_nodes(
self.controller.account_name, self.container,
self.segment_dict['name'])
path = '/%s/%s/%s' % (self.controller.account_name, self.container,
self.segment_dict['name'])
req = Request.blank(path)
if self.seek:
req.range = 'bytes=%s-' % self.seek
self.seek = 0
if self.segment > 10:
sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + 1
resp = self.controller.GETorHEAD_base(req, 'Object', partition,
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring), path,
self.controller.app.object_ring.replica_count)
if resp.status_int // 100 != 2:
raise Exception('Could not load object segment %s: %s' % (path,
resp.status_int))
self.segment_iter = resp.app_iter
except StopIteration:
raise
except Exception, err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.exception('ERROR: While processing '
'manifest /%s/%s/%s %s' % (self.controller.account_name,
self.controller.container_name,
self.controller.object_name, self.controller.trans_id))
err.swift_logged = True
self.response.status_int = 503
raise
def next(self):
return iter(self).next()
def __iter__(self):
""" Standard iterator function that returns the object's contents. """
try:
while True:
if not self.segment_iter:
self._load_next_segment()
while True:
with ChunkReadTimeout(self.controller.app.node_timeout):
try:
chunk = self.segment_iter.next()
break
except StopIteration:
self._load_next_segment()
self.position += len(chunk)
self.response.bytes_transferred = getattr(self.response,
'bytes_transferred', 0) + len(chunk)
yield chunk
except StopIteration:
raise
except Exception, err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.exception('ERROR: While processing '
'manifest /%s/%s/%s %s' % (self.controller.account_name,
self.controller.container_name,
self.controller.object_name, self.controller.trans_id))
err.swift_logged = True
self.response.status_int = 503
raise
def app_iter_range(self, start, stop):
"""
Non-standard iterator function for use with Webob in serving Range
requests more quickly. This will skip over segments and do a range
request on the first segment to return data from, if needed.
:param start: The first byte (zero-based) to return. None for 0.
:param stop: The last byte (zero-based) to return. None for end.
"""
try:
if start:
self.segment_peek = self.listing.next()
while start >= self.position + self.segment_peek['bytes']:
self.segment += 1
self.position += self.segment_peek['bytes']
self.segment_peek = self.listing.next()
self.seek = start - self.position
else:
start = 0
if stop is not None:
length = stop - start
else:
length = None
for chunk in self:
if length is not None:
length -= len(chunk)
if length < 0:
# Chop off the extra:
self.response.bytes_transferred = \
getattr(self.response, 'bytes_transferred', 0) \
+ length
yield chunk[:length]
break
yield chunk
except StopIteration:
raise
except Exception, err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.exception('ERROR: While processing '
'manifest /%s/%s/%s %s' % (self.controller.account_name,
self.controller.container_name,
self.controller.object_name, self.controller.trans_id))
err.swift_logged = True
self.response.status_int = 503
raise
class Controller(object):
"""Base WSGI controller class for the proxy"""
@ -536,9 +699,137 @@ class ObjectController(Controller):
return aresp
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
return self.GETorHEAD_base(req, 'Object', partition,
resp = self.GETorHEAD_base(req, 'Object', partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
# If we get a 416 Requested Range Not Satisfiable we have to check if
# we were actually requesting a manifest object and then redo the range
# request on the whole object.
if resp.status_int == 416:
req_range = req.range
req.range = None
resp2 = self.GETorHEAD_base(req, 'Object', partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
if 'x-object-manifest' not in resp2.headers:
return resp
resp = resp2
req.range = req_range
if 'x-object-manifest' in resp.headers:
lcontainer, lprefix = \
resp.headers['x-object-manifest'].split('/', 1)
lpartition, lnodes = self.app.container_ring.get_nodes(
self.account_name, lcontainer)
marker = ''
listing = []
while True:
lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' %
(quote(self.account_name), quote(lcontainer),
quote(lprefix), quote(marker)))
lresp = self.GETorHEAD_base(lreq, 'Container', lpartition,
lnodes, lreq.path_info,
self.app.container_ring.replica_count)
if lresp.status_int // 100 != 2:
lresp = HTTPNotFound(request=req)
lresp.headers['X-Object-Manifest'] = \
resp.headers['x-object-manifest']
return lresp
if 'swift.authorize' in req.environ:
req.acl = lresp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
sublisting = json.loads(lresp.body)
if not sublisting:
break
listing.extend(sublisting)
if len(listing) > CONTAINER_LISTING_LIMIT:
break
marker = sublisting[-1]['name']
if len(listing) > CONTAINER_LISTING_LIMIT:
# We will serve large objects with a ton of segments with
# chunked transfer encoding.
def listing_iter():
marker = ''
while True:
lreq = Request.blank(
'/%s/%s?prefix=%s&format=json&marker=%s' %
(quote(self.account_name), quote(lcontainer),
quote(lprefix), quote(marker)))
lresp = self.GETorHEAD_base(lreq, 'Container',
lpartition, lnodes, lreq.path_info,
self.app.container_ring.replica_count)
if lresp.status_int // 100 != 2:
raise Exception('Object manifest GET could not '
'continue listing: %s %s' %
(req.path, lreq.path))
if 'swift.authorize' in req.environ:
req.acl = lresp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
if aresp:
raise Exception('Object manifest GET could '
'not continue listing: %s %s' %
(req.path, aresp))
sublisting = json.loads(lresp.body)
if not sublisting:
break
for obj in sublisting:
yield obj
marker = sublisting[-1]['name']
headers = {
'X-Object-Manifest': resp.headers['x-object-manifest'],
'Content-Type': resp.content_type}
for key, value in resp.headers.iteritems():
if key.lower().startswith('x-object-meta-'):
headers[key] = value
resp = Response(headers=headers, request=req,
conditional_response=True)
if req.method == 'HEAD':
# These shenanigans are because webob translates the HEAD
# request into a webob EmptyResponse for the body, which
# has a len, which eventlet translates as needing a
# content-length header added. So we call the original
# webob resp for the headers but return an empty iterator
# for the body.
def head_response(environ, start_response):
resp(environ, start_response)
return iter([])
head_response.status_int = resp.status_int
return head_response
else:
resp.app_iter = SegmentedIterable(self, lcontainer,
listing_iter(), resp)
else:
# For objects with a reasonable number of segments, we'll serve
# them with a set content-length and computed etag.
content_length = sum(o['bytes'] for o in listing)
last_modified = max(o['last_modified'] for o in listing)
last_modified = \
datetime(*map(int, re.split('[^\d]', last_modified)[:-1]))
etag = md5('"'.join(o['hash'] for o in listing)).hexdigest()
headers = {
'X-Object-Manifest': resp.headers['x-object-manifest'],
'Content-Type': resp.content_type,
'Content-Length': content_length,
'ETag': etag}
for key, value in resp.headers.iteritems():
if key.lower().startswith('x-object-meta-'):
headers[key] = value
resp = Response(headers=headers, request=req,
conditional_response=True)
resp.app_iter = SegmentedIterable(self, lcontainer, listing,
resp)
resp.content_length = content_length
resp.last_modified = last_modified
return resp
@public
@delay_denial
@ -652,9 +943,15 @@ class ObjectController(Controller):
return source_resp
self.object_name = orig_obj_name
self.container_name = orig_container_name
data_source = source_resp.app_iter
new_req = Request.blank(req.path_info,
environ=req.environ, headers=req.headers)
if 'x-object-manifest' in source_resp.headers:
data_source = iter([''])
new_req.content_length = 0
new_req.headers['X-Object-Manifest'] = \
source_resp.headers['x-object-manifest']
else:
data_source = source_resp.app_iter
new_req.content_length = source_resp.content_length
new_req.etag = source_resp.etag
# we no longer need the X-Copy-From header

View File

@ -26,7 +26,7 @@ class AccessLogProcessor(object):
"""Transform proxy server access logs"""
def __init__(self, conf):
self.server_name = conf.get('server_name', 'proxy')
self.server_name = conf.get('server_name', 'proxy-server')
self.lb_private_ips = [x.strip() for x in \
conf.get('lb_private_ips', '').split(',')\
if x.strip()]

View File

@ -16,6 +16,7 @@ class TestObject(unittest.TestCase):
if skip:
raise SkipTest
self.container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token})
@ -24,6 +25,7 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEquals(resp.status, 201)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
self.obj), 'test', {'X-Auth-Token': token})
@ -35,6 +37,7 @@ class TestObject(unittest.TestCase):
def tearDown(self):
if skip:
raise SkipTest
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (parsed.path, self.container,
self.obj), '', {'X-Auth-Token': token})
@ -42,6 +45,7 @@ class TestObject(unittest.TestCase):
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token})
@ -53,6 +57,7 @@ class TestObject(unittest.TestCase):
def test_public_object(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET',
'%s/%s/%s' % (parsed.path, self.container, self.obj))
@ -62,6 +67,7 @@ class TestObject(unittest.TestCase):
raise Exception('Should not have been able to GET')
except Exception, err:
self.assert_(str(err).startswith('No result after '))
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token,
@ -73,6 +79,7 @@ class TestObject(unittest.TestCase):
resp = retry(get)
resp.read()
self.assertEquals(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token, 'X-Container-Read': ''})
@ -89,6 +96,7 @@ class TestObject(unittest.TestCase):
def test_private_object(self):
if skip or skip3:
raise SkipTest
# Ensure we can't access the object with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/%s' % (parsed.path, self.container,
@ -98,8 +106,10 @@ class TestObject(unittest.TestCase):
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (parsed.path,
shared_container), '',
@ -110,6 +120,7 @@ class TestObject(unittest.TestCase):
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path,
@ -123,6 +134,7 @@ class TestObject(unittest.TestCase):
resp = retry(copy, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, shared_container,
@ -131,6 +143,7 @@ class TestObject(unittest.TestCase):
resp = retry(put, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (parsed.path,
@ -143,6 +156,7 @@ class TestObject(unittest.TestCase):
resp = retry(copy2, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (parsed.path,
@ -155,6 +169,7 @@ class TestObject(unittest.TestCase):
resp = retry(copy3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (parsed.path, shared_container,
@ -163,6 +178,7 @@ class TestObject(unittest.TestCase):
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
# clean up shared_container
def delete(url, token, parsed, conn):
conn.request('DELETE',
@ -173,6 +189,269 @@ class TestObject(unittest.TestCase):
resp.read()
self.assertEquals(resp.status, 204)
def test_manifest(self):
if skip:
raise SkipTest
# Data for the object segments
segments1 = ['one', 'two', 'three', 'four', 'five']
segments2 = ['six', 'seven', 'eight']
segments3 = ['nine', 'ten', 'eleven']
# Upload the first set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments1/%s' % (parsed.path,
self.container, str(objnum)), segments1[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments1/' % self.container,
'Content-Type': 'text/jibberish', 'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token, 'Range':
'bytes=3-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1[1:]))
self.assertEquals(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token, 'Range':
'bytes=5-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:])
self.assertEquals(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token, 'Range':
'bytes=5-10'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:11])
self.assertEquals(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments2/%s' % (parsed.path,
self.container, str(objnum)), segments2[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments2/' % self.container,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
if not skip3:
# Ensure we can't access the manifest with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token, 'X-Container-Read':
swift_test_user[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + acontainer, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments3/%s' % (parsed.path,
acontainer, str(objnum)), segments3[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments3/' % acontainer,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
if not skip3:
# Ensure we can't access the manifest with the third account
# (because the segments are in a protected container even if the
# manifest itself is not).
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
'', {'X-Auth-Token': token, 'X-Container-Read':
swift_test_user[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/manifest' % (parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments3/%s' % (parsed.path,
acontainer, str(objnum)), '', {'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments2/%s' % (parsed.path,
self.container, str(objnum)), '', {'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments1/%s' % (parsed.path,
self.container, str(objnum)), '', {'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the extra container
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
if __name__ == '__main__':
unittest.main()

View File

@ -22,6 +22,7 @@ from webob.exc import HTTPBadRequest, HTTPLengthRequired, \
from swift.common import constraints
class TestConstraints(unittest.TestCase):
def test_check_metadata_empty(self):
@ -137,6 +138,32 @@ class TestConstraints(unittest.TestCase):
self.assert_(isinstance(resp, HTTPBadRequest))
self.assert_('Content-Type' in resp.body)
def test_check_object_manifest_header(self):
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container/prefix', 'Content-Length':
'0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(not resp)
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container', 'Content-Length': '0',
'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': '/container/prefix',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container/prefix?query=param',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'container/prefix&query=param',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
resp = constraints.check_object_creation(Request.blank('/',
headers={'X-Object-Manifest': 'http://host/container/prefix',
'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest')
self.assert_(isinstance(resp, HTTPBadRequest))
def test_check_mount(self):
self.assertFalse(constraints.check_mount('', ''))
constraints.os = MockTrue() # mock os module

View File

@ -77,7 +77,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Object-Meta-3': 'Three',
'X-Object-Meta-4': 'Four',
@ -95,7 +96,8 @@ class TestObjectController(unittest.TestCase):
if not self.path_to_test_xfs:
raise SkipTest
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/fail', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/fail',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Object-Meta-1': 'One',
'X-Object-Meta-2': 'Two',
@ -116,29 +118,37 @@ class TestObjectController(unittest.TestCase):
def test_POST_container_connection(self):
if not self.path_to_test_xfs:
raise SkipTest
def mock_http_connect(response, with_exc=False):
class FakeConn(object):
def __init__(self, status, with_exc):
self.status = status
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
self.with_exc = with_exc
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def read(self, amt=None):
return ''
return lambda *args, **kwargs: FakeConn(response, with_exc)
old_http_connect = object_server.http_connect
try:
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp, 'Content-Type': 'text/plain',
'Content-Length': '0'})
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD':
'POST'}, headers={'X-Timestamp': timestamp, 'Content-Type':
'text/plain', 'Content-Length': '0'})
resp = self.object_controller.PUT(req)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -148,7 +158,8 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = mock_http_connect(202)
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -158,7 +169,8 @@ class TestObjectController(unittest.TestCase):
object_server.http_connect = mock_http_connect(202, with_exc=True)
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -226,7 +238,8 @@ class TestObjectController(unittest.TestCase):
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY')
self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '6',
'ETag': '0b4c12d7e0a73840c1c4f148fda3b037',
@ -258,7 +271,8 @@ class TestObjectController(unittest.TestCase):
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY TWO')
self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '10',
'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
@ -304,7 +318,8 @@ class TestObjectController(unittest.TestCase):
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY THREE')
self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '12',
'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
@ -316,25 +331,33 @@ class TestObjectController(unittest.TestCase):
def test_PUT_container_connection(self):
if not self.path_to_test_xfs:
raise SkipTest
def mock_http_connect(response, with_exc=False):
class FakeConn(object):
def __init__(self, status, with_exc):
self.status = status
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
self.with_exc = with_exc
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def read(self, amt=None):
return ''
return lambda *args, **kwargs: FakeConn(response, with_exc)
old_http_connect = object_server.http_connect
try:
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Container-Host': '1.2.3.4:0',
'X-Container-Partition': '3',
@ -555,7 +578,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.etag, etag)
req = Request.blank('/sda1/p/a/c/o2', environ={'REQUEST_METHOD': 'GET'},
req = Request.blank('/sda1/p/a/c/o2',
environ={'REQUEST_METHOD': 'GET'},
headers={'If-Match': '*'})
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 412)
@ -715,7 +739,8 @@ class TestObjectController(unittest.TestCase):
""" Test swift.object_server.ObjectController.DELETE """
if not self.path_to_test_xfs:
raise SkipTest
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'})
req = Request.blank('/sda1/p/a/c',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 400)
@ -916,21 +941,26 @@ class TestObjectController(unittest.TestCase):
def test_disk_file_mkstemp_creates_dir(self):
tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
os.rmdir(tmpdir)
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp():
with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
'o').mkstemp():
self.assert_(os.path.exists(tmpdir))
def test_max_upload_time(self):
if not self.path_to_test_xfs:
raise SkipTest
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
if self.sent < 4:
sleep(0.1)
self.sent += 1
return ' '
return ''
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'X-Timestamp': normalize_timestamp(time()),
@ -946,14 +976,18 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 408)
def test_short_body(self):
class ShortBody():
def __init__(self):
self.sent = False
def read(self, size=-1):
if not self.sent:
self.sent = True
return ' '
return ''
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': ShortBody()},
headers={'X-Timestamp': normalize_timestamp(time()),
@ -1001,11 +1035,37 @@ class TestObjectController(unittest.TestCase):
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers['content-encoding'], 'gzip')
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'HEAD'})
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD':
'HEAD'})
resp = self.object_controller.HEAD(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers['content-encoding'], 'gzip')
def test_manifest_header(self):
if not self.path_to_test_xfs:
raise SkipTest
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'Content-Length': '0',
'X-Object-Manifest': 'c/o/'})
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p', hash_path('a', 'c',
'o')), timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)), {'X-Timestamp': timestamp,
'Content-Length': '0', 'Content-Type': 'text/plain', 'name':
'/a/c/o', 'X-Object-Manifest': 'c/o/', 'ETag':
'd41d8cd98f00b204e9800998ecf8427e'})
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'})
resp = self.object_controller.GET(req)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.headers.get('x-object-manifest'), 'c/o/')
if __name__ == '__main__':
unittest.main()

View File

@ -35,8 +35,8 @@ import eventlet
from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen
from eventlet.timeout import Timeout
import simplejson
from webob import Request
from webob.exc import HTTPUnauthorized
from webob import Request, Response
from webob.exc import HTTPNotFound, HTTPUnauthorized
from test.unit import connect_tcp, readuntil2crlfs
from swift.proxy import server as proxy_server
@ -53,7 +53,9 @@ logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
def fake_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status, etag=None, body=''):
self.status = status
self.reason = 'Fake'
@ -160,6 +162,7 @@ class FakeRing(object):
class FakeMemcache(object):
def __init__(self):
self.store = {}
@ -372,9 +375,12 @@ class TestController(unittest.TestCase):
class TestProxyServer(unittest.TestCase):
def test_unhandled_exception(self):
class MyApp(proxy_server.Application):
def get_controller(self, path):
raise Exception('this shouldnt be caught')
app = MyApp(None, FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})
@ -497,8 +503,11 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 204, 500, 404), 503)
def test_PUT_connect_exceptions(self):
def mock_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
@ -518,6 +527,7 @@ class TestObjectController(unittest.TestCase):
if self.status == -3:
return FakeConn(507)
return FakeConn(100)
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
@ -525,7 +535,9 @@ class TestObjectController(unittest.TestCase):
if status == -1:
raise HTTPException()
return FakeConn(status)
return connect
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
@ -546,8 +558,11 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 503, 503, -1), 503)
def test_PUT_send_exceptions(self):
def mock_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
@ -611,8 +626,11 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(res.status_int, 413)
def test_PUT_getresponse_exceptions(self):
def mock_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
@ -807,6 +825,7 @@ class TestObjectController(unittest.TestCase):
dev['port'] = 1
class SlowBody():
def __init__(self):
self.sent = 0
@ -816,6 +835,7 @@ class TestObjectController(unittest.TestCase):
self.sent += 1
return ' '
return ''
req = Request.blank('/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'Content-Length': '4', 'Content-Type': 'text/plain'})
@ -854,11 +874,13 @@ class TestObjectController(unittest.TestCase):
dev['port'] = 1
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
raise Exception('Disconnected')
req = Request.blank('/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'Content-Length': '4', 'Content-Type': 'text/plain'})
@ -1508,7 +1530,9 @@ class TestObjectController(unittest.TestCase):
def test_chunked_put(self):
# quick test of chunked put w/o PATH_TO_TEST_XFS
class ChunkedFile():
def __init__(self, bytes):
self.bytes = bytes
self.read_bytes = 0
@ -1576,6 +1600,7 @@ class TestObjectController(unittest.TestCase):
mkdirs(os.path.join(testdir, 'sdb1'))
mkdirs(os.path.join(testdir, 'sdb1', 'tmp'))
try:
orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT
conf = {'devices': testdir, 'swift_dir': testdir,
'mount_check': 'false'}
prolis = listen(('localhost', 0))
@ -1669,8 +1694,10 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(headers[:len(exp)], exp)
# Check unhandled exception
orig_update_request = prosrv.update_request
def broken_update_request(env, req):
raise Exception('fake')
prosrv.update_request = broken_update_request
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
@ -1719,8 +1746,10 @@ class TestObjectController(unittest.TestCase):
# in a test for logging x-forwarded-for (first entry only).
class Logger(object):
def info(self, msg):
self.msg = msg
orig_logger = prosrv.logger
prosrv.logger = Logger()
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
@ -1742,8 +1771,10 @@ class TestObjectController(unittest.TestCase):
# Turn on header logging.
class Logger(object):
def info(self, msg):
self.msg = msg
orig_logger = prosrv.logger
prosrv.logger = Logger()
prosrv.log_headers = True
@ -1900,6 +1931,70 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(headers[:len(exp)], exp)
body = fd.read()
self.assertEquals(body, 'oh hai123456789abcdef')
# Create a container for our segmented/manifest object testing
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Storage-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEquals(headers[:len(exp)], exp)
# Create the object segments
for segment in xrange(5):
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented/name/%s HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Storage-Token: '
't\r\nContent-Length: 5\r\n\r\n1234 ' % str(segment))
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEquals(headers[:len(exp)], exp)
# Create the object manifest file
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/segmented/name HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Storage-Token: '
't\r\nContent-Length: 0\r\nX-Object-Manifest: '
'segmented/name/\r\nContent-Type: text/jibberish\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
self.assertEquals(headers[:len(exp)], exp)
# Ensure retrieving the manifest file gets the whole object
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Auth-Token: '
't\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 200'
self.assertEquals(headers[:len(exp)], exp)
self.assert_('X-Object-Manifest: segmented/name/' in headers)
self.assert_('Content-Type: text/jibberish' in headers)
body = fd.read()
self.assertEquals(body, '1234 1234 1234 1234 1234 ')
# Do it again but exceeding the container listing limit
proxy_server.CONTAINER_LISTING_LIMIT = 2
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: '
'localhost\r\nConnection: close\r\nX-Auth-Token: '
't\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 200'
self.assertEquals(headers[:len(exp)], exp)
self.assert_('X-Object-Manifest: segmented/name/' in headers)
self.assert_('Content-Type: text/jibberish' in headers)
body = fd.read()
# A bit fragile of a test; as it makes the assumption that all
# will be sent in a single chunk.
self.assertEquals(body,
'19\r\n1234 1234 1234 1234 1234 \r\n0\r\n\r\n')
finally:
prospa.kill()
acc1spa.kill()
@ -1909,6 +2004,7 @@ class TestObjectController(unittest.TestCase):
obj1spa.kill()
obj2spa.kill()
finally:
proxy_server.CONTAINER_LISTING_LIMIT = orig_container_listing_limit
rmtree(testdir)
def test_mismatched_etags(self):
@ -2111,6 +2207,7 @@ class TestObjectController(unittest.TestCase):
res = controller.COPY(req)
self.assert_(called[0])
class TestContainerController(unittest.TestCase):
"Test swift.proxy_server.ContainerController"
@ -2254,7 +2351,9 @@ class TestContainerController(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
def test_put_locking(self):
class MockMemcache(FakeMemcache):
def __init__(self, allow_lock=None):
self.allow_lock = allow_lock
super(MockMemcache, self).__init__()
@ -2265,6 +2364,7 @@ class TestContainerController(unittest.TestCase):
yield True
else:
raise MemcacheLockError()
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
@ -2870,5 +2970,261 @@ class TestAccountController(unittest.TestCase):
test_status_map((204, 500, 404), 503)
class FakeObjectController(object):
def __init__(self):
self.app = self
self.logger = self
self.account_name = 'a'
self.container_name = 'c'
self.object_name = 'o'
self.trans_id = 'tx1'
self.object_ring = FakeRing()
self.node_timeout = 1
def exception(self, *args):
self.exception_args = args
self.exception_info = sys.exc_info()
def GETorHEAD_base(self, *args):
self.GETorHEAD_base_args = args
req = args[0]
path = args[4]
body = data = path[-1] * int(path[-1])
if req.range and req.range.ranges:
body = ''
for start, stop in req.range.ranges:
body += data[start:stop]
resp = Response(app_iter=iter(body))
return resp
def iter_nodes(self, partition, nodes, ring):
for node in nodes:
yield node
for node in ring.get_more_nodes(partition):
yield node
class Stub(object):
pass
class TestSegmentedIterable(unittest.TestCase):
def setUp(self):
self.controller = FakeObjectController()
def test_load_next_segment_unexpected_error(self):
# Iterator value isn't a dict
self.assertRaises(Exception,
proxy_server.SegmentedIterable(self.controller, None,
[None])._load_next_segment)
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
def test_load_next_segment_with_no_segments(self):
self.assertRaises(StopIteration,
proxy_server.SegmentedIterable(self.controller, 'lc',
[])._load_next_segment)
def test_load_next_segment_with_one_segment(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '1')
def test_load_next_segment_with_two_segments(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '1')
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '22')
def test_load_next_segment_with_two_segments_skip_first(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.segment = 0
segit.listing.next()
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '22')
def test_load_next_segment_with_seek(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.segment = 0
segit.listing.next()
segit.seek = 1
segit._load_next_segment()
self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2')
self.assertEquals(str(self.controller.GETorHEAD_base_args[0].range),
'bytes=1-')
data = ''.join(segit.segment_iter)
self.assertEquals(data, '2')
def test_load_next_segment_with_get_error(self):
def local_GETorHEAD_base(*args):
return HTTPNotFound()
self.controller.GETorHEAD_base = local_GETorHEAD_base
self.assertRaises(Exception,
proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])._load_next_segment)
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
self.assertEquals(str(self.controller.exception_info[1]),
'Could not load object segment /a/lc/o1: 404')
def test_iter_unexpected_error(self):
# Iterator value isn't a dict
self.assertRaises(Exception, ''.join,
proxy_server.SegmentedIterable(self.controller, None, [None]))
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
def test_iter_with_no_segments(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [])
self.assertEquals(''.join(segit), '')
def test_iter_with_one_segment(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}])
segit.response = Stub()
self.assertEquals(''.join(segit), '1')
self.assertEquals(segit.response.bytes_transferred, 1)
def test_iter_with_two_segments(self):
segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}, {'name': 'o2'}])
segit.response = Stub()
self.assertEquals(''.join(segit), '122')
self.assertEquals(segit.response.bytes_transferred, 3)
def test_iter_with_get_error(self):
def local_GETorHEAD_base(*args):
return HTTPNotFound()
self.controller.GETorHEAD_base = local_GETorHEAD_base
self.assertRaises(Exception, ''.join,
proxy_server.SegmentedIterable(self.controller, 'lc', [{'name':
'o1'}]))
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
self.assertEquals(str(self.controller.exception_info[1]),
'Could not load object segment /a/lc/o1: 404')
def test_app_iter_range_unexpected_error(self):
# Iterator value isn't a dict
self.assertRaises(Exception,
proxy_server.SegmentedIterable(self.controller, None,
[None]).app_iter_range(None, None).next)
self.assertEquals(self.controller.exception_args[0],
'ERROR: While processing manifest /a/c/o tx1')
def test_app_iter_range_with_no_segments(self):
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(None, None)), '')
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(3, None)), '')
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(3, 5)), '')
self.assertEquals(''.join(proxy_server.SegmentedIterable(
self.controller, 'lc', []).app_iter_range(None, 5)), '')
def test_app_iter_range_with_one_segment(self):
listing = [{'name': 'o1', 'bytes': 1}]
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)), '1')
self.assertEquals(segit.response.bytes_transferred, 1)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
self.assertEquals(''.join(segit.app_iter_range(3, None)), '')
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
self.assertEquals(''.join(segit.app_iter_range(3, 5)), '')
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 5)), '1')
self.assertEquals(segit.response.bytes_transferred, 1)
def test_app_iter_range_with_two_segments(self):
listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}]
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)), '122')
self.assertEquals(segit.response.bytes_transferred, 3)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(1, None)), '22')
self.assertEquals(segit.response.bytes_transferred, 2)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(1, 5)), '22')
self.assertEquals(segit.response.bytes_transferred, 2)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 2)), '12')
self.assertEquals(segit.response.bytes_transferred, 2)
def test_app_iter_range_with_many_segments(self):
listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2},
{'name': 'o3', 'bytes': 3}, {'name': 'o4', 'bytes': 4}, {'name':
'o5', 'bytes': 5}]
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, None)),
'122333444455555')
self.assertEquals(segit.response.bytes_transferred, 15)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(3, None)),
'333444455555')
self.assertEquals(segit.response.bytes_transferred, 12)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(5, None)), '3444455555')
self.assertEquals(segit.response.bytes_transferred, 10)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 6)), '122333')
self.assertEquals(segit.response.bytes_transferred, 6)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(None, 7)), '1223334')
self.assertEquals(segit.response.bytes_transferred, 7)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(3, 7)), '3334')
self.assertEquals(segit.response.bytes_transferred, 4)
segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing)
segit.response = Stub()
self.assertEquals(''.join(segit.app_iter_range(5, 7)), '34')
self.assertEquals(segit.response.bytes_transferred, 2)
if __name__ == '__main__':
unittest.main()

View File

@ -65,7 +65,7 @@ class DumbInternalProxy(object):
class TestLogProcessor(unittest.TestCase):
access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
'09/Jul/2010/04/14/30 GET '\
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\