diff --git a/test/functional/swift_testing.py b/test/functional/swift_testing.py index d1b913c321..4ee1601c7e 100644 --- a/test/functional/swift_testing.py +++ b/test/functional/swift_testing.py @@ -19,10 +19,13 @@ import socket import sys from time import sleep from urlparse import urlparse +import functools +from nose import SkipTest from test import get_config from swiftclient import get_auth, http_connection +from test.functional.swift_test_client import Connection conf = get_config('func_test') web_front_end = conf.get('web_front_end', 'integral') @@ -184,3 +187,45 @@ def check_response(conn): resp.read() raise InternalServerError() return resp + +cluster_info = {} + + +def get_cluster_info(): + conn = Connection(conf) + conn.authenticate() + global cluster_info + cluster_info = conn.cluster_info() + + +def reset_acl(): + def post(url, token, parsed, conn): + conn.request('POST', parsed.path, '', { + 'X-Auth-Token': token, + 'X-Account-Access-Control': '{}' + }) + return check_response(conn) + resp = retry(post, use_account=1) + resp.read() + + +def requires_acls(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + if skip: + raise SkipTest + if not cluster_info: + get_cluster_info() + # Determine whether this cluster has account ACLs; if not, skip test + if not cluster_info.get('tempauth', {}).get('account_acls'): + raise SkipTest + if 'keystoneauth' in cluster_info: + # remove when keystoneauth supports account acls + raise SkipTest + reset_acl() + try: + rv = f(*args, **kwargs) + finally: + reset_acl() + return rv + return wrapper diff --git a/test/functional/test_account.py b/test/functional/test_account.py index acd7c8aa27..56accde1a0 100755 --- a/test/functional/test_account.py +++ b/test/functional/test_account.py @@ -17,15 +17,17 @@ import unittest import json +from uuid import uuid4 from nose import SkipTest +from string import letters from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \ MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH from swift.common.middleware.acl import format_acl -from test.functional.swift_test_client import Connection -from test import get_config -from swift_testing import check_response, retry, skip, web_front_end +from swift_testing import (check_response, retry, skip, skip2, skip3, + web_front_end, requires_acls) import swift_testing +from test.functional.tests import load_constraint class TestAccount(unittest.TestCase): @@ -70,21 +72,316 @@ class TestAccount(unittest.TestCase): self.assert_(resp.status in (200, 204), resp.status) self.assertEquals(resp.getheader('x-account-meta-test'), 'Value') - def test_tempauth_account_acls(self): - if skip: + def test_invalid_acls(self): + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + # needs to be an acceptable header size + num_keys = 8 + max_key_size = load_constraint('max_header_size') / num_keys + acl = {'admin': [c * max_key_size for c in letters[:num_keys]]} + headers = {'x-account-access-control': format_acl( + version=2, acl_dict=acl)} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 400) + + # and again a touch smaller + acl = {'admin': [c * max_key_size for c in letters[:num_keys - 1]]} + headers = {'x-account-access-control': format_acl( + version=2, acl_dict=acl)} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + @requires_acls + def test_invalid_acl_keys(self): + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + # needs to be json + resp = retry(post, headers={'X-Account-Access-Control': 'invalid'}, + use_account=1) + resp.read() + self.assertEqual(resp.status, 400) + + acl_user = swift_testing.swift_test_user[1] + acl = {'admin': [acl_user], 'invalid_key': 'invalid_value'} + headers = {'x-account-access-control': format_acl( + version=2, acl_dict=acl)} + + resp = retry(post, headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 400) + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + @requires_acls + def test_invalid_acl_values(self): + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + acl = {'admin': 'invalid_value'} + headers = {'x-account-access-control': format_acl( + version=2, acl_dict=acl)} + + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 400) + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + @requires_acls + def test_read_only_acl(self): + if skip3: raise SkipTest - # Determine whether this cluster has account ACLs; if not, skip test - conn = Connection(get_config('func_test')) - conn.authenticate() - cluster_info = conn.cluster_info() - if not cluster_info.get('tempauth', {}).get('account_acls'): + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + # cannot read account + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read access + acl_user = swift_testing.swift_test_user[2] + acl = {'read-only': [acl_user]} + headers = {'x-account-access-control': format_acl( + version=2, acl_dict=acl)} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-only can read account headers + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204)) + # but not acls + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + # read-only can not write metadata + headers = {'x-account-meta-test': 'value'} + resp = retry(post, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + # but they can read it + headers = {'x-account-meta-test': 'value'} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204)) + self.assertEqual(resp.getheader('X-Account-Meta-Test'), 'value') + + @requires_acls + def test_read_write_acl(self): + if skip3: raise SkipTest - if 'keystoneauth' in cluster_info: - # Unfortunate hack -- tempauth (with account ACLs) is expected - # to play nice with Keystone (without account ACLs), but Zuul - # functest framework doesn't give us an easy way to get a - # tempauth user. + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + # cannot read account + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-write access + acl_user = swift_testing.swift_test_user[2] + acl = {'read-write': [acl_user]} + headers = {'x-account-access-control': format_acl( + version=2, acl_dict=acl)} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-write can read account headers + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204)) + # but not acls + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + # read-write can not write account metadata + headers = {'x-account-meta-test': 'value'} + resp = retry(post, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + @requires_acls + def test_admin_acl(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + # cannot read account + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant admin access + acl_user = swift_testing.swift_test_user[2] + acl = {'admin': [acl_user]} + acl_json_str = format_acl(version=2, acl_dict=acl) + headers = {'x-account-access-control': acl_json_str} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # admin can read account headers + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204)) + # including acls + self.assertEqual(resp.getheader('X-Account-Access-Control'), + acl_json_str) + + # admin can write account metadata + value = str(uuid4()) + headers = {'x-account-meta-test': value} + resp = retry(post, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204)) + self.assertEqual(resp.getheader('X-Account-Meta-Test'), value) + + # admin can even revoke their own access + headers = {'x-account-access-control': '{}'} + resp = retry(post, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + + # and again, cannot read account + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + @requires_acls + def test_protected_tempurl(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + # add a account metadata, and temp-url-key to account + value = str(uuid4()) + headers = { + 'x-account-meta-temp-url-key': 'secret', + 'x-account-meta-test': value, + } + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # grant read-only access to tester3 + acl_user = swift_testing.swift_test_user[2] + acl = {'read-only': [acl_user]} + acl_json_str = format_acl(version=2, acl_dict=acl) + headers = {'x-account-access-control': acl_json_str} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-only tester3 can read account metadata + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204), + 'Expected status in (200, 204), got %s' % resp.status) + self.assertEqual(resp.getheader('X-Account-Meta-Test'), value) + # but not temp-url-key + self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None) + + # grant read-write access to tester3 + acl_user = swift_testing.swift_test_user[2] + acl = {'read-write': [acl_user]} + acl_json_str = format_acl(version=2, acl_dict=acl) + headers = {'x-account-access-control': acl_json_str} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-write tester3 can read account metadata + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204), + 'Expected status in (200, 204), got %s' % resp.status) + self.assertEqual(resp.getheader('X-Account-Meta-Test'), value) + # but not temp-url-key + self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), None) + + # grant admin access to tester3 + acl_user = swift_testing.swift_test_user[2] + acl = {'admin': [acl_user]} + acl_json_str = format_acl(version=2, acl_dict=acl) + headers = {'x-account-access-control': acl_json_str} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # admin tester3 can read account metadata + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204), + 'Expected status in (200, 204), got %s' % resp.status) + self.assertEqual(resp.getheader('X-Account-Meta-Test'), value) + # including temp-url-key + self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), + 'secret') + + # admin tester3 can even change temp-url-key + secret = str(uuid4()) + headers = { + 'x-account-meta-temp-url-key': secret, + } + resp = retry(post, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, use_account=3) + resp.read() + self.assert_(resp.status in (200, 204), + 'Expected status in (200, 204), got %s' % resp.status) + self.assertEqual(resp.getheader('X-Account-Meta-Temp-Url-Key'), + secret) + + @requires_acls + def test_account_acls(self): + if skip2: raise SkipTest def post(url, token, parsed, conn, headers): @@ -206,6 +503,137 @@ class TestAccount(unittest.TestCase): use_account=1) resp.read() + @requires_acls + def test_swift_account_acls(self): + if skip: + raise SkipTest + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def head(url, token, parsed, conn): + conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + try: + # User1 can POST to their own account + resp = retry(post, headers={'X-Account-Access-Control': '{}'}) + resp.read() + self.assertEqual(resp.status, 204) + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + # User1 can GET their own empty account + resp = retry(get) + resp.read() + self.assertEqual(resp.status // 100, 2) + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + # User1 can POST non-empty data + acl_json = '{"admin":["bob"]}' + resp = retry(post, headers={'X-Account-Access-Control': acl_json}) + resp.read() + self.assertEqual(resp.status, 204) + + # User1 can GET the non-empty data + resp = retry(get) + resp.read() + self.assertEqual(resp.status // 100, 2) + self.assertEqual(resp.getheader('X-Account-Access-Control'), + acl_json) + + # POST non-JSON ACL should fail + resp = retry(post, headers={'X-Account-Access-Control': 'yuck'}) + resp.read() + # resp.status will be 400 if tempauth or some other ACL-aware + # auth middleware rejects it, or 200 (but silently swallowed by + # core Swift) if ACL-unaware auth middleware approves it. + + # A subsequent GET should show the old, valid data, not the garbage + resp = retry(get) + resp.read() + self.assertEqual(resp.status // 100, 2) + self.assertEqual(resp.getheader('X-Account-Access-Control'), + acl_json) + + finally: + # Make sure to clean up even if tests fail -- User2 should not + # have access to User1's account in other functional tests! + resp = retry(post, headers={'X-Account-Access-Control': '{}'}) + resp.read() + + def test_swift_prohibits_garbage_account_acls(self): + if skip: + raise SkipTest + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + try: + # User1 can POST to their own account + resp = retry(post, headers={'X-Account-Access-Control': '{}'}) + resp.read() + self.assertEqual(resp.status, 204) + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + # User1 can GET their own empty account + resp = retry(get) + resp.read() + self.assertEqual(resp.status // 100, 2) + self.assertEqual(resp.getheader('X-Account-Access-Control'), None) + + # User1 can POST non-empty data + acl_json = '{"admin":["bob"]}' + resp = retry(post, headers={'X-Account-Access-Control': acl_json}) + resp.read() + self.assertEqual(resp.status, 204) + # If this request is handled by ACL-aware auth middleware, then the + # ACL will be persisted. If it is handled by ACL-unaware auth + # middleware, then the header will be thrown out. But the request + # should return successfully in any case. + + # User1 can GET the non-empty data + resp = retry(get) + resp.read() + self.assertEqual(resp.status // 100, 2) + # ACL will be set if some ACL-aware auth middleware (e.g. tempauth) + # propagates it to sysmeta; if no ACL-aware auth middleware does, + # then X-Account-Access-Control will still be empty. + + # POST non-JSON ACL should fail + resp = retry(post, headers={'X-Account-Access-Control': 'yuck'}) + resp.read() + # resp.status will be 400 if tempauth or some other ACL-aware + # auth middleware rejects it, or 200 (but silently swallowed by + # core Swift) if ACL-unaware auth middleware approves it. + + # A subsequent GET should either show the old, valid data (if + # ACL-aware auth middleware is propagating it) or show nothing + # (if no auth middleware in the pipeline is ACL-aware), but should + # never return the garbage ACL. + resp = retry(get) + resp.read() + self.assertEqual(resp.status // 100, 2) + self.assertNotEqual(resp.getheader('X-Account-Access-Control'), + 'yuck') + + finally: + # Make sure to clean up even if tests fail -- User2 should not + # have access to User1's account in other functional tests! + resp = retry(post, headers={'X-Account-Access-Control': '{}'}) + resp.read() + def test_unicode_metadata(self): if skip: raise SkipTest diff --git a/test/functional/test_container.py b/test/functional/test_container.py index 15f7fc14ee..bf808696f2 100755 --- a/test/functional/test_container.py +++ b/test/functional/test_container.py @@ -24,7 +24,7 @@ from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \ MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH from swift_testing import check_response, retry, skip, skip2, skip3, \ - swift_test_perm, web_front_end + swift_test_perm, web_front_end, requires_acls, swift_test_user class TestContainer(unittest.TestCase): @@ -650,6 +650,657 @@ class TestContainer(unittest.TestCase): resp.read() self.assertEquals(resp.status, 201) + @requires_acls + def test_read_only_acl_listings(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def put(url, token, parsed, conn, name): + conn.request('PUT', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + # cannot list containers + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-only access + acl_user = swift_test_user[2] + acl = {'read-only': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-only can list containers + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(self.name in listing) + + # read-only can not create containers + new_container_name = str(uuid4()) + resp = retry(put, new_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # but it can see newly created ones + resp = retry(put, new_container_name, use_account=1) + resp.read() + self.assertEquals(resp.status, 201) + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(new_container_name in listing) + + @requires_acls + def test_read_only_acl_metadata(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn, name): + conn.request('GET', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def post(url, token, parsed, conn, name, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path + '/%s' % name, '', new_headers) + return check_response(conn) + + # add some metadata + value = str(uuid4()) + headers = {'x-container-meta-test': value} + resp = retry(post, self.name, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # cannot see metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-only access + acl_user = swift_test_user[2] + acl = {'read-only': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-only can NOT write container metadata + new_value = str(uuid4()) + headers = {'x-container-meta-test': new_value} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + # read-only can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + @requires_acls + def test_read_write_acl_listings(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def put(url, token, parsed, conn, name): + conn.request('PUT', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def delete(url, token, parsed, conn, name): + conn.request('DELETE', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + # cannot list containers + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-write access + acl_user = swift_test_user[2] + acl = {'read-write': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can list containers + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(self.name in listing) + + # can create new containers + new_container_name = str(uuid4()) + resp = retry(put, new_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 201) + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(new_container_name in listing) + + # can also delete them + resp = retry(delete, new_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(new_container_name not in listing) + + # even if they didn't create them + empty_container_name = str(uuid4()) + resp = retry(put, empty_container_name, use_account=1) + resp.read() + self.assertEquals(resp.status, 201) + resp = retry(delete, empty_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + + @requires_acls + def test_read_write_acl_metadata(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn, name): + conn.request('GET', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def post(url, token, parsed, conn, name, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path + '/%s' % name, '', new_headers) + return check_response(conn) + + # add some metadata + value = str(uuid4()) + headers = {'x-container-meta-test': value} + resp = retry(post, self.name, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # cannot see metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-write access + acl_user = swift_test_user[2] + acl = {'read-write': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # read-write can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # read-write can also write container metadata + new_value = str(uuid4()) + headers = {'x-container-meta-test': new_value} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) + + # and remove it + headers = {'x-remove-container-meta-test': 'true'} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), None) + + @requires_acls + def test_admin_acl_listing(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn): + conn.request('GET', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def put(url, token, parsed, conn, name): + conn.request('PUT', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def delete(url, token, parsed, conn, name): + conn.request('DELETE', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + # cannot list containers + resp = retry(get, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant admin access + acl_user = swift_test_user[2] + acl = {'admin': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can list containers + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(self.name in listing) + + # can create new containers + new_container_name = str(uuid4()) + resp = retry(put, new_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 201) + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(new_container_name in listing) + + # can also delete them + resp = retry(delete, new_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + resp = retry(get, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(new_container_name not in listing) + + # even if they didn't create them + empty_container_name = str(uuid4()) + resp = retry(put, empty_container_name, use_account=1) + resp.read() + self.assertEquals(resp.status, 201) + resp = retry(delete, empty_container_name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + + @requires_acls + def test_admin_acl_metadata(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn, name): + conn.request('GET', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def post(url, token, parsed, conn, name, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path + '/%s' % name, '', new_headers) + return check_response(conn) + + # add some metadata + value = str(uuid4()) + headers = {'x-container-meta-test': value} + resp = retry(post, self.name, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # cannot see metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant access + acl_user = swift_test_user[2] + acl = {'admin': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # can also write container metadata + new_value = str(uuid4()) + headers = {'x-container-meta-test': new_value} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) + + # and remove it + headers = {'x-remove-container-meta-test': 'true'} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), None) + + @requires_acls + def test_protected_container_sync(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn, name): + conn.request('GET', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def post(url, token, parsed, conn, name, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path + '/%s' % name, '', new_headers) + return check_response(conn) + + # add some metadata + value = str(uuid4()) + headers = { + 'x-container-sync-key': 'secret', + 'x-container-meta-test': value, + } + resp = retry(post, self.name, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # grant read-only access + acl_user = swift_test_user[2] + acl = {'read-only': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + # but not sync-key + self.assertEqual(resp.getheader('X-Container-Sync-Key'), None) + + # and can not write + headers = {'x-container-sync-key': str(uuid4())} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + # grant read-write access + acl_user = swift_test_user[2] + acl = {'read-write': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + # but not sync-key + self.assertEqual(resp.getheader('X-Container-Sync-Key'), None) + + # sanity check sync-key w/ account1 + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') + + # and can write + new_value = str(uuid4()) + headers = { + 'x-container-sync-key': str(uuid4()), + 'x-container-meta-test': new_value, + } + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) # validate w/ account1 + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) + # but can not write sync-key + self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') + + # grant admin access + acl_user = swift_test_user[2] + acl = {'admin': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # admin can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) + # and ALSO sync-key + self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret') + + # admin tester3 can even change sync-key + new_secret = str(uuid4()) + headers = {'x-container-sync-key': new_secret} + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Sync-Key'), new_secret) + + @requires_acls + def test_protected_container_acl(self): + if skip3: + raise SkipTest + + def get(url, token, parsed, conn, name): + conn.request('GET', parsed.path + '/%s' % name, '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def post(url, token, parsed, conn, name, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path + '/%s' % name, '', new_headers) + return check_response(conn) + + # add some container acls + value = str(uuid4()) + headers = { + 'x-container-read': 'jdoe', + 'x-container-write': 'jdoe', + 'x-container-meta-test': value, + } + resp = retry(post, self.name, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') + self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + + # grant read-only access + acl_user = swift_test_user[2] + acl = {'read-only': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + # but not container acl + self.assertEqual(resp.getheader('X-Container-Read'), None) + self.assertEqual(resp.getheader('X-Container-Write'), None) + + # and can not write + headers = { + 'x-container-read': 'frank', + 'x-container-write': 'frank', + } + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + # grant read-write access + acl_user = swift_test_user[2] + acl = {'read-write': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), value) + # but not container acl + self.assertEqual(resp.getheader('X-Container-Read'), None) + self.assertEqual(resp.getheader('X-Container-Write'), None) + + # sanity check container acls with account1 + resp = retry(get, self.name, use_account=1) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') + self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') + + # and can write + new_value = str(uuid4()) + headers = { + 'x-container-read': 'frank', + 'x-container-write': 'frank', + 'x-container-meta-test': new_value, + } + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=1) # validate w/ account1 + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) + # but can not write container acls + self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') + self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') + + # grant admin access + acl_user = swift_test_user[2] + acl = {'admin': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # admin can read container metadata + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value) + # and ALSO container acls + self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe') + self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe') + + # admin tester3 can even change container acls + new_value = str(uuid4()) + headers = { + 'x-container-read': '.r:*', + } + resp = retry(post, self.name, headers=headers, use_account=3) + resp.read() + self.assertEqual(resp.status, 204) + resp = retry(get, self.name, use_account=3) + resp.read() + self.assertEquals(resp.status, 204) + self.assertEqual(resp.getheader('X-Container-Read'), '.r:*') + def test_long_name_content_type(self): if skip: raise SkipTest diff --git a/test/functional/test_object.py b/test/functional/test_object.py index 9b999b6b90..fed2e1507a 100755 --- a/test/functional/test_object.py +++ b/test/functional/test_object.py @@ -19,10 +19,11 @@ import unittest from nose import SkipTest from uuid import uuid4 -from swift_testing import check_response, retry, skip, skip3, \ - swift_test_perm, web_front_end from swift.common.utils import json +from swift_testing import check_response, retry, skip, skip3, \ + swift_test_perm, web_front_end, requires_acls, swift_test_user + class TestObject(unittest.TestCase): @@ -290,6 +291,249 @@ class TestObject(unittest.TestCase): resp.read() self.assertEquals(resp.status, 204) + @requires_acls + def test_read_only(self): + if skip3: + raise SkipTest + + def get_listing(url, token, parsed, conn): + conn.request('GET', '%s/%s' % (parsed.path, self.container), '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def get(url, token, parsed, conn, name): + conn.request('GET', '%s/%s/%s' % ( + parsed.path, self.container, name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + def put(url, token, parsed, conn, name): + conn.request('PUT', '%s/%s/%s' % ( + parsed.path, self.container, name), 'test', + {'X-Auth-Token': token}) + return check_response(conn) + + def delete(url, token, parsed, conn, name): + conn.request('PUT', '%s/%s/%s' % ( + parsed.path, self.container, name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + # cannot list objects + resp = retry(get_listing, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # cannot get object + resp = retry(get, self.obj, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-only access + acl_user = swift_test_user[2] + acl = {'read-only': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can list objects + resp = retry(get_listing, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(self.obj in listing) + + # can get object + resp = retry(get, self.obj, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 200) + self.assertEquals(body, 'test') + + # can not put an object + obj_name = str(uuid4()) + resp = retry(put, obj_name, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 403) + + # can not delete an object + resp = retry(delete, self.obj, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 403) + + # sanity with account1 + resp = retry(get_listing, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(obj_name not in listing) + self.assert_(self.obj in listing) + + @requires_acls + def test_read_write(self): + if skip3: + raise SkipTest + + def get_listing(url, token, parsed, conn): + conn.request('GET', '%s/%s' % (parsed.path, self.container), '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def get(url, token, parsed, conn, name): + conn.request('GET', '%s/%s/%s' % ( + parsed.path, self.container, name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + def put(url, token, parsed, conn, name): + conn.request('PUT', '%s/%s/%s' % ( + parsed.path, self.container, name), 'test', + {'X-Auth-Token': token}) + return check_response(conn) + + def delete(url, token, parsed, conn, name): + conn.request('DELETE', '%s/%s/%s' % ( + parsed.path, self.container, name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + # cannot list objects + resp = retry(get_listing, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # cannot get object + resp = retry(get, self.obj, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant read-write access + acl_user = swift_test_user[2] + acl = {'read-write': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can list objects + resp = retry(get_listing, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(self.obj in listing) + + # can get object + resp = retry(get, self.obj, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 200) + self.assertEquals(body, 'test') + + # can put an object + obj_name = str(uuid4()) + resp = retry(put, obj_name, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 201) + + # can delete an object + resp = retry(delete, self.obj, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 204) + + # sanity with account1 + resp = retry(get_listing, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(obj_name in listing) + self.assert_(self.obj not in listing) + + @requires_acls + def test_admin(self): + if skip3: + raise SkipTest + + def get_listing(url, token, parsed, conn): + conn.request('GET', '%s/%s' % (parsed.path, self.container), '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post_account(url, token, parsed, conn, headers): + new_headers = dict({'X-Auth-Token': token}, **headers) + conn.request('POST', parsed.path, '', new_headers) + return check_response(conn) + + def get(url, token, parsed, conn, name): + conn.request('GET', '%s/%s/%s' % ( + parsed.path, self.container, name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + def put(url, token, parsed, conn, name): + conn.request('PUT', '%s/%s/%s' % ( + parsed.path, self.container, name), 'test', + {'X-Auth-Token': token}) + return check_response(conn) + + def delete(url, token, parsed, conn, name): + conn.request('DELETE', '%s/%s/%s' % ( + parsed.path, self.container, name), '', + {'X-Auth-Token': token}) + return check_response(conn) + + # cannot list objects + resp = retry(get_listing, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # cannot get object + resp = retry(get, self.obj, use_account=3) + resp.read() + self.assertEquals(resp.status, 403) + + # grant admin access + acl_user = swift_test_user[2] + acl = {'admin': [acl_user]} + headers = {'x-account-access-control': json.dumps(acl)} + resp = retry(post_account, headers=headers, use_account=1) + resp.read() + self.assertEqual(resp.status, 204) + + # can list objects + resp = retry(get_listing, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(self.obj in listing) + + # can get object + resp = retry(get, self.obj, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 200) + self.assertEquals(body, 'test') + + # can put an object + obj_name = str(uuid4()) + resp = retry(put, obj_name, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 201) + + # can delete an object + resp = retry(delete, self.obj, use_account=3) + body = resp.read() + self.assertEquals(resp.status, 204) + + # sanity with account1 + resp = retry(get_listing, use_account=3) + listing = resp.read() + self.assertEquals(resp.status, 200) + self.assert_(obj_name in listing) + self.assert_(self.obj not in listing) + def test_manifest(self): if skip: raise SkipTest