# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # """Identity v3 IdentityProvider action implementations""" import logging from osc_lib.cli import format_columns from osc_lib.command import command from osc_lib import exceptions from osc_lib import utils from openstackclient.i18n import _ from openstackclient.identity import common LOG = logging.getLogger(__name__) class CreateIdentityProvider(command.ShowOne): _description = _("Create new identity provider") def get_parser(self, prog_name): parser = super(CreateIdentityProvider, self).get_parser(prog_name) parser.add_argument( 'identity_provider_id', metavar='', help=_('New identity provider name (must be unique)'), ) identity_remote_id_provider = parser.add_mutually_exclusive_group() identity_remote_id_provider.add_argument( '--remote-id', metavar='', action='append', help=_( 'Remote IDs to associate with the Identity Provider ' '(repeat option to provide multiple values)' ), ) identity_remote_id_provider.add_argument( '--remote-id-file', metavar='', help=_( 'Name of a file that contains many remote IDs to associate ' 'with the identity provider, one per line' ), ) parser.add_argument( '--description', metavar='', help=_('New identity provider description'), ) parser.add_argument( '--domain', metavar='', help=_( 'Domain to associate with the identity provider. If not ' 'specified, a domain will be created automatically. ' '(Name or ID)' ), ) parser.add_argument( '--authorization-ttl', metavar='', type=int, help=_( 'Time to keep the role assignments for users ' 'authenticating via this identity provider. ' 'When not provided, global default configured in the ' 'Identity service will be used. ' 'Available since Identity API version 3.14 (Ussuri).' ), ) enable_identity_provider = parser.add_mutually_exclusive_group() enable_identity_provider.add_argument( '--enable', dest='enabled', action='store_true', default=True, help=_('Enable identity provider (default)'), ) enable_identity_provider.add_argument( '--disable', dest='enabled', action='store_false', help=_('Disable the identity provider'), ) return parser def take_action(self, parsed_args): identity_client = self.app.client_manager.identity if parsed_args.remote_id_file: file_content = utils.read_blob_file_contents( parsed_args.remote_id_file ) remote_ids = file_content.splitlines() remote_ids = list(map(str.strip, remote_ids)) else: remote_ids = ( parsed_args.remote_id if parsed_args.remote_id else None ) domain_id = None if parsed_args.domain: domain_id = common.find_domain( identity_client, parsed_args.domain ).id # TODO(pas-ha) actually check for 3.14 microversion kwargs = {} auth_ttl = parsed_args.authorization_ttl if auth_ttl is not None: if auth_ttl < 0: msg = _("%(param)s must be positive integer or zero.") % { "param": "authorization-ttl" } raise exceptions.CommandError(msg) kwargs['authorization_ttl'] = auth_ttl idp = identity_client.federation.identity_providers.create( id=parsed_args.identity_provider_id, remote_ids=remote_ids, description=parsed_args.description, domain_id=domain_id, enabled=parsed_args.enabled, **kwargs ) idp._info.pop('links', None) remote_ids = format_columns.ListColumn(idp._info.pop('remote_ids', [])) idp._info['remote_ids'] = remote_ids return zip(*sorted(idp._info.items())) class DeleteIdentityProvider(command.Command): _description = _("Delete identity provider(s)") def get_parser(self, prog_name): parser = super(DeleteIdentityProvider, self).get_parser(prog_name) parser.add_argument( 'identity_provider', metavar='', nargs='+', help=_('Identity provider(s) to delete'), ) return parser def take_action(self, parsed_args): identity_client = self.app.client_manager.identity result = 0 for i in parsed_args.identity_provider: try: identity_client.federation.identity_providers.delete(i) except Exception as e: result += 1 LOG.error( _( "Failed to delete identity providers with " "name or ID '%(provider)s': %(e)s" ), {'provider': i, 'e': e}, ) if result > 0: total = len(parsed_args.identity_provider) msg = _( "%(result)s of %(total)s identity providers failed" " to delete." ) % {'result': result, 'total': total} raise exceptions.CommandError(msg) class ListIdentityProvider(command.Lister): _description = _("List identity providers") def get_parser(self, prog_name): parser = super(ListIdentityProvider, self).get_parser(prog_name) parser.add_argument( '--id', metavar='', help=_('The Identity Providers’ ID attribute'), ) parser.add_argument( '--enabled', dest='enabled', action='store_true', help=_('The Identity Providers that are enabled will be returned'), ) return parser def take_action(self, parsed_args): columns = ('ID', 'Enabled', 'Domain ID', 'Description') identity_client = self.app.client_manager.identity kwargs = {} if parsed_args.id: kwargs['id'] = parsed_args.id if parsed_args.enabled: kwargs['enabled'] = True data = identity_client.federation.identity_providers.list(**kwargs) return ( columns, ( utils.get_item_properties( s, columns, formatters={}, ) for s in data ), ) class SetIdentityProvider(command.Command): _description = _("Set identity provider properties") def get_parser(self, prog_name): parser = super(SetIdentityProvider, self).get_parser(prog_name) parser.add_argument( 'identity_provider', metavar='', help=_('Identity provider to modify'), ) parser.add_argument( '--description', metavar='', help=_('Set identity provider description'), ) identity_remote_id_provider = parser.add_mutually_exclusive_group() identity_remote_id_provider.add_argument( '--remote-id', metavar='', action='append', help=_( 'Remote IDs to associate with the Identity Provider ' '(repeat option to provide multiple values)' ), ) identity_remote_id_provider.add_argument( '--remote-id-file', metavar='', help=_( 'Name of a file that contains many remote IDs to associate ' 'with the identity provider, one per line' ), ) parser.add_argument( '--authorization-ttl', metavar='', type=int, help=_( 'Time to keep the role assignments for users ' 'authenticating via this identity provider. ' 'Available since Identity API version 3.14 (Ussuri).' ), ) enable_identity_provider = parser.add_mutually_exclusive_group() enable_identity_provider.add_argument( '--enable', action='store_true', help=_('Enable the identity provider'), ) enable_identity_provider.add_argument( '--disable', action='store_true', help=_('Disable the identity provider'), ) return parser def take_action(self, parsed_args): federation_client = self.app.client_manager.identity.federation # Always set remote_ids if either is passed in if parsed_args.remote_id_file: file_content = utils.read_blob_file_contents( parsed_args.remote_id_file ) remote_ids = file_content.splitlines() remote_ids = list(map(str.strip, remote_ids)) elif parsed_args.remote_id: remote_ids = parsed_args.remote_id # Setup keyword args for the client kwargs = {} if parsed_args.description: kwargs['description'] = parsed_args.description if parsed_args.enable: kwargs['enabled'] = True if parsed_args.disable: kwargs['enabled'] = False if parsed_args.remote_id_file or parsed_args.remote_id: kwargs['remote_ids'] = remote_ids # TODO(pas-ha) actually check for 3.14 microversion # TODO(pas-ha) make it possible to reset authorization_ttl # back to None value. # Currently not possible as filter_kwargs decorator in # keystoneclient/base.py explicitly drops the None-valued keys # from kwargs, and 'update' method is wrapped in this decorator. auth_ttl = parsed_args.authorization_ttl if auth_ttl is not None: if auth_ttl < 0: msg = _("%(param)s must be positive integer or zero.") % { "param": "authorization-ttl" } raise exceptions.CommandError(msg) kwargs['authorization_ttl'] = auth_ttl federation_client.identity_providers.update( parsed_args.identity_provider, **kwargs ) class ShowIdentityProvider(command.ShowOne): _description = _("Display identity provider details") def get_parser(self, prog_name): parser = super(ShowIdentityProvider, self).get_parser(prog_name) parser.add_argument( 'identity_provider', metavar='', help=_('Identity provider to display'), ) return parser def take_action(self, parsed_args): identity_client = self.app.client_manager.identity idp = utils.find_resource( identity_client.federation.identity_providers, parsed_args.identity_provider, id=parsed_args.identity_provider, ) idp._info.pop('links', None) remote_ids = format_columns.ListColumn(idp._info.pop('remote_ids', [])) idp._info['remote_ids'] = remote_ids return zip(*sorted(idp._info.items()))