Allow domain_id for roles

Roles can be domain specific but there is no current way for shade to
allow us to set that value.

Additionally the role name can be updated so this add update_role()

Change-Id: I3279f17cb8871e91fcc3aa3bd18ae8457a0016bb
This commit is contained in:
Sam Yaple 2017-08-23 22:51:15 -04:00 committed by David Shrewsbury
parent 8cda430e8b
commit 835d6555c3
3 changed files with 76 additions and 10 deletions

View File

@ -378,6 +378,7 @@ def normalize_roles(roles):
"""Normalize Identity roles."""
ret = [
dict(
domain_id=role.get('domain_id'),
id=role.get('id'),
name=role.get('name'),
) for role in roles

View File

@ -1375,9 +1375,12 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
self.list_groups.invalidate(self)
return True
def list_roles(self):
@_utils.valid_kwargs('domain_id')
def list_roles(self, **kwargs):
"""List Keystone roles.
:param domain_id: domain id for listing roles (v3)
:returns: a list of ``munch.Munch`` containing the role description.
:raises: ``OpenStackCloudException``: if something goes wrong during
@ -1386,14 +1389,16 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
v2 = self._is_client_version('identity', 2)
url = '/OS-KSADM/roles' if v2 else '/roles'
data = self._identity_client.get(
url, error_message="Failed to list roles")
url, params=kwargs, error_message="Failed to list roles")
return _utils.normalize_roles(self._get_and_munchify('roles', data))
def search_roles(self, name_or_id=None, filters=None):
@_utils.valid_kwargs('domain_id')
def search_roles(self, name_or_id=None, filters=None, **kwargs):
"""Seach Keystone roles.
:param string name: role name or id.
:param dict filters: a dict containing additional filters to use.
:param domain_id: domain id (v3)
:returns: a list of ``munch.Munch`` containing the role description.
Each ``munch.Munch`` contains the following attributes::
@ -1405,14 +1410,16 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
roles = self.list_roles()
roles = self.list_roles(**kwargs)
return _utils._filter_list(roles, name_or_id, filters)
def get_role(self, name_or_id, filters=None):
@_utils.valid_kwargs('domain_id')
def get_role(self, name_or_id, filters=None, **kwargs):
"""Get exactly one Keystone role.
:param id: role name or id.
:param filters: a dict containing additional filters to use.
:param domain_id: domain id (v3)
:returns: a single ``munch.Munch`` containing the role description.
Each ``munch.Munch`` contains the following attributes::
@ -1424,7 +1431,7 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
return _utils._get_entity(self, 'role', name_or_id, filters)
return _utils._get_entity(self, 'role', name_or_id, filters, **kwargs)
def _keystone_v2_role_assignments(self, user, project=None,
role=None, **kwargs):
@ -1686,10 +1693,12 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
return _utils.normalize_flavor_accesses(
self._get_and_munchify('flavor_access', data))
def create_role(self, name):
@_utils.valid_kwargs('domain_id')
def create_role(self, name, **kwargs):
"""Create a Keystone role.
:param string name: The name of the role.
:param domain_id: domain id (v3)
:returns: a ``munch.Munch`` containing the role description
@ -1697,23 +1706,55 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
"""
v2 = self._is_client_version('identity', 2)
url = '/OS-KSADM/roles' if v2 else '/roles'
kwargs['name'] = name
msg = 'Failed to create role {name}'.format(name=name)
data = self._identity_client.post(
url, json={'role': {'name': name}}, error_message=msg)
url, json={'role': kwargs}, error_message=msg)
role = self._get_and_munchify('role', data)
return _utils.normalize_roles([role])[0]
def delete_role(self, name_or_id):
@_utils.valid_kwargs('domain_id')
def update_role(self, name_or_id, name, **kwargs):
"""Update a Keystone role.
:param name_or_id: Name or id of the role to update
:param string name: The new role name
:param domain_id: domain id
:returns: a ``munch.Munch`` containing the role description
:raise OpenStackCloudException: if the role cannot be created
"""
if self._is_client_version('identity', 2):
raise OpenStackCloudUnavailableFeature(
'Unavailable Feature: Role update requires Identity v3'
)
kwargs['name_or_id'] = name_or_id
role = self.get_role(**kwargs)
if role is None:
self.log.debug(
"Role %s not found for updating", name_or_id)
return False
msg = 'Failed to update role {name}'.format(name=name_or_id)
json_kwargs = {'role_id': role.id, 'role': {'name': name}}
data = self._identity_client.patch('/roles', error_message=msg,
json=json_kwargs)
role = self._get_and_munchify('role', data)
return _utils.normalize_roles([role])[0]
@_utils.valid_kwargs('domain_id')
def delete_role(self, name_or_id, **kwargs):
"""Delete a Keystone role.
:param string id: Name or id of the role to delete.
:param domain_id: domain id (v3)
:returns: True if delete succeeded, False otherwise.
:raises: ``OpenStackCloudException`` if something goes wrong during
the openstack API call.
"""
role = self.get_role(name_or_id)
role = self.get_role(name_or_id, **kwargs)
if role is None:
self.log.debug(
"Role %s not found for deleting", name_or_id)

View File

@ -101,6 +101,30 @@ class TestIdentityRoles(base.RequestsMockTestCase):
self.assertThat(role.id, matchers.Equals(role_data.role_id))
self.assert_calls()
def test_update_role(self):
role_data = self._get_role_data()
req = {'role_id': role_data.role_id,
'role': {'name': role_data.role_name}}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(),
status_code=200,
json={'roles': [role_data.json_response['role']]}),
dict(method='PATCH',
uri=self.get_mock_url(),
status_code=200,
json=role_data.json_response,
validate=dict(json=req))
])
role = self.op_cloud.update_role(role_data.role_id,
role_data.role_name)
self.assertIsNotNone(role)
self.assertThat(role.name, matchers.Equals(role_data.role_name))
self.assertThat(role.id, matchers.Equals(role_data.role_id))
self.assert_calls()
def test_delete_role_by_id(self):
role_data = self._get_role_data()
self.register_uris([