Bulk Requests: auto extract archive and bulk delete middleware.

Fix small problem in ratelimiting middleware.

DocImpact

Change-Id: Ide3e0b9f4887626c30cae0b97eb7e2237b1df3ed
This commit is contained in:
David Goetz 2013-01-24 12:34:56 -08:00
parent 64270fab71
commit 2f663ff9a0
8 changed files with 1020 additions and 6 deletions

View File

@ -172,3 +172,10 @@ Proxy Logging
:members:
:show-inheritance:
Bulk Operations (Delete and Archive Auto Extraction)
====================================================
.. automodule:: swift.common.middleware.bulk
:members:
:show-inheritance:

View File

@ -334,3 +334,10 @@ use = egg:swift#proxy_logging
# What HTTP methods are allowed for StatsD logging (comma-sep); request methods
# not in this list will have "BAD_METHOD" for the <verb> portion of the metric.
# log_statsd_valid_http_methods = GET,HEAD,POST,PUT,DELETE,COPY,OPTIONS
# Note: Put before both ratelimit and auth in the pipeline.
[filter:bulk]
use = egg:swift#bulk
# max_containers_per_extraction = 10000
# max_failed_files = 1000
# max_deletes_per_request = 1000

View File

@ -99,8 +99,9 @@ setup(
'tempurl=swift.common.middleware.tempurl:filter_factory',
'formpost=swift.common.middleware.formpost:filter_factory',
'name_check=swift.common.middleware.name_check:filter_factory',
'proxy_logging=swift.common.middleware.proxy_logging:'
'filter_factory',
'proxy_logging='
'swift.common.middleware.proxy_logging:filter_factory',
'bulk=swift.common.middleware.bulk:filter_factory',
],
},
)

View File

@ -1061,7 +1061,7 @@ class ContainerBroker(DatabaseBroker):
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimeter: delimeter for query
:param delimiter: delimiter for query
:param path: if defined, will set the prefix and delimter based on
the path

View File

@ -0,0 +1,392 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tarfile
from urllib import quote, unquote
from xml.sax import saxutils
from swift.common.swob import Request, HTTPBadGateway, \
HTTPCreated, HTTPBadRequest, HTTPNotFound, HTTPUnauthorized, HTTPOk, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPNotAcceptable, \
catch_http_exception
from swift.common.utils import split_path, json, TRUE_VALUES
from swift.common.constraints import check_utf8, MAX_FILE_SIZE
from swift.common.http import HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, \
HTTP_NOT_FOUND
from swift.common.constraints import MAX_OBJECT_NAME_LENGTH, \
MAX_CONTAINER_NAME_LENGTH
MAX_PATH_LENGTH = MAX_OBJECT_NAME_LENGTH + MAX_CONTAINER_NAME_LENGTH + 2
class CreateContainerError(Exception):
def __init__(self, msg, status_int, status):
self.status_int = status_int
self.status = status
Exception.__init__(self, msg)
ACCEPTABLE_FORMATS = ['text/plain', 'application/json', 'application/xml',
'text/xml']
class Bulk(object):
"""
Middleware that will do many operations on a single request.
Extract Archive:
Expand tar files into a swift account. Request must be a PUT with the
header X-Extract-Archive specifying the format of archive file. Accepted
formats are tar, tar.gz, and tar.bz2.
For a PUT to the following url:
/v1/AUTH_Account/$UPLOAD_PATH
UPLOAD_PATH is where the files will be expanded to. UPLOAD_PATH can be a
container, a pseudo-directory within a container, or an empty string. The
destination of a file in the archive will be built as follows:
/v1/AUTH_Account/$UPLOAD_PATH/$FILE_PATH
Where FILE_PATH is the file name from the listing in the tar file.
If the UPLOAD_PATH is an empty string, containers will be auto created
accordingly and files in the tar that would not map to any container (files
in the base directory) will be ignored.
Only regular files will be uploaded. Empty directories, symlinks, etc will
not be uploaded.
If all valid files were uploaded successfully will return an HTTPCreated
response. If any files failed to be created will return an HTTPBadGateway
response. In both cases the response body will specify the number of files
successfully uploaded and a list of the files that failed. The return body
will be formatted in the way specified in the request's Accept header.
Acceptable formats are text/plain, application/json, application/xml, and
text/xml.
Bulk Delete:
Will delete multiple objects from their account with a single request.
Responds to DELETE requests with a header 'X-Bulk-Delete: true'.
The Content-Type should be set to text/plain. The body of the DELETE
request will be a newline separated list of url encoded objects to delete.
You can only delete 1000 (configurable) objects per request. The objects
specified in the DELETE request body must be URL encoded and in the form:
/container_name/obj_name
If all objects were successfully deleted (or did not exist), will return an
HTTPOk. If any objects failed to delete, will return an HTTPBadGateway. In
both cases the response body will specify the number of objects
successfully deleted, not found, and a list of the objects that failed.
The return body will be formatted in the way specified in the request's
Accept header. Acceptable formats are text/plain, application/json,
apllication/xml, and text/xml.
"""
def __init__(self, app, conf):
self.app = app
self.max_containers = int(
conf.get('max_containers_per_extraction', 10000))
self.max_failed_extractions = int(
conf.get('max_failed_extractions', 1000))
self.max_deletes_per_request = int(
conf.get('max_deletes_per_request', 1000))
def create_container(self, req, container_path):
"""
Makes a subrequest to create a new container.
:params container_path: an unquoted path to a container to be created
:returns: None on success
:raises: CreateContainerError on creation error
"""
new_env = req.environ.copy()
new_env['PATH_INFO'] = container_path
create_cont_req = Request.blank(container_path, environ=new_env)
resp = create_cont_req.get_response(self.app)
if resp.status_int // 100 != 2:
raise CreateContainerError(
"Create Container Failed: " + container_path,
resp.status_int, resp.status)
def get_objs_to_delete(self, req):
"""
Will populate objs_to_delete with data from request input.
:params req: a Swob request
:returns: a list of the contents of req.body when separated by newline.
:raises: HTTPException on failures
"""
line = ''
data_remaining = True
objs_to_delete = []
if req.content_length is None and \
req.headers.get('transfer-encoding', '').lower() != 'chunked':
raise HTTPBadRequest('Invalid request: no content sent.')
while data_remaining:
if len(objs_to_delete) > self.max_deletes_per_request:
raise HTTPRequestEntityTooLarge(
'Maximum Bulk Deletes: %d per request' %
self.max_deletes_per_request)
if '\n' in line:
obj_to_delete, line = line.split('\n', 1)
objs_to_delete.append(obj_to_delete)
else:
data = req.body_file.read(MAX_PATH_LENGTH)
if data:
line += data
else:
data_remaining = False
if line.strip():
objs_to_delete.append(line)
if len(line) > MAX_PATH_LENGTH * 2:
raise HTTPBadRequest('Invalid File Name')
return objs_to_delete
def get_response_body(self, data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == 'text/plain':
output = ''
for key in sorted(data_dict.keys()):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
if data_format == 'application/json':
data_dict['Errors'] = error_list
return json.dumps(data_dict)
if data_format.endswith('/xml'):
output = '<?xml version="1.0" encoding="UTF-8"?>\n<delete>\n'
for key in sorted(data_dict.keys()):
xml_key = key.replace(' ', '_').lower()
output += '<%s>%s</%s>\n' % (xml_key, data_dict[key], xml_key)
output += '<errors>\n'
output += '\n'.join(
['<object>'
'<name>%s</name><status>%s</status>'
'</object>' % (saxutils.escape(name), status) for
name, status in error_list])
output += '</errors>\n</delete>\n'
return output
raise HTTPNotAcceptable('Invalid output type')
def handle_delete(self, req):
"""
:params req: a swob Request
:raises HTTPException: on unhandled errors
:returns: a swob Response
"""
try:
vrs, account, _junk = split_path(unquote(req.path), 2, 3, True)
except ValueError:
return HTTPNotFound(request=req)
incoming_format = req.headers.get('Content-Type')
if incoming_format and not incoming_format.startswith('text/plain'):
# For now only accept newline separated object names
return HTTPNotAcceptable(request=req)
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type:
return HTTPNotAcceptable(request=req)
objs_to_delete = self.get_objs_to_delete(req)
failed_files = []
success_count = not_found_count = 0
failed_file_response_type = HTTPBadRequest
for obj_to_delete in objs_to_delete:
obj_to_delete = obj_to_delete.strip().lstrip('/')
if not obj_to_delete:
continue
obj_to_delete = unquote(obj_to_delete)
delete_path = '/'.join(['', vrs, account, obj_to_delete])
if not check_utf8(delete_path):
failed_files.append([quote(delete_path),
HTTPPreconditionFailed().status])
continue
new_env = req.environ.copy()
new_env['PATH_INFO'] = delete_path
del(new_env['wsgi.input'])
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s BulkDelete' % req.environ.get('HTTP_USER_AGENT')
delete_obj_req = Request.blank(delete_path, new_env)
resp = delete_obj_req.get_response(self.app)
if resp.status_int // 100 == 2:
success_count += 1
elif resp.status_int == HTTP_NOT_FOUND:
not_found_count += 1
elif resp.status_int == HTTP_UNAUTHORIZED:
return HTTPUnauthorized(request=req)
else:
if resp.status_int // 100 == 5:
failed_file_response_type = HTTPBadGateway
failed_files.append([quote(delete_path), resp.status])
resp_body = self.get_response_body(
out_content_type,
{'Number Deleted': success_count,
'Number Not Found': not_found_count},
failed_files)
if (success_count or not_found_count) and not failed_files:
return HTTPOk(resp_body, content_type=out_content_type)
if failed_files:
return failed_file_response_type(
resp_body, content_type=out_content_type)
return HTTPBadRequest('Invalid bulk delete.')
def handle_extract(self, req, compress_type):
"""
:params req: a swob Request
:params compress_type: specifying the compression type of the tar.
Accepts '', 'gz, or 'bz2'
:raises HTTPException: on unhandled errors
:returns: a swob response to request
"""
success_count = 0
failed_files = []
existing_containers = set()
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type:
return HTTPNotAcceptable(request=req)
if req.content_length is None and \
req.headers.get('transfer-encoding', '').lower() != 'chunked':
return HTTPBadRequest('Invalid request: no content sent.')
try:
vrs, account, extract_base = split_path(
unquote(req.path), 2, 3, True)
except ValueError:
return HTTPNotFound(request=req)
extract_base = extract_base or ''
extract_base = extract_base.rstrip('/')
try:
tar = tarfile.open(mode='r|' + compress_type,
fileobj=req.body_file)
while True:
tar_info = tar.next()
if tar_info is None or \
len(failed_files) >= self.max_failed_extractions:
break
if tar_info.isfile():
obj_path = tar_info.name
if obj_path.startswith('./'):
obj_path = obj_path[2:]
obj_path = obj_path.lstrip('/')
if extract_base:
obj_path = extract_base + '/' + obj_path
if '/' not in obj_path:
continue # ignore base level file
destination = '/'.join(
['', vrs, account, obj_path])
container = obj_path.split('/', 1)[0]
if not check_utf8(destination):
failed_files.append(
[quote(destination[:MAX_PATH_LENGTH]),
HTTPPreconditionFailed().status])
continue
if tar_info.size > MAX_FILE_SIZE:
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]),
HTTPRequestEntityTooLarge().status])
continue
if container not in existing_containers:
try:
self.create_container(
req, '/'.join(['', vrs, account, container]))
existing_containers.add(container)
except CreateContainerError, err:
if err.status_int == HTTP_UNAUTHORIZED:
return HTTPUnauthorized(request=req)
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]),
err.status])
continue
except ValueError:
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]),
HTTP_BAD_REQUEST])
continue
if len(existing_containers) > self.max_containers:
return HTTPBadRequest(
'More than %d base level containers in tar.' %
self.max_containers)
tar_file = tar.extractfile(tar_info)
new_env = req.environ.copy()
new_env['wsgi.input'] = tar_file
new_env['PATH_INFO'] = destination
new_env['CONTENT_LENGTH'] = tar_info.size
new_env['HTTP_USER_AGENT'] = \
'%s BulkExpand' % req.environ.get('HTTP_USER_AGENT')
create_obj_req = Request.blank(destination, new_env)
resp = create_obj_req.get_response(self.app)
if resp.status_int // 100 == 2:
success_count += 1
else:
if resp.status_int == HTTP_UNAUTHORIZED:
return HTTPUnauthorized(request=req)
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]), resp.status])
resp_body = self.get_response_body(
out_content_type,
{'Number Files Created': success_count},
failed_files)
if success_count and not failed_files:
return HTTPCreated(resp_body, content_type=out_content_type)
if failed_files:
return HTTPBadGateway(resp_body, content_type=out_content_type)
return HTTPBadRequest('Invalid Tar File: No Valid Files')
except tarfile.TarError, tar_error:
return HTTPBadRequest('Invalid Tar File: %s' % tar_error)
@catch_http_exception
def __call__(self, env, start_response):
req = Request(env)
extract_type = \
req.headers.get('X-Extract-Archive', '').lower().strip('.')
if extract_type and req.method == 'PUT':
archive_type = {'tar': '', 'tar.gz': 'gz',
'tar.bz2': 'bz2'}.get(extract_type)
if archive_type is not None:
resp = self.handle_extract(req, archive_type)
else:
resp = HTTPBadRequest("Unsupported archive format")
return resp(env, start_response)
if (req.headers.get('X-Bulk-Delete', '').lower() in TRUE_VALUES and
req.method == 'DELETE'):
return self.handle_delete(req)(env, start_response)
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
def bulk_filter(app):
return Bulk(app, conf)
return bulk_filter

View File

@ -168,12 +168,15 @@ class RateLimitMiddleware(object):
def handle_ratelimit(self, req, account_name, container_name, obj_name):
'''
Performs rate limiting and account white/black listing. Sleeps
if necessary.
if necessary. If self.memcache_client is not set, immediately returns
None.
:param account_name: account name from path
:param container_name: container name from path
:param obj_name: object name from path
'''
if not self.memcache_client:
return None
if account_name in self.ratelimit_blacklist:
self.logger.error(_('Returning 497 because of blacklisting: %s'),
account_name)

View File

@ -1024,13 +1024,34 @@ class Response(object):
return app_iter
class HTTPException(Response, Exception):
def __init__(self, *args, **kwargs):
Response.__init__(self, *args, **kwargs)
Exception.__init__(self, self.status)
def catch_http_exception(func):
"""
A decorator function to wrap a __call__ function. If an HTTPException
is raised it will be appropriately returned as the response.
"""
def catch_exception_func(self, env, start_response):
try:
return func(self, env, start_response)
except HTTPException, err_resp:
return err_resp(env, start_response)
return catch_exception_func
class StatusMap(object):
"""
A dict-like object that returns Response subclasses/factory functions
A dict-like object that returns HTTPException subclasses/factory functions
where the given key is the status code.
"""
def __getitem__(self, key):
return partial(Response, status=key)
return partial(HTTPException, status=key)
status_map = StatusMap()
@ -1057,5 +1078,6 @@ HTTPUnprocessableEntity = status_map[422]
HTTPClientDisconnect = status_map[499]
HTTPServerError = status_map[500]
HTTPInternalServerError = status_map[500]
HTTPBadGateway = status_map[502]
HTTPServiceUnavailable = status_map[503]
HTTPInsufficientStorage = status_map[507]

View File

@ -0,0 +1,582 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import tarfile
import urllib
from shutil import rmtree
from tempfile import mkdtemp
from StringIO import StringIO
from mock import patch
from swift.common.middleware import bulk
from swift.common.swob import Request, Response, HTTPException
from swift.common.utils import json
class FakeApp(object):
def __init__(self):
self.calls = 0
self.delete_paths = []
def __call__(self, env, start_response):
self.calls += 1
if env['PATH_INFO'].startswith('/unauth/'):
return Response(status=401)(env, start_response)
if env['PATH_INFO'].startswith('/create_cont/'):
return Response(status='201 Created')(env, start_response)
if env['PATH_INFO'].startswith('/create_cont_fail/'):
return Response(status='404 Not Found')(env, start_response)
if env['PATH_INFO'].startswith('/create_obj_unauth/'):
if env['PATH_INFO'].endswith('/cont'):
return Response(status='201 Created')(env, start_response)
return Response(status=401)(env, start_response)
if env['PATH_INFO'].startswith('/tar_works/'):
if len(env['PATH_INFO']) > 100:
return Response(status='400 Bad Request')(env, start_response)
return Response(status='201 Created')(env, start_response)
if env['PATH_INFO'].startswith('/delete_works/'):
self.delete_paths.append(env['PATH_INFO'])
if len(env['PATH_INFO']) > 100:
return Response(status='400 Bad Request')(env, start_response)
if env['PATH_INFO'].endswith('404'):
return Response(status='404 Not Found')(env, start_response)
if env['PATH_INFO'].endswith('badutf8'):
return Response(
status='412 Precondition Failed')(env, start_response)
return Response(status='204 No Content')(env, start_response)
if env['PATH_INFO'].startswith('/delete_cont_fail/'):
return Response(status='409 Conflict')(env, start_response)
if env['PATH_INFO'].startswith('/broke/'):
return Response(status='500 Internal Error')(env, start_response)
def build_dir_tree(start_path, tree_obj):
if isinstance(tree_obj, list):
for obj in tree_obj:
build_dir_tree(start_path, obj)
if isinstance(tree_obj, dict):
for dir_name, obj in tree_obj.iteritems():
dir_path = os.path.join(start_path, dir_name)
os.mkdir(dir_path)
build_dir_tree(dir_path, obj)
if isinstance(tree_obj, unicode):
tree_obj = tree_obj.encode('utf8')
if isinstance(tree_obj, str):
obj_path = os.path.join(start_path, tree_obj)
with open(obj_path, 'w+') as tree_file:
tree_file.write('testing')
def build_tar_tree(tar, start_path, tree_obj, base_path=''):
if isinstance(tree_obj, list):
for obj in tree_obj:
build_tar_tree(tar, start_path, obj, base_path=base_path)
if isinstance(tree_obj, dict):
for dir_name, obj in tree_obj.iteritems():
dir_path = os.path.join(start_path, dir_name)
tar_info = tarfile.TarInfo(dir_path[len(base_path):])
tar_info.type = tarfile.DIRTYPE
tar.addfile(tar_info)
build_tar_tree(tar, dir_path, obj, base_path=base_path)
if isinstance(tree_obj, unicode):
tree_obj = tree_obj.encode('utf8')
if isinstance(tree_obj, str):
obj_path = os.path.join(start_path, tree_obj)
tar_info = tarfile.TarInfo('./' + obj_path[len(base_path):])
tar.addfile(tar_info)
class TestUntar(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.bulk = bulk.filter_factory({})(self.app)
self.testdir = os.path.join(mkdtemp(), 'tmp_test_bulk')
os.mkdir(self.testdir)
def tearDown(self):
self.app.calls = 0
rmtree(self.testdir)
def test_create_container_for_path(self):
req = Request.blank('/')
self.assertEquals(
self.bulk.create_container(req, '/create_cont/acc/cont'),
None)
self.assertRaises(
bulk.CreateContainerError,
self.bulk.create_container,
req, '/create_cont_fail/acc/cont')
def test_extract_tar_works(self):
for compress_format in ['', 'gz', 'bz2']:
base_name = 'base_works_%s' % compress_format
dir_tree = [
{base_name: [{'sub_dir1': ['sub1_file1', 'sub1_file2']},
{'sub_dir2': ['sub2_file1', u'test obj \u2661']},
'sub_file1',
{'sub_dir3': [{'sub4_dir1': '../sub4 file1'}]},
{'sub_dir4': None},
]}]
build_dir_tree(self.testdir, dir_tree)
mode = 'w'
extension = ''
if compress_format:
mode += ':' + compress_format
extension += '.' + compress_format
tar = tarfile.open(name=os.path.join(self.testdir,
'tar_works.tar' + extension),
mode=mode)
tar.add(os.path.join(self.testdir, base_name))
tar.close()
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, compress_format)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Files Created'], 6)
# test out xml
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'application/xml'})
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, compress_format)
self.assertEquals(resp.status_int, 201)
self.assert_('<number_files_created>6</number_files_created>' in
resp.body)
# test out nonexistent format
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'good_xml'})
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
resp = self.bulk.handle_extract(req, compress_format)
self.assertEquals(resp.status_int, 406)
def test_extract_call(self):
base_name = 'base_works_gz'
dir_tree = [
{base_name: [{'sub_dir1': ['sub1_file1', 'sub1_file2']},
{'sub_dir2': ['sub2_file1', 'sub2_file2']},
'sub_file1',
{'sub_dir3': [{'sub4_dir1': 'sub4_file1'}]}]}]
build_dir_tree(self.testdir, dir_tree)
tar = tarfile.open(name=os.path.join(self.testdir,
'tar_works.tar.gz'),
mode='w:gz')
tar.add(os.path.join(self.testdir, base_name))
tar.close()
def fake_start_response(*args, **kwargs):
pass
req = Request.blank('/tar_works/acc/cont/')
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar.gz'))
self.bulk(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.app.calls = 0
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar.gz'))
req.headers['x-extract-archive'] = 'tar.gz'
req.headers['transfer-encoding'] = 'Chunked'
req.method = 'PUT'
self.bulk(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 7)
self.app.calls = 0
req.headers['x-extract-archive'] = 'bad'
t = self.bulk(req.environ, fake_start_response)
self.assertEquals(t[0], "Unsupported archive format")
tar = tarfile.open(name=os.path.join(self.testdir,
'tar_works.tar'),
mode='w')
tar.add(os.path.join(self.testdir, base_name))
tar.close()
self.app.calls = 0
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar'))
req.headers['x-extract-archive'] = 'tar'
t = self.bulk(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 7)
def test_bad_container(self):
req = Request.blank('/invalid/', body='')
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 404)
req = Request.blank('/create_cont_fail/acc/cont')
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 400)
def build_tar(self, dir_tree=None):
if not dir_tree:
dir_tree = [
{'base_fails1': [{'sub_dir1': ['sub1_file1']},
{'sub_dir2': ['sub2_file1', 'sub2_file2']},
'f' * 101,
{'sub_dir3': [{'sub4_dir1': 'sub4_file1'}]}]}]
tar = tarfile.open(name=os.path.join(self.testdir, 'tar_fails.tar'),
mode='w')
build_tar_tree(tar, self.testdir, dir_tree,
base_path=self.testdir + '/')
tar.close()
return tar
def test_extract_tar_with_basefile(self):
dir_tree = [
'base_lvl_file', 'another_base_file',
{'base_fails1': [{'sub_dir1': ['sub1_file1']},
{'sub_dir2': ['sub2_file1', 'sub2_file2']},
{'sub_dir3': [{'sub4_dir1': 'sub4_file1'}]}]}]
tar = self.build_tar(dir_tree)
req = Request.blank('/tar_works/acc/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Files Created'], 4)
def test_extract_tar_fail_cont_401(self):
tar = self.build_tar()
req = Request.blank('/unauth/acc/')
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 401)
def test_extract_tar_fail_obj_401(self):
tar = self.build_tar()
req = Request.blank('/create_obj_unauth/acc/cont/')
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 401)
def test_extract_tar_fail_obj_name_len(self):
tar = self.build_tar()
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Files Created'], 4)
self.assertEquals(resp_data['Errors'][0][0],
'/tar_works/acc/cont/base_fails1/' + ('f' * 101))
def test_extract_tar_fail_compress_type(self):
tar = self.build_tar()
req = Request.blank('/tar_works/acc/cont/')
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, 'gz')
self.assertEquals(resp.status_int, 400)
self.assertEquals(self.app.calls, 0)
def test_extract_tar_fail_max_file_name_length(self):
tar = self.build_tar()
with patch.object(self.bulk, 'max_failed_extractions', 1):
self.app.calls = 0
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
self.assertEquals(self.app.calls, 5)
self.assertEquals(resp_data['Errors'][0][0],
'/tar_works/acc/cont/base_fails1/' + ('f' * 101))
@patch.object(bulk, 'MAX_FILE_SIZE', 4)
def test_extract_tar_fail_max_file_size(self):
tar = self.build_tar()
dir_tree = [{'test': [{'sub_dir1': ['sub1_file1']}]}]
build_dir_tree(self.testdir, dir_tree)
tar = tarfile.open(name=os.path.join(self.testdir,
'tar_works.tar'),
mode='w')
tar.add(os.path.join(self.testdir, 'test'))
tar.close()
self.app.calls = 0
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
self.assert_(resp_data['Errors'][0][1].startswith('413'))
def test_extract_tar_fail_max_cont(self):
dir_tree = [{'sub_dir1': ['sub1_file1']},
{'sub_dir2': ['sub2_file1', 'sub2_file2']},
'f' * 101,
{'sub_dir3': [{'sub4_dir1': 'sub4_file1'}]}]
tar = self.build_tar(dir_tree)
with patch.object(self.bulk, 'max_containers', 1):
self.app.calls = 0
body = open(os.path.join(self.testdir, 'tar_fails.tar')).read()
req = Request.blank('/tar_works/acc/', body=body)
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
self.assertEquals(self.app.calls, 3)
self.assertEquals(resp.status_int, 400)
def test_extract_tar_fail_create_cont(self):
dir_tree = [{'base_fails1': [
{'sub_dir1': ['sub1_file1']},
{'sub_dir2': ['sub2_file1', 'sub2_file2']},
'f\xde',
{'./sub_dir3': [{'sub4_dir1': 'sub4_file1'}]}]}]
tar = self.build_tar(dir_tree)
req = Request.blank('/create_cont_fail/acc/cont/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
self.assertEquals(self.app.calls, 4)
self.assertEquals(len(resp_data['Errors']), 5)
def test_extract_tar_fail_create_cont_value_err(self):
tar = self.build_tar()
req = Request.blank('/create_cont_fail/acc/cont/',
headers={'Accept': 'application/json'})
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
def bad_create(req, path):
raise ValueError('Test')
with patch.object(self.bulk, 'create_container', bad_create):
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
self.assertEquals(self.app.calls, 0)
self.assertEquals(len(resp_data['Errors']), 5)
def test_get_response_body(self):
self.assertRaises(
HTTPException, self.bulk.get_response_body, 'badformat', {}, [])
xml_body = self.bulk.get_response_body(
'text/xml', {'hey': 'there'}, [['json > xml', '202 Accepted']])
self.assert_('&gt' in xml_body)
class TestDelete(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.bulk = bulk.filter_factory({})(self.app)
def tearDown(self):
self.app.calls = 0
self.app.delete_paths = []
def test_bulk_delete_works(self):
req = Request.blank('/delete_works/AUTH_Acc', body='/c/f\n/c/f404',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/f', '/delete_works/AUTH_Acc/c/f404'])
self.assertEquals(self.app.calls, 2)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Deleted'], 1)
self.assertEquals(resp_data['Number Not Found'], 1)
def test_bulk_delete_bad_accept_and_content_type(self):
req = Request.blank('/delete_works/AUTH_Acc',
headers={'Accept': 'badformat'})
req.method = 'DELETE'
req.environ['wsgi.input'] = StringIO('/c/f\n/c/f404')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 406)
req = Request.blank('/delete_works/AUTH_Acc',
headers={'Accept': 'application/json',
'Content-Type': 'text/xml'})
req.method = 'DELETE'
req.environ['wsgi.input'] = StringIO('/c/f\n/c/f404')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 406)
def test_bulk_delete_call(self):
def fake_start_response(*args, **kwargs):
pass
req = Request.blank('/delete_works/AUTH_Acc')
req.method = 'DELETE'
req.headers['x-bulk-delete'] = 't'
req.headers['Transfer-Encoding'] = 'chunked'
req.environ['wsgi.input'] = StringIO('/c/f')
self.bulk(req.environ, fake_start_response)
self.assertEquals(
self.app.delete_paths, ['/delete_works/AUTH_Acc/c/f'])
self.assertEquals(self.app.calls, 1)
def test_bulk_delete_get_objs(self):
req = Request.blank('/delete_works/AUTH_Acc', body='1\r\n2\r\n')
req.method = 'DELETE'
with patch.object(self.bulk, 'max_deletes_per_request', 2):
results = self.bulk.get_objs_to_delete(req)
self.assertEquals(results, ['1\r', '2\r'])
with patch.object(bulk, 'MAX_PATH_LENGTH', 2):
results = []
req.environ['wsgi.input'] = StringIO('1\n2\n3')
results = self.bulk.get_objs_to_delete(req)
self.assertEquals(results, ['1', '2', '3'])
def test_bulk_delete_works_extra_newlines(self):
req = Request.blank('/delete_works/AUTH_Acc',
body='/c/f\n\n\n/c/f404\n\n\n',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/f', '/delete_works/AUTH_Acc/c/f404'])
self.assertEquals(self.app.calls, 2)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Deleted'], 1)
self.assertEquals(resp_data['Number Not Found'], 1)
def test_bulk_delete_too_many_newlines(self):
req = Request.blank('/delete_works/AUTH_Acc')
req.method = 'DELETE'
data = '\n\n' * self.bulk.max_deletes_per_request
req.environ['wsgi.input'] = StringIO(data)
req.content_length = len(data)
try:
self.bulk.handle_delete(req)
except HTTPException, err:
self.assertEquals(err.status_int, 413)
else:
self.fail('413 not raised')
def test_bulk_delete_raised_error(self):
def fake_start_response(*args, **kwargs):
self.assertTrue(args[0].startswith('413'))
req = Request.blank('/delete_works/AUTH_Acc')
req.method = 'DELETE'
req.headers['x-bulk-delete'] = 't'
data = '\n\n' * self.bulk.max_deletes_per_request
req.environ['wsgi.input'] = StringIO(data)
req.content_length = len(data)
self.bulk(req.environ, fake_start_response)
def test_bulk_delete_works_unicode(self):
body = (u'/c/ obj \u2661\r\n'.encode('utf8') +
'c/ objbadutf8\r\n' +
'/c/f\xdebadutf8\n')
req = Request.blank('/delete_works/AUTH_Acc', body=body,
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/ obj \xe2\x99\xa1',
'/delete_works/AUTH_Acc/c/ objbadutf8'])
self.assertEquals(self.app.calls, 2)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Deleted'], 1)
self.assertEquals(len(resp_data['Errors']), 2)
self.assertEquals(
resp_data['Errors'],
[[urllib.quote('/delete_works/AUTH_Acc/c/ objbadutf8'),
'412 Precondition Failed'],
[urllib.quote('/delete_works/AUTH_Acc/c/f\xdebadutf8'),
'412 Precondition Failed']])
def test_bulk_delete_no_body(self):
req = Request.blank('/unauth/AUTH_acc/')
self.assertRaises(HTTPException, self.bulk.handle_delete, req)
def test_bulk_delete_no_files_in_body(self):
req = Request.blank('/unauth/AUTH_acc/', body=' ')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 400)
def test_bulk_delete_unauth(self):
req = Request.blank('/unauth/AUTH_acc/', body='/c/f\n')
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 401)
def test_bulk_delete_500_resp(self):
req = Request.blank('/broke/AUTH_acc/', body='/c/f\n')
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 502)
def test_bulk_delete_bad_path(self):
req = Request.blank('/delete_cont_fail/')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 404)
def test_bulk_delete_container_delete(self):
req = Request.blank('/delete_cont_fail/AUTH_Acc', body='c\n',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Deleted'], 0)
self.assertEquals(resp_data['Errors'][0][1], '409 Conflict')
def test_bulk_delete_bad_file_too_long(self):
req = Request.blank('/delete_works/AUTH_Acc',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
data = '/c/f\nc/' + ('1' * bulk.MAX_PATH_LENGTH) + '\n/c/f'
req.environ['wsgi.input'] = StringIO(data)
req.headers['Transfer-Encoding'] = 'chunked'
resp = self.bulk.handle_delete(req)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Deleted'], 2)
self.assertEquals(resp_data['Errors'][0][1], '400 Bad Request')
def test_bulk_delete_bad_file_over_twice_max_length(self):
body = '/c/f\nc/' + ('123456' * bulk.MAX_PATH_LENGTH) + '\n'
req = Request.blank('/delete_works/AUTH_Acc', body=body)
req.method = 'DELETE'
try:
self.bulk.handle_delete(req)
except HTTPException, err:
self.assertEquals(err.status_int, 400)
else:
self.fail('400 not raised')
if __name__ == '__main__':
unittest.main()