Improve Keystone v3 token support

Now keystoneauth uses HTTP_X_TENANT_NAME/ID only even if v3 token uses for
backward compatibility. There is no problem with current behavior because
keystonemiddleware set same value on the headers but the headers are
specified as deprecated so this patch allows to support HTTP_X_PROJECT_NAME/ID
in addition to HTTP_X_TENANT_NAME/ID.

Change-Id: Ie5e02067a59e18f1ac215f51429863bdd42f729f
This commit is contained in:
Hisashi Osanai 2015-07-14 15:13:06 +09:00
parent 1ac9ceb548
commit 5cd57dc357
2 changed files with 95 additions and 16 deletions

View File

@ -243,8 +243,10 @@ class KeystoneAuth(object):
service_roles = list_from_csv(environ.get('HTTP_X_SERVICE_ROLES', ''))
identity = {'user': (environ.get('HTTP_X_USER_ID'),
environ.get('HTTP_X_USER_NAME')),
'tenant': (environ.get('HTTP_X_TENANT_ID'),
environ.get('HTTP_X_TENANT_NAME')),
'tenant': (environ.get('HTTP_X_PROJECT_ID',
environ.get('HTTP_X_TENANT_ID')),
environ.get('HTTP_X_PROJECT_NAME',
environ.get('HTTP_X_TENANT_NAME'))),
'roles': roles,
'service_roles': service_roles}
token_info = environ.get('keystone.token_info', {})

View File

@ -550,7 +550,8 @@ class BaseTestAuthorize(unittest.TestCase):
if not identity:
identity = self._get_identity()
return get_account_for_tenant(self.test_auth,
identity['HTTP_X_TENANT_ID'])
identity.get('HTTP_X_PROJECT_ID') or
identity.get('HTTP_X_TENANT_ID'))
def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=None,
@ -564,13 +565,20 @@ class BaseTestAuthorize(unittest.TestCase):
'HTTP_X_USER_NAME': user_name,
'HTTP_X_USER_DOMAIN_NAME': user_domain_name,
'HTTP_X_USER_DOMAIN_ID': user_domain_id,
'HTTP_X_TENANT_ID': tenant_id,
'HTTP_X_TENANT_NAME': tenant_name,
'HTTP_X_PROJECT_ID': tenant_id,
'HTTP_X_PROJECT_NAME': tenant_name,
'HTTP_X_PROJECT_DOMAIN_ID': project_domain_id,
'HTTP_X_PROJECT_DOMAIN_NAME': project_domain_name,
'HTTP_X_ROLES': roles,
'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
def _get_identity_for_v2(self, **kwargs):
identity = self._get_identity(**kwargs)
for suffix in ['ID', 'NAME']:
identity['HTTP_X_TENANT_{0}'.format(suffix)] = identity.pop(
'HTTP_X_PROJECT_{0}'.format(suffix))
return identity
def _get_env_id(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=[],
project_domain_name='domA', project_domain_id='99',
@ -597,7 +605,8 @@ class TestAuthorize(BaseTestAuthorize):
# fake cached account info
info_key = get_cache_key(account)
default_env = {
'REMOTE_USER': identity['HTTP_X_TENANT_ID'],
'REMOTE_USER': (identity.get('HTTP_X_PROJECT_ID') or
identity.get('HTTP_X_TENANT_ID')),
'swift.infocache': {info_key: {'status': 200, 'sysmeta': {}}}}
default_env.update(identity)
if env:
@ -689,7 +698,7 @@ class TestAuthorize(BaseTestAuthorize):
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_name_user_in_roles(self):
identity = self._get_identity()
identity = self._get_identity_for_v2()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_name = identity['HTTP_X_TENANT_NAME']
@ -697,15 +706,33 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (tenant_name, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
def test_authorize_succeeds_for_project_name_user_in_roles(self):
identity = self._get_identity()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
project_name = identity['HTTP_X_PROJECT_NAME']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (project_name, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
identity = self._get_identity_for_v2()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_id = identity['HTTP_X_TENANT_ID']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (tenant_id, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_project_id_user_in_roles(self):
identity = self._get_identity()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
project_id = identity['HTTP_X_PROJECT_ID']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (project_id, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self):
identity = self._get_identity()
user_name = identity['HTTP_X_USER_NAME']
@ -844,8 +871,8 @@ class TestAuthorize(BaseTestAuthorize):
self.assertEqual(authorize_resp, None)
def test_names_disallowed_in_acls_outside_default_domain(self):
id = self._get_identity(user_domain_id='non-default',
project_domain_id='non-default')
id = self._get_identity_for_v2(user_domain_id='non-default',
project_domain_id='non-default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
@ -859,9 +886,23 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='non-default',
project_domain_id='non-default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_names_allowed_in_acls_inside_default_domain(self):
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
id = self._get_identity_for_v2(user_domain_id='default',
project_domain_id='default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
@ -872,12 +913,23 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_names_allowed_in_acls_inside_default_domain_with_config(self):
conf = {'allow_names_in_acls': 'yes'}
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.test_auth.logger = FakeLogger()
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
id = self._get_identity_for_v2(user_domain_id='default',
project_domain_id='default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
@ -888,12 +940,23 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_names_disallowed_in_acls_inside_default_domain(self):
conf = {'allow_names_in_acls': 'false'}
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.test_auth.logger = FakeLogger()
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
id = self._get_identity_for_v2(user_domain_id='default',
project_domain_id='default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
@ -907,6 +970,20 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_keystone_identity(self):
user = ('U_ID', 'U_NAME')
roles = ('ROLE1', 'ROLE2')