sanity check copy tests

Change-Id: I4880e0a790dabb84e8c55fe11c36df41aeb0c739
This commit is contained in:
Clay Gerrard 2014-03-03 09:08:43 -08:00
parent b9434b5c3e
commit 20466d80fa
2 changed files with 271 additions and 252 deletions

View File

@ -568,4 +568,6 @@ def fake_http_connect(*code_iter, **kwargs):
return FakeConn(status, etag, body=body, timestamp=timestamp,
expect_status=expect_status, headers=headers)
connect.code_iter = code_iter
return connect

View File

@ -54,7 +54,8 @@ from swift.proxy.controllers.base import get_container_memcache_key, \
get_account_memcache_key, cors_validation
import swift.proxy.controllers
from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.swob import Request, Response, HTTPUnauthorized
from swift.common.swob import Request, Response, HTTPUnauthorized, \
HTTPException
# mocks
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
@ -225,6 +226,7 @@ def set_http_connect(*args, **kwargs):
swift.proxy.controllers.obj.http_connect = new_connect
swift.proxy.controllers.account.http_connect = new_connect
swift.proxy.controllers.container.http_connect = new_connect
return new_connect
# tests
@ -2208,307 +2210,322 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
def test_copy_from(self):
@contextmanager
def controller_context(self, req, *args, **kwargs):
_v, account, container, obj = utils.split_path(req.path, 4, 4, True)
controller = proxy_server.ObjectController(self.app, account,
container, obj)
self.app.update_request(req)
self.app.memcache.store = {}
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
# initial source object PUT
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
self.app.update_request(req)
set_http_connect(200, 200, 201, 201, 201)
# acct cont obj obj obj
new_connect = set_http_connect(*args, **kwargs)
yield controller
unused_status_list = []
while True:
try:
unused_status_list.append(new_connect.code_iter.next())
except StopIteration:
break
if unused_status_list:
raise self.fail('UN-USED STATUS CODES: %r' %
unused_status_list)
def test_basic_put_with_x_copy_from(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
# basic copy
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o'})
self.app.update_request(req)
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_basic_put_with_x_copy_from_accross_container(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c2/o'})
status_list = (200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont conc objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c2/o')
# non-zero content length
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5',
'X-Copy-From': 'c/o'})
self.app.update_request(req)
set_http_connect(200, 200, 200, 200, 200, 200, 200)
# acct cont acct cont objc objc objc
self.app.memcache.store = {}
def test_copy_non_zero_content_length(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5',
'X-Copy-From': 'c/o'})
status_list = (200, 200)
# acct cont
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
self.assertEquals(resp.status_int, 400)
# extra source path parsing
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o/o2'})
req.account = 'a'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_copy_with_slashes_in_x_copy_from(self):
# extra source path parsing
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o/o2'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
# space in soure path
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o%20o2'})
req.account = 'a'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_copy_with_spaces_in_x_copy_from(self):
# space in soure path
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': 'c/o%20o2'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o%20o2')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o%20o2')
# repeat tests with leading /
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_copy_with_leading_slash_in_x_copy_from(self):
# repeat tests with leading /
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o/o2'})
req.account = 'a'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_copy_with_leading_slash_and_slashes_in_x_copy_from(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o/o2'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
# negative tests
def test_copy_with_no_object_in_x_copy_from(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c'})
status_list = (200, 200)
# acct cont
with self.controller_context(req, *status_list) as controller:
try:
controller.PUT(req)
except HTTPException as resp:
self.assertEquals(resp.status_int // 100, 4) # client error
else:
raise self.fail('Invalid X-Copy-From did not raise '
'client error')
# invalid x-copy-from path
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c'})
self.app.update_request(req)
self.app.memcache.store = {}
def test_copy_server_error_reading_source(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
status_list = (200, 200, 503, 503, 503)
# acct cont objc objc objc
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int // 100, 4) # client error
self.assertEquals(resp.status_int, 503)
# server error
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
set_http_connect(200, 200, 503, 503, 503)
# acct cont objc objc objc
self.app.memcache.store = {}
def test_copy_not_found_reading_source(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
# not found
status_list = (200, 200, 404, 404, 404)
# acct cont objc objc objc
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 503)
self.assertEquals(resp.status_int, 404)
# not found
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
set_http_connect(200, 200, 404, 404, 404)
# acct cont objc objc objc
self.app.memcache.store = {}
def test_copy_with_some_missing_sources(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
status_list = (200, 200, 404, 404, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 404)
self.assertEquals(resp.status_int, 201)
# some missing containers
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
self.app.update_request(req)
set_http_connect(200, 200, 404, 404, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_copy_with_object_metadata(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o',
'X-Object-Meta-Ours': 'okay'})
# test object metadata
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers.get('x-object-meta-test'), 'testing')
self.assertEquals(resp.headers.get('x-object-meta-ours'), 'okay')
self.assertEquals(resp.headers.get('x-delete-at'), '9876543210')
# test object meta data
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o',
'X-Object-Meta-Ours': 'okay'})
self.app.update_request(req)
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers.get('x-object-meta-test'),
'testing')
self.assertEquals(resp.headers.get('x-object-meta-ours'), 'okay')
self.assertEquals(resp.headers.get('x-delete-at'), '9876543210')
def test_copy_source_larger_than_max_file_size(self):
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
# copy-from object is too large to fit in target object
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
# copy-from object is too large to fit in target object
class LargeResponseBody(object):
def __len__(self):
return MAX_FILE_SIZE + 1
def __getitem__(self, key):
return ''
copy_from_obj_body = LargeResponseBody()
status_list = (200, 200, 200, 200, 200)
# acct cont objc objc objc
kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list,
**kwargs) as controller:
self.app.update_request(req)
class LargeResponseBody(object):
def __len__(self):
return MAX_FILE_SIZE + 1
def __getitem__(self, key):
return ''
copy_from_obj_body = LargeResponseBody()
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
body=copy_from_obj_body)
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 413)
def test_COPY(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0'})
req.account = 'a'
set_http_connect(200, 200, 201, 201, 201)
# acct cont obj obj obj
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
req.account = 'a'
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_basic_COPY(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o2'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
req = Request.blank('/v1/a/c/o/o2',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_COPY_accross_containers(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c2/o'})
status_list = (200, 200, 200, 200, 200, 200, 201, 201, 201)
# acct cont c2 objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_COPY_source_with_slashes_in_name(self):
req = Request.blank('/v1/a/c/o/o2',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
req = Request.blank('/v1/a/c/o/o2',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_COPY_destination_leading_slash(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c_o'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200)
# acct cont
self.app.memcache.store = {}
def test_COPY_source_with_slashes_destination_leading_slash(self):
req = Request.blank('/v1/a/c/o/o2',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 412)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200, 503, 503, 503)
# acct cont objc objc objc
self.app.memcache.store = {}
def test_COPY_no_object_in_destination(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c_o'})
status_list = [] # no requests needed
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 503)
self.assertEquals(resp.status_int, 412)
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200, 404, 404, 404)
# acct cont objc objc objc
self.app.memcache.store = {}
def test_COPY_server_error_reading_source(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
status_list = (200, 200, 503, 503, 503)
# acct cont objc objc objc
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 404)
self.assertEquals(resp.status_int, 503)
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200, 404, 404, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_COPY_not_found_reading_source(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
status_list = (200, 200, 404, 404, 404)
# acct cont objc objc objc
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.status_int, 404)
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o',
'X-Object-Meta-Ours': 'okay'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
def test_COPY_with_some_missing_sources(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
status_list = (200, 200, 404, 404, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers.get('x-object-meta-test'),
'testing')
self.assertEquals(resp.headers.get('x-object-meta-ours'), 'okay')
self.assertEquals(resp.headers.get('x-delete-at'), '9876543210')
self.assertEquals(resp.status_int, 201)
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
self.app.update_request(req)
class LargeResponseBody(object):
def __len__(self):
return MAX_FILE_SIZE + 1
def __getitem__(self, key):
return ''
copy_from_obj_body = LargeResponseBody()
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201,
body=copy_from_obj_body)
self.app.memcache.store = {}
def test_COPY_with_metadata(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o',
'X-Object-Meta-Ours': 'okay'})
status_list = (200, 200, 200, 200, 200, 201, 201, 201)
# acct cont objc objc objc obj obj obj
with self.controller_context(req, *status_list) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 413)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers.get('x-object-meta-test'),
'testing')
self.assertEquals(resp.headers.get('x-object-meta-ours'), 'okay')
self.assertEquals(resp.headers.get('x-delete-at'), '9876543210')
def test_COPY_source_larger_than_max_file_size(self):
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
class LargeResponseBody(object):
def __len__(self):
return MAX_FILE_SIZE + 1
def __getitem__(self, key):
return ''
copy_from_obj_body = LargeResponseBody()
status_list = (200, 200, 200, 200, 200)
# acct cont objc objc objc
kwargs = dict(body=copy_from_obj_body)
with self.controller_context(req, *status_list,
**kwargs) as controller:
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 413)
def test_COPY_newest(self):
with save_globals():