Merge "Add test for copy using a Range header"

This commit is contained in:
Jenkins 2016-09-06 11:16:19 +00:00 committed by Gerrit Code Review
commit 7973bf8e9d
2 changed files with 124 additions and 7 deletions

View File

@ -1275,7 +1275,17 @@ class TestFile(Base):
self.assertTrue(file_item.copy( self.assertTrue(file_item.copy(
'%s%s' % (prefix, cont), dest_filename, hdrs=extra_hdrs)) '%s%s' % (prefix, cont), dest_filename, hdrs=extra_hdrs))
self.assertIn(dest_filename, cont.files()) # verify container listing for copy
listing = cont.files(parms={'format': 'json'})
for obj in listing:
if obj['name'] == dest_filename:
break
else:
self.fail('Failed to find %s in listing' % dest_filename)
self.assertEqual(file_item.size, obj['bytes'])
self.assertEqual(file_item.etag, obj['hash'])
self.assertEqual(file_item.content_type, obj['content_type'])
file_copy = cont.file(dest_filename) file_copy = cont.file(dest_filename)
@ -1315,6 +1325,19 @@ class TestFile(Base):
self.assertIn(k.lower(), resp_headers) self.assertIn(k.lower(), resp_headers)
self.assertEqual(v, resp_headers[k.lower()]) self.assertEqual(v, resp_headers[k.lower()])
# verify container listing for copy
listing = cont.files(parms={'format': 'json'})
for obj in listing:
if obj['name'] == dest_filename:
break
else:
self.fail('Failed to find %s in listing' % dest_filename)
self.assertEqual(file_item.size, obj['bytes'])
self.assertEqual(file_item.etag, obj['hash'])
self.assertEqual(
'application/test-changed', obj['content_type'])
# repeat copy with X-Fresh-Metadata header - existing user # repeat copy with X-Fresh-Metadata header - existing user
# metadata should not be copied, new completely replaces it. # metadata should not be copied, new completely replaces it.
extra_hdrs = {'Content-Type': 'application/test-updated', extra_hdrs = {'Content-Type': 'application/test-updated',
@ -1337,6 +1360,63 @@ class TestFile(Base):
for k in ('Content-Disposition', 'Content-Encoding'): for k in ('Content-Disposition', 'Content-Encoding'):
self.assertNotIn(k.lower(), resp_headers) self.assertNotIn(k.lower(), resp_headers)
# verify container listing for copy
listing = cont.files(parms={'format': 'json'})
for obj in listing:
if obj['name'] == dest_filename:
break
else:
self.fail('Failed to find %s in listing' % dest_filename)
self.assertEqual(file_item.size, obj['bytes'])
self.assertEqual(file_item.etag, obj['hash'])
self.assertEqual(
'application/test-updated', obj['content_type'])
def testCopyRange(self):
# makes sure to test encoded characters
source_filename = 'dealde%2Fl04 011e%204c8df/flash.png'
file_item = self.env.container.file(source_filename)
metadata = {Utils.create_ascii_name(): Utils.create_name()}
data = file_item.write_random(1024)
file_item.sync_metadata(metadata)
file_item.initialize()
dest_cont = self.env.account.container(Utils.create_name())
self.assertTrue(dest_cont.create())
expected_body = data[100:201]
expected_etag = hashlib.md5(expected_body)
# copy both from within and across containers
for cont in (self.env.container, dest_cont):
# copy both with and without initial slash
for prefix in ('', '/'):
dest_filename = Utils.create_name()
file_item.copy('%s%s' % (prefix, cont), dest_filename,
hdrs={'Range': 'bytes=100-200'})
self.assertEqual(201, file_item.conn.response.status)
# verify container listing for copy
listing = cont.files(parms={'format': 'json'})
for obj in listing:
if obj['name'] == dest_filename:
break
else:
self.fail('Failed to find %s in listing' % dest_filename)
self.assertEqual(101, obj['bytes'])
self.assertEqual(expected_etag.hexdigest(), obj['hash'])
self.assertEqual(file_item.content_type, obj['content_type'])
# verify copy object
copy_file_item = cont.file(dest_filename)
self.assertEqual(expected_body, copy_file_item.read())
self.assertTrue(copy_file_item.initialize())
self.assertEqual(metadata, copy_file_item.metadata)
def testCopyAccount(self): def testCopyAccount(self):
# makes sure to test encoded characters # makes sure to test encoded characters
source_filename = 'dealde%2Fl04 011e%204c8df/flash.png' source_filename = 'dealde%2Fl04 011e%204c8df/flash.png'
@ -1427,10 +1507,9 @@ class TestFile(Base):
# invalid destination container # invalid destination container
file_item = self.env.container.file(source_filename) file_item = self.env.container.file(source_filename)
self.assertTrue( self.assertFalse(file_item.copy(
not file_item.copy( '%s%s' % (prefix, Utils.create_name()),
'%s%s' % (prefix, Utils.create_name()), Utils.create_name()))
Utils.create_name()))
def testCopyAccount404s(self): def testCopyAccount404s(self):
acct = self.env.conn.account_name acct = self.env.conn.account_name

View File

@ -1410,6 +1410,30 @@ class TestServerSideCopyMiddleware(unittest.TestCase):
self.assertEqual('Not Bar', req_headers.get('X-Foo')) self.assertEqual('Not Bar', req_headers.get('X-Foo'))
self.assertIn('X-Fresh-Metadata', req_headers) self.assertIn('X-Fresh-Metadata', req_headers)
def test_COPY_with_single_range(self):
# verify that source etag is not copied when copying a range
self.app.register('GET', '/v1/a/c/o', swob.HTTPOk,
{'etag': 'bogus etag'}, "abcdefghijklmnop")
self.app.register('PUT', '/v1/a/c1/o', swob.HTTPCreated, {})
req = swob.Request.blank(
'/v1/a/c/o', method='COPY',
headers={'Destination': 'c1/o',
'Range': 'bytes=5-10'})
status, headers, body = self.call_ssc(req)
self.assertEqual(status, '201 Created')
calls = self.app.calls_with_headers
self.assertEqual(2, len(calls))
method, path, req_headers = calls[1]
self.assertEqual('PUT', method)
self.assertEqual('/v1/a/c1/o', path)
self.assertNotIn('etag', (h.lower() for h in req_headers))
self.assertEqual('6', req_headers['content-length'])
req = swob.Request.blank('/v1/a/c1/o', method='GET')
status, headers, body = self.call_ssc(req)
self.assertEqual('fghijk', body)
class TestServerSideCopyConfiguration(unittest.TestCase): class TestServerSideCopyConfiguration(unittest.TestCase):
@ -1498,7 +1522,7 @@ class TestServerSideCopyMiddlewareWithEC(unittest.TestCase):
self.policy = POLICIES.default self.policy = POLICIES.default
self.app.container_info = dict(self.container_info) self.app.container_info = dict(self.container_info)
def test_COPY_with_ranges(self): def test_COPY_with_single_range(self):
req = swob.Request.blank( req = swob.Request.blank(
'/v1/a/c/o', method='COPY', '/v1/a/c/o', method='COPY',
headers={'Destination': 'c1/o', headers={'Destination': 'c1/o',
@ -1528,10 +1552,24 @@ class TestServerSideCopyMiddlewareWithEC(unittest.TestCase):
'X-Obj-Metadata-Footer': 'yes', 'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes' 'X-Obj-Multiphase-Commit': 'yes'
} }
put_hdrs = []
def capture_conn(host, port, dev, part, method, path, *args, **kwargs):
if method == 'PUT':
put_hdrs.append(args[0])
with set_http_connect(*status_codes, body_iter=body_iter, with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers, expect_headers=expect_headers): headers=headers, expect_headers=expect_headers,
give_connect=capture_conn):
resp = req.get_response(self.ssc) resp = req.get_response(self.ssc)
self.assertEqual(resp.status_int, 201) self.assertEqual(resp.status_int, 201)
expected_puts = POLICIES.default.ec_ndata + POLICIES.default.ec_nparity
self.assertEqual(expected_puts, len(put_hdrs))
for hdrs in put_hdrs:
# etag should not be copied from source
self.assertNotIn('etag', (h.lower() for h in hdrs))
def test_COPY_with_invalid_ranges(self): def test_COPY_with_invalid_ranges(self):
# real body size is segment_size - 10 (just 1 segment) # real body size is segment_size - 10 (just 1 segment)