Merge "Add --os-cloud support"

This commit is contained in:
Jenkins 2015-04-19 06:17:31 +00:00 committed by Gerrit Code Review
commit fd8b284164
9 changed files with 411 additions and 321 deletions

View File

@ -84,9 +84,17 @@ def base_parser(parser):
""" """
# Global arguments # Global arguments
parser.add_argument(
'--os-cloud',
metavar='<cloud-config-name>',
dest='cloud',
default=env('OS_CLOUD'),
help='Cloud name in clouds.yaml (Env: OS_CLOUD)',
)
parser.add_argument( parser.add_argument(
'--os-region-name', '--os-region-name',
metavar='<auth-region-name>', metavar='<auth-region-name>',
dest='region_name',
default=env('OS_REGION_NAME'), default=env('OS_REGION_NAME'),
help='Authentication region name (Env: OS_REGION_NAME)', help='Authentication region name (Env: OS_REGION_NAME)',
) )

View File

@ -30,6 +30,8 @@ import common
from openstackclient.api import object_store_v1 as object_store from openstackclient.api import object_store_v1 as object_store
from openstackclient.identity import client as identity_client from openstackclient.identity import client as identity_client
from os_client_config import config as cloud_config
LOG = logging.getLogger('') LOG = logging.getLogger('')
@ -37,6 +39,15 @@ LOG = logging.getLogger('')
def run(opts): def run(opts):
"""Run the examples""" """Run the examples"""
# Look for configuration file
# To support token-flow we have no required values
# print "options: %s" % self.options
cloud = cloud_config.OpenStackConfig().get_one_cloud(
cloud=opts.cloud,
argparse=opts,
)
LOG.debug("cloud cfg: %s", cloud.config)
# Set up certificate verification and CA bundle # Set up certificate verification and CA bundle
# NOTE(dtroyer): This converts from the usual OpenStack way to the single # NOTE(dtroyer): This converts from the usual OpenStack way to the single
# requests argument and is an app-specific thing because # requests argument and is an app-specific thing because
@ -52,13 +63,13 @@ def run(opts):
# The returned session will have a configured auth object # The returned session will have a configured auth object
# based on the selected plugin's available options. # based on the selected plugin's available options.
# So to do...oh, just go to api.auth.py and look at what it does. # So to do...oh, just go to api.auth.py and look at what it does.
session = common.make_session(opts, verify=verify) session = common.make_session(cloud, verify=verify)
# Extract an endpoint # Extract an endpoint
auth_ref = session.auth.get_auth_ref(session) auth_ref = session.auth.get_auth_ref(session)
if opts.os_url: if opts.url:
endpoint = opts.os_url endpoint = opts.url
else: else:
endpoint = auth_ref.service_catalog.url_for( endpoint = auth_ref.service_catalog.url_for(
service_type='object-store', service_type='object-store',

View File

@ -29,6 +29,8 @@ import common
from openstackclient.common import clientmanager from openstackclient.common import clientmanager
from os_client_config import config as cloud_config
LOG = logging.getLogger('') LOG = logging.getLogger('')
@ -36,6 +38,16 @@ LOG = logging.getLogger('')
def run(opts): def run(opts):
"""Run the examples""" """Run the examples"""
# Do configuration file handling
cc = cloud_config.OpenStackConfig()
LOG.debug("defaults: %s", cc.defaults)
cloud = cc.get_one_cloud(
cloud=opts.cloud,
argparse=opts,
)
LOG.debug("cloud cfg: %s", cloud.config)
# Loop through extensions to get API versions # Loop through extensions to get API versions
# Currently API versions are statically selected. Once discovery # Currently API versions are statically selected. Once discovery
# is working this can go away... # is working this can go away...
@ -59,7 +71,7 @@ def run(opts):
# Collect the auth and config options together and give them to # Collect the auth and config options together and give them to
# ClientManager and it will wrangle all of the goons into place. # ClientManager and it will wrangle all of the goons into place.
client_manager = clientmanager.ClientManager( client_manager = clientmanager.ClientManager(
cli_options=opts, cli_options=cloud,
verify=verify, verify=verify,
api_version=api_version, api_version=api_version,
) )

View File

@ -77,25 +77,27 @@ def select_auth_plugin(options):
auth_plugin_name = None auth_plugin_name = None
if options.os_auth_type in [plugin.name for plugin in get_plugin_list()]: # Do the token/url check first as this must override the default
# A direct plugin name was given, use it # 'password' set by os-client-config
return options.os_auth_type # Also, url and token are not copied into o-c-c's auth dict (yet?)
if options.auth.get('url', None) and options.auth.get('token', None):
if options.os_url and options.os_token:
# service token authentication # service token authentication
auth_plugin_name = 'token_endpoint' auth_plugin_name = 'token_endpoint'
elif options.os_username: elif options.auth_type in [plugin.name for plugin in PLUGIN_LIST]:
if options.os_identity_api_version == '3': # A direct plugin name was given, use it
auth_plugin_name = options.auth_type
elif options.auth.get('username', None):
if options.identity_api_version == '3':
auth_plugin_name = 'v3password' auth_plugin_name = 'v3password'
elif options.os_identity_api_version == '2.0': elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2password' auth_plugin_name = 'v2password'
else: else:
# let keystoneclient figure it out itself # let keystoneclient figure it out itself
auth_plugin_name = 'osc_password' auth_plugin_name = 'osc_password'
elif options.os_token: elif options.auth.get('token', None):
if options.os_identity_api_version == '3': if options.identity_api_version == '3':
auth_plugin_name = 'v3token' auth_plugin_name = 'v3token'
elif options.os_identity_api_version == '2.0': elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2token' auth_plugin_name = 'v2token'
else: else:
# let keystoneclient figure it out itself # let keystoneclient figure it out itself
@ -109,35 +111,27 @@ def select_auth_plugin(options):
def build_auth_params(auth_plugin_name, cmd_options): def build_auth_params(auth_plugin_name, cmd_options):
auth_params = {}
auth_params = dict(cmd_options.auth)
if auth_plugin_name: if auth_plugin_name:
LOG.debug('auth_type: %s', auth_plugin_name) LOG.debug('auth_type: %s', auth_plugin_name)
auth_plugin_class = base.get_plugin_class(auth_plugin_name) auth_plugin_class = base.get_plugin_class(auth_plugin_name)
plugin_options = auth_plugin_class.get_options()
for option in plugin_options:
option_name = 'os_' + option.dest
LOG.debug('fetching option %s' % option_name)
auth_params[option.dest] = getattr(cmd_options, option_name, None)
# grab tenant from project for v2.0 API compatibility # grab tenant from project for v2.0 API compatibility
if auth_plugin_name.startswith("v2"): if auth_plugin_name.startswith("v2"):
auth_params['tenant_id'] = getattr( if 'project_id' in auth_params:
cmd_options, auth_params['tenant_id'] = auth_params['project_id']
'os_project_id', del auth_params['project_id']
None, if 'project_name' in auth_params:
) auth_params['tenant_name'] = auth_params['project_name']
auth_params['tenant_name'] = getattr( del auth_params['project_name']
cmd_options,
'os_project_name',
None,
)
else: else:
LOG.debug('no auth_type') LOG.debug('no auth_type')
# delay the plugin choice, grab every option # delay the plugin choice, grab every option
auth_plugin_class = None
plugin_options = set([o.replace('-', '_') for o in get_options_list()]) plugin_options = set([o.replace('-', '_') for o in get_options_list()])
for option in plugin_options: for option in plugin_options:
option_name = 'os_' + option LOG.debug('fetching option %s' % option)
LOG.debug('fetching option %s' % option_name) auth_params[option] = getattr(cmd_options.auth, option, None)
auth_params[option] = getattr(cmd_options, option_name, None)
return (auth_plugin_class, auth_params) return (auth_plugin_class, auth_params)
@ -146,15 +140,29 @@ def check_valid_auth_options(options, auth_plugin_name):
msg = '' msg = ''
if auth_plugin_name.endswith('password'): if auth_plugin_name.endswith('password'):
if not options.os_username: if not options.auth.get('username', None):
msg += _('Set a username with --os-username or OS_USERNAME\n') msg += _('Set a username with --os-username, OS_USERNAME,'
if not options.os_auth_url: ' or auth.username\n')
msg += _('Set an authentication URL, with --os-auth-url or' if not options.auth.get('auth_url', None):
' OS_AUTH_URL\n') msg += _('Set an authentication URL, with --os-auth-url,'
if (not options.os_project_id and not options.os_domain_id and not ' OS_AUTH_URL or auth.auth_url\n')
options.os_domain_name and not options.os_project_name): if (not options.auth.get('project_id', None) and not
options.auth.get('domain_id', None) and not
options.auth.get('domain_name', None) and not
options.auth.get('project_name', None)):
msg += _('Set a scope, such as a project or domain, with ' msg += _('Set a scope, such as a project or domain, with '
'--os-project-name or OS_PROJECT_NAME') '--os-project-name, OS_PROJECT_NAME or auth.project_name')
elif auth_plugin_name.endswith('token'):
if not options.auth.get('token', None):
msg += _('Set a token with --os-token, OS_TOKEN or auth.token\n')
if not options.auth.get('auth_url', None):
msg += _('Set a service AUTH_URL, with --os-auth-url, '
'OS_AUTH_URL or auth.auth_url\n')
elif auth_plugin_name == 'token_endpoint':
if not options.auth.get('token', None):
msg += _('Set a token with --os-token, OS_TOKEN or auth.token\n')
if not options.auth.get('url', None):
msg += _('Set a service URL, with --os-url, OS_URL or auth.url\n')
if msg: if msg:
raise exc.CommandError('Missing parameter(s): \n%s' % msg) raise exc.CommandError('Missing parameter(s): \n%s' % msg)
@ -171,6 +179,7 @@ def build_auth_plugins_option_parser(parser):
parser.add_argument( parser.add_argument(
'--os-auth-type', '--os-auth-type',
metavar='<auth-type>', metavar='<auth-type>',
dest='auth_type',
default=utils.env('OS_AUTH_TYPE'), default=utils.env('OS_AUTH_TYPE'),
help='Select an auhentication type. Available types: ' + help='Select an auhentication type. Available types: ' +
', '.join(available_plugins) + ', '.join(available_plugins) +
@ -178,7 +187,7 @@ def build_auth_plugins_option_parser(parser):
' (Env: OS_AUTH_TYPE)', ' (Env: OS_AUTH_TYPE)',
choices=available_plugins choices=available_plugins
) )
# make sure we catch old v2.0 env values # Maintain compatibility with old tenant env vars
envs = { envs = {
'OS_PROJECT_NAME': utils.env( 'OS_PROJECT_NAME': utils.env(
'OS_PROJECT_NAME', 'OS_PROJECT_NAME',
@ -190,15 +199,20 @@ def build_auth_plugins_option_parser(parser):
), ),
} }
for o in get_options_list(): for o in get_options_list():
# remove allusion to tenants from v2.0 API # Remove tenant options from KSC plugins and replace them below
if 'tenant' not in o: if 'tenant' not in o:
parser.add_argument( parser.add_argument(
'--os-' + o, '--os-' + o,
metavar='<auth-%s>' % o, metavar='<auth-%s>' % o,
default=envs.get(OPTIONS_LIST[o]['env'], dest=o.replace('-', '_'),
utils.env(OPTIONS_LIST[o]['env'])), default=envs.get(
help='%s\n(Env: %s)' % (OPTIONS_LIST[o]['help'], OPTIONS_LIST[o]['env'],
OPTIONS_LIST[o]['env']), utils.env(OPTIONS_LIST[o]['env']),
),
help='%s\n(Env: %s)' % (
OPTIONS_LIST[o]['help'],
OPTIONS_LIST[o]['env'],
),
) )
# add tenant-related options for compatibility # add tenant-related options for compatibility
# this is deprecated but still used in some tempest tests... # this is deprecated but still used in some tempest tests...
@ -206,14 +220,12 @@ def build_auth_plugins_option_parser(parser):
'--os-tenant-name', '--os-tenant-name',
metavar='<auth-tenant-name>', metavar='<auth-tenant-name>',
dest='os_project_name', dest='os_project_name',
default=utils.env('OS_TENANT_NAME'),
help=argparse.SUPPRESS, help=argparse.SUPPRESS,
) )
parser.add_argument( parser.add_argument(
'--os-tenant-id', '--os-tenant-id',
metavar='<auth-tenant-id>', metavar='<auth-tenant-id>',
dest='os_project_id', dest='os_project_id',
default=utils.env('OS_TENANT_ID'),
help=argparse.SUPPRESS, help=argparse.SUPPRESS,
) )
return parser return parser

View File

@ -58,7 +58,7 @@ class ClientManager(object):
def __init__( def __init__(
self, self,
cli_options, cli_options=None,
api_version=None, api_version=None,
verify=True, verify=True,
pw_func=None, pw_func=None,
@ -82,8 +82,8 @@ class ClientManager(object):
self._cli_options = cli_options self._cli_options = cli_options
self._api_version = api_version self._api_version = api_version
self._pw_callback = pw_func self._pw_callback = pw_func
self._url = self._cli_options.os_url self._url = self._cli_options.auth.get('url', None)
self._region_name = self._cli_options.os_region_name self._region_name = self._cli_options.region_name
self.timing = self._cli_options.timing self.timing = self._cli_options.timing
@ -121,7 +121,7 @@ class ClientManager(object):
# Horrible hack alert...must handle prompt for null password if # Horrible hack alert...must handle prompt for null password if
# password auth is requested. # password auth is requested.
if (self.auth_plugin_name.endswith('password') and if (self.auth_plugin_name.endswith('password') and
not self._cli_options.os_password): not self._cli_options.auth.get('password', None)):
self._cli_options.os_password = self._pw_callback() self._cli_options.os_password = self._pw_callback()
(auth_plugin, self._auth_params) = auth.build_auth_params( (auth_plugin, self._auth_params) = auth.build_auth_params(
@ -129,13 +129,15 @@ class ClientManager(object):
self._cli_options, self._cli_options,
) )
default_domain = self._cli_options.os_default_domain # TODO(mordred): This is a usability improvement that's broadly useful
# We should port it back up into os-client-config.
default_domain = self._cli_options.default_domain
# NOTE(stevemar): If PROJECT_DOMAIN_ID or PROJECT_DOMAIN_NAME is # NOTE(stevemar): If PROJECT_DOMAIN_ID or PROJECT_DOMAIN_NAME is
# present, then do not change the behaviour. Otherwise, set the # present, then do not change the behaviour. Otherwise, set the
# PROJECT_DOMAIN_ID to 'OS_DEFAULT_DOMAIN' for better usability. # PROJECT_DOMAIN_ID to 'OS_DEFAULT_DOMAIN' for better usability.
if (self._api_version.get('identity') == '3' and if (self._api_version.get('identity') == '3' and
not self._auth_params.get('project_domain_id') and not self._auth_params.get('project_domain_id', None) and
not self._auth_params.get('project_domain_name')): not self._auth_params.get('project_domain_name', None)):
self._auth_params['project_domain_id'] = default_domain self._auth_params['project_domain_id'] = default_domain
# NOTE(stevemar): If USER_DOMAIN_ID or USER_DOMAIN_NAME is present, # NOTE(stevemar): If USER_DOMAIN_ID or USER_DOMAIN_NAME is present,
@ -143,8 +145,8 @@ class ClientManager(object):
# to 'OS_DEFAULT_DOMAIN' for better usability. # to 'OS_DEFAULT_DOMAIN' for better usability.
if (self._api_version.get('identity') == '3' and if (self._api_version.get('identity') == '3' and
self.auth_plugin_name.endswith('password') and self.auth_plugin_name.endswith('password') and
not self._auth_params.get('user_domain_id') and not self._auth_params.get('user_domain_id', None) and
not self._auth_params.get('user_domain_name')): not self._auth_params.get('user_domain_name', None)):
self._auth_params['user_domain_id'] = default_domain self._auth_params['user_domain_id'] = default_domain
# For compatibility until all clients can be updated # For compatibility until all clients can be updated

View File

@ -33,6 +33,8 @@ from openstackclient.common import exceptions as exc
from openstackclient.common import timing from openstackclient.common import timing
from openstackclient.common import utils from openstackclient.common import utils
from os_client_config import config as cloud_config
DEFAULT_DOMAIN = 'default' DEFAULT_DOMAIN = 'default'
@ -86,10 +88,6 @@ class OpenStackShell(app.App):
# Until we have command line arguments parsed, dump any stack traces # Until we have command line arguments parsed, dump any stack traces
self.dump_stack_trace = True self.dump_stack_trace = True
# This is instantiated in initialize_app() only when using
# password flow auth
self.auth_client = None
# Assume TLS host certificate verification is enabled # Assume TLS host certificate verification is enabled
self.verify = True self.verify = True
@ -165,10 +163,19 @@ class OpenStackShell(app.App):
description, description,
version) version)
# service token auth argument
parser.add_argument(
'--os-cloud',
metavar='<cloud-config-name>',
dest='cloud',
default=utils.env('OS_CLOUD'),
help='Cloud name in clouds.yaml (Env: OS_CLOUD)',
)
# Global arguments # Global arguments
parser.add_argument( parser.add_argument(
'--os-region-name', '--os-region-name',
metavar='<auth-region-name>', metavar='<auth-region-name>',
dest='region_name',
default=utils.env('OS_REGION_NAME'), default=utils.env('OS_REGION_NAME'),
help='Authentication region name (Env: OS_REGION_NAME)') help='Authentication region name (Env: OS_REGION_NAME)')
parser.add_argument( parser.add_argument(
@ -213,8 +220,43 @@ class OpenStackShell(app.App):
* authenticate against Identity if requested * authenticate against Identity if requested
""" """
# Parent __init__ parses argv into self.options
super(OpenStackShell, self).initialize_app(argv) super(OpenStackShell, self).initialize_app(argv)
# Resolve the verify/insecure exclusive pair here as cloud_config
# doesn't know about verify
self.options.insecure = (
self.options.insecure and not self.options.verify
)
# Set the default plugin to token_endpoint if rl and token are given
if (self.options.url and self.options.token):
# Use service token authentication
cloud_config.set_default('auth_type', 'token_endpoint')
else:
cloud_config.set_default('auth_type', 'osc_password')
self.log.debug("options: %s", self.options)
# Do configuration file handling
cc = cloud_config.OpenStackConfig()
self.log.debug("defaults: %s", cc.defaults)
self.cloud = cc.get_one_cloud(
cloud=self.options.cloud,
argparse=self.options,
)
self.log.debug("cloud cfg: %s", self.cloud.config)
# Set up client TLS
cacert = self.cloud.cacert
if cacert:
self.verify = cacert
else:
self.verify = not getattr(self.cloud.config, 'insecure', False)
# Neutralize verify option
self.options.verify = None
# Save default domain # Save default domain
self.default_domain = self.options.os_default_domain self.default_domain = self.options.os_default_domain
@ -260,14 +302,8 @@ class OpenStackShell(app.App):
# Handle deferred help and exit # Handle deferred help and exit
self.print_help_if_requested() self.print_help_if_requested()
# Set up common client session
if self.options.os_cacert:
self.verify = self.options.os_cacert
else:
self.verify = not self.options.insecure
self.client_manager = clientmanager.ClientManager( self.client_manager = clientmanager.ClientManager(
cli_options=self.options, cli_options=self.cloud,
verify=self.verify, verify=self.verify,
api_version=self.api_version, api_version=self.api_version,
pw_func=prompt_for_password, pw_func=prompt_for_password,

View File

@ -48,13 +48,14 @@ class Container(object):
class FakeOptions(object): class FakeOptions(object):
def __init__(self, **kwargs): def __init__(self, **kwargs):
for option in auth.OPTIONS_LIST: for option in auth.OPTIONS_LIST:
setattr(self, 'os_' + option.replace('-', '_'), None) setattr(self, option.replace('-', '_'), None)
self.os_auth_type = None self.auth_type = None
self.os_identity_api_version = '2.0' self.identity_api_version = '2.0'
self.timing = None self.timing = None
self.os_region_name = None self.region_name = None
self.os_url = None self.url = None
self.os_default_domain = 'default' self.auth = {}
self.default_domain = 'default'
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
@ -86,9 +87,11 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager( client_manager = clientmanager.ClientManager(
cli_options=FakeOptions( cli_options=FakeOptions(
os_token=fakes.AUTH_TOKEN, auth_type='token_endpoint',
os_url=fakes.AUTH_URL, auth=dict(
os_auth_type='token_endpoint', token=fakes.AUTH_TOKEN,
url=fakes.AUTH_URL,
),
), ),
api_version=API_VERSION, api_version=API_VERSION,
verify=True verify=True
@ -114,9 +117,11 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager( client_manager = clientmanager.ClientManager(
cli_options=FakeOptions( cli_options=FakeOptions(
os_token=fakes.AUTH_TOKEN, auth=dict(
os_auth_url=fakes.AUTH_URL, token=fakes.AUTH_TOKEN,
os_auth_type='v2token', auth_url=fakes.AUTH_URL,
),
auth_type='v2token',
), ),
api_version=API_VERSION, api_version=API_VERSION,
verify=True verify=True
@ -138,10 +143,12 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager( client_manager = clientmanager.ClientManager(
cli_options=FakeOptions( cli_options=FakeOptions(
os_auth_url=fakes.AUTH_URL, auth=dict(
os_username=fakes.USERNAME, auth_url=fakes.AUTH_URL,
os_password=fakes.PASSWORD, username=fakes.USERNAME,
os_project_name=fakes.PROJECT_NAME, password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
), ),
api_version=API_VERSION, api_version=API_VERSION,
verify=False, verify=False,
@ -198,11 +205,13 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager( client_manager = clientmanager.ClientManager(
cli_options=FakeOptions( cli_options=FakeOptions(
os_auth_url=fakes.AUTH_URL, auth=dict(
os_username=fakes.USERNAME, auth_url=fakes.AUTH_URL,
os_password=fakes.PASSWORD, username=fakes.USERNAME,
os_project_name=fakes.PROJECT_NAME, password=fakes.PASSWORD,
os_auth_type='v2password', project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
), ),
api_version=API_VERSION, api_version=API_VERSION,
verify='cafile', verify='cafile',
@ -214,8 +223,8 @@ class TestClientManager(utils.TestCase):
self.assertEqual('cafile', client_manager._cacert) self.assertEqual('cafile', client_manager._cacert)
def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name): def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name):
auth_params['os_auth_type'] = auth_plugin_name auth_params['auth_type'] = auth_plugin_name
auth_params['os_identity_api_version'] = api_version auth_params['identity_api_version'] = api_version
client_manager = clientmanager.ClientManager( client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(**auth_params), cli_options=FakeOptions(**auth_params),
api_version=API_VERSION, api_version=API_VERSION,
@ -230,19 +239,33 @@ class TestClientManager(utils.TestCase):
def test_client_manager_select_auth_plugin(self): def test_client_manager_select_auth_plugin(self):
# test token auth # test token auth
params = dict(os_token=fakes.AUTH_TOKEN, params = dict(
os_auth_url=fakes.AUTH_URL) auth=dict(
auth_url=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
),
)
self._select_auth_plugin(params, '2.0', 'v2token') self._select_auth_plugin(params, '2.0', 'v2token')
self._select_auth_plugin(params, '3', 'v3token') self._select_auth_plugin(params, '3', 'v3token')
self._select_auth_plugin(params, 'XXX', 'token') self._select_auth_plugin(params, 'XXX', 'token')
# test token/endpoint auth # test token/endpoint auth
params = dict(os_token=fakes.AUTH_TOKEN, os_url='test') params = dict(
auth_plugin='token_endpoint',
auth=dict(
url='test',
token=fakes.AUTH_TOKEN,
),
)
self._select_auth_plugin(params, 'XXX', 'token_endpoint') self._select_auth_plugin(params, 'XXX', 'token_endpoint')
# test password auth # test password auth
params = dict(os_auth_url=fakes.AUTH_URL, params = dict(
os_username=fakes.USERNAME, auth=dict(
os_password=fakes.PASSWORD, auth_url=fakes.AUTH_URL,
os_project_name=fakes.PROJECT_NAME) username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
)
self._select_auth_plugin(params, '2.0', 'v2password') self._select_auth_plugin(params, '2.0', 'v2password')
self._select_auth_plugin(params, '3', 'v3password') self._select_auth_plugin(params, '3', 'v3password')
self._select_auth_plugin(params, 'XXX', 'password') self._select_auth_plugin(params, 'XXX', 'password')

View File

@ -82,34 +82,62 @@ class TestShell(utils.TestCase):
fake_execute(_shell, _cmd) fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "project"]) self.app.assert_called_with(["list", "project"])
self.assertEqual(default_args["auth_url"], self.assertEqual(
_shell.options.os_auth_url) default_args.get("auth_url", ''),
self.assertEqual(default_args["project_id"], _shell.options.auth_url,
_shell.options.os_project_id) )
self.assertEqual(default_args["project_name"], self.assertEqual(
_shell.options.os_project_name) default_args.get("project_id", ''),
self.assertEqual(default_args["domain_id"], _shell.options.project_id,
_shell.options.os_domain_id) )
self.assertEqual(default_args["domain_name"], self.assertEqual(
_shell.options.os_domain_name) default_args.get("project_name", ''),
self.assertEqual(default_args["user_domain_id"], _shell.options.project_name,
_shell.options.os_user_domain_id) )
self.assertEqual(default_args["user_domain_name"], self.assertEqual(
_shell.options.os_user_domain_name) default_args.get("domain_id", ''),
self.assertEqual(default_args["project_domain_id"], _shell.options.domain_id,
_shell.options.os_project_domain_id) )
self.assertEqual(default_args["project_domain_name"], self.assertEqual(
_shell.options.os_project_domain_name) default_args.get("domain_name", ''),
self.assertEqual(default_args["username"], _shell.options.domain_name,
_shell.options.os_username) )
self.assertEqual(default_args["password"], self.assertEqual(
_shell.options.os_password) default_args.get("user_domain_id", ''),
self.assertEqual(default_args["region_name"], _shell.options.user_domain_id,
_shell.options.os_region_name) )
self.assertEqual(default_args["trust_id"], self.assertEqual(
_shell.options.os_trust_id) default_args.get("user_domain_name", ''),
self.assertEqual(default_args['auth_type'], _shell.options.user_domain_name,
_shell.options.os_auth_type) )
self.assertEqual(
default_args.get("project_domain_id", ''),
_shell.options.project_domain_id,
)
self.assertEqual(
default_args.get("project_domain_name", ''),
_shell.options.project_domain_name,
)
self.assertEqual(
default_args.get("username", ''),
_shell.options.username,
)
self.assertEqual(
default_args.get("password", ''),
_shell.options.password,
)
self.assertEqual(
default_args.get("region_name", ''),
_shell.options.region_name,
)
self.assertEqual(
default_args.get("trust_id", ''),
_shell.options.trust_id,
)
self.assertEqual(
default_args.get('auth_type', ''),
_shell.options.auth_type,
)
def _assert_token_auth(self, cmd_options, default_args): def _assert_token_auth(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app", with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@ -118,9 +146,34 @@ class TestShell(utils.TestCase):
fake_execute(_shell, _cmd) fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"]) self.app.assert_called_with(["list", "role"])
self.assertEqual(default_args["os_token"], _shell.options.os_token) self.assertEqual(
self.assertEqual(default_args["os_auth_url"], default_args.get("token", ''),
_shell.options.os_auth_url) _shell.options.token,
"token"
)
self.assertEqual(
default_args.get("auth_url", ''),
_shell.options.auth_url,
"auth_url"
)
def _assert_token_endpoint_auth(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
self.app):
_shell, _cmd = make_shell(), cmd_options + " list role"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"])
self.assertEqual(
default_args.get("token", ''),
_shell.options.token,
"token",
)
self.assertEqual(
default_args.get("url", ''),
_shell.options.url,
"url",
)
def _assert_cli(self, cmd_options, default_args): def _assert_cli(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app", with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@ -178,301 +231,159 @@ class TestShellPasswordAuth(TestShell):
flag = "--os-auth-url " + DEFAULT_AUTH_URL flag = "--os-auth-url " + DEFAULT_AUTH_URL
kwargs = { kwargs = {
"auth_url": DEFAULT_AUTH_URL, "auth_url": DEFAULT_AUTH_URL,
"project_id": "",
"project_name": "",
"user_domain_id": "",
"domain_id": "",
"domain_name": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_project_id_flow(self): def test_only_project_id_flow(self):
flag = "--os-project-id " + DEFAULT_PROJECT_ID flag = "--os-project-id " + DEFAULT_PROJECT_ID
kwargs = { kwargs = {
"auth_url": "",
"project_id": DEFAULT_PROJECT_ID, "project_id": DEFAULT_PROJECT_ID,
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_project_name_flow(self): def test_only_project_name_flow(self):
flag = "--os-project-name " + DEFAULT_PROJECT_NAME flag = "--os-project-name " + DEFAULT_PROJECT_NAME
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": DEFAULT_PROJECT_NAME, "project_name": DEFAULT_PROJECT_NAME,
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_domain_id_flow(self): def test_only_domain_id_flow(self):
flag = "--os-domain-id " + DEFAULT_DOMAIN_ID flag = "--os-domain-id " + DEFAULT_DOMAIN_ID
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": DEFAULT_DOMAIN_ID, "domain_id": DEFAULT_DOMAIN_ID,
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_domain_name_flow(self): def test_only_domain_name_flow(self):
flag = "--os-domain-name " + DEFAULT_DOMAIN_NAME flag = "--os-domain-name " + DEFAULT_DOMAIN_NAME
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": DEFAULT_DOMAIN_NAME, "domain_name": DEFAULT_DOMAIN_NAME,
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_user_domain_id_flow(self): def test_only_user_domain_id_flow(self):
flag = "--os-user-domain-id " + DEFAULT_USER_DOMAIN_ID flag = "--os-user-domain-id " + DEFAULT_USER_DOMAIN_ID
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": DEFAULT_USER_DOMAIN_ID, "user_domain_id": DEFAULT_USER_DOMAIN_ID,
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_user_domain_name_flow(self): def test_only_user_domain_name_flow(self):
flag = "--os-user-domain-name " + DEFAULT_USER_DOMAIN_NAME flag = "--os-user-domain-name " + DEFAULT_USER_DOMAIN_NAME
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": DEFAULT_USER_DOMAIN_NAME, "user_domain_name": DEFAULT_USER_DOMAIN_NAME,
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_project_domain_id_flow(self): def test_only_project_domain_id_flow(self):
flag = "--os-project-domain-id " + DEFAULT_PROJECT_DOMAIN_ID flag = "--os-project-domain-id " + DEFAULT_PROJECT_DOMAIN_ID
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": DEFAULT_PROJECT_DOMAIN_ID, "project_domain_id": DEFAULT_PROJECT_DOMAIN_ID,
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_project_domain_name_flow(self): def test_only_project_domain_name_flow(self):
flag = "--os-project-domain-name " + DEFAULT_PROJECT_DOMAIN_NAME flag = "--os-project-domain-name " + DEFAULT_PROJECT_DOMAIN_NAME
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": DEFAULT_PROJECT_DOMAIN_NAME, "project_domain_name": DEFAULT_PROJECT_DOMAIN_NAME,
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_username_flow(self): def test_only_username_flow(self):
flag = "--os-username " + DEFAULT_USERNAME flag = "--os-username " + DEFAULT_USERNAME
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": DEFAULT_USERNAME, "username": DEFAULT_USERNAME,
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_password_flow(self): def test_only_password_flow(self):
flag = "--os-password " + DEFAULT_PASSWORD flag = "--os-password " + DEFAULT_PASSWORD
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": DEFAULT_PASSWORD, "password": DEFAULT_PASSWORD,
"region_name": "",
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_region_name_flow(self): def test_only_region_name_flow(self):
flag = "--os-region-name " + DEFAULT_REGION_NAME flag = "--os-region-name " + DEFAULT_REGION_NAME
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": DEFAULT_REGION_NAME, "region_name": DEFAULT_REGION_NAME,
"trust_id": "",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_trust_id_flow(self): def test_only_trust_id_flow(self):
flag = "--os-trust-id " + "1234" flag = "--os-trust-id " + "1234"
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "1234", "trust_id": "1234",
"auth_type": "",
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
def test_only_auth_type_flow(self): def test_only_auth_type_flow(self):
flag = "--os-auth-type " + "v2password" flag = "--os-auth-type " + "v2password"
kwargs = { kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": DEFAULT_AUTH_PLUGIN "auth_type": DEFAULT_AUTH_PLUGIN
} }
self._assert_password_auth(flag, kwargs) self._assert_password_auth(flag, kwargs)
class TestShellTokenAuth(TestShell): class TestShellTokenAuth(TestShell):
def test_only_token(self):
flag = "--os-token " + DEFAULT_TOKEN
kwargs = {
"token": DEFAULT_TOKEN,
"auth_url": '',
}
self._assert_token_auth(flag, kwargs)
def test_only_auth_url(self):
flag = "--os-auth-url " + DEFAULT_AUTH_URL
kwargs = {
"token": '',
"auth_url": DEFAULT_AUTH_URL,
}
self._assert_token_auth(flag, kwargs)
def test_empty_auth(self):
os.environ = {}
flag = ""
kwargs = {}
self._assert_token_auth(flag, kwargs)
class TestShellTokenAuthEnv(TestShell):
def setUp(self): def setUp(self):
super(TestShellTokenAuth, self).setUp() super(TestShellTokenAuthEnv, self).setUp()
env = { env = {
"OS_TOKEN": DEFAULT_TOKEN, "OS_TOKEN": DEFAULT_TOKEN,
"OS_AUTH_URL": DEFAULT_SERVICE_URL, "OS_AUTH_URL": DEFAULT_AUTH_URL,
} }
self.orig_env, os.environ = os.environ, env.copy() self.orig_env, os.environ = os.environ, env.copy()
def tearDown(self): def tearDown(self):
super(TestShellTokenAuth, self).tearDown() super(TestShellTokenAuthEnv, self).tearDown()
os.environ = self.orig_env os.environ = self.orig_env
def test_default_auth(self): def test_env(self):
flag = "" flag = ""
kwargs = { kwargs = {
"os_token": DEFAULT_TOKEN, "token": DEFAULT_TOKEN,
"os_auth_url": DEFAULT_SERVICE_URL "auth_url": DEFAULT_AUTH_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_token(self):
flag = "--os-token xyzpdq"
kwargs = {
"token": "xyzpdq",
"auth_url": DEFAULT_AUTH_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_auth_url(self):
flag = "--os-auth-url http://cloud.local:555"
kwargs = {
"token": DEFAULT_TOKEN,
"auth_url": "http://cloud.local:555",
} }
self._assert_token_auth(flag, kwargs) self._assert_token_auth(flag, kwargs)
@ -480,8 +391,82 @@ class TestShellTokenAuth(TestShell):
os.environ = {} os.environ = {}
flag = "" flag = ""
kwargs = { kwargs = {
"os_token": "", "token": '',
"os_auth_url": "" "auth_url": '',
}
self._assert_token_auth(flag, kwargs)
class TestShellTokenEndpointAuth(TestShell):
def test_only_token(self):
flag = "--os-token " + DEFAULT_TOKEN
kwargs = {
"token": DEFAULT_TOKEN,
"url": '',
}
self._assert_token_endpoint_auth(flag, kwargs)
def test_only_url(self):
flag = "--os-url " + DEFAULT_SERVICE_URL
kwargs = {
"token": '',
"url": DEFAULT_SERVICE_URL,
}
self._assert_token_endpoint_auth(flag, kwargs)
def test_empty_auth(self):
os.environ = {}
flag = ""
kwargs = {
"token": '',
"auth_url": '',
}
self._assert_token_endpoint_auth(flag, kwargs)
class TestShellTokenEndpointAuthEnv(TestShell):
def setUp(self):
super(TestShellTokenEndpointAuthEnv, self).setUp()
env = {
"OS_TOKEN": DEFAULT_TOKEN,
"OS_URL": DEFAULT_SERVICE_URL,
}
self.orig_env, os.environ = os.environ, env.copy()
def tearDown(self):
super(TestShellTokenEndpointAuthEnv, self).tearDown()
os.environ = self.orig_env
def test_env(self):
flag = ""
kwargs = {
"token": DEFAULT_TOKEN,
"url": DEFAULT_SERVICE_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_token(self):
flag = "--os-token xyzpdq"
kwargs = {
"token": "xyzpdq",
"url": DEFAULT_SERVICE_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_url(self):
flag = "--os-url http://cloud.local:555"
kwargs = {
"token": DEFAULT_TOKEN,
"url": "http://cloud.local:555",
}
self._assert_token_auth(flag, kwargs)
def test_empty_auth(self):
os.environ = {}
flag = ""
kwargs = {
"token": '',
"url": '',
} }
self._assert_token_auth(flag, kwargs) self._assert_token_auth(flag, kwargs)

View File

@ -7,6 +7,7 @@ six>=1.9.0
Babel>=1.3 Babel>=1.3
cliff>=1.10.0 # Apache-2.0 cliff>=1.10.0 # Apache-2.0
cliff-tablib>=1.0 cliff-tablib>=1.0
os-client-config
oslo.config>=1.9.3 # Apache-2.0 oslo.config>=1.9.3 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0 oslo.i18n>=1.5.0 # Apache-2.0
oslo.utils>=1.4.0 # Apache-2.0 oslo.utils>=1.4.0 # Apache-2.0