diff --git a/openstackclient/identity/v3/service_provider.py b/openstackclient/identity/v3/service_provider.py new file mode 100644 index 0000000000..31e96a83a3 --- /dev/null +++ b/openstackclient/identity/v3/service_provider.py @@ -0,0 +1,216 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Service Provider action implementations""" + +import logging +import six +import sys + +from cliff import command +from cliff import lister +from cliff import show + +from openstackclient.common import utils + + +class CreateServiceProvider(show.ShowOne): + """Create new service provider""" + + log = logging.getLogger(__name__ + '.CreateServiceProvider') + + def get_parser(self, prog_name): + parser = super(CreateServiceProvider, self).get_parser(prog_name) + parser.add_argument( + 'service_provider_id', + metavar='', + help='New service provider ID (must be unique)' + ) + parser.add_argument( + '--auth-url', + metavar='', + required=True, + help='Authentication URL of remote federated service provider', + ) + parser.add_argument( + '--description', + metavar='', + help='New service provider description', + ) + parser.add_argument( + '--service-provider-url', + metavar='', + required=True, + help='A service URL where SAML assertions are being sent', + ) + + enable_service_provider = parser.add_mutually_exclusive_group() + enable_service_provider.add_argument( + '--enable', + dest='enabled', + action='store_true', + default=True, + help='Enable service provider (default)', + ) + enable_service_provider.add_argument( + '--disable', + dest='enabled', + action='store_false', + help='Disable service provider', + ) + + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)', parsed_args) + service_client = self.app.client_manager.identity + sp = service_client.federation.service_providers.create( + id=parsed_args.service_provider_id, + auth_url=parsed_args.auth_url, + description=parsed_args.description, + enabled=parsed_args.enabled, + sp_url=parsed_args.service_provider_url) + + sp._info.pop('links', None) + return zip(*sorted(six.iteritems(sp._info))) + + +class DeleteServiceProvider(command.Command): + """Delete service provider""" + + log = logging.getLogger(__name__ + '.DeleteServiceProvider') + + def get_parser(self, prog_name): + parser = super(DeleteServiceProvider, self).get_parser(prog_name) + parser.add_argument( + 'service_provider', + metavar='', + help='Service provider ID to delete (ID)', + ) + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)', parsed_args) + service_client = self.app.client_manager.identity + service_client.federation.service_providers.delete( + parsed_args.service_provider) + return + + +class ListServiceProvider(lister.Lister): + """List service providers""" + + log = logging.getLogger(__name__ + '.ListServiceProvider') + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)', parsed_args) + service_client = self.app.client_manager.identity + data = service_client.federation.service_providers.list() + + column_headers = ('ID', 'Enabled', 'Description', 'Auth URL') + return (column_headers, + (utils.get_item_properties( + s, column_headers, + formatters={}, + ) for s in data)) + + +class SetServiceProvider(command.Command): + """Set service provider properties""" + + log = logging.getLogger(__name__ + '.SetServiceProvider') + + def get_parser(self, prog_name): + parser = super(SetServiceProvider, self).get_parser(prog_name) + parser.add_argument( + 'service_provider', + metavar='', + help='Service provider ID to change (ID)', + ) + parser.add_argument( + '--auth-url', + metavar='', + help='Authentication URL of remote federated Service Provider', + ) + + parser.add_argument( + '--description', + metavar='', + help='New service provider description', + ) + parser.add_argument( + '--service-provider-url', + metavar='', + help='A service URL where SAML assertions are being sent', + ) + enable_service_provider = parser.add_mutually_exclusive_group() + enable_service_provider.add_argument( + '--enable', + action='store_true', + help='Enable service provider', + ) + enable_service_provider.add_argument( + '--disable', + action='store_true', + help='Disable service provider', + ) + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)', parsed_args) + federation_client = self.app.client_manager.identity.federation + + enabled = None + if parsed_args.enable is True: + enabled = True + elif parsed_args.disable is True: + enabled = False + + if not any((enabled is not None, parsed_args.description, + parsed_args.service_provider_url, + parsed_args.auth_url)): + sys.stdout.write("Service Provider not updated, no arguments " + "present") + return (None, None) + + service_provider = federation_client.service_providers.update( + parsed_args.service_provider, enabled=enabled, + description=parsed_args.description, + auth_url=parsed_args.auth_url, + sp_url=parsed_args.service_provider_url) + return zip(*sorted(six.iteritems(service_provider._info))) + + +class ShowServiceProvider(show.ShowOne): + """Display service provider details""" + + log = logging.getLogger(__name__ + '.ShowServiceProvider') + + def get_parser(self, prog_name): + parser = super(ShowServiceProvider, self).get_parser(prog_name) + parser.add_argument( + 'service_provider', + metavar='', + help='Service provider ID to display (ID)', + ) + return parser + + def take_action(self, parsed_args): + self.log.debug('take_action(%s)', parsed_args) + service_client = self.app.client_manager.identity + service_provider = utils.find_resource( + service_client.federation.service_providers, + parsed_args.service_provider) + + service_provider._info.pop('links', None) + return zip(*sorted(six.iteritems(service_provider._info))) diff --git a/openstackclient/tests/identity/v3/fakes.py b/openstackclient/tests/identity/v3/fakes.py index c868401aa1..eb8673ef42 100644 --- a/openstackclient/tests/identity/v3/fakes.py +++ b/openstackclient/tests/identity/v3/fakes.py @@ -243,6 +243,20 @@ protocol_id = 'protocol' mapping_id = 'test_mapping' mapping_id_updated = 'prod_mapping' +sp_id = 'BETA' +sp_description = 'Service Provider to burst into' +service_provider_url = 'https://beta.example.com/Shibboleth.sso/POST/SAML' +sp_auth_url = ('https://beta.example.com/v3/OS-FEDERATION/identity_providers/' + 'idp/protocol/saml2/auth') + +SERVICE_PROVIDER = { + 'id': sp_id, + 'enabled': True, + 'description': sp_description, + 'sp_url': service_provider_url, + 'auth_url': sp_auth_url +} + PROTOCOL_ID_MAPPING = { 'id': protocol_id, 'mapping': mapping_id @@ -380,6 +394,8 @@ class FakeFederationManager(object): self.projects.resource_class = fakes.FakeResource(None, {}) self.domains = mock.Mock() self.domains.resource_class = fakes.FakeResource(None, {}) + self.service_providers = mock.Mock() + self.service_providers.resource_class = fakes.FakeResource(None, {}) class FakeFederatedClient(FakeIdentityv3Client): diff --git a/openstackclient/tests/identity/v3/test_service_provider.py b/openstackclient/tests/identity/v3/test_service_provider.py new file mode 100644 index 0000000000..e77870d696 --- /dev/null +++ b/openstackclient/tests/identity/v3/test_service_provider.py @@ -0,0 +1,412 @@ +# Copyright 2014 CERN. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +from openstackclient.identity.v3 import service_provider +from openstackclient.tests import fakes +from openstackclient.tests.identity.v3 import fakes as service_fakes + + +class TestServiceProvider(service_fakes.TestFederatedIdentity): + + def setUp(self): + super(TestServiceProvider, self).setUp() + + federation_lib = self.app.client_manager.identity.federation + self.service_providers_mock = federation_lib.service_providers + self.service_providers_mock.reset_mock() + + +class TestServiceProviderCreate(TestServiceProvider): + + def setUp(self): + super(TestServiceProviderCreate, self).setUp() + + copied_sp = copy.deepcopy(service_fakes.SERVICE_PROVIDER) + resource = fakes.FakeResource(None, copied_sp, loaded=True) + self.service_providers_mock.create.return_value = resource + self.cmd = service_provider.CreateServiceProvider(self.app, None) + + def test_create_service_provider_required_options_only(self): + arglist = [ + '--auth-url', service_fakes.sp_auth_url, + '--service-provider-url', service_fakes.service_provider_url, + service_fakes.sp_id, + ] + verifylist = [ + ('auth_url', service_fakes.sp_auth_url), + ('service_provider_url', service_fakes.service_provider_url), + ('service_provider_id', service_fakes.sp_id), + + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'enabled': True, + 'description': None, + 'auth_url': service_fakes.sp_auth_url, + 'sp_url': service_fakes.service_provider_url + } + + self.service_providers_mock.create.assert_called_with( + id=service_fakes.sp_id, + **kwargs + ) + + collist = ('auth_url', 'description', 'enabled', 'id', 'sp_url') + self.assertEqual(collist, columns) + datalist = ( + service_fakes.sp_auth_url, + service_fakes.sp_description, + True, + service_fakes.sp_id, + service_fakes.service_provider_url + ) + self.assertEqual(data, datalist) + + def test_create_service_provider_description(self): + + arglist = [ + '--description', service_fakes.sp_description, + '--auth-url', service_fakes.sp_auth_url, + '--service-provider-url', service_fakes.service_provider_url, + service_fakes.sp_id, + ] + verifylist = [ + ('description', service_fakes.sp_description), + ('auth_url', service_fakes.sp_auth_url), + ('service_provider_url', service_fakes.service_provider_url), + ('service_provider_id', service_fakes.sp_id), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + + # Set expected values + kwargs = { + 'description': service_fakes.sp_description, + 'auth_url': service_fakes.sp_auth_url, + 'sp_url': service_fakes.service_provider_url, + 'enabled': True, + } + + self.service_providers_mock.create.assert_called_with( + id=service_fakes.sp_id, + **kwargs + ) + + collist = ('auth_url', 'description', 'enabled', 'id', 'sp_url') + self.assertEqual(columns, collist) + datalist = ( + service_fakes.sp_auth_url, + service_fakes.sp_description, + True, + service_fakes.sp_id, + service_fakes.service_provider_url + ) + self.assertEqual(datalist, data) + + def test_create_service_provider_disabled(self): + + # Prepare FakeResource object + service_provider = copy.deepcopy(service_fakes.SERVICE_PROVIDER) + service_provider['enabled'] = False + service_provider['description'] = None + + resource = fakes.FakeResource(None, service_provider, loaded=True) + self.service_providers_mock.create.return_value = resource + + arglist = [ + '--auth-url', service_fakes.sp_auth_url, + '--service-provider-url', service_fakes.service_provider_url, + '--disable', + service_fakes.sp_id, + ] + verifylist = [ + ('auth_url', service_fakes.sp_auth_url), + ('service_provider_url', service_fakes.service_provider_url), + ('service_provider_id', service_fakes.sp_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + # Set expected values + kwargs = { + 'auth_url': service_fakes.sp_auth_url, + 'sp_url': service_fakes.service_provider_url, + 'enabled': False, + 'description': None, + } + + self.service_providers_mock.create.assert_called_with( + id=service_fakes.sp_id, + **kwargs + ) + + collist = ('auth_url', 'description', 'enabled', 'id', 'sp_url') + self.assertEqual(collist, collist) + datalist = ( + service_fakes.sp_auth_url, + None, + False, + service_fakes.sp_id, + service_fakes.service_provider_url + ) + self.assertEqual(datalist, data) + + +class TestServiceProviderDelete(TestServiceProvider): + + def setUp(self): + super(TestServiceProviderDelete, self).setUp() + + # This is the return value for utils.find_resource() + self.service_providers_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True, + ) + + self.service_providers_mock.delete.return_value = None + self.cmd = service_provider.DeleteServiceProvider(self.app, None) + + def test_delete_service_provider(self): + arglist = [ + service_fakes.sp_id, + ] + verifylist = [ + ('service_provider', service_fakes.sp_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.cmd.take_action(parsed_args) + self.service_providers_mock.delete.assert_called_with( + service_fakes.sp_id, + ) + + +class TestServiceProviderList(TestServiceProvider): + + def setUp(self): + super(TestServiceProviderList, self).setUp() + + self.service_providers_mock.get.return_value = fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True, + ) + self.service_providers_mock.list.return_value = [ + fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True, + ), + ] + + # Get the command object to test + self.cmd = service_provider.ListServiceProvider(self.app, None) + + def test_service_provider_list_no_options(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + # DisplayCommandBase.take_action() returns two tuples + columns, data = self.cmd.take_action(parsed_args) + + self.service_providers_mock.list.assert_called_with() + + collist = ('ID', 'Enabled', 'Description', 'Auth URL') + self.assertEqual(collist, columns) + datalist = (( + service_fakes.sp_id, + True, + service_fakes.sp_description, + service_fakes.sp_auth_url + ), ) + self.assertEqual(tuple(data), datalist) + + +class TestServiceProviderShow(TestServiceProvider): + + def setUp(self): + super(TestServiceProviderShow, self).setUp() + + ret = fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True, + ) + self.service_providers_mock.get.return_value = ret + # Get the command object to test + self.cmd = service_provider.ShowServiceProvider(self.app, None) + + def test_service_provider_show(self): + arglist = [ + service_fakes.sp_id, + ] + verifylist = [ + ('service_provider', service_fakes.sp_id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.service_providers_mock.get.assert_called_with( + service_fakes.sp_id, + ) + + collist = ('auth_url', 'description', 'enabled', 'id', 'sp_url') + self.assertEqual(collist, columns) + datalist = ( + service_fakes.sp_auth_url, + service_fakes.sp_description, + True, + service_fakes.sp_id, + service_fakes.service_provider_url + ) + self.assertEqual(data, datalist) + + +class TestServiceProviderSet(TestServiceProvider): + + def setUp(self): + super(TestServiceProviderSet, self).setUp() + self.cmd = service_provider.SetServiceProvider(self.app, None) + + def test_service_provider_disable(self): + """Disable Service Provider + + Set Service Provider's ``enabled`` attribute to False. + """ + def prepare(self): + """Prepare fake return objects before the test is executed""" + updated_sp = copy.deepcopy(service_fakes.SERVICE_PROVIDER) + updated_sp['enabled'] = False + resources = fakes.FakeResource( + None, + updated_sp, + loaded=True + ) + self.service_providers_mock.update.return_value = resources + + prepare(self) + arglist = [ + '--disable', service_fakes.sp_id, + ] + verifylist = [ + ('service_provider', service_fakes.sp_id), + ('enable', False), + ('disable', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + self.service_providers_mock.update.assert_called_with( + service_fakes.sp_id, + enabled=False, + description=None, + auth_url=None, + sp_url=None + ) + + collist = ('auth_url', 'description', 'enabled', 'id', 'sp_url') + self.assertEqual(collist, columns) + datalist = ( + service_fakes.sp_auth_url, + service_fakes.sp_description, + False, + service_fakes.sp_id, + service_fakes.service_provider_url + ) + self.assertEqual(datalist, data) + + def test_service_provider_enable(self): + """Enable Service Provider. + + Set Service Provider's ``enabled`` attribute to True. + """ + def prepare(self): + """Prepare fake return objects before the test is executed""" + resources = fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True + ) + self.service_providers_mock.update.return_value = resources + + prepare(self) + arglist = [ + '--enable', service_fakes.sp_id, + ] + verifylist = [ + ('service_provider', service_fakes.sp_id), + ('enable', True), + ('disable', False), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + self.service_providers_mock.update.assert_called_with( + service_fakes.sp_id, enabled=True, description=None, + auth_url=None, sp_url=None) + collist = ('auth_url', 'description', 'enabled', 'id', 'sp_url') + self.assertEqual(collist, columns) + datalist = ( + service_fakes.sp_auth_url, + service_fakes.sp_description, + True, + service_fakes.sp_id, + service_fakes.service_provider_url + ) + self.assertEqual(data, datalist) + + def test_service_provider_no_options(self): + def prepare(self): + """Prepare fake return objects before the test is executed""" + resources = fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True + ) + self.service_providers_mock.get.return_value = resources + + resources = fakes.FakeResource( + None, + copy.deepcopy(service_fakes.SERVICE_PROVIDER), + loaded=True, + ) + self.service_providers_mock.update.return_value = resources + + prepare(self) + arglist = [ + service_fakes.sp_id, + ] + verifylist = [ + ('service_provider', service_fakes.sp_id), + ('description', None), + ('enable', False), + ('disable', False), + ('auth_url', None), + ('service_provider_url', None), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # expect take_action() to return (None, None) as none of --disabled, + # --enabled, --description, --service-provider-url, --auth_url option + # was set. + self.assertEqual(columns, None) + self.assertEqual(data, None) diff --git a/setup.cfg b/setup.cfg index f2c1a05754..e2d7288488 100644 --- a/setup.cfg +++ b/setup.cfg @@ -280,6 +280,12 @@ openstack.identity.v3 = service_show = openstackclient.identity.v3.service:ShowService service_set = openstackclient.identity.v3.service:SetService + service_provider_create = openstackclient.identity.v3.service_provider:CreateServiceProvider + service_provider_delete = openstackclient.identity.v3.service_provider:DeleteServiceProvider + service_provider_list = openstackclient.identity.v3.service_provider:ListServiceProvider + service_provider_set = openstackclient.identity.v3.service_provider:SetServiceProvider + service_provider_show = openstackclient.identity.v3.service_provider:ShowServiceProvider + token_issue = openstackclient.identity.v3.token:IssueToken trust_create = openstackclient.identity.v3.trust:CreateTrust