diff --git a/bin/st b/bin/st index 3bd8b4f5bf..b41aca67ec 100755 --- a/bin/st +++ b/bin/st @@ -581,7 +581,8 @@ def put_object(url, token, container, name, contents, content_length=None, :param container: container name that the object is in :param name: object name to put :param contents: a string or a file like object to read object data from - :param content_length: value to send as content-length header + :param content_length: value to send as content-length header; also limits + the amount read from contents :param etag: etag of contents :param chunk_size: chunk size of data to write :param content_type: value to send as content-type header @@ -611,18 +612,24 @@ def put_object(url, token, container, name, contents, content_length=None, conn.putrequest('PUT', path) for header, value in headers.iteritems(): conn.putheader(header, value) - if not content_length: + if content_length is None: conn.putheader('Transfer-Encoding', 'chunked') - conn.endheaders() - chunk = contents.read(chunk_size) - while chunk: - if not content_length: - conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) - else: - conn.send(chunk) + conn.endheaders() chunk = contents.read(chunk_size) - if not content_length: + while chunk: + conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + chunk = contents.read(chunk_size) conn.send('0\r\n\r\n') + else: + conn.endheaders() + left = content_length + while left > 0: + size = chunk_size + if size > left: + size = left + chunk = contents.read(size) + conn.send(chunk) + left -= len(chunk) else: conn.request('PUT', path, contents, headers) resp = conn.getresponse() @@ -860,15 +867,20 @@ class QueueFunctionThread(Thread): st_delete_help = ''' -delete --all OR delete container [object] [object] ... +delete --all OR delete container [--leave-segments] [object] [object] ... Deletes everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given.'''.strip('\n') + container, or a list of objects depending on the args given. Segments of + manifest objects will be deleted as well, unless you specify the + --leave-segments option.'''.strip('\n') def st_delete(parser, args, print_queue, error_queue): parser.add_option('-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to delete ' 'everything in the account') + parser.add_option('', '--leave-segments', action='store_true', + dest='leave_segments', default=False, help='Indicates that you want ' + 'the segments of manifest objects left alone') (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): @@ -876,11 +888,42 @@ def st_delete(parser, args, print_queue, error_queue): (basename(argv[0]), st_delete_help)) return + def _delete_segment((container, obj), conn): + conn.delete_object(container, obj) + if options.verbose: + print_queue.put('%s/%s' % (container, obj)) + object_queue = Queue(10000) def _delete_object((container, obj), conn): try: + old_manifest = None + if not options.leave_segments: + try: + old_manifest = conn.head_object(container, obj).get( + 'x-object-manifest') + except ClientException, err: + if err.http_status != 404: + raise conn.delete_object(container, obj) + if old_manifest: + segment_queue = Queue(10000) + scontainer, sprefix = old_manifest.split('/', 1) + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put((scontainer, delobj['name'])) + if not segment_queue.empty(): + segment_threads = [QueueFunctionThread(segment_queue, + _delete_segment, create_connection()) for _ in + xrange(10)] + for thread in segment_threads: + thread.start() + while not segment_queue.empty(): + sleep(0.01) + for thread in segment_threads: + thread.abort = True + while thread.isAlive(): + thread.join(0.01) if options.verbose: path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): @@ -891,6 +934,7 @@ def st_delete(parser, args, print_queue, error_queue): raise error_queue.put('Object %s not found' % repr('%s/%s' % (container, obj))) + container_queue = Queue(10000) def _delete_container(container, conn): @@ -956,6 +1000,10 @@ def st_delete(parser, args, print_queue, error_queue): raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) conn = create_connection() _delete_container(args[0], conn) else: @@ -976,7 +1024,7 @@ def st_delete(parser, args, print_queue, error_queue): st_download_help = ''' -download --all OR download container [object] [object] ... +download --all OR download container [options] [object] [object] ... Downloads everything in the account (with --all), or everything in a container, or a list of objects depending on the args given. For a single object download, you may use the -o [--output] option to @@ -1015,20 +1063,26 @@ def st_download(options, args, print_queue, error_queue): headers, body = \ conn.get_object(container, obj, resp_chunk_size=65536) content_type = headers.get('content-type') - content_length = int(headers.get('content-length')) + if 'content-length' in headers: + content_length = int(headers.get('content-length')) + else: + content_length = None etag = headers.get('etag') path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): path = path[1:] + md5sum = None make_dir = out_file != "-" if content_type.split(';', 1)[0] == 'text/directory': if make_dir and not isdir(path): mkdirs(path) read_length = 0 - md5sum = md5() + if 'x-object-manifest' not in headers: + md5sum = md5() for chunk in body: read_length += len(chunk) - md5sum.update(chunk) + if md5sum: + md5sum.update(chunk) else: dirpath = dirname(path) if make_dir and dirpath and not isdir(dirpath): @@ -1040,16 +1094,18 @@ def st_download(options, args, print_queue, error_queue): else: fp = open(path, 'wb') read_length = 0 - md5sum = md5() + if 'x-object-manifest' not in headers: + md5sum = md5() for chunk in body: fp.write(chunk) read_length += len(chunk) - md5sum.update(chunk) + if md5sum: + md5sum.update(chunk) fp.close() - if md5sum.hexdigest() != etag: + if md5sum and md5sum.hexdigest() != etag: error_queue.put('%s: md5sum != etag, %s != %s' % (path, md5sum.hexdigest(), etag)) - if read_length != content_length: + if content_length is not None and read_length != content_length: error_queue.put('%s: read_length != content_length, %d != %d' % (path, read_length, content_length)) if 'x-object-meta-mtime' in headers and not options.out_file: @@ -1110,6 +1166,10 @@ def st_download(options, args, print_queue, error_queue): raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) _download_container(args[0], create_connection()) else: if len(args) == 2: @@ -1223,6 +1283,10 @@ Containers: %d raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) try: headers = conn.head_container(args[0]) object_count = int(headers.get('x-container-object-count', 0)) @@ -1259,14 +1323,19 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], Account: %s Container: %s Object: %s - Content Type: %s -Content Length: %s - Last Modified: %s - ETag: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], - args[1], headers.get('content-type'), - headers.get('content-length'), - headers.get('last-modified'), - headers.get('etag'))) + Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], + args[1], headers.get('content-type'))) + if 'content-length' in headers: + print_queue.put('Content Length: %s' % + headers['content-length']) + if 'last-modified' in headers: + print_queue.put(' Last Modified: %s' % + headers['last-modified']) + if 'etag' in headers: + print_queue.put(' ETag: %s' % headers['etag']) + if 'x-object-manifest' in headers: + print_queue.put(' Manifest: %s' % + headers['x-object-manifest']) for key, value in headers.items(): if key.startswith('x-object-meta-'): print_queue.put('%14s: %s' % ('Meta %s' % @@ -1274,7 +1343,7 @@ Content Length: %s for key, value in headers.items(): if not key.startswith('x-object-meta-') and key not in ( 'content-type', 'content-length', 'last-modified', - 'etag', 'date'): + 'etag', 'date', 'x-object-manifest'): print_queue.put( '%14s: %s' % (key.title(), value)) except ClientException, err: @@ -1326,6 +1395,10 @@ def st_post(options, args, print_queue, error_queue): raise error_queue.put('Account not found') elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might have ' \ + 'meant %r instead of %r.' % \ + (args[0].replace('/', ' ', 1), args[0]) headers = {} for item in options.meta: split_item = item.split(':') @@ -1363,23 +1436,48 @@ st_upload_help = ''' upload [options] container file_or_directory [file_or_directory] [...] Uploads to the given container the files and directories specified by the remaining args. -c or --changed is an option that will only upload files - that have changed since the last upload.'''.strip('\n') + that have changed since the last upload. -S or --segment-size + and --leave-segments are options as well (see --help for more). +'''.strip('\n') def st_upload(options, args, print_queue, error_queue): parser.add_option('-c', '--changed', action='store_true', dest='changed', default=False, help='Will only upload files that have changed since ' 'the last upload') + parser.add_option('-S', '--segment-size', dest='segment_size', help='Will ' + 'upload files in segments no larger than and then create a ' + '"manifest" file that will download all the segments as if it were ' + 'the original file. The segments will be uploaded to a ' + '_segments container so as to not pollute the main ' + ' listings.') + parser.add_option('', '--leave-segments', action='store_true', + dest='leave_segments', default=False, help='Indicates that you want ' + 'the older segments of manifest objects left alone (in the case of ' + 'overwrites)') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: error_queue.put('Usage: %s [options] %s' % (basename(argv[0]), st_upload_help)) return + object_queue = Queue(10000) - file_queue = Queue(10000) + def _segment_job(job, conn): + if job.get('delete', False): + conn.delete_object(job['container'], job['obj']) + else: + fp = open(job['path'], 'rb') + fp.seek(job['segment_start']) + conn.put_object(job.get('container', args[0] + '_segments'), + job['obj'], fp, content_length=job['segment_size']) + if options.verbose and 'log_line' in job: + print_queue.put(job['log_line']) - def _upload_file((path, dir_marker), conn): + def _object_job(job, conn): + path = job['path'] + container = job.get('container', args[0]) + dir_marker = job.get('dir_marker', False) try: obj = path if obj.startswith('./') or obj.startswith('.\\'): @@ -1388,7 +1486,7 @@ def st_upload(options, args, print_queue, error_queue): if dir_marker: if options.changed: try: - headers = conn.head_object(args[0], obj) + headers = conn.head_object(container, obj) ct = headers.get('content-type') cl = int(headers.get('content-length')) et = headers.get('etag') @@ -1401,24 +1499,86 @@ def st_upload(options, args, print_queue, error_queue): except ClientException, err: if err.http_status != 404: raise - conn.put_object(args[0], obj, '', content_length=0, + conn.put_object(container, obj, '', content_length=0, content_type='text/directory', headers=put_headers) else: - if options.changed: + # We need to HEAD all objects now in case we're overwriting a + # manifest object and need to delete the old segments + # ourselves. + old_manifest = None + if options.changed or not options.leave_segments: try: - headers = conn.head_object(args[0], obj) + headers = conn.head_object(container, obj) cl = int(headers.get('content-length')) mt = headers.get('x-object-meta-mtime') - if cl == getsize(path) and \ + if options.changed and cl == getsize(path) and \ mt == put_headers['x-object-meta-mtime']: return + if not options.leave_segments: + old_manifest = headers.get('x-object-manifest') except ClientException, err: if err.http_status != 404: raise - conn.put_object(args[0], obj, open(path, 'rb'), - content_length=getsize(path), - headers=put_headers) + if options.segment_size and \ + getsize(path) < options.segment_size: + full_size = getsize(path) + segment_queue = Queue(10000) + segment_threads = [QueueFunctionThread(segment_queue, + _segment_job, create_connection()) for _ in xrange(10)] + for thread in segment_threads: + thread.start() + segment = 0 + segment_start = 0 + while segment_start < full_size: + segment_size = int(options.segment_size) + if segment_start + segment_size > full_size: + segment_size = full_size - segment_start + segment_queue.put({'path': path, + 'obj': '%s/%s/%s/%08d' % (obj, + put_headers['x-object-meta-mtime'], full_size, + segment), + 'segment_start': segment_start, + 'segment_size': segment_size, + 'log_line': '%s segment %s' % (obj, segment)}) + segment += 1 + segment_start += segment_size + while not segment_queue.empty(): + sleep(0.01) + for thread in segment_threads: + thread.abort = True + while thread.isAlive(): + thread.join(0.01) + new_object_manifest = '%s_segments/%s/%s/%s/' % ( + container, obj, put_headers['x-object-meta-mtime'], + full_size) + if old_manifest == new_object_manifest: + old_manifest = None + put_headers['x-object-manifest'] = new_object_manifest + conn.put_object(container, obj, '', content_length=0, + headers=put_headers) + else: + conn.put_object(container, obj, open(path, 'rb'), + content_length=getsize(path), headers=put_headers) + if old_manifest: + segment_queue = Queue(10000) + scontainer, sprefix = old_manifest.split('/', 1) + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put({'delete': True, + 'container': scontainer, 'obj': delobj['name']}) + if not segment_queue.empty(): + segment_threads = [QueueFunctionThread(segment_queue, + _segment_job, create_connection()) for _ in + xrange(10)] + for thread in segment_threads: + thread.start() + while not segment_queue.empty(): + sleep(0.01) + for thread in segment_threads: + thread.abort = True + while thread.isAlive(): + thread.join(0.01) if options.verbose: print_queue.put(obj) except OSError, err: @@ -1429,22 +1589,22 @@ def st_upload(options, args, print_queue, error_queue): def _upload_dir(path): names = listdir(path) if not names: - file_queue.put((path, True)) # dir_marker = True + object_queue.put({'path': path, 'dir_marker': True}) else: for name in listdir(path): subpath = join(path, name) if isdir(subpath): _upload_dir(subpath) else: - file_queue.put((subpath, False)) # dir_marker = False + object_queue.put({'path': subpath}) url, token = get_auth(options.auth, options.user, options.key, snet=options.snet) create_connection = lambda: Connection(options.auth, options.user, options.key, preauthurl=url, preauthtoken=token, snet=options.snet) - file_threads = [QueueFunctionThread(file_queue, _upload_file, + object_threads = [QueueFunctionThread(object_queue, _object_job, create_connection()) for _ in xrange(10)] - for thread in file_threads: + for thread in object_threads: thread.start() conn = create_connection() # Try to create the container, just in case it doesn't exist. If this @@ -1453,6 +1613,8 @@ def st_upload(options, args, print_queue, error_queue): # it'll surface on the first object PUT. try: conn.put_container(args[0]) + if options.segment_size is not None: + conn.put_container(args[0] + '_segments') except: pass try: @@ -1460,10 +1622,10 @@ def st_upload(options, args, print_queue, error_queue): if isdir(arg): _upload_dir(arg) else: - file_queue.put((arg, False)) # dir_marker = False - while not file_queue.empty(): + object_queue.put({'path': arg}) + while not object_queue.empty(): sleep(0.01) - for thread in file_threads: + for thread in object_threads: thread.abort = True while thread.isAlive(): thread.join(0.01) diff --git a/doc/source/index.rst b/doc/source/index.rst index 9b20293921..de07c132ea 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -44,6 +44,7 @@ Overview and Concepts overview_replication overview_stats ratelimit + overview_large_objects Developer Documentation ======================= diff --git a/doc/source/overview_large_objects.rst b/doc/source/overview_large_objects.rst new file mode 100644 index 0000000000..01f4990732 --- /dev/null +++ b/doc/source/overview_large_objects.rst @@ -0,0 +1,177 @@ +==================== +Large Object Support +==================== + +-------- +Overview +-------- + +Swift has a limit on the size of a single uploaded object; by default this is +5GB. However, the download size of a single object is virtually unlimited with +the concept of segmentation. Segments of the larger object are uploaded and a +special manifest file is created that, when downloaded, sends all the segments +concatenated as a single object. This also offers much greater upload speed +with the possibility of parallel uploads of the segments. + +---------------------------------- +Using ``st`` for Segmented Objects +---------------------------------- + +The quickest way to try out this feature is use the included ``st`` Swift Tool. +You can use the ``-S`` option to specify the segment size to use when splitting +a large file. For example:: + + st upload test_container -S 1073741824 large_file + +This would split the large_file into 1G segments and begin uploading those +segments in parallel. Once all the segments have been uploaded, ``st`` will +then create the manifest file so the segments can be downloaded as one. + +So now, the following ``st`` command would download the entire large object:: + + st download test_container large_file + +``st`` uses a strict convention for its segmented object support. In the above +example it will upload all the segments into a second container named +test_container_segments. These segments will have names like +large_file/1290206778.25/21474836480/00000000, +large_file/1290206778.25/21474836480/00000001, etc. + +The main benefit for using a separate container is that the main container +listings will not be polluted with all the segment names. The reason for using +the segment name format of /// is so that an +upload of a new file with the same name won't overwrite the contents of the +first until the last moment when the manifest file is updated. + +``st`` will manage these segment files for you, deleting old segments on +deletes and overwrites, etc. You can override this behavior with the +``--leave-segments`` option if desired; this is useful if you want to have +multiple versions of the same large object available. + +---------- +Direct API +---------- + +You can also work with the segments and manifests directly with HTTP requests +instead of having ``st`` do that for you. You can just upload the segments like +you would any other object and the manifest is just a zero-byte file with an +extra ``X-Object-Manifest`` header. + +All the object segments need to be in the same container, have a common object +name prefix, and their names sort in the order they should be concatenated. +They don't have to be in the same container as the manifest file will be, which +is useful to keep container listings clean as explained above with ``st``. + +The manifest file is simply a zero-byte file with the extra +``X-Object-Manifest: /`` header, where ```` is +the container the object segments are in and ```` is the common prefix +for all the segments. + +It is best to upload all the segments first and then create or update the +manifest. In this way, the full object won't be available for downloading until +the upload is complete. Also, you can upload a new set of segments to a second +location and then update the manifest to point to this new location. During the +upload of the new segments, the original manifest will still be available to +download the first set of segments. + +Here's an example using ``curl`` with tiny 1-byte segments:: + + # First, upload the segments + curl -X PUT -H 'X-Auth-Token: ' \ + http:///container/myobject/1 --data-binary '1' + curl -X PUT -H 'X-Auth-Token: ' \ + http:///container/myobject/2 --data-binary '2' + curl -X PUT -H 'X-Auth-Token: ' \ + http:///container/myobject/3 --data-binary '3' + + # Next, create the manifest file + curl -X PUT -H 'X-Auth-Token: ' \ + -H 'X-Object-Manifest: container/myobject/' \ + http:///container/myobject --data-binary '' + + # And now we can download the segments as a single object + curl -H 'X-Auth-Token: ' \ + http:///container/myobject + +---------------- +Additional Notes +---------------- + +* With a ``GET`` or ``HEAD`` of a manifest file, the ``X-Object-Manifest: + /`` header will be returned with the concatenated object + so you can tell where it's getting its segments from. + +* The response's ``Content-Length`` for a ``GET`` or ``HEAD`` on the manifest + file will be the sum of all the segments in the ``/`` + listing, dynamically. So, uploading additional segments after the manifest is + created will cause the concatenated object to be that much larger; there's no + need to recreate the manifest file. + +* The response's ``Content-Type`` for a ``GET`` or ``HEAD`` on the manifest + will be the same as the ``Content-Type`` set during the ``PUT`` request that + created the manifest. You can easily change the ``Content-Type`` by reissuing + the ``PUT``. + +* The response's ``ETag`` for a ``GET`` or ``HEAD`` on the manifest file will + be the MD5 sum of the concatenated string of ETags for each of the segments + in the ``/`` listing, dynamically. Usually in Swift the + ETag is the MD5 sum of the contents of the object, and that holds true for + each segment independently. But, it's not feasible to generate such an ETag + for the manifest itself, so this method was chosen to at least offer change + detection. + +------- +History +------- + +Large object support has gone through various iterations before settling on +this implementation. + +The primary factor driving the limitation of object size in swift is +maintaining balance among the partitions of the ring. To maintain an even +dispersion of disk usage throughout the cluster the obvious storage pattern +was to simply split larger objects into smaller segments, which could then be +glued together during a read. + +Before the introduction of large object support some applications were already +splitting their uploads into segments and re-assembling them on the client +side after retrieving the individual pieces. This design allowed the client +to support backup and archiving of large data sets, but was also frequently +employed to improve performance or reduce errors due to network interruption. +The major disadvantage of this method is that knowledge of the original +partitioning scheme is required to properly reassemble the object, which is +not practical for some use cases, such as CDN origination. + +In order to eliminate any barrier to entry for clients wanting to store +objects larger than 5GB, initially we also prototyped fully transparent +support for large object uploads. A fully transparent implementation would +support a larger max size by automatically splitting objects into segments +during upload within the proxy without any changes to the client API. All +segments were completely hidden from the client API. + +This solution introduced a number of challenging failure conditions into the +cluster, wouldn't provide the client with any option to do parallel uploads, +and had no basis for a resume feature. The transparent implementation was +deemed just too complex for the benefit. + +The current "user manifest" design was chosen in order to provide a +transparent download of large objects to the client and still provide the +uploading client a clean API to support segmented uploads. + +Alternative "explicit" user manifest options were discussed which would have +required a pre-defined format for listing the segments to "finalize" the +segmented upload. While this may offer some potential advantages, it was +decided that pushing an added burden onto the client which could potentially +limit adoption should be avoided in favor of a simpler "API" (essentially just +the format of the 'X-Object-Manifest' header). + +During development it was noted that this "implicit" user manifest approach +which is based on the path prefix can be potentially affected by the eventual +consistency window of the container listings, which could theoretically cause +a GET on the manifest object to return an invalid whole object for that short +term. In reality you're unlikely to encounter this scenario unless you're +running very high concurrency uploads against a small testing environment +which isn't running the object-updaters or container-replicators. + +Like all of swift, Large Object Support is living feature which will continue +to improve and may change over time. diff --git a/etc/log-processing.conf-sample b/etc/log-processing.conf-sample index 11805add0b..7619d0599a 100644 --- a/etc/log-processing.conf-sample +++ b/etc/log-processing.conf-sample @@ -23,7 +23,7 @@ class_path = swift.stats.access_processor.AccessLogProcessor # load balancer private ips is for load balancer ip addresses that should be # counted as servicenet # lb_private_ips = -# server_name = proxy +# server_name = proxy-server # user = swift # warn_percent = 0.8 diff --git a/swift/common/client.py b/swift/common/client.py index b89d06aa66..1acba8cb37 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -569,7 +569,8 @@ def put_object(url, token, container, name, contents, content_length=None, :param container: container name that the object is in :param name: object name to put :param contents: a string or a file like object to read object data from - :param content_length: value to send as content-length header + :param content_length: value to send as content-length header; also limits + the amount read from contents :param etag: etag of contents :param chunk_size: chunk size of data to write :param content_type: value to send as content-type header @@ -599,18 +600,24 @@ def put_object(url, token, container, name, contents, content_length=None, conn.putrequest('PUT', path) for header, value in headers.iteritems(): conn.putheader(header, value) - if not content_length: + if content_length is None: conn.putheader('Transfer-Encoding', 'chunked') - conn.endheaders() - chunk = contents.read(chunk_size) - while chunk: - if not content_length: - conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) - else: - conn.send(chunk) + conn.endheaders() chunk = contents.read(chunk_size) - if not content_length: + while chunk: + conn.send('%x\r\n%s\r\n' % (len(chunk), chunk)) + chunk = contents.read(chunk_size) conn.send('0\r\n\r\n') + else: + conn.endheaders() + left = content_length + while left > 0: + size = chunk_size + if size > left: + size = left + chunk = contents.read(size) + conn.send(chunk) + left -= len(chunk) else: conn.request('PUT', path, contents, headers) resp = conn.getresponse() diff --git a/swift/common/constraints.py b/swift/common/constraints.py index c3f4f1621d..d91c136504 100644 --- a/swift/common/constraints.py +++ b/swift/common/constraints.py @@ -113,6 +113,17 @@ def check_object_creation(req, object_name): if not check_utf8(req.headers['Content-Type']): return HTTPBadRequest(request=req, body='Invalid Content-Type', content_type='text/plain') + if 'x-object-manifest' in req.headers: + value = req.headers['x-object-manifest'] + container = prefix = None + try: + container, prefix = value.split('/', 1) + except ValueError: + pass + if not container or not prefix or '?' in value or '&' in value or \ + prefix[0] == '/': + return HTTPBadRequest(request=req, + body='X-Object-Manifest must in the format container/prefix') return check_metadata(req, 'object') diff --git a/swift/obj/server.py b/swift/obj/server.py index 632a0c04cc..cf90bb7971 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -391,6 +391,9 @@ class ObjectController(object): 'ETag': etag, 'Content-Length': str(os.fstat(fd).st_size), } + if 'x-object-manifest' in request.headers: + metadata['X-Object-Manifest'] = \ + request.headers['x-object-manifest'] metadata.update(val for val in request.headers.iteritems() if val[0].lower().startswith('x-object-meta-') and len(val[0]) > 14) @@ -460,7 +463,8 @@ class ObjectController(object): 'application/octet-stream'), app_iter=file, request=request, conditional_response=True) for key, value in file.metadata.iteritems(): - if key.lower().startswith('x-object-meta-'): + if key == 'X-Object-Manifest' or \ + key.lower().startswith('x-object-meta-'): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) @@ -488,7 +492,8 @@ class ObjectController(object): response = Response(content_type=file.metadata['Content-Type'], request=request, conditional_response=True) for key, value in file.metadata.iteritems(): - if key.lower().startswith('x-object-meta-'): + if key == 'X-Object-Manifest' or \ + key.lower().startswith('x-object-meta-'): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 5cbc4083f9..e8a9e59b23 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -14,15 +14,23 @@ # limitations under the License. from __future__ import with_statement +try: + import simplejson as json +except ImportError: + import json import mimetypes import os +import re import time import traceback from ConfigParser import ConfigParser +from datetime import datetime from urllib import unquote, quote import uuid import functools +from hashlib import md5 +from eventlet import sleep from eventlet.timeout import Timeout from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ HTTPNotFound, HTTPPreconditionFailed, \ @@ -36,8 +44,8 @@ from swift.common.utils import get_logger, normalize_timestamp, split_path, \ cache_from_env from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ - check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \ - MAX_FILE_SIZE + check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ + MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout @@ -95,6 +103,161 @@ def get_container_memcache_key(account, container): return 'container/%s/%s' % (account, container) +class SegmentedIterable(object): + """ + Iterable that returns the object contents for a segmented object in Swift. + + If set, the response's `bytes_transferred` value will be updated (used to + log the size of the request). Also, if there's a failure that cuts the + transfer short, the response's `status_int` will be updated (again, just + for logging since the original status would have already been sent to the + client). + + :param controller: The ObjectController instance to work with. + :param container: The container the object segments are within. + :param listing: The listing of object segments to iterate over; this may + be an iterator or list that returns dicts with 'name' and + 'bytes' keys. + :param response: The webob.Response this iterable is associated with, if + any (default: None) + """ + + def __init__(self, controller, container, listing, response=None): + self.controller = controller + self.container = container + self.listing = iter(listing) + self.segment = -1 + self.segment_dict = None + self.segment_peek = None + self.seek = 0 + self.segment_iter = None + self.position = 0 + self.response = response + if not self.response: + self.response = Response() + self.next_get_time = 0 + + def _load_next_segment(self): + """ + Loads the self.segment_iter with the next object segment's contents. + + :raises: StopIteration when there are no more object segments. + """ + try: + self.segment += 1 + self.segment_dict = self.segment_peek or self.listing.next() + self.segment_peek = None + partition, nodes = self.controller.app.object_ring.get_nodes( + self.controller.account_name, self.container, + self.segment_dict['name']) + path = '/%s/%s/%s' % (self.controller.account_name, self.container, + self.segment_dict['name']) + req = Request.blank(path) + if self.seek: + req.range = 'bytes=%s-' % self.seek + self.seek = 0 + if self.segment > 10: + sleep(max(self.next_get_time - time.time(), 0)) + self.next_get_time = time.time() + 1 + resp = self.controller.GETorHEAD_base(req, 'Object', partition, + self.controller.iter_nodes(partition, nodes, + self.controller.app.object_ring), path, + self.controller.app.object_ring.replica_count) + if resp.status_int // 100 != 2: + raise Exception('Could not load object segment %s: %s' % (path, + resp.status_int)) + self.segment_iter = resp.app_iter + except StopIteration: + raise + except Exception, err: + if not getattr(err, 'swift_logged', False): + self.controller.app.logger.exception('ERROR: While processing ' + 'manifest /%s/%s/%s %s' % (self.controller.account_name, + self.controller.container_name, + self.controller.object_name, self.controller.trans_id)) + err.swift_logged = True + self.response.status_int = 503 + raise + + def next(self): + return iter(self).next() + + def __iter__(self): + """ Standard iterator function that returns the object's contents. """ + try: + while True: + if not self.segment_iter: + self._load_next_segment() + while True: + with ChunkReadTimeout(self.controller.app.node_timeout): + try: + chunk = self.segment_iter.next() + break + except StopIteration: + self._load_next_segment() + self.position += len(chunk) + self.response.bytes_transferred = getattr(self.response, + 'bytes_transferred', 0) + len(chunk) + yield chunk + except StopIteration: + raise + except Exception, err: + if not getattr(err, 'swift_logged', False): + self.controller.app.logger.exception('ERROR: While processing ' + 'manifest /%s/%s/%s %s' % (self.controller.account_name, + self.controller.container_name, + self.controller.object_name, self.controller.trans_id)) + err.swift_logged = True + self.response.status_int = 503 + raise + + def app_iter_range(self, start, stop): + """ + Non-standard iterator function for use with Webob in serving Range + requests more quickly. This will skip over segments and do a range + request on the first segment to return data from, if needed. + + :param start: The first byte (zero-based) to return. None for 0. + :param stop: The last byte (zero-based) to return. None for end. + """ + try: + if start: + self.segment_peek = self.listing.next() + while start >= self.position + self.segment_peek['bytes']: + self.segment += 1 + self.position += self.segment_peek['bytes'] + self.segment_peek = self.listing.next() + self.seek = start - self.position + else: + start = 0 + if stop is not None: + length = stop - start + else: + length = None + for chunk in self: + if length is not None: + length -= len(chunk) + if length < 0: + # Chop off the extra: + self.response.bytes_transferred = \ + getattr(self.response, 'bytes_transferred', 0) \ + + length + yield chunk[:length] + break + yield chunk + except StopIteration: + raise + except Exception, err: + if not getattr(err, 'swift_logged', False): + self.controller.app.logger.exception('ERROR: While processing ' + 'manifest /%s/%s/%s %s' % (self.controller.account_name, + self.controller.container_name, + self.controller.object_name, self.controller.trans_id)) + err.swift_logged = True + self.response.status_int = 503 + raise + + class Controller(object): """Base WSGI controller class for the proxy""" @@ -538,9 +701,137 @@ class ObjectController(Controller): return aresp partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - return self.GETorHEAD_base(req, 'Object', partition, + resp = self.GETorHEAD_base(req, 'Object', partition, self.iter_nodes(partition, nodes, self.app.object_ring), req.path_info, self.app.object_ring.replica_count) + # If we get a 416 Requested Range Not Satisfiable we have to check if + # we were actually requesting a manifest object and then redo the range + # request on the whole object. + if resp.status_int == 416: + req_range = req.range + req.range = None + resp2 = self.GETorHEAD_base(req, 'Object', partition, + self.iter_nodes(partition, nodes, self.app.object_ring), + req.path_info, self.app.object_ring.replica_count) + if 'x-object-manifest' not in resp2.headers: + return resp + resp = resp2 + req.range = req_range + + if 'x-object-manifest' in resp.headers: + lcontainer, lprefix = \ + resp.headers['x-object-manifest'].split('/', 1) + lpartition, lnodes = self.app.container_ring.get_nodes( + self.account_name, lcontainer) + marker = '' + listing = [] + while True: + lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' % + (quote(self.account_name), quote(lcontainer), + quote(lprefix), quote(marker))) + lresp = self.GETorHEAD_base(lreq, 'Container', lpartition, + lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + lresp = HTTPNotFound(request=req) + lresp.headers['X-Object-Manifest'] = \ + resp.headers['x-object-manifest'] + return lresp + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + return aresp + sublisting = json.loads(lresp.body) + if not sublisting: + break + listing.extend(sublisting) + if len(listing) > CONTAINER_LISTING_LIMIT: + break + marker = sublisting[-1]['name'] + + if len(listing) > CONTAINER_LISTING_LIMIT: + # We will serve large objects with a ton of segments with + # chunked transfer encoding. + + def listing_iter(): + marker = '' + while True: + lreq = Request.blank( + '/%s/%s?prefix=%s&format=json&marker=%s' % + (quote(self.account_name), quote(lcontainer), + quote(lprefix), quote(marker))) + lresp = self.GETorHEAD_base(lreq, 'Container', + lpartition, lnodes, lreq.path_info, + self.app.container_ring.replica_count) + if lresp.status_int // 100 != 2: + raise Exception('Object manifest GET could not ' + 'continue listing: %s %s' % + (req.path, lreq.path)) + if 'swift.authorize' in req.environ: + req.acl = lresp.headers.get('x-container-read') + aresp = req.environ['swift.authorize'](req) + if aresp: + raise Exception('Object manifest GET could ' + 'not continue listing: %s %s' % + (req.path, aresp)) + sublisting = json.loads(lresp.body) + if not sublisting: + break + for obj in sublisting: + yield obj + marker = sublisting[-1]['name'] + + headers = { + 'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value + resp = Response(headers=headers, request=req, + conditional_response=True) + if req.method == 'HEAD': + # These shenanigans are because webob translates the HEAD + # request into a webob EmptyResponse for the body, which + # has a len, which eventlet translates as needing a + # content-length header added. So we call the original + # webob resp for the headers but return an empty iterator + # for the body. + + def head_response(environ, start_response): + resp(environ, start_response) + return iter([]) + + head_response.status_int = resp.status_int + return head_response + else: + resp.app_iter = SegmentedIterable(self, lcontainer, + listing_iter(), resp) + + else: + # For objects with a reasonable number of segments, we'll serve + # them with a set content-length and computed etag. + content_length = sum(o['bytes'] for o in listing) + last_modified = max(o['last_modified'] for o in listing) + last_modified = \ + datetime(*map(int, re.split('[^\d]', last_modified)[:-1])) + etag = md5('"'.join(o['hash'] for o in listing)).hexdigest() + headers = { + 'X-Object-Manifest': resp.headers['x-object-manifest'], + 'Content-Type': resp.content_type, + 'Content-Length': content_length, + 'ETag': etag} + for key, value in resp.headers.iteritems(): + if key.lower().startswith('x-object-meta-'): + headers[key] = value + resp = Response(headers=headers, request=req, + conditional_response=True) + resp.app_iter = SegmentedIterable(self, lcontainer, listing, + resp) + resp.content_length = content_length + resp.last_modified = last_modified + + return resp @public @delay_denial @@ -616,10 +907,6 @@ class ObjectController(Controller): partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) - # this is a temporary hook for migrations to set PUT timestamps - if '!Migration-Timestamp!' in req.headers: - req.headers['X-Timestamp'] = \ - normalize_timestamp(req.headers['!Migration-Timestamp!']) # Sometimes the 'content-type' header exists, but is set to None. if not req.headers.get('content-type'): guessed_type, _ = mimetypes.guess_type(req.path_info) @@ -658,11 +945,17 @@ class ObjectController(Controller): return source_resp self.object_name = orig_obj_name self.container_name = orig_container_name - data_source = source_resp.app_iter new_req = Request.blank(req.path_info, environ=req.environ, headers=req.headers) - new_req.content_length = source_resp.content_length - new_req.etag = source_resp.etag + if 'x-object-manifest' in source_resp.headers: + data_source = iter(['']) + new_req.content_length = 0 + new_req.headers['X-Object-Manifest'] = \ + source_resp.headers['x-object-manifest'] + else: + data_source = source_resp.app_iter + new_req.content_length = source_resp.content_length + new_req.etag = source_resp.etag # we no longer need the X-Copy-From header del new_req.headers['X-Copy-From'] for k, v in source_resp.headers.items(): diff --git a/swift/stats/access_processor.py b/swift/stats/access_processor.py index f0ae3a023a..0f5814076f 100644 --- a/swift/stats/access_processor.py +++ b/swift/stats/access_processor.py @@ -26,7 +26,7 @@ class AccessLogProcessor(object): """Transform proxy server access logs""" def __init__(self, conf): - self.server_name = conf.get('server_name', 'proxy') + self.server_name = conf.get('server_name', 'proxy-server') self.lb_private_ips = [x.strip() for x in \ conf.get('lb_private_ips', '').split(',')\ if x.strip()] diff --git a/test/functionalnosetests/test_object.py b/test/functionalnosetests/test_object.py index e4b2fc48c5..2e1668db0e 100644 --- a/test/functionalnosetests/test_object.py +++ b/test/functionalnosetests/test_object.py @@ -16,6 +16,7 @@ class TestObject(unittest.TestCase): if skip: raise SkipTest self.container = uuid4().hex + def put(url, token, parsed, conn): conn.request('PUT', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) @@ -24,6 +25,7 @@ class TestObject(unittest.TestCase): resp.read() self.assertEquals(resp.status, 201) self.obj = uuid4().hex + def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, self.obj), 'test', {'X-Auth-Token': token}) @@ -35,6 +37,7 @@ class TestObject(unittest.TestCase): def tearDown(self): if skip: raise SkipTest + def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/%s' % (parsed.path, self.container, self.obj), '', {'X-Auth-Token': token}) @@ -42,6 +45,7 @@ class TestObject(unittest.TestCase): resp = retry(delete) resp.read() self.assertEquals(resp.status, 204) + def delete(url, token, parsed, conn): conn.request('DELETE', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) @@ -53,6 +57,7 @@ class TestObject(unittest.TestCase): def test_public_object(self): if skip: raise SkipTest + def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % (parsed.path, self.container, self.obj)) @@ -62,6 +67,7 @@ class TestObject(unittest.TestCase): raise Exception('Should not have been able to GET') except Exception, err: self.assert_(str(err).startswith('No result after ')) + def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', {'X-Auth-Token': token, @@ -73,6 +79,7 @@ class TestObject(unittest.TestCase): resp = retry(get) resp.read() self.assertEquals(resp.status, 200) + def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', {'X-Auth-Token': token, 'X-Container-Read': ''}) @@ -89,6 +96,7 @@ class TestObject(unittest.TestCase): def test_private_object(self): if skip or skip3: raise SkipTest + # Ensure we can't access the object with the third account def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % (parsed.path, self.container, @@ -98,8 +106,10 @@ class TestObject(unittest.TestCase): resp = retry(get, use_account=3) resp.read() self.assertEquals(resp.status, 403) + # create a shared container writable by account3 shared_container = uuid4().hex + def put(url, token, parsed, conn): conn.request('PUT', '%s/%s' % (parsed.path, shared_container), '', @@ -110,6 +120,7 @@ class TestObject(unittest.TestCase): resp = retry(put) resp.read() self.assertEquals(resp.status, 201) + # verify third account can not copy from private container def copy(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, @@ -123,6 +134,7 @@ class TestObject(unittest.TestCase): resp = retry(copy, use_account=3) resp.read() self.assertEquals(resp.status, 403) + # verify third account can write "obj1" to shared container def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, shared_container, @@ -131,6 +143,7 @@ class TestObject(unittest.TestCase): resp = retry(put, use_account=3) resp.read() self.assertEquals(resp.status, 201) + # verify third account can copy "obj1" to shared container def copy2(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % (parsed.path, @@ -143,6 +156,7 @@ class TestObject(unittest.TestCase): resp = retry(copy2, use_account=3) resp.read() self.assertEquals(resp.status, 201) + # verify third account STILL can not copy from private container def copy3(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % (parsed.path, @@ -155,6 +169,7 @@ class TestObject(unittest.TestCase): resp = retry(copy3, use_account=3) resp.read() self.assertEquals(resp.status, 403) + # clean up "obj1" def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/%s' % (parsed.path, shared_container, @@ -163,6 +178,7 @@ class TestObject(unittest.TestCase): resp = retry(delete) resp.read() self.assertEquals(resp.status, 204) + # clean up shared_container def delete(url, token, parsed, conn): conn.request('DELETE', @@ -173,6 +189,269 @@ class TestObject(unittest.TestCase): resp.read() self.assertEquals(resp.status, 204) + def test_manifest(self): + if skip: + raise SkipTest + # Data for the object segments + segments1 = ['one', 'two', 'three', 'four', 'five'] + segments2 = ['six', 'seven', 'eight'] + segments3 = ['nine', 'ten', 'eleven'] + + # Upload the first set of segments + def put(url, token, parsed, conn, objnum): + conn.request('PUT', '%s/%s/segments1/%s' % (parsed.path, + self.container, str(objnum)), segments1[objnum], + {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments1)): + resp = retry(put, objnum) + resp.read() + self.assertEquals(resp.status, 201) + + # Upload the manifest + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, + 'X-Object-Manifest': '%s/segments1/' % self.container, + 'Content-Type': 'text/jibberish', 'Content-Length': '0'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest (should get all the segments as the body) + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)) + self.assertEquals(resp.status, 200) + self.assertEquals(resp.getheader('content-type'), 'text/jibberish') + + # Get with a range at the start of the second segment + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, 'Range': + 'bytes=3-'}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1[1:])) + self.assertEquals(resp.status, 206) + + # Get with a range in the middle of the second segment + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, 'Range': + 'bytes=5-'}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)[5:]) + self.assertEquals(resp.status, 206) + + # Get with a full start and stop range + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, 'Range': + 'bytes=5-10'}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)[5:11]) + self.assertEquals(resp.status, 206) + + # Upload the second set of segments + def put(url, token, parsed, conn, objnum): + conn.request('PUT', '%s/%s/segments2/%s' % (parsed.path, + self.container, str(objnum)), segments2[objnum], + {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments2)): + resp = retry(put, objnum) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest (should still be the first segments of course) + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments1)) + self.assertEquals(resp.status, 200) + + # Update the manifest + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, + 'X-Object-Manifest': '%s/segments2/' % self.container, + 'Content-Length': '0'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest (should be the second set of segments now) + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments2)) + self.assertEquals(resp.status, 200) + + if not skip3: + + # Ensure we can't access the manifest with the third account + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # Grant access to the third account + def post(url, token, parsed, conn): + conn.request('POST', '%s/%s' % (parsed.path, self.container), + '', {'X-Auth-Token': token, 'X-Container-Read': + swift_test_user[2]}) + return check_response(conn) + resp = retry(post) + resp.read() + self.assertEquals(resp.status, 204) + + # The third account should be able to get the manifest now + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + self.assertEquals(resp.read(), ''.join(segments2)) + self.assertEquals(resp.status, 200) + + # Create another container for the third set of segments + acontainer = uuid4().hex + + def put(url, token, parsed, conn): + conn.request('PUT', parsed.path + '/' + acontainer, '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Upload the third set of segments in the other container + def put(url, token, parsed, conn, objnum): + conn.request('PUT', '%s/%s/segments3/%s' % (parsed.path, + acontainer, str(objnum)), segments3[objnum], + {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments3)): + resp = retry(put, objnum) + resp.read() + self.assertEquals(resp.status, 201) + + # Update the manifest + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token, + 'X-Object-Manifest': '%s/segments3/' % acontainer, + 'Content-Length': '0'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEquals(resp.status, 201) + + # Get the manifest to ensure it's the third set of segments + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get) + self.assertEquals(resp.read(), ''.join(segments3)) + self.assertEquals(resp.status, 200) + + if not skip3: + + # Ensure we can't access the manifest with the third account + # (because the segments are in a protected container even if the + # manifest itself is not). + + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # Grant access to the third account + def post(url, token, parsed, conn): + conn.request('POST', '%s/%s' % (parsed.path, acontainer), + '', {'X-Auth-Token': token, 'X-Container-Read': + swift_test_user[2]}) + return check_response(conn) + resp = retry(post) + resp.read() + self.assertEquals(resp.status, 204) + + # The third account should be able to get the manifest now + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + self.assertEquals(resp.read(), ''.join(segments3)) + self.assertEquals(resp.status, 200) + + # Delete the manifest + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/manifest' % (parsed.path, + self.container), '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the third set of segments + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/segments3/%s' % (parsed.path, + acontainer, str(objnum)), '', {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments3)): + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the second set of segments + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/segments2/%s' % (parsed.path, + self.container, str(objnum)), '', {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments2)): + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the first set of segments + def delete(url, token, parsed, conn, objnum): + conn.request('DELETE', '%s/%s/segments1/%s' % (parsed.path, + self.container, str(objnum)), '', {'X-Auth-Token': token}) + return check_response(conn) + for objnum in xrange(len(segments1)): + resp = retry(delete, objnum) + resp.read() + self.assertEquals(resp.status, 204) + + # Delete the extra container + def delete(url, token, parsed, conn): + conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(delete) + resp.read() + self.assertEquals(resp.status, 204) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_constraints.py b/test/unit/common/test_constraints.py index 0950d1e50e..bcc590f1ee 100644 --- a/test/unit/common/test_constraints.py +++ b/test/unit/common/test_constraints.py @@ -22,6 +22,7 @@ from webob.exc import HTTPBadRequest, HTTPLengthRequired, \ from swift.common import constraints + class TestConstraints(unittest.TestCase): def test_check_metadata_empty(self): @@ -137,6 +138,32 @@ class TestConstraints(unittest.TestCase): self.assert_(isinstance(resp, HTTPBadRequest)) self.assert_('Content-Type' in resp.body) + def test_check_object_manifest_header(self): + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container/prefix', 'Content-Length': + '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(not resp) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container', 'Content-Length': '0', + 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': '/container/prefix', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container/prefix?query=param', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'container/prefix&query=param', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + resp = constraints.check_object_creation(Request.blank('/', + headers={'X-Object-Manifest': 'http://host/container/prefix', + 'Content-Length': '0', 'Content-Type': 'text/plain'}), 'manifest') + self.assert_(isinstance(resp, HTTPBadRequest)) + def test_check_mount(self): self.assertFalse(constraints.check_mount('', '')) constraints.os = MockTrue() # mock os module diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 94a3b28266..90eed52977 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -42,7 +42,7 @@ class TestObjectController(unittest.TestCase): self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') if not self.path_to_test_xfs or \ not os.path.exists(self.path_to_test_xfs): - print >>sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ + print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ 'pointing to a valid directory.\n' \ 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \ 'system for testing.' @@ -77,7 +77,8 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 201) timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Object-Meta-3': 'Three', 'X-Object-Meta-4': 'Four', @@ -95,7 +96,8 @@ class TestObjectController(unittest.TestCase): if not self.path_to_test_xfs: raise SkipTest timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/fail', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/fail', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Object-Meta-1': 'One', 'X-Object-Meta-2': 'Two', @@ -116,29 +118,37 @@ class TestObjectController(unittest.TestCase): def test_POST_container_connection(self): if not self.path_to_test_xfs: raise SkipTest + def mock_http_connect(response, with_exc=False): + class FakeConn(object): + def __init__(self, status, with_exc): self.status = status self.reason = 'Fake' self.host = '1.2.3.4' self.port = '1234' self.with_exc = with_exc + def getresponse(self): if self.with_exc: raise Exception('test') return self + def read(self, amt=None): return '' + return lambda *args, **kwargs: FakeConn(response, with_exc) + old_http_connect = object_server.http_connect try: timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, - headers={'X-Timestamp': timestamp, 'Content-Type': 'text/plain', - 'Content-Length': '0'}) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': + 'POST'}, headers={'X-Timestamp': timestamp, 'Content-Type': + 'text/plain', 'Content-Length': '0'}) resp = self.object_controller.PUT(req) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -148,7 +158,8 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = mock_http_connect(202) resp = self.object_controller.POST(req) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -158,7 +169,8 @@ class TestObjectController(unittest.TestCase): object_server.http_connect = mock_http_connect(202, with_exc=True) resp = self.object_controller.POST(req) self.assertEquals(resp.status_int, 202) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -226,7 +238,8 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY') - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, 'Content-Length': '6', 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037', @@ -258,7 +271,8 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY TWO') - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, 'Content-Length': '10', 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039', @@ -270,17 +284,17 @@ class TestObjectController(unittest.TestCase): if not self.path_to_test_xfs: raise SkipTest req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': normalize_timestamp(time()), - 'Content-Type': 'text/plain'}) + headers={'X-Timestamp': normalize_timestamp(time()), + 'Content-Type': 'text/plain'}) req.body = 'test' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 201) def test_PUT_invalid_etag(self): req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, - headers={'X-Timestamp': normalize_timestamp(time()), - 'Content-Type': 'text/plain', - 'ETag': 'invalid'}) + headers={'X-Timestamp': normalize_timestamp(time()), + 'Content-Type': 'text/plain', + 'ETag': 'invalid'}) req.body = 'test' resp = self.object_controller.PUT(req) self.assertEquals(resp.status_int, 422) @@ -304,7 +318,8 @@ class TestObjectController(unittest.TestCase): timestamp + '.data') self.assert_(os.path.isfile(objfile)) self.assertEquals(open(objfile).read(), 'VERIFY THREE') - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, 'Content-Length': '12', 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568', @@ -316,25 +331,33 @@ class TestObjectController(unittest.TestCase): def test_PUT_container_connection(self): if not self.path_to_test_xfs: raise SkipTest + def mock_http_connect(response, with_exc=False): + class FakeConn(object): + def __init__(self, status, with_exc): self.status = status self.reason = 'Fake' self.host = '1.2.3.4' self.port = '1234' self.with_exc = with_exc + def getresponse(self): if self.with_exc: raise Exception('test') return self + def read(self, amt=None): return '' + return lambda *args, **kwargs: FakeConn(response, with_exc) + old_http_connect = object_server.http_connect try: timestamp = normalize_timestamp(time()) - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': timestamp, 'X-Container-Host': '1.2.3.4:0', 'X-Container-Partition': '3', @@ -555,7 +578,8 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 200) self.assertEquals(resp.etag, etag) - req = Request.blank('/sda1/p/a/c/o2', environ={'REQUEST_METHOD': 'GET'}, + req = Request.blank('/sda1/p/a/c/o2', + environ={'REQUEST_METHOD': 'GET'}, headers={'If-Match': '*'}) resp = self.object_controller.GET(req) self.assertEquals(resp.status_int, 412) @@ -715,7 +739,8 @@ class TestObjectController(unittest.TestCase): """ Test swift.object_server.ObjectController.DELETE """ if not self.path_to_test_xfs: raise SkipTest - req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'}) + req = Request.blank('/sda1/p/a/c', + environ={'REQUEST_METHOD': 'DELETE'}) resp = self.object_controller.DELETE(req) self.assertEquals(resp.status_int, 400) @@ -916,21 +941,26 @@ class TestObjectController(unittest.TestCase): def test_disk_file_mkstemp_creates_dir(self): tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') os.rmdir(tmpdir) - with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp(): + with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', + 'o').mkstemp(): self.assert_(os.path.exists(tmpdir)) def test_max_upload_time(self): if not self.path_to_test_xfs: raise SkipTest + class SlowBody(): + def __init__(self): self.sent = 0 + def read(self, size=-1): if self.sent < 4: sleep(0.1) self.sent += 1 return ' ' return '' + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'X-Timestamp': normalize_timestamp(time()), @@ -946,14 +976,18 @@ class TestObjectController(unittest.TestCase): self.assertEquals(resp.status_int, 408) def test_short_body(self): + class ShortBody(): + def __init__(self): self.sent = False + def read(self, size=-1): if not self.sent: self.sent = True return ' ' return '' + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': ShortBody()}, headers={'X-Timestamp': normalize_timestamp(time()), @@ -1001,11 +1035,37 @@ class TestObjectController(unittest.TestCase): resp = self.object_controller.GET(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.headers['content-encoding'], 'gzip') - req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'HEAD'}) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': + 'HEAD'}) resp = self.object_controller.HEAD(req) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.headers['content-encoding'], 'gzip') + def test_manifest_header(self): + if not self.path_to_test_xfs: + raise SkipTest + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'text/plain', + 'Content-Length': '0', + 'X-Object-Manifest': 'c/o/'}) + resp = self.object_controller.PUT(req) + self.assertEquals(resp.status_int, 201) + objfile = os.path.join(self.testdir, 'sda1', + storage_directory(object_server.DATADIR, 'p', hash_path('a', 'c', + 'o')), timestamp + '.data') + self.assert_(os.path.isfile(objfile)) + self.assertEquals(pickle.loads(getxattr(objfile, + object_server.METADATA_KEY)), {'X-Timestamp': timestamp, + 'Content-Length': '0', 'Content-Type': 'text/plain', 'name': + '/a/c/o', 'X-Object-Manifest': 'c/o/', 'ETag': + 'd41d8cd98f00b204e9800998ecf8427e'}) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'}) + resp = self.object_controller.GET(req) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.headers.get('x-object-manifest'), 'c/o/') + if __name__ == '__main__': unittest.main() diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 2c9cd83395..b7d43c0fb2 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -35,8 +35,8 @@ import eventlet from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen from eventlet.timeout import Timeout import simplejson -from webob import Request -from webob.exc import HTTPUnauthorized +from webob import Request, Response +from webob.exc import HTTPNotFound, HTTPUnauthorized from test.unit import connect_tcp, readuntil2crlfs from swift.proxy import server as proxy_server @@ -53,7 +53,9 @@ logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) def fake_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status, etag=None, body=''): self.status = status self.reason = 'Fake' @@ -160,6 +162,7 @@ class FakeRing(object): class FakeMemcache(object): + def __init__(self): self.store = {} @@ -372,9 +375,12 @@ class TestController(unittest.TestCase): class TestProxyServer(unittest.TestCase): def test_unhandled_exception(self): + class MyApp(proxy_server.Application): + def get_controller(self, path): raise Exception('this shouldnt be caught') + app = MyApp(None, FakeMemcache(), account_ring=FakeRing(), container_ring=FakeRing(), object_ring=FakeRing()) req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'}) @@ -497,8 +503,11 @@ class TestObjectController(unittest.TestCase): test_status_map((200, 200, 204, 500, 404), 503) def test_PUT_connect_exceptions(self): + def mock_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status): self.status = status self.reason = 'Fake' @@ -518,6 +527,7 @@ class TestObjectController(unittest.TestCase): if self.status == -3: return FakeConn(507) return FakeConn(100) + code_iter = iter(code_iter) def connect(*args, **ckwargs): @@ -525,7 +535,9 @@ class TestObjectController(unittest.TestCase): if status == -1: raise HTTPException() return FakeConn(status) + return connect + with save_globals(): controller = proxy_server.ObjectController(self.app, 'account', 'container', 'object') @@ -546,8 +558,11 @@ class TestObjectController(unittest.TestCase): test_status_map((200, 200, 503, 503, -1), 503) def test_PUT_send_exceptions(self): + def mock_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status): self.status = status self.reason = 'Fake' @@ -611,8 +626,11 @@ class TestObjectController(unittest.TestCase): self.assertEquals(res.status_int, 413) def test_PUT_getresponse_exceptions(self): + def mock_http_connect(*code_iter, **kwargs): + class FakeConn(object): + def __init__(self, status): self.status = status self.reason = 'Fake' @@ -807,6 +825,7 @@ class TestObjectController(unittest.TestCase): dev['port'] = 1 class SlowBody(): + def __init__(self): self.sent = 0 @@ -816,6 +835,7 @@ class TestObjectController(unittest.TestCase): self.sent += 1 return ' ' return '' + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'Content-Length': '4', 'Content-Type': 'text/plain'}) @@ -854,11 +874,13 @@ class TestObjectController(unittest.TestCase): dev['port'] = 1 class SlowBody(): + def __init__(self): self.sent = 0 def read(self, size=-1): raise Exception('Disconnected') + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()}, headers={'Content-Length': '4', 'Content-Type': 'text/plain'}) @@ -1508,7 +1530,9 @@ class TestObjectController(unittest.TestCase): def test_chunked_put(self): # quick test of chunked put w/o PATH_TO_TEST_XFS + class ChunkedFile(): + def __init__(self, bytes): self.bytes = bytes self.read_bytes = 0 @@ -1576,6 +1600,7 @@ class TestObjectController(unittest.TestCase): mkdirs(os.path.join(testdir, 'sdb1')) mkdirs(os.path.join(testdir, 'sdb1', 'tmp')) try: + orig_container_listing_limit = proxy_server.CONTAINER_LISTING_LIMIT conf = {'devices': testdir, 'swift_dir': testdir, 'mount_check': 'false'} prolis = listen(('localhost', 0)) @@ -1669,8 +1694,10 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) # Check unhandled exception orig_update_request = prosrv.update_request + def broken_update_request(env, req): raise Exception('fake') + prosrv.update_request = broken_update_request sock = connect_tcp(('localhost', prolis.getsockname()[1])) fd = sock.makefile() @@ -1719,8 +1746,10 @@ class TestObjectController(unittest.TestCase): # in a test for logging x-forwarded-for (first entry only). class Logger(object): + def info(self, msg): self.msg = msg + orig_logger = prosrv.logger prosrv.logger = Logger() sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -1742,8 +1771,10 @@ class TestObjectController(unittest.TestCase): # Turn on header logging. class Logger(object): + def info(self, msg): self.msg = msg + orig_logger = prosrv.logger prosrv.logger = Logger() prosrv.log_headers = True @@ -1900,6 +1931,70 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) body = fd.read() self.assertEquals(body, 'oh hai123456789abcdef') + # Create a container for our segmented/manifest object testing + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/segmented HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Storage-Token: t\r\n' + 'Content-Length: 0\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Create the object segments + for segment in xrange(5): + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/segmented/name/%s HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 5\r\n\r\n1234 ' % str(segment)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Create the object manifest file + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Storage-Token: ' + 't\r\nContent-Length: 0\r\nX-Object-Manifest: ' + 'segmented/name/\r\nContent-Type: text/jibberish\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEquals(headers[:len(exp)], exp) + # Ensure retrieving the manifest file gets the whole object + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('X-Object-Manifest: segmented/name/' in headers) + self.assert_('Content-Type: text/jibberish' in headers) + body = fd.read() + self.assertEquals(body, '1234 1234 1234 1234 1234 ') + # Do it again but exceeding the container listing limit + proxy_server.CONTAINER_LISTING_LIMIT = 2 + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/segmented/name HTTP/1.1\r\nHost: ' + 'localhost\r\nConnection: close\r\nX-Auth-Token: ' + 't\r\n\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + self.assert_('X-Object-Manifest: segmented/name/' in headers) + self.assert_('Content-Type: text/jibberish' in headers) + body = fd.read() + # A bit fragile of a test; as it makes the assumption that all + # will be sent in a single chunk. + self.assertEquals(body, + '19\r\n1234 1234 1234 1234 1234 \r\n0\r\n\r\n') finally: prospa.kill() acc1spa.kill() @@ -1909,6 +2004,7 @@ class TestObjectController(unittest.TestCase): obj1spa.kill() obj2spa.kill() finally: + proxy_server.CONTAINER_LISTING_LIMIT = orig_container_listing_limit rmtree(testdir) def test_mismatched_etags(self): @@ -2111,6 +2207,7 @@ class TestObjectController(unittest.TestCase): res = controller.COPY(req) self.assert_(called[0]) + class TestContainerController(unittest.TestCase): "Test swift.proxy_server.ContainerController" @@ -2254,7 +2351,9 @@ class TestContainerController(unittest.TestCase): self.assertEquals(resp.status_int, 404) def test_put_locking(self): + class MockMemcache(FakeMemcache): + def __init__(self, allow_lock=None): self.allow_lock = allow_lock super(MockMemcache, self).__init__() @@ -2265,6 +2364,7 @@ class TestContainerController(unittest.TestCase): yield True else: raise MemcacheLockError() + with save_globals(): controller = proxy_server.ContainerController(self.app, 'account', 'container') @@ -2870,5 +2970,261 @@ class TestAccountController(unittest.TestCase): test_status_map((204, 500, 404), 503) +class FakeObjectController(object): + + def __init__(self): + self.app = self + self.logger = self + self.account_name = 'a' + self.container_name = 'c' + self.object_name = 'o' + self.trans_id = 'tx1' + self.object_ring = FakeRing() + self.node_timeout = 1 + + def exception(self, *args): + self.exception_args = args + self.exception_info = sys.exc_info() + + def GETorHEAD_base(self, *args): + self.GETorHEAD_base_args = args + req = args[0] + path = args[4] + body = data = path[-1] * int(path[-1]) + if req.range and req.range.ranges: + body = '' + for start, stop in req.range.ranges: + body += data[start:stop] + resp = Response(app_iter=iter(body)) + return resp + + def iter_nodes(self, partition, nodes, ring): + for node in nodes: + yield node + for node in ring.get_more_nodes(partition): + yield node + + +class Stub(object): + pass + + +class TestSegmentedIterable(unittest.TestCase): + + def setUp(self): + self.controller = FakeObjectController() + + def test_load_next_segment_unexpected_error(self): + # Iterator value isn't a dict + self.assertRaises(Exception, + proxy_server.SegmentedIterable(self.controller, None, + [None])._load_next_segment) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + + def test_load_next_segment_with_no_segments(self): + self.assertRaises(StopIteration, + proxy_server.SegmentedIterable(self.controller, 'lc', + [])._load_next_segment) + + def test_load_next_segment_with_one_segment(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}]) + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '1') + + def test_load_next_segment_with_two_segments(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o1') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '1') + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '22') + + def test_load_next_segment_with_two_segments_skip_first(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit.segment = 0 + segit.listing.next() + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '22') + + def test_load_next_segment_with_seek(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit.segment = 0 + segit.listing.next() + segit.seek = 1 + segit._load_next_segment() + self.assertEquals(self.controller.GETorHEAD_base_args[4], '/a/lc/o2') + self.assertEquals(str(self.controller.GETorHEAD_base_args[0].range), + 'bytes=1-') + data = ''.join(segit.segment_iter) + self.assertEquals(data, '2') + + def test_load_next_segment_with_get_error(self): + + def local_GETorHEAD_base(*args): + return HTTPNotFound() + + self.controller.GETorHEAD_base = local_GETorHEAD_base + self.assertRaises(Exception, + proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}])._load_next_segment) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + self.assertEquals(str(self.controller.exception_info[1]), + 'Could not load object segment /a/lc/o1: 404') + + def test_iter_unexpected_error(self): + # Iterator value isn't a dict + self.assertRaises(Exception, ''.join, + proxy_server.SegmentedIterable(self.controller, None, [None])) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + + def test_iter_with_no_segments(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', []) + self.assertEquals(''.join(segit), '') + + def test_iter_with_one_segment(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}]) + segit.response = Stub() + self.assertEquals(''.join(segit), '1') + self.assertEquals(segit.response.bytes_transferred, 1) + + def test_iter_with_two_segments(self): + segit = proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}, {'name': 'o2'}]) + segit.response = Stub() + self.assertEquals(''.join(segit), '122') + self.assertEquals(segit.response.bytes_transferred, 3) + + def test_iter_with_get_error(self): + + def local_GETorHEAD_base(*args): + return HTTPNotFound() + + self.controller.GETorHEAD_base = local_GETorHEAD_base + self.assertRaises(Exception, ''.join, + proxy_server.SegmentedIterable(self.controller, 'lc', [{'name': + 'o1'}])) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + self.assertEquals(str(self.controller.exception_info[1]), + 'Could not load object segment /a/lc/o1: 404') + + def test_app_iter_range_unexpected_error(self): + # Iterator value isn't a dict + self.assertRaises(Exception, + proxy_server.SegmentedIterable(self.controller, None, + [None]).app_iter_range(None, None).next) + self.assertEquals(self.controller.exception_args[0], + 'ERROR: While processing manifest /a/c/o tx1') + + def test_app_iter_range_with_no_segments(self): + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(None, None)), '') + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(3, None)), '') + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(3, 5)), '') + self.assertEquals(''.join(proxy_server.SegmentedIterable( + self.controller, 'lc', []).app_iter_range(None, 5)), '') + + def test_app_iter_range_with_one_segment(self): + listing = [{'name': 'o1', 'bytes': 1}] + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, None)), '1') + self.assertEquals(segit.response.bytes_transferred, 1) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + self.assertEquals(''.join(segit.app_iter_range(3, None)), '') + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + self.assertEquals(''.join(segit.app_iter_range(3, 5)), '') + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 5)), '1') + self.assertEquals(segit.response.bytes_transferred, 1) + + def test_app_iter_range_with_two_segments(self): + listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}] + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, None)), '122') + self.assertEquals(segit.response.bytes_transferred, 3) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(1, None)), '22') + self.assertEquals(segit.response.bytes_transferred, 2) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(1, 5)), '22') + self.assertEquals(segit.response.bytes_transferred, 2) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 2)), '12') + self.assertEquals(segit.response.bytes_transferred, 2) + + def test_app_iter_range_with_many_segments(self): + listing = [{'name': 'o1', 'bytes': 1}, {'name': 'o2', 'bytes': 2}, + {'name': 'o3', 'bytes': 3}, {'name': 'o4', 'bytes': 4}, {'name': + 'o5', 'bytes': 5}] + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, None)), + '122333444455555') + self.assertEquals(segit.response.bytes_transferred, 15) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(3, None)), + '333444455555') + self.assertEquals(segit.response.bytes_transferred, 12) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(5, None)), '3444455555') + self.assertEquals(segit.response.bytes_transferred, 10) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 6)), '122333') + self.assertEquals(segit.response.bytes_transferred, 6) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(None, 7)), '1223334') + self.assertEquals(segit.response.bytes_transferred, 7) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(3, 7)), '3334') + self.assertEquals(segit.response.bytes_transferred, 4) + + segit = proxy_server.SegmentedIterable(self.controller, 'lc', listing) + segit.response = Stub() + self.assertEquals(''.join(segit.app_iter_range(5, 7)), '34') + self.assertEquals(segit.response.bytes_transferred, 2) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 1221a05e9e..1a6450139f 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -65,7 +65,7 @@ class DumbInternalProxy(object): class TestLogProcessor(unittest.TestCase): - access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\ + access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ '09/Jul/2010/04/14/30 GET '\ '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\