Enable testing to dynamicly leverage ACL roles

The 'existing' tests will need users with credentials other than admin
and member, and the secure-rbac tests will need different credentials
again.

As this no longer uses a cache, and is dynamically generated and passed
the actual *token* vality is not checked, and all ACL testing uses
role values.

Change-Id: Ief00e16da32a5196c21920c317c309241f2ad1cb
This commit is contained in:
Steve Baker 2020-12-21 15:45:29 +13:00 committed by Julia Kreger
parent 07bdccea58
commit 04a1f17ef8
3 changed files with 89 additions and 25 deletions

View File

@ -17,30 +17,57 @@ are blocked or allowed to be processed.
"""
import abc
from unittest import mock
import uuid
import ddt
from keystoneauth1.fixture import v3 as v3_token
from keystonemiddleware import auth_token
from oslo_config import cfg
from oslo_context import context as oslo_context
from ironic.tests.unit.api import base
from ironic.tests.unit.api import utils
from ironic.tests.unit.db import utils as db_utils
cfg.CONF.import_opt('cache', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
ADMIN_TOKEN = uuid.uuid4().hex
MEMBER_TOKEN = uuid.uuid4().hex
admin_context = oslo_context.RequestContext(
user_id=ADMIN_TOKEN,
roles=['admin', 'member', 'reader'],
)
member_context = oslo_context.RequestContext(
user_id=MEMBER_TOKEN,
roles=['member', 'reader'],
)
USERS = {
ADMIN_TOKEN: admin_context.to_dict(),
MEMBER_TOKEN: member_context.to_dict(),
}
class TestACLBase(base.BaseApiTest):
def setUp(self):
super(TestACLBase, self).setUp()
self.environ = {'fake.cache': utils.FakeMemcache()}
self.environ = {}
self.format_data = {}
self._create_test_data()
self.fake_token = None
mock_auth = mock.patch.object(
auth_token.AuthProtocol, 'process_request',
autospec=True)
self.mock_auth = mock_auth.start()
self.addCleanup(mock_auth.stop)
def _make_app(self):
cfg.CONF.set_override('cache', 'fake.cache',
group='keystone_authtoken')
cfg.CONF.set_override('auth_strategy', 'keystone')
return super(TestACLBase, self)._make_app()
@ -48,9 +75,36 @@ class TestACLBase(base.BaseApiTest):
def _create_test_data(self):
pass
def _check_skip(self, **kwargs):
if kwargs.get('skip_reason'):
self.skipTest(kwargs.get('skip_reason'))
def _fake_process_request(self, request, meow):
if self.fake_token:
request.user_token_valid = True
request.user_token = True
# is this right?!?
request.token_info = self.fake_token
request.auth_token = v3_token.Token(
user_id=self.fake_token['user'])
else:
# Because of this, the user will always get a 403 in testing, even
# if the API would normally return a 401 if a token is valid
request.user_token_valid = False
def _test_request(self, path, params=None, headers=None, method='get',
assert_status=None, assert_dict_contains=None):
path = path.format(**self.format_data)
self.mock_auth.side_effect = self._fake_process_request
if headers:
auth_token = headers.get('X-Auth-Token')
if auth_token:
auth_token = self.format_data[auth_token]
headers['X-Auth-Token'] = auth_token
self.fake_token = USERS[auth_token]
headers['X_ROLES'] = ','.join(USERS[auth_token]['roles'])
self.mock_auth.side_effect = self._fake_process_request
expect_errors = bool(assert_status)
if method == 'get':
response = self.get_json(
@ -78,8 +132,11 @@ class TestACLBasic(TestACLBase):
def _create_test_data(self):
fake_db_node = db_utils.create_test_node(chassis_id=None)
self.format_data['node_uuid'] = fake_db_node['uuid']
self.format_data['admin_token'] = ADMIN_TOKEN
self.format_data['member_token'] = MEMBER_TOKEN
@ddt.file_data('test_acl_basic.yaml')
@ddt.unpack
def test_basic(self, **kwargs):
self._check_skip(**kwargs)
self._test_request(**kwargs)

View File

@ -1,25 +1,30 @@
values:
skip_reason: For value storage
admin_headers: &admin_headers
X-Auth-Token: 'admin_token'
member_headers: &member_headers
X-Auth-Token: 'member_token'
non_authenticated:
path: &node_path '/v1/nodes/{node_uuid}'
assert_status: 401
assert_status: 403
authenticated:
path: *node_path
headers:
X-Auth-Token: &admin_token '4562138218392831'
headers: *admin_headers
assert_dict_contains:
uuid: '{node_uuid}'
driver: 'fake-hardware'
non_admin:
path: *node_path
headers:
X-Auth-Token: &member_token '4562138218392832'
headers: *member_headers
assert_status: 403
non_admin_with_admin_header:
path: *node_path
headers:
X-Auth-Token: *member_token
X-Auth-Token: 'member_token'
X-Roles: admin
assert_status: 403
@ -41,4 +46,4 @@ public_api_v1_json:
public_api_v1_xml:
path: /v1.xml
assert_status: 404
assert_status: 404

View File

@ -33,9 +33,6 @@ from ironic.tests.unit.db import utils as db_utils
ADMIN_TOKEN = '4562138218392831'
MEMBER_TOKEN = '4562138218392832'
ADMIN_TOKEN_HASH = hashlib.sha256(ADMIN_TOKEN.encode()).hexdigest()
MEMBER_TOKEN_HASH = hashlib.sha256(MEMBER_TOKEN.encode()).hexdigest()
ADMIN_BODY = {
'access': {
'token': {'id': ADMIN_TOKEN,
@ -60,21 +57,26 @@ MEMBER_BODY = {
}
}
DEFAULT_CACHE_VALUES = {
ADMIN_TOKEN: ADMIN_BODY,
MEMBER_TOKEN: MEMBER_BODY
}
class FakeMemcache(object):
"""Fake cache that is used for keystone tokens lookup."""
# NOTE(lucasagomes): keystonemiddleware >= 2.0.0 the token cache
# keys are sha256 hashes of the token key. This was introduced in
# https://review.opendev.org/#/c/186971
_cache = {
'tokens/%s' % ADMIN_TOKEN: ADMIN_BODY,
'tokens/%s' % ADMIN_TOKEN_HASH: ADMIN_BODY,
'tokens/%s' % MEMBER_TOKEN: MEMBER_BODY,
'tokens/%s' % MEMBER_TOKEN_HASH: MEMBER_BODY,
}
def __init__(self):
def __init__(self, cache_values=None):
if not cache_values:
cache_values = DEFAULT_CACHE_VALUES
self._cache = {}
for k, v in cache_values.items():
self._cache['tokens/%s' % k] = v
# NOTE(lucasagomes): keystonemiddleware >= 2.0.0 the token cache
# keys are sha256 hashes of the token key. This was introduced in
# https://review.opendev.org/#/c/186971
self._cache['tokens/%s' %
hashlib.sha256(k.encode()).hexdigest()] = v
self.set_key = None
self.set_value = None
self.token_expiration = None