Add CRUD methods for keystone groups

This replaces review Ia25b76c38c3463c963a5ac2ae1c81fcd33dc591b.

Change-Id: I9a6a28994d076f032fe27624cdb3b0fbe248acef
Co-Authored-By: Monty Taylor <mordred@inaugust.com>
This commit is contained in:
David Shrewsbury 2015-11-06 16:10:27 -05:00
parent a1aed4e9eb
commit 89dd35301c
6 changed files with 332 additions and 0 deletions

View File

@ -627,6 +627,26 @@ class DomainDelete(task_manager.Task):
return client.keystone_client.domains.delete(**self.args)
class GroupList(task_manager.Task):
def main(self, client):
return client.keystone_client.groups.list()
class GroupCreate(task_manager.Task):
def main(self, client):
return client.keystone_client.groups.create(**self.args)
class GroupDelete(task_manager.Task):
def main(self, client):
return client.keystone_client.groups.delete(**self.args)
class GroupUpdate(task_manager.Task):
def main(self, client):
return client.keystone_client.groups.update(**self.args)
class ZoneList(task_manager.Task):
def main(self, client):
return client.designate_client.domains.list()

View File

@ -319,6 +319,19 @@ def normalize_domains(domains):
return meta.obj_list_to_dict(ret)
def normalize_groups(domains):
"""Normalize Identity groups."""
ret = [
dict(
id=domain.get('id'),
name=domain.get('name'),
description=domain.get('description'),
domain_id=domain.get('domain_id'),
) for domain in domains
]
return meta.obj_list_to_dict(ret)
def valid_kwargs(*valid_args):
# This decorator checks if argument passed as **kwargs to a function are
# present in valid_args.

View File

@ -1117,6 +1117,128 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
_tasks.DomainGet(domain=domain_id))
return _utils.normalize_domains([domain])[0]
@_utils.cache_on_arguments()
def list_groups(self):
"""List Keystone Groups.
:returns: A list of dicts containing the group description.
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
with _utils.shade_exceptions("Failed to list groups"):
groups = self.manager.submitTask(_tasks.GroupList())
return _utils.normalize_groups(groups)
def search_groups(self, name_or_id=None, filters=None):
"""Search Keystone groups.
:param name: Group name or id.
:param filters: A dict containing additional filters to use.
:returns: A list of dict containing the group description.
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
groups = self.list_groups()
return _utils._filter_list(groups, name_or_id, filters)
def get_group(self, name_or_id, filters=None):
"""Get exactly one Keystone group.
:param id: Group name or id.
:param filters: A dict containing additional filters to use.
:returns: A dict containing the group description.
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
return _utils._get_entity(self.search_groups, name_or_id, filters)
def create_group(self, name, description, domain=None):
"""Create a group.
:param string name: Group name.
:param string description: Group description.
:param string domain: Domain name or ID for the group.
:returns: A dict containing the group description.
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
with _utils.shade_exceptions(
"Error creating group {group}".format(group=name)
):
domain_id = None
if domain:
dom = self.get_domain(domain)
if not dom:
raise OpenStackCloudException(
"Creating group {group} failed: Invalid domain "
"{domain}".format(group=name, domain=domain)
)
domain_id = dom['id']
group = self.manager.submitTask(_tasks.GroupCreate(
name=name, description=description, domain=domain_id)
)
self.list_groups.invalidate(self)
return _utils.normalize_groups([group])[0]
def update_group(self, name_or_id, name=None, description=None):
"""Update an existing group
:param string name: New group name.
:param string description: New group description.
:returns: A dict containing the group description.
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
self.list_groups.invalidate(self)
group = self.get_group(name_or_id)
if group is None:
raise OpenStackCloudException(
"Group {0} not found for updating".format(name_or_id)
)
with _utils.shade_exceptions(
"Unable to update group {name}".format(name=name_or_id)
):
group = self.manager.submitTask(_tasks.GroupUpdate(
group=group['id'], name=name, description=description))
self.list_groups.invalidate(self)
return _utils.normalize_groups([group])[0]
def delete_group(self, name_or_id):
"""Delete a group
:param name_or_id: ID or name of the group to delete.
:returns: True if delete succeeded, False otherwise.
:raises: ``OpenStackCloudException``: if something goes wrong during
the openstack API call.
"""
group = self.get_group(name_or_id)
if group is None:
self.log.debug(
"Group {0} not found for deleting".format(name_or_id))
return False
with _utils.shade_exceptions(
"Unable to delete group {name}".format(name=name_or_id)
):
self.manager.submitTask(_tasks.GroupDelete(group=group['id']))
self.list_groups.invalidate(self)
return True
def list_roles(self):
"""List Keystone roles.

View File

@ -170,3 +170,11 @@ class FakeRole(object):
def __init__(self, id, name):
self.id = id
self.name = name
class FakeGroup(object):
def __init__(self, id, name, description, domain=None):
self.id = id
self.name = name
self.description = description
self.domain = domain

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test_groups
----------------------------------
Functional tests for `shade` keystone group resource.
"""
import shade
from shade.tests import base
class TestGroup(base.TestCase):
def setUp(self):
super(TestGroup, self).setUp()
self.cloud = shade.operator_cloud(cloud='devstack-admin')
if self.cloud.cloud_config.get_api_version('identity') in ('2', '2.0'):
self.skipTest('Identity service does not support groups')
self.group_prefix = self.getUniqueString('group')
self.addCleanup(self._cleanup_groups)
def _cleanup_groups(self):
exception_list = list()
for group in self.cloud.list_groups():
if group['name'].startswith(self.group_prefix):
try:
self.cloud.delete_group(group['id'])
except Exception as e:
exception_list.append(str(e))
continue
if exception_list:
# Raise an error: we must make users aware that something went
# wrong
raise shade.OpenStackCloudException('\n'.join(exception_list))
def test_create_group(self):
group_name = self.group_prefix + '_create'
group = self.cloud.create_group(group_name, 'test group')
for key in ('id', 'name', 'description', 'domain_id'):
self.assertIn(key, group)
self.assertEqual(group_name, group['name'])
self.assertEqual('test group', group['description'])
def test_delete_group(self):
group_name = self.group_prefix + '_delete'
group = self.cloud.create_group(group_name, 'test group')
self.assertIsNotNone(group)
self.assertTrue(self.cloud.delete_group(group_name))
results = self.cloud.search_groups(filters=dict(name=group_name))
self.assertEqual(0, len(results))
def test_delete_group_not_exists(self):
self.assertFalse(self.cloud.delete_group('xInvalidGroupx'))
def test_search_groups(self):
group_name = self.group_prefix + '_search'
# Shouldn't find any group with this name yet
results = self.cloud.search_groups(filters=dict(name=group_name))
self.assertEqual(0, len(results))
# Now create a new group
group = self.cloud.create_group(group_name, 'test group')
self.assertEqual(group_name, group['name'])
# Now we should find only the new group
results = self.cloud.search_groups(filters=dict(name=group_name))
self.assertEqual(1, len(results))
self.assertEqual(group_name, results[0]['name'])
def test_update_group(self):
group_name = self.group_prefix + '_update'
group_desc = 'test group'
group = self.cloud.create_group(group_name, group_desc)
self.assertEqual(group_name, group['name'])
self.assertEqual(group_desc, group['description'])
updated_group_name = group_name + '_xyz'
updated_group_desc = group_desc + ' updated'
updated_group = self.cloud.update_group(
group_name,
name=updated_group_name,
description=updated_group_desc)
self.assertEqual(updated_group_name, updated_group['name'])
self.assertEqual(updated_group_desc, updated_group['description'])

View File

@ -0,0 +1,64 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import shade
from shade.tests.unit import base
from shade.tests import fakes
class TestGroups(base.TestCase):
def setUp(self):
super(TestGroups, self).setUp()
self.cloud = shade.operator_cloud(validate=False)
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_list_groups(self, mock_keystone):
self.cloud.list_groups()
mock_keystone.groups.list.assert_called_once_with()
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_get_group(self, mock_keystone):
self.cloud.get_group('1234')
mock_keystone.groups.list.assert_called_once_with()
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_delete_group(self, mock_keystone):
mock_keystone.groups.list.return_value = [
fakes.FakeGroup('1234', 'name', 'desc')
]
self.assertTrue(self.cloud.delete_group('1234'))
mock_keystone.groups.list.assert_called_once_with()
mock_keystone.groups.delete.assert_called_once_with(
group='1234'
)
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_create_group(self, mock_keystone):
self.cloud.create_group('test-group', 'test desc')
mock_keystone.groups.create.assert_called_once_with(
name='test-group', description='test desc', domain=None
)
@mock.patch.object(shade.OpenStackCloud, 'keystone_client')
def test_update_group(self, mock_keystone):
mock_keystone.groups.list.return_value = [
fakes.FakeGroup('1234', 'name', 'desc')
]
self.cloud.update_group('1234', 'test-group', 'test desc')
mock_keystone.groups.list.assert_called_once_with()
mock_keystone.groups.update.assert_called_once_with(
group='1234', name='test-group', description='test desc'
)