diff --git a/releasenotes/notes/grant-revoke-assignments-231d3f9596a1ae75.yaml b/releasenotes/notes/grant-revoke-assignments-231d3f9596a1ae75.yaml new file mode 100644 index 000000000..9776030ca --- /dev/null +++ b/releasenotes/notes/grant-revoke-assignments-231d3f9596a1ae75.yaml @@ -0,0 +1,3 @@ +--- +features: + - add granting and revoking of roles from groups and users diff --git a/shade/_tasks.py b/shade/_tasks.py index 6e231bdbb..d4b23c4da 100644 --- a/shade/_tasks.py +++ b/shade/_tasks.py @@ -687,6 +687,26 @@ class RoleDelete(task_manager.Task): return client.keystone_client.roles.delete(**self.args) +class RoleAddUser(task_manager.Task): + def main(self, client): + return client.keystone_client.roles.add_user_role(**self.args) + + +class RoleGrantUser(task_manager.Task): + def main(self, client): + return client.keystone_client.roles.grant(**self.args) + + +class RoleRemoveUser(task_manager.Task): + def main(self, client): + return client.keystone_client.roles.remove_user_role(**self.args) + + +class RoleRevokeUser(task_manager.Task): + def main(self, client): + return client.keystone_client.roles.revoke(**self.args) + + class RoleAssignmentList(task_manager.Task): def main(self, client): return client.keystone_client.role_assignments.list(**self.args) diff --git a/shade/_utils.py b/shade/_utils.py index f846f9179..93d0bc938 100644 --- a/shade/_utils.py +++ b/shade/_utils.py @@ -450,6 +450,17 @@ def normalize_role_assignments(assignments): return new_assignments +def normalize_roles(roles): + """Normalize Identity roles.""" + ret = [ + dict( + id=role.get('id'), + name=role.get('name'), + ) for role in roles + ] + return meta.obj_list_to_dict(ret) + + def valid_kwargs(*valid_args): # This decorator checks if argument passed as **kwargs to a function are # present in valid_args. diff --git a/shade/operatorcloud.py b/shade/operatorcloud.py index 269bce942..53114a144 100644 --- a/shade/operatorcloud.py +++ b/shade/operatorcloud.py @@ -1545,6 +1545,161 @@ class OperatorCloud(openstackcloud.OpenStackCloud): return True + def _get_grant_revoke_params(self, role, user=None, group=None, + project=None, domain=None): + role = self.get_role(role) + if role is None: + return {} + data = {'role': role.id} + + # domain and group not available in keystone v2.0 + keystone_version = self.cloud_config.get_api_version('identity') + is_keystone_v2 = keystone_version.startswith('2') + + filters = {} + if not is_keystone_v2 and domain: + filters['domain_id'] = data['domain'] = \ + self.get_domain(domain)['id'] + + if user: + data['user'] = self.get_user(user, filters=filters) + + if project: + # drop domain in favor of project + data.pop('domain', None) + data['project'] = self.get_project(project, filters=filters) + + if not is_keystone_v2 and group: + data['group'] = self.get_group(group, filters=filters) + + return data + + def grant_role(self, name_or_id, user=None, group=None, + project=None, domain=None, wait=False, timeout=60): + """Grant a role to a user. + + :param string name_or_id: The name or id of the role. + :param string user: The name or id of the user. + :param string group: The name or id of the group. (v3) + :param string project: The name or id of the project. + :param string domain: The name or id of the domain. (v3) + :param bool wait: Wait for role to be granted + :param int timeout: Timeout to wait for role to be granted + + NOTE: for wait and timeout, sometimes granting roles is not + instantaneous for granting roles. + + NOTE: project is required for keystone v2 + + :returns: True if the role is assigned, otherwise False + + :raise OpenStackCloudException: if the role cannot be granted + """ + data = self._get_grant_revoke_params(name_or_id, user, group, + project, domain) + filters = data.copy() + if not data: + raise OpenStackCloudException( + 'Role {0} not found.'.format(name_or_id)) + + if data.get('user') is not None and data.get('group') is not None: + raise OpenStackCloudException( + 'Specify either a group or a user, not both') + if data.get('user') is None and data.get('group') is None: + raise OpenStackCloudException( + 'Must specify either a user or a group') + if self.cloud_config.get_api_version('identity').startswith('2') and \ + data.get('project') is None: + raise OpenStackCloudException( + 'Must specify project for keystone v2') + + if self.list_role_assignments(filters=filters): + self.log.debug('Assignment already exists') + return False + + with _utils.shade_exceptions( + "Error granting access to role: {0}".format( + data)): + if self.cloud_config.get_api_version('identity').startswith('2'): + data['tenant'] = data.pop('project') + self.manager.submitTask(_tasks.RoleAddUser(**data)) + else: + if data.get('project') is None and data.get('domain') is None: + raise OpenStackCloudException( + 'Must specify either a domain or project') + self.manager.submitTask(_tasks.RoleGrantUser(**data)) + if wait: + for count in _utils._iterate_timeout( + timeout, + "Timeout waiting for role to be granted"): + if self.list_role_assignments(filters=filters): + break + return True + + def revoke_role(self, name_or_id, user=None, group=None, + project=None, domain=None, wait=False, timeout=60): + """Revoke a role from a user. + + :param string name_or_id: The name or id of the role. + :param string user: The name or id of the user. + :param string group: The name or id of the group. (v3) + :param string project: The name or id of the project. + :param string domain: The id of the domain. (v3) + :param bool wait: Wait for role to be revoked + :param int timeout: Timeout to wait for role to be revoked + + NOTE: for wait and timeout, sometimes revoking roles is not + instantaneous for revoking roles. + + NOTE: project is required for keystone v2 + + :returns: True if the role is revoke, otherwise False + + :raise OpenStackCloudException: if the role cannot be removed + """ + data = self._get_grant_revoke_params(name_or_id, user, group, + project, domain) + filters = data.copy() + + if not data: + raise OpenStackCloudException( + 'Role {0} not found.'.format(name_or_id)) + + if data.get('user') is not None and data.get('group') is not None: + raise OpenStackCloudException( + 'Specify either a group or a user, not both') + if data.get('user') is None and data.get('group') is None: + raise OpenStackCloudException( + 'Must specify either a user or a group') + if self.cloud_config.get_api_version('identity').startswith('2') and \ + data.get('project') is None: + raise OpenStackCloudException( + 'Must specify project for keystone v2') + + if not self.list_role_assignments(filters=filters): + self.log.debug('Assignment does not exist') + return False + + with _utils.shade_exceptions( + "Error revoking access to role: {0}".format( + data)): + if self.cloud_config.get_api_version('identity').startswith('2'): + data['tenant'] = data.pop('project') + self.manager.submitTask(_tasks.RoleRemoveUser(**data)) + else: + if data.get('project') is None \ + and data.get('domain') is None: + raise OpenStackCloudException( + 'Must specify either a domain or project') + self.manager.submitTask(_tasks.RoleRevokeUser(**data)) + if wait: + for count in _utils._iterate_timeout( + timeout, + "Timeout waiting for role to be revoked"): + if not self.list_role_assignments(filters=filters): + break + return True + def list_hypervisors(self): """List all hypervisors diff --git a/shade/tests/fakes.py b/shade/tests/fakes.py index f3c07224c..f2bf77ba3 100644 --- a/shade/tests/fakes.py +++ b/shade/tests/fakes.py @@ -60,8 +60,9 @@ class FakeImage(object): class FakeProject(object): - def __init__(self, id): + def __init__(self, id, domain_id=None): self.id = id + self.domain_id = domain_id or 'default' class FakeServer(object): @@ -96,10 +97,12 @@ class FakeService(object): class FakeUser(object): - def __init__(self, id, email, name): + def __init__(self, id, email, name, domain_id=None): self.id = id self.email = email self.name = name + if domain_id is not None: + self.domain_id = domain_id class FakeVolume(object): @@ -196,11 +199,11 @@ class FakeRole(object): class FakeGroup(object): - def __init__(self, id, name, description, domain=None): + def __init__(self, id, name, description, domain_id=None): self.id = id self.name = name self.description = description - self.domain = domain + self.domain_id = domain_id or 'default' class FakeHypervisor(object): diff --git a/shade/tests/functional/test_identity.py b/shade/tests/functional/test_identity.py index 1ad582540..40dbd0b2b 100644 --- a/shade/tests/functional/test_identity.py +++ b/shade/tests/functional/test_identity.py @@ -33,8 +33,44 @@ class TestIdentity(base.TestCase): self.cloud = operator_cloud(cloud='devstack-admin') self.role_prefix = 'test_role' + ''.join( random.choice(string.ascii_lowercase) for _ in range(5)) + self.user_prefix = self.getUniqueString('user') + self.group_prefix = self.getUniqueString('group') + + self.addCleanup(self._cleanup_users) + self.identity_version = \ + self.cloud.cloud_config.get_api_version('identity') + if self.identity_version not in ('2', '2.0'): + self.addCleanup(self._cleanup_groups) self.addCleanup(self._cleanup_roles) + def _cleanup_groups(self): + exception_list = list() + for group in self.cloud.list_groups(): + if group['name'].startswith(self.group_prefix): + try: + self.cloud.delete_group(group['id']) + except Exception as e: + exception_list.append(str(e)) + continue + + if exception_list: + # Raise an error: we must make users aware that something went + # wrong + raise OpenStackCloudException('\n'.join(exception_list)) + + def _cleanup_users(self): + exception_list = list() + for user in self.cloud.list_users(): + if user['name'].startswith(self.user_prefix): + try: + self.cloud.delete_user(user['id']) + except Exception as e: + exception_list.append(str(e)) + continue + + if exception_list: + raise OpenStackCloudException('\n'.join(exception_list)) + def _cleanup_roles(self): exception_list = list() for role in self.cloud.list_roles(): @@ -48,6 +84,13 @@ class TestIdentity(base.TestCase): if exception_list: raise OpenStackCloudException('\n'.join(exception_list)) + def _create_user(self, **kwargs): + domain_id = None + if self.identity_version not in ('2', '2.0'): + domain = self.cloud.get_domain('default') + domain_id = domain['id'] + return self.cloud.create_user(domain_id=domain_id, **kwargs) + def test_list_roles(self): roles = self.cloud.list_roles() self.assertIsNotNone(roles) @@ -84,7 +127,7 @@ class TestIdentity(base.TestCase): # need to make this test a little more specific, and add more for testing # filtering functionality. def test_list_role_assignments(self): - if self.cloud.cloud_config.get_api_version('identity') in ('2', '2.0'): + if self.identity_version in ('2', '2.0'): self.skipTest("Identity service does not support role assignments") assignments = self.cloud.list_role_assignments() self.assertIsInstance(assignments, list) @@ -94,6 +137,118 @@ class TestIdentity(base.TestCase): user = self.cloud.get_user('demo') project = self.cloud.get_project('demo') assignments = self.cloud.list_role_assignments( - filters={'user': user.id, 'project': project.id}) + filters={'user': user['id'], 'project': project['id']}) self.assertIsInstance(assignments, list) self.assertTrue(len(assignments) > 0) + + def test_grant_revoke_role_user_project(self): + user_name = self.user_prefix + '_user_project' + user_email = 'nobody@nowhere.com' + role_name = self.role_prefix + '_grant_user_project' + role = self.cloud.create_role(role_name) + user = self._create_user(name=user_name, + email=user_email, + default_project='demo') + self.assertTrue(self.cloud.grant_role( + role_name, user=user['id'], project='demo', wait=True)) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'user': user['id'], + 'project': self.cloud.get_project('demo')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 1) + self.assertTrue(self.cloud.revoke_role( + role_name, user=user['id'], project='demo', wait=True)) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'user': user['id'], + 'project': self.cloud.get_project('demo')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 0) + + def test_grant_revoke_role_group_project(self): + if self.identity_version in ('2', '2.0'): + self.skipTest("Identity service does not support group") + role_name = self.role_prefix + '_grant_group_project' + role = self.cloud.create_role(role_name) + group_name = self.group_prefix + '_group_project' + group = self.cloud.create_group(name=group_name, + description='test group', + domain='default') + self.assertTrue(self.cloud.grant_role( + role_name, group=group['id'], project='demo')) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'group': group['id'], + 'project': self.cloud.get_project('demo')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 1) + self.assertTrue(self.cloud.revoke_role( + role_name, group=group['id'], project='demo')) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'group': group['id'], + 'project': self.cloud.get_project('demo')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 0) + + def test_grant_revoke_role_user_domain(self): + if self.identity_version in ('2', '2.0'): + self.skipTest("Identity service does not support domain") + role_name = self.role_prefix + '_grant_user_domain' + role = self.cloud.create_role(role_name) + user_name = self.user_prefix + '_user_domain' + user_email = 'nobody@nowhere.com' + user = self._create_user(name=user_name, + email=user_email, + default_project='demo') + self.assertTrue(self.cloud.grant_role( + role_name, user=user['id'], domain='default')) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'user': user['id'], + 'domain': self.cloud.get_domain('default')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 1) + self.assertTrue(self.cloud.revoke_role( + role_name, user=user['id'], domain='default')) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'user': user['id'], + 'domain': self.cloud.get_domain('default')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 0) + + def test_grant_revoke_role_group_domain(self): + if self.identity_version in ('2', '2.0'): + self.skipTest("Identity service does not support domain or group") + role_name = self.role_prefix + '_grant_group_domain' + role = self.cloud.create_role(role_name) + group_name = self.group_prefix + '_group_domain' + group = self.cloud.create_group(name=group_name, + description='test group', + domain='default') + self.assertTrue(self.cloud.grant_role( + role_name, group=group['id'], domain='default')) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'group': group['id'], + 'domain': self.cloud.get_domain('default')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 1) + self.assertTrue(self.cloud.revoke_role( + role_name, group=group['id'], domain='default')) + assignments = self.cloud.list_role_assignments({ + 'role': role['id'], + 'group': group['id'], + 'domain': self.cloud.get_domain('default')['id'] + }) + self.assertIsInstance(assignments, list) + self.assertTrue(len(assignments) == 0) diff --git a/shade/tests/unit/test_role_assignment.py b/shade/tests/unit/test_role_assignment.py new file mode 100644 index 000000000..5263633dc --- /dev/null +++ b/shade/tests/unit/test_role_assignment.py @@ -0,0 +1,859 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import patch +import os_client_config as occ +from shade import OperatorCloud, operator_cloud +from shade.exc import OpenStackCloudException, OpenStackCloudTimeout +from shade.meta import obj_to_dict +from shade.tests import base, fakes +import testtools + + +class TestRoleAssignment(base.TestCase): + + def setUp(self): + super(TestRoleAssignment, self).setUp() + self.cloud = operator_cloud(validate=False) + self.fake_role = obj_to_dict(fakes.FakeRole('12345', 'test')) + self.fake_user = obj_to_dict(fakes.FakeUser('12345', + 'test@nobody.org', + 'test', + domain_id='test-domain')) + self.fake_group = obj_to_dict(fakes.FakeGroup('12345', + 'test', + 'test group', + domain_id='test-domain')) + self.fake_project = obj_to_dict( + fakes.FakeProject('12345', domain_id='test-domain')) + self.fake_domain = obj_to_dict(fakes.FakeDomain('test-domain', + 'test', + 'test domain', + enabled=True)) + self.user_project_assignment = obj_to_dict({ + 'role': { + 'id': self.fake_role['id'] + }, + 'scope': { + 'project': { + 'id': self.fake_project['id'] + } + }, + 'user': { + 'id': self.fake_user['id'] + } + }) + self.group_project_assignment = obj_to_dict({ + 'role': { + 'id': self.fake_role['id'] + }, + 'scope': { + 'project': { + 'id': self.fake_project['id'] + } + }, + 'group': { + 'id': self.fake_group['id'] + } + }) + self.user_domain_assignment = obj_to_dict({ + 'role': { + 'id': self.fake_role['id'] + }, + 'scope': { + 'domain': { + 'id': self.fake_domain['id'] + } + }, + 'user': { + 'id': self.fake_user['id'] + } + }) + self.group_domain_assignment = obj_to_dict({ + 'role': { + 'id': self.fake_role['id'] + }, + 'scope': { + 'domain': { + 'id': self.fake_domain['id'] + } + }, + 'group': { + 'id': self.fake_group['id'] + } + }) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_v2(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.roles.roles_for_user.return_value = [] + mock_keystone.roles.add_user_role.return_value = self.fake_role + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_project_v2(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.return_value = [] + mock_keystone.roles.add_user_role.return_value = self.fake_role + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.grant_role(self.fake_role['id'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.grant_role(self.fake_role['id'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_project_v2_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.return_value = [self.fake_role] + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_project(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.role_assignments.list.return_value = [] + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_project_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.role_assignments.list.return_value = \ + [self.user_project_assignment] + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['id'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_group_project(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.role_assignments.list.return_value = [] + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_group_project_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.role_assignments.list.return_value = \ + [self.group_project_assignment] + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['name'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_domain(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = [] + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['id'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['id'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['name'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_domain_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = \ + [self.user_domain_assignment] + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['id'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_group_domain(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = [] + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['id'])) + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_group_domain_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = \ + [self.group_domain_assignment] + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['id'])) + self.assertFalse(self.cloud.grant_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_v2(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.roles.roles_for_user.return_value = [self.fake_role] + mock_keystone.roles.remove_user_role.return_value = self.fake_role + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_project_v2(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.return_value = [] + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['id'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['id'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_project_v2_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.return_value = [self.fake_role] + mock_keystone.roles.remove_user_role.return_value = self.fake_role + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_project(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.role_assignments.list.return_value = [] + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_project_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.role_assignments.list.return_value = \ + [self.user_project_assignment] + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['id'], + user=self.fake_user['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_group_project(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.role_assignments.list.return_value = [] + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['name'], + project=self.fake_project['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_group_project_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.role_assignments.list.return_value = \ + [self.group_project_assignment] + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['name'], + project=self.fake_project['id'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['id'], + project=self.fake_project['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_domain(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = [] + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['name'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_domain_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = \ + [self.user_domain_assignment] + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + domain=self.fake_domain['id'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['id'], + domain=self.fake_domain['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_group_domain(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = [] + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['name'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['id'])) + self.assertFalse(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_group_domain_exists(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.groups.list.return_value = [self.fake_group] + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = \ + [self.group_domain_assignment] + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['name'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['id'])) + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + group=self.fake_group['id'], + domain=self.fake_domain['id'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_no_role(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [] + + with testtools.ExpectedException( + OpenStackCloudException, + 'Role {0} not found'.format(self.fake_role['name']) + ): + self.cloud.grant_role(self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_no_role(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [] + with testtools.ExpectedException( + OpenStackCloudException, + 'Role {0} not found'.format(self.fake_role['name']) + ): + self.cloud.revoke_role(self.fake_role['name'], + group=self.fake_group['name'], + domain=self.fake_domain['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_no_user_or_group_specified(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + with testtools.ExpectedException( + OpenStackCloudException, + 'Must specify either a user or a group' + ): + self.cloud.grant_role(self.fake_role['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_no_user_or_group_specified(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + with testtools.ExpectedException( + OpenStackCloudException, + 'Must specify either a user or a group' + ): + self.cloud.revoke_role(self.fake_role['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_no_user_or_group(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [] + with testtools.ExpectedException( + OpenStackCloudException, + 'Must specify either a user or a group' + ): + self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_no_user_or_group(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [] + with testtools.ExpectedException( + OpenStackCloudException, + 'Must specify either a user or a group' + ): + self.cloud.revoke_role(self.fake_role['name'], + user=self.fake_user['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_both_user_and_group(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.groups.list.return_value = [self.fake_group] + with testtools.ExpectedException( + OpenStackCloudException, + 'Specify either a group or a user, not both' + ): + self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + group=self.fake_group['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_both_user_and_group(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.groups.list.return_value = [self.fake_group] + with testtools.ExpectedException( + OpenStackCloudException, + 'Specify either a group or a user, not both' + ): + self.cloud.revoke_role(self.fake_role['name'], + user=self.fake_user['name'], + group=self.fake_group['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_both_project_and_domain(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + fake_user2 = fakes.FakeUser('12345', + 'test@nobody.org', + 'test', + domain_id='default') + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user, fake_user2] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.domains.get.return_value = self.fake_domain + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'], + domain=self.fake_domain['name'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_both_project_and_domain(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + fake_user2 = fakes.FakeUser('12345', + 'test@nobody.org', + 'test', + domain_id='default') + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user, fake_user2] + mock_keystone.projects.list.return_value = [self.fake_project] + mock_keystone.domains.get.return_value = self.fake_domain + mock_keystone.role_assignments.list.return_value = \ + [self.user_project_assignment] + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'], + domain=self.fake_domain['name'])) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_no_project_or_domain(self, mock_keystone, mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.projects.list.return_value = [] + mock_keystone.domains.get.return_value = None + with testtools.ExpectedException( + OpenStackCloudException, + 'Must specify either a domain or project' + ): + self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_no_project_or_domain(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.projects.list.return_value = [] + mock_keystone.domains.get.return_value = None + mock_keystone.role_assignments.list.return_value = \ + [self.user_project_assignment] + with testtools.ExpectedException( + OpenStackCloudException, + 'Must specify either a domain or project' + ): + self.cloud.revoke_role(self.fake_role['name'], + user=self.fake_user['name']) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_bad_domain_exception(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.side_effect = Exception('test') + with testtools.ExpectedException( + OpenStackCloudException, + 'Failed to get domain baddomain \(Inner Exception: test\)' + ): + self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + domain='baddomain') + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_bad_domain_exception(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '3' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.domains.get.side_effect = Exception('test') + with testtools.ExpectedException( + OpenStackCloudException, + 'Failed to get domain baddomain \(Inner Exception: test\)' + ): + self.cloud.revoke_role(self.fake_role['name'], + user=self.fake_user['name'], + domain='baddomain') + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_project_v2_wait(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.side_effect = [ + [], [], [self.fake_role]] + mock_keystone.roles.add_user_role.return_value = self.fake_role + self.assertTrue(self.cloud.grant_role(self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'], + wait=True)) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_grant_role_user_project_v2_wait_exception(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.side_effect = [ + [], [], [self.fake_role]] + mock_keystone.roles.add_user_role.return_value = self.fake_role + + with testtools.ExpectedException( + OpenStackCloudTimeout, + 'Timeout waiting for role to be granted' + ): + self.assertTrue(self.cloud.grant_role( + self.fake_role['name'], user=self.fake_user['name'], + project=self.fake_project['id'], wait=True, timeout=1)) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_project_v2_wait(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.side_effect = [ + [self.fake_role], [self.fake_role], + []] + mock_keystone.roles.remove_user_role.return_value = self.fake_role + self.assertTrue(self.cloud.revoke_role(self.fake_role['name'], + user=self.fake_user['name'], + project=self.fake_project['id'], + wait=True)) + + @patch.object(occ.cloud_config.CloudConfig, 'get_api_version') + @patch.object(OperatorCloud, 'keystone_client') + def test_revoke_role_user_project_v2_wait_exception(self, + mock_keystone, + mock_api_version): + mock_api_version.return_value = '2.0' + mock_keystone.roles.list.return_value = [self.fake_role] + mock_keystone.tenants.list.return_value = [self.fake_project] + mock_keystone.users.list.return_value = [self.fake_user] + mock_keystone.roles.roles_for_user.side_effect = [ + [self.fake_role], [self.fake_role], + []] + mock_keystone.roles.remove_user_role.return_value = self.fake_role + with testtools.ExpectedException( + OpenStackCloudTimeout, + 'Timeout waiting for role to be revoked' + ): + self.assertTrue(self.cloud.revoke_role( + self.fake_role['name'], user=self.fake_user['name'], + project=self.fake_project['id'], wait=True, timeout=1))