#!/usr/bin/python # Copyright (c) 2010-2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import json import unittest from nose import SkipTest from uuid import uuid4 from test.functional import check_response, retry, requires_acls, \ load_constraint, requires_policies import test.functional as tf class TestContainer(unittest.TestCase): def setUp(self): if tf.skip: raise SkipTest self.name = uuid4().hex # this container isn't created by default, but will be cleaned up self.container = uuid4().hex def put(url, token, parsed, conn): conn.request('PUT', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) self.max_meta_count = load_constraint('max_meta_count') self.max_meta_name_length = load_constraint('max_meta_name_length') self.max_meta_overall_size = load_constraint('max_meta_overall_size') self.max_meta_value_length = load_constraint('max_meta_value_length') def tearDown(self): if tf.skip: raise SkipTest def get(url, token, parsed, conn, container): conn.request( 'GET', parsed.path + '/' + container + '?format=json', '', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, container, obj): conn.request( 'DELETE', '/'.join([parsed.path, container, obj['name']]), '', {'X-Auth-Token': token}) return check_response(conn) for container in (self.name, self.container): while True: resp = retry(get, container) body = resp.read() if resp.status == 404: break self.assert_(resp.status // 100 == 2, resp.status) objs = json.loads(body) if not objs: break for obj in objs: resp = retry(delete, container, obj) resp.read() self.assertEqual(resp.status, 204) def delete(url, token, parsed, conn, container): conn.request('DELETE', parsed.path + '/' + container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete, self.name) resp.read() self.assertEqual(resp.status, 204) # container may have not been created resp = retry(delete, self.container) resp.read() self.assert_(resp.status in (204, 404)) def test_multi_metadata(self): if tf.skip: raise SkipTest def post(url, token, parsed, conn, name, value): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, name: value}) return check_response(conn) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(post, 'X-Container-Meta-One', '1') resp.read() self.assertEqual(resp.status, 204) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-one'), '1') resp = retry(post, 'X-Container-Meta-Two', '2') resp.read() self.assertEqual(resp.status, 204) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-one'), '1') self.assertEqual(resp.getheader('x-container-meta-two'), '2') def test_unicode_metadata(self): if tf.skip: raise SkipTest def post(url, token, parsed, conn, name, value): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, name: value}) return check_response(conn) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) uni_key = u'X-Container-Meta-uni\u0E12' uni_value = u'uni\u0E12' if (tf.web_front_end == 'integral'): resp = retry(post, uni_key, '1') resp.read() self.assertEqual(resp.status, 204) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader(uni_key.encode('utf-8')), '1') resp = retry(post, 'X-Container-Meta-uni', uni_value) resp.read() self.assertEqual(resp.status, 204) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('X-Container-Meta-uni'), uni_value.encode('utf-8')) if (tf.web_front_end == 'integral'): resp = retry(post, uni_key, uni_value) resp.read() self.assertEqual(resp.status, 204) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader(uni_key.encode('utf-8')), uni_value.encode('utf-8')) def test_PUT_metadata(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn, name, value): conn.request('PUT', parsed.path + '/' + name, '', {'X-Auth-Token': token, 'X-Container-Meta-Test': value}) return check_response(conn) def head(url, token, parsed, conn, name): conn.request('HEAD', parsed.path + '/' + name, '', {'X-Auth-Token': token}) return check_response(conn) def get(url, token, parsed, conn, name): conn.request('GET', parsed.path + '/' + name, '', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('DELETE', parsed.path + '/' + name, '', {'X-Auth-Token': token}) return check_response(conn) name = uuid4().hex resp = retry(put, name, 'Value') resp.read() self.assertEqual(resp.status, 201) resp = retry(head, name) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), 'Value') resp = retry(get, name) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), 'Value') resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 204) name = uuid4().hex resp = retry(put, name, '') resp.read() self.assertEqual(resp.status, 201) resp = retry(head, name) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), None) resp = retry(get, name) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), None) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 204) def test_POST_metadata(self): if tf.skip: raise SkipTest def post(url, token, parsed, conn, value): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Meta-Test': value}) return check_response(conn) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) def get(url, token, parsed, conn): conn.request('GET', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), None) resp = retry(get) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), None) resp = retry(post, 'Value') resp.read() self.assertEqual(resp.status, 204) resp = retry(head) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), 'Value') resp = retry(get) resp.read() self.assert_(resp.status in (200, 204), resp.status) self.assertEqual(resp.getheader('x-container-meta-test'), 'Value') def test_PUT_bad_metadata(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn, name, extra_headers): headers = {'X-Auth-Token': token} headers.update(extra_headers) conn.request('PUT', parsed.path + '/' + name, '', headers) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('DELETE', parsed.path + '/' + name, '', {'X-Auth-Token': token}) return check_response(conn) name = uuid4().hex resp = retry( put, name, {'X-Container-Meta-' + ('k' * self.max_meta_name_length): 'v'}) resp.read() self.assertEqual(resp.status, 201) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 204) name = uuid4().hex resp = retry( put, name, {'X-Container-Meta-' + ( 'k' * (self.max_meta_name_length + 1)): 'v'}) resp.read() self.assertEqual(resp.status, 400) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 404) name = uuid4().hex resp = retry( put, name, {'X-Container-Meta-Too-Long': 'k' * self.max_meta_value_length}) resp.read() self.assertEqual(resp.status, 201) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 204) name = uuid4().hex resp = retry( put, name, {'X-Container-Meta-Too-Long': 'k' * ( self.max_meta_value_length + 1)}) resp.read() self.assertEqual(resp.status, 400) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 404) name = uuid4().hex headers = {} for x in xrange(self.max_meta_count): headers['X-Container-Meta-%d' % x] = 'v' resp = retry(put, name, headers) resp.read() self.assertEqual(resp.status, 201) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 204) name = uuid4().hex headers = {} for x in xrange(self.max_meta_count + 1): headers['X-Container-Meta-%d' % x] = 'v' resp = retry(put, name, headers) resp.read() self.assertEqual(resp.status, 400) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 404) name = uuid4().hex headers = {} header_value = 'k' * self.max_meta_value_length size = 0 x = 0 while size < (self.max_meta_overall_size - 4 - self.max_meta_value_length): size += 4 + self.max_meta_value_length headers['X-Container-Meta-%04d' % x] = header_value x += 1 if self.max_meta_overall_size - size > 1: headers['X-Container-Meta-k'] = \ 'v' * (self.max_meta_overall_size - size - 1) resp = retry(put, name, headers) resp.read() self.assertEqual(resp.status, 201) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 204) name = uuid4().hex headers['X-Container-Meta-k'] = \ 'v' * (self.max_meta_overall_size - size) resp = retry(put, name, headers) resp.read() self.assertEqual(resp.status, 400) resp = retry(delete, name) resp.read() self.assertEqual(resp.status, 404) def test_POST_bad_metadata(self): if tf.skip: raise SkipTest def post(url, token, parsed, conn, extra_headers): headers = {'X-Auth-Token': token} headers.update(extra_headers) conn.request('POST', parsed.path + '/' + self.name, '', headers) return check_response(conn) resp = retry( post, {'X-Container-Meta-' + ('k' * self.max_meta_name_length): 'v'}) resp.read() self.assertEqual(resp.status, 204) resp = retry( post, {'X-Container-Meta-' + ( 'k' * (self.max_meta_name_length + 1)): 'v'}) resp.read() self.assertEqual(resp.status, 400) resp = retry( post, {'X-Container-Meta-Too-Long': 'k' * self.max_meta_value_length}) resp.read() self.assertEqual(resp.status, 204) resp = retry( post, {'X-Container-Meta-Too-Long': 'k' * ( self.max_meta_value_length + 1)}) resp.read() self.assertEqual(resp.status, 400) headers = {} for x in xrange(self.max_meta_count): headers['X-Container-Meta-%d' % x] = 'v' resp = retry(post, headers) resp.read() self.assertEqual(resp.status, 204) headers = {} for x in xrange(self.max_meta_count + 1): headers['X-Container-Meta-%d' % x] = 'v' resp = retry(post, headers) resp.read() self.assertEqual(resp.status, 400) headers = {} header_value = 'k' * self.max_meta_value_length size = 0 x = 0 while size < (self.max_meta_overall_size - 4 - self.max_meta_value_length): size += 4 + self.max_meta_value_length headers['X-Container-Meta-%04d' % x] = header_value x += 1 if self.max_meta_overall_size - size > 1: headers['X-Container-Meta-k'] = \ 'v' * (self.max_meta_overall_size - size - 1) resp = retry(post, headers) resp.read() self.assertEqual(resp.status, 204) headers['X-Container-Meta-k'] = \ 'v' * (self.max_meta_overall_size - size) resp = retry(post, headers) resp.read() self.assertEqual(resp.status, 400) def test_public_container(self): if tf.skip: raise SkipTest def get(url, token, parsed, conn): conn.request('GET', parsed.path + '/' + self.name) return check_response(conn) try: resp = retry(get) raise Exception('Should not have been able to GET') except Exception as err: self.assert_(str(err).startswith('No result after '), err) def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Read': '.r:*,.rlistings'}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) resp = retry(get) resp.read() self.assertEqual(resp.status, 204) def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Read': ''}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) try: resp = retry(get) raise Exception('Should not have been able to GET') except Exception as err: self.assert_(str(err).startswith('No result after '), err) def test_cross_account_container(self): if tf.skip or tf.skip2: raise SkipTest # Obtain the first account's string first_account = ['unknown'] def get1(url, token, parsed, conn): first_account[0] = parsed.path conn.request('HEAD', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get1) resp.read() # Ensure we can't access the container with the second account def get2(url, token, parsed, conn): conn.request('GET', first_account[0] + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get2, use_account=2) resp.read() self.assertEqual(resp.status, 403) # Make the container accessible by the second account def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Read': tf.swift_test_perm[1], 'X-Container-Write': tf.swift_test_perm[1]}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # Ensure we can now use the container with the second account resp = retry(get2, use_account=2) resp.read() self.assertEqual(resp.status, 204) # Make the container private again def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Read': '', 'X-Container-Write': ''}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # Ensure we can't access the container with the second account again resp = retry(get2, use_account=2) resp.read() self.assertEqual(resp.status, 403) def test_cross_account_public_container(self): if tf.skip or tf.skip2: raise SkipTest # Obtain the first account's string first_account = ['unknown'] def get1(url, token, parsed, conn): first_account[0] = parsed.path conn.request('HEAD', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get1) resp.read() # Ensure we can't access the container with the second account def get2(url, token, parsed, conn): conn.request('GET', first_account[0] + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get2, use_account=2) resp.read() self.assertEqual(resp.status, 403) # Make the container completely public def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Read': '.r:*,.rlistings'}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # Ensure we can now read the container with the second account resp = retry(get2, use_account=2) resp.read() self.assertEqual(resp.status, 204) # But we shouldn't be able to write with the second account def put2(url, token, parsed, conn): conn.request('PUT', first_account[0] + '/' + self.name + '/object', 'test object', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put2, use_account=2) resp.read() self.assertEqual(resp.status, 403) # Now make the container also writeable by the second account def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Write': tf.swift_test_perm[1]}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # Ensure we can still read the container with the second account resp = retry(get2, use_account=2) resp.read() self.assertEqual(resp.status, 204) # And that we can now write with the second account resp = retry(put2, use_account=2) resp.read() self.assertEqual(resp.status, 201) def test_nonadmin_user(self): if tf.skip or tf.skip3: raise SkipTest # Obtain the first account's string first_account = ['unknown'] def get1(url, token, parsed, conn): first_account[0] = parsed.path conn.request('HEAD', parsed.path + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get1) resp.read() # Ensure we can't access the container with the third account def get3(url, token, parsed, conn): conn.request('GET', first_account[0] + '/' + self.name, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get3, use_account=3) resp.read() self.assertEqual(resp.status, 403) # Make the container accessible by the third account def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Read': tf.swift_test_perm[2]}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # Ensure we can now read the container with the third account resp = retry(get3, use_account=3) resp.read() self.assertEqual(resp.status, 204) # But we shouldn't be able to write with the third account def put3(url, token, parsed, conn): conn.request('PUT', first_account[0] + '/' + self.name + '/object', 'test object', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put3, use_account=3) resp.read() self.assertEqual(resp.status, 403) # Now make the container also writeable by the third account def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.name, '', {'X-Auth-Token': token, 'X-Container-Write': tf.swift_test_perm[2]}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # Ensure we can still read the container with the third account resp = retry(get3, use_account=3) resp.read() self.assertEqual(resp.status, 204) # And that we can now write with the third account resp = retry(put3, use_account=3) resp.read() self.assertEqual(resp.status, 201) @requires_acls def test_read_only_acl_listings(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn): conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def put(url, token, parsed, conn, name): conn.request('PUT', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) # cannot list containers resp = retry(get, use_account=3) resp.read() self.assertEquals(resp.status, 403) # grant read-only access acl_user = tf.swift_test_user[2] acl = {'read-only': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # read-only can list containers resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(self.name in listing) # read-only can not create containers new_container_name = str(uuid4()) resp = retry(put, new_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 403) # but it can see newly created ones resp = retry(put, new_container_name, use_account=1) resp.read() self.assertEquals(resp.status, 201) resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(new_container_name in listing) @requires_acls def test_read_only_acl_metadata(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn, name): conn.request('GET', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def post(url, token, parsed, conn, name, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path + '/%s' % name, '', new_headers) return check_response(conn) # add some metadata value = str(uuid4()) headers = {'x-container-meta-test': value} resp = retry(post, self.name, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # cannot see metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 403) # grant read-only access acl_user = tf.swift_test_user[2] acl = {'read-only': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # read-only can NOT write container metadata new_value = str(uuid4()) headers = {'x-container-meta-test': new_value} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 403) # read-only can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) @requires_acls def test_read_write_acl_listings(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn): conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) return check_response(conn) def post(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def put(url, token, parsed, conn, name): conn.request('PUT', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('DELETE', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) # cannot list containers resp = retry(get, use_account=3) resp.read() self.assertEquals(resp.status, 403) # grant read-write access acl_user = tf.swift_test_user[2] acl = {'read-write': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can list containers resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(self.name in listing) # can create new containers new_container_name = str(uuid4()) resp = retry(put, new_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 201) resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(new_container_name in listing) # can also delete them resp = retry(delete, new_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 204) resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(new_container_name not in listing) # even if they didn't create them empty_container_name = str(uuid4()) resp = retry(put, empty_container_name, use_account=1) resp.read() self.assertEquals(resp.status, 201) resp = retry(delete, empty_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 204) @requires_acls def test_read_write_acl_metadata(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn, name): conn.request('GET', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def post(url, token, parsed, conn, name, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path + '/%s' % name, '', new_headers) return check_response(conn) # add some metadata value = str(uuid4()) headers = {'x-container-meta-test': value} resp = retry(post, self.name, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # cannot see metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 403) # grant read-write access acl_user = tf.swift_test_user[2] acl = {'read-write': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # read-write can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # read-write can also write container metadata new_value = str(uuid4()) headers = {'x-container-meta-test': new_value} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEquals(resp.status, 204) resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) # and remove it headers = {'x-remove-container-meta-test': 'true'} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEquals(resp.status, 204) resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), None) @requires_acls def test_admin_acl_listing(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn): conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) return check_response(conn) def post(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def put(url, token, parsed, conn, name): conn.request('PUT', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('DELETE', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) # cannot list containers resp = retry(get, use_account=3) resp.read() self.assertEquals(resp.status, 403) # grant admin access acl_user = tf.swift_test_user[2] acl = {'admin': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can list containers resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(self.name in listing) # can create new containers new_container_name = str(uuid4()) resp = retry(put, new_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 201) resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(new_container_name in listing) # can also delete them resp = retry(delete, new_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 204) resp = retry(get, use_account=3) listing = resp.read() self.assertEquals(resp.status, 200) self.assert_(new_container_name not in listing) # even if they didn't create them empty_container_name = str(uuid4()) resp = retry(put, empty_container_name, use_account=1) resp.read() self.assertEquals(resp.status, 201) resp = retry(delete, empty_container_name, use_account=3) resp.read() self.assertEquals(resp.status, 204) @requires_acls def test_admin_acl_metadata(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn, name): conn.request('GET', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def post(url, token, parsed, conn, name, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path + '/%s' % name, '', new_headers) return check_response(conn) # add some metadata value = str(uuid4()) headers = {'x-container-meta-test': value} resp = retry(post, self.name, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # cannot see metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 403) # grant access acl_user = tf.swift_test_user[2] acl = {'admin': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # can also write container metadata new_value = str(uuid4()) headers = {'x-container-meta-test': new_value} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEquals(resp.status, 204) resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) # and remove it headers = {'x-remove-container-meta-test': 'true'} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEquals(resp.status, 204) resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), None) @requires_acls def test_protected_container_sync(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn, name): conn.request('GET', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def post(url, token, parsed, conn, name, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path + '/%s' % name, '', new_headers) return check_response(conn) # add some metadata value = str(uuid4()) headers = { 'x-container-sync-key': 'secret', 'x-container-meta-test': value, } resp = retry(post, self.name, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # grant read-only access acl_user = tf.swift_test_user[2] acl = {'read-only': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # but not sync-key self.assertEqual(resp.getheader('X-Container-Sync-Key'), None) # and can not write headers = {'x-container-sync-key': str(uuid4())} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 403) # grant read-write access acl_user = tf.swift_test_user[2] acl = {'read-write': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # but not sync-key self.assertEqual(resp.getheader('X-Container-Sync-Key'), None) # sanity check sync-key w/ account1 resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') # and can write new_value = str(uuid4()) headers = { 'x-container-sync-key': str(uuid4()), 'x-container-meta-test': new_value, } resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) # validate w/ account1 resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) # but can not write sync-key self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') # grant admin access acl_user = tf.swift_test_user[2] acl = {'admin': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # admin can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) # and ALSO sync-key self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') # admin tester3 can even change sync-key new_secret = str(uuid4()) headers = {'x-container-sync-key': new_secret} resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Sync-Key'), new_secret) @requires_acls def test_protected_container_acl(self): if tf.skip3: raise SkipTest def get(url, token, parsed, conn, name): conn.request('GET', parsed.path + '/%s' % name, '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def post(url, token, parsed, conn, name, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path + '/%s' % name, '', new_headers) return check_response(conn) # add some container acls value = str(uuid4()) headers = { 'x-container-read': 'jdoe', 'x-container-write': 'jdoe', 'x-container-meta-test': value, } resp = retry(post, self.name, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # grant read-only access acl_user = tf.swift_test_user[2] acl = {'read-only': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # but not container acl self.assertEqual(resp.getheader('X-Container-Read'), None) self.assertEqual(resp.getheader('X-Container-Write'), None) # and can not write headers = { 'x-container-read': 'frank', 'x-container-write': 'frank', } resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 403) # grant read-write access acl_user = tf.swift_test_user[2] acl = {'read-write': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) # but not container acl self.assertEqual(resp.getheader('X-Container-Read'), None) self.assertEqual(resp.getheader('X-Container-Write'), None) # sanity check container acls with account1 resp = retry(get, self.name, use_account=1) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') # and can write new_value = str(uuid4()) headers = { 'x-container-read': 'frank', 'x-container-write': 'frank', 'x-container-meta-test': new_value, } resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=1) # validate w/ account1 resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) # but can not write container acls self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') # grant admin access acl_user = tf.swift_test_user[2] acl = {'admin': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # admin can read container metadata resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) # and ALSO container acls self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') # admin tester3 can even change container acls new_value = str(uuid4()) headers = { 'x-container-read': '.r:*', } resp = retry(post, self.name, headers=headers, use_account=3) resp.read() self.assertEqual(resp.status, 204) resp = retry(get, self.name, use_account=3) resp.read() self.assertEquals(resp.status, 204) self.assertEqual(resp.getheader('X-Container-Read'), '.r:*') def test_long_name_content_type(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn): container_name = 'X' * 2048 conn.request('PUT', '%s/%s' % (parsed.path, container_name), 'there', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 400) self.assertEqual(resp.getheader('Content-Type'), 'text/html; charset=UTF-8') def test_null_name(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn): conn.request('PUT', '%s/abc%%00def' % parsed.path, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) if (tf.web_front_end == 'apache2'): self.assertEqual(resp.status, 404) else: self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL') self.assertEqual(resp.status, 412) def test_create_container_gets_default_policy_by_default(self): try: default_policy = \ tf.FunctionalStoragePolicyCollection.from_info().default except AssertionError: raise SkipTest() def put(url, token, parsed, conn): conn.request('PUT', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status // 100, 2) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(head) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEquals(headers.get('x-storage-policy'), default_policy['name']) def test_error_invalid_storage_policy_name(self): def put(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('PUT', parsed.path + '/' + self.container, '', new_headers) return check_response(conn) # create resp = retry(put, {'X-Storage-Policy': uuid4().hex}) resp.read() self.assertEqual(resp.status, 400) @requires_policies def test_create_non_default_storage_policy_container(self): policy = self.policies.exclude(default=True).select() def put(url, token, parsed, conn, headers=None): base_headers = {'X-Auth-Token': token} if headers: base_headers.update(headers) conn.request('PUT', parsed.path + '/' + self.container, '', base_headers) return check_response(conn) headers = {'X-Storage-Policy': policy['name']} resp = retry(put, headers=headers) resp.read() self.assertEqual(resp.status, 201) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(head) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEquals(headers.get('x-storage-policy'), policy['name']) # and test recreate with-out specifying Storage Policy resp = retry(put) resp.read() self.assertEqual(resp.status, 202) # should still be original storage policy resp = retry(head) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEquals(headers.get('x-storage-policy'), policy['name']) # delete it def delete(url, token, parsed, conn): conn.request('DELETE', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertEqual(resp.status, 204) # verify no policy header resp = retry(head) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEquals(headers.get('x-storage-policy'), None) @requires_policies def test_conflict_change_storage_policy_with_put(self): def put(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('PUT', parsed.path + '/' + self.container, '', new_headers) return check_response(conn) # create policy = self.policies.select() resp = retry(put, {'X-Storage-Policy': policy['name']}) resp.read() self.assertEqual(resp.status, 201) # can't change it other_policy = self.policies.exclude(name=policy['name']).select() resp = retry(put, {'X-Storage-Policy': other_policy['name']}) resp.read() self.assertEqual(resp.status, 409) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) return check_response(conn) # still original policy resp = retry(head) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEquals(headers.get('x-storage-policy'), policy['name']) @requires_policies def test_noop_change_storage_policy_with_post(self): def put(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('PUT', parsed.path + '/' + self.container, '', new_headers) return check_response(conn) # create policy = self.policies.select() resp = retry(put, {'X-Storage-Policy': policy['name']}) resp.read() self.assertEqual(resp.status, 201) def post(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path + '/' + self.container, '', new_headers) return check_response(conn) # attempt update for header in ('X-Storage-Policy', 'X-Storage-Policy-Index'): other_policy = self.policies.exclude(name=policy['name']).select() resp = retry(post, {header: other_policy['name']}) resp.read() self.assertEqual(resp.status, 204) def head(url, token, parsed, conn): conn.request('HEAD', parsed.path + '/' + self.container, '', {'X-Auth-Token': token}) return check_response(conn) # still original policy resp = retry(head) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEquals(headers.get('x-storage-policy'), policy['name']) if __name__ == '__main__': unittest.main()