Uses nacim's version instead

This commit is contained in:
Matthieu Huin 2014-05-27 15:43:38 +02:00
parent 93a8d3b9e8
commit b7fd668554

View File

@ -18,9 +18,10 @@ import time
import unittest
from collections import defaultdict
from swiftpolicy import swiftpolicy
from swift.common.middleware import keystoneauth
from swift.common.swob import Request, Response
from swift.common.http import HTTP_FORBIDDEN
from swiftpolicy.enforcer import AclCheck
class UnmockTimeModule(object):
@ -164,7 +165,7 @@ class FakeApp(object):
class SwiftAuth(unittest.TestCase):
def setUp(self):
self.test_auth = swiftpolicy.filter_factory({})(FakeApp())
self.test_auth = keystoneauth.filter_factory({})(FakeApp())
self.test_auth.logger = FakeLogger()
def _make_request(self, path=None, headers=None, **kwargs):
@ -182,7 +183,7 @@ class SwiftAuth(unittest.TestCase):
def _get_successful_middleware(self):
response_iter = iter([('200 OK', {}, '')])
return swiftpolicy.filter_factory({})(FakeApp(response_iter))
return keystoneauth.filter_factory({})(FakeApp(response_iter))
def test_invalid_request_authorized(self):
role = self.test_auth.reseller_admin_role
@ -231,28 +232,30 @@ class SwiftAuth(unittest.TestCase):
def test_anonymous_is_not_authorized_for_unknown_reseller_prefix(self):
req = self._make_request(path='/v1/BLAH_foo/c/o',
headers={'X_IDENTITY_STATUS': 'Invalid'})
# check user is not authorized even object is "public"
req.acl = '.r:*'
resp = req.get_response(self.test_auth)
self.assertEqual(resp.status_int, 401)
# def test_blank_reseller_prefix(self):
# conf = {'reseller_prefix': ''}
# test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
# account = tenant_id = 'foo'
# self.assertTrue(test_auth._reseller_check(account, tenant_id))
def test_blank_reseller_prefix(self):
conf = {'reseller_prefix': ''}
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
account = tenant_id = 'foo'
self.assertEqual(account, test_auth._get_account_for_tenant(tenant_id))
def test_reseller_prefix_added_underscore(self):
conf = {'reseller_prefix': 'AUTH'}
test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.reseller_prefix, "AUTH_")
def test_reseller_prefix_not_added_double_underscores(self):
conf = {'reseller_prefix': 'AUTH_'}
test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.assertEqual(test_auth.reseller_prefix, "AUTH_")
def test_override_asked_for_but_not_allowed(self):
conf = {'allow_overrides': 'false'}
self.test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
req = self._make_request('/v1/AUTH_account',
environ={'swift.authorize_override': True})
resp = req.get_response(self.test_auth)
@ -260,7 +263,7 @@ class SwiftAuth(unittest.TestCase):
def test_override_asked_for_and_allowed(self):
conf = {'allow_overrides': 'true'}
self.test_auth = swiftpolicy.filter_factory(conf)(FakeApp())
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
req = self._make_request('/v1/AUTH_account',
environ={'swift.authorize_override': True})
resp = req.get_response(self.test_auth)
@ -297,7 +300,7 @@ class SwiftAuth(unittest.TestCase):
class TestAuthorize(unittest.TestCase):
def setUp(self):
self.test_auth = swiftpolicy.filter_factory({})(FakeApp())
self.test_auth = keystoneauth.filter_factory({})(FakeApp())
self.test_auth.logger = FakeLogger()
def _make_request(self, path, **kwargs):
@ -310,9 +313,7 @@ class TestAuthorize(unittest.TestCase):
identity['HTTP_X_TENANT_ID'])
def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=None):
if roles is None:
roles = []
user_id='user_id', user_name='user_name', roles=[]):
if isinstance(roles, list):
roles = ','.join(roles)
return {'HTTP_X_USER_ID': user_id,
@ -373,6 +374,7 @@ class TestAuthorize(unittest.TestCase):
self.assertTrue(req.environ.get('swift_owner'))
def test_authorize_succeeds_as_owner_for_insensitive_operator_role(self):
#import pdb; pdb.set_trace()
roles = [r.upper() for r in self.test_auth.operator_roles.split(',')]
identity = self._get_identity(roles=roles)
req = self._check_authenticate(identity=identity)
@ -449,57 +451,6 @@ class TestAuthorize(unittest.TestCase):
acl = '*:%s' % user
self._check_authenticate(identity=identity, acl=acl)
# def test_cross_tenant_authorization_success(self):
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME',
# ['tenantID:userA']),
# 'tenantID:userA')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME',
# ['tenantNAME:userA']),
# 'tenantNAME:userA')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userA']),
# '*:userA')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME',
# ['tenantID:userID']),
# 'tenantID:userID')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME',
# ['tenantNAME:userID']),
# 'tenantNAME:userID')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userID']),
# '*:userID')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantID:*']),
# 'tenantID:*')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']),
# 'tenantNAME:*')
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME', ['*:*']),
# '*:*')
# def test_cross_tenant_authorization_failure(self):
# self.assertEqual(
# self.test_auth._authorize_cross_tenant(
# 'userID', 'userA', 'tenantID', 'tenantNAME',
# ['tenantXYZ:userA']),
# None)
def test_delete_own_account_not_allowed(self):
roles = self.test_auth.operator_roles.split(',')
identity = self._get_identity(roles=roles)
@ -520,5 +471,62 @@ class TestAuthorize(unittest.TestCase):
env={'REQUEST_METHOD': 'DELETE'})
self.assertEqual(bool(req.environ.get('swift_owner')), True)
class TestAclCheckCrossTenant(unittest.TestCase):
def setUp(self):
self.cross_tenant_check = AclCheck._authorize_cross_tenant
def test_cross_tenant_authorization_success(self):
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME',
['tenantID:userA']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME',
['tenantNAME:userA']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userA']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME',
['tenantID:userID']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME',
['tenantNAME:userID']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME', ['*:userID']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantID:*']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME', ['tenantNAME:*']),
True)
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME', ['*:*']),
True)
def test_cross_tenant_authorization_failure(self):
self.assertEqual(
self.cross_tenant_check(
'userID', 'userA', 'tenantID', 'tenantNAME',
['tenantXYZ:userA']),
False)
if __name__ == '__main__':
unittest.main()