Merge "Split ClientManager"

This commit is contained in:
Zuul 2018-03-27 11:47:47 +00:00 committed by Gerrit Code Review
commit 6a10dbb153
9 changed files with 583 additions and 399 deletions

231
config_tempest/clients.py Normal file
View File

@ -0,0 +1,231 @@
# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import exceptions
from tempest.lib.services.compute import flavors_client
from tempest.lib.services.compute import networks_client as nova_net_client
from tempest.lib.services.compute import servers_client
from tempest.lib.services.identity.v2 import identity_client
from tempest.lib.services.identity.v2 import roles_client
from tempest.lib.services.identity.v2 import tenants_client
from tempest.lib.services.identity.v2 import users_client
from tempest.lib.services.identity.v3 \
import identity_client as identity_v3_client
from tempest.lib.services.identity.v3 import projects_client
from tempest.lib.services.identity.v3 import roles_client as roles_v3_client
from tempest.lib.services.identity.v3 import services_client as s_client
from tempest.lib.services.identity.v3 import users_client as users_v3_client
from tempest.lib.services.image.v2 import images_client
from tempest.lib.services.network import networks_client
from tempest.lib.services.volume.v2 import services_client
class ProjectsClient(object):
"""The class is a wrapper for managing projects/tenants.
It instantiates tempest projects_client and provides methods for creating
and finding projects (identity version v3)/tenants (identity version v2).
"""
def __init__(self, auth, catalog_type, identity_region, endpoint_type,
identity_version, **default_params):
self.identity_version = identity_version
self.project_class = tenants_client.TenantsClient if \
self.identity_version == "v2" else projects_client.ProjectsClient
self.client = self.project_class(auth, catalog_type, identity_region,
endpoint_type, **default_params)
def get_project_by_name(self, project_name):
if self.identity_version == "v2":
projects = self.client.list_tenants()['tenants']
else:
projects = self.client.list_projects()['projects']
for project in projects:
if project['name'] == project_name:
return project
raise exceptions.NotFound(
'No such tenant/project (%s) in %s' % (project_name, projects))
def create_project(self, name, description):
if self.identity_version == "v2":
self.client.create_tenant(name=name, description=description)
else:
self.client.create_project(name=name, description=description)
class ClientManager(object):
"""Manager of various OpenStack API clients.
Connections to clients are created on-demand, i.e. the client tries to
connect to the server only when it's being requested.
"""
def __init__(self, conf, creds):
"""Init method of ClientManager.
:param conf: TempestConf object
:param creds: Credentials object
"""
self.identity_region = conf.get_defaulted('identity', 'region')
self.auth_provider = creds.get_auth_provider()
default_params = {
'disable_ssl_certificate_validation':
conf.get_defaulted('identity',
'disable_ssl_certificate_validation'),
'ca_certs': conf.get_defaulted('identity', 'ca_certificates_file')
}
compute_params = {
'service': conf.get_defaulted('compute', 'catalog_type'),
'region': self.identity_region,
'endpoint_type': conf.get_defaulted('compute', 'endpoint_type')
}
compute_params.update(default_params)
self.identity = self.get_identity_client(conf, default_params)
self.tenants = ProjectsClient(
self.auth_provider,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
'publicURL',
creds.identity_version,
**default_params)
self.set_roles_client(
auth=self.auth_provider,
creds=creds,
conf=conf,
endpoint_type='publicURL',
default_params=default_params)
self.set_users_client(
auth=self.auth_provider,
creds=creds,
conf=conf,
endpoint_type='publicURL',
default_params=default_params)
self.images = images_client.ImagesClient(
self.auth_provider,
conf.get_defaulted('image', 'catalog_type'),
self.identity_region,
**default_params)
self.servers = servers_client.ServersClient(self.auth_provider,
**compute_params)
self.flavors = flavors_client.FlavorsClient(self.auth_provider,
**compute_params)
self.service_client = s_client.ServicesClient(
self.auth_provider,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
**default_params)
self.volume_service = services_client.ServicesClient(
self.auth_provider,
conf.get_defaulted('volume', 'catalog_type'),
self.identity_region,
**default_params)
self.networks = None
def create_nova_network_client():
if self.networks is None:
self.networks = nova_net_client.NetworksClient(
self.auth_provider, **compute_params)
return self.networks
def create_neutron_client():
if self.networks is None:
self.networks = networks_client.NetworksClient(
self.auth_provider,
conf.get_defaulted('network', 'catalog_type'),
self.identity_region,
endpoint_type=conf.get_defaulted('network',
'endpoint_type'),
**default_params)
return self.networks
self.get_nova_net_client = create_nova_network_client
self.get_neutron_client = create_neutron_client
# Set admin tenant id needed for keystone v3 tests.
if creds.admin:
tenant = self.tenants.get_project_by_name(creds.tenant_name)
conf.set('identity', 'admin_tenant_id', tenant['id'])
def get_identity_client(self, conf, default_params):
"""Obtain identity client.
:type conf: TempestConf object
:type default_params: dict
"""
if "v2.0" in conf.get("identity", "uri"):
return identity_client.IdentityClient(
self.auth_provider,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region, endpoint_type='publicURL',
**default_params)
else:
return identity_v3_client.IdentityClient(
self.auth_provider,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region, endpoint_type='publicURL',
**default_params)
def set_users_client(self, auth, creds, conf, endpoint_type,
default_params):
"""Sets users client.
:param auth: auth provider
:type auth: auth.KeystoneV2AuthProvider (or V3)
:type creds: Credentials object
:type conf: TempestConf object
:type endpoint_type: string
:type default_params: dict
"""
users_class = users_client.UsersClient
if "v3" in creds.identity_version:
users_class = users_v3_client.UsersClient
self.users = users_class(
auth,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
endpoint_type=endpoint_type,
**default_params)
def set_roles_client(self, auth, creds, conf, endpoint_type,
default_params):
"""Sets roles client.
:param auth: auth provider
:type auth: auth.KeystoneV2AuthProvider (or V3)
:type creds: Credentials object
:type conf: TempestConf object
:type endpoint_type: string
:type default_params: dict
"""
roles_class = roles_client.RolesClient
if "v3" in creds.identity_version:
roles_class = roles_v3_client.RolesClient
self.roles = roles_class(
auth,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
endpoint_type=endpoint_type,
**default_params)

View File

@ -0,0 +1,136 @@
# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import auth
class Credentials(object):
"""Class contains all needed credentials.
Wrapps credentials obtained from TempestConf object and Tempest
credentialsfrom auth library.
"""
def __init__(self, conf, admin):
"""Init method of Credentials.
:type conf: TempestConf object
:param admin: True if the user is admin, False otherwise
:type admin: Boolean
"""
self.admin = admin
self._conf = conf
self.username = self.get_credential('username')
self.password = self.get_credential('password')
self.tenant_name = self.get_credential('tenant_name')
self.identity_version = self._get_identity_version()
self.disable_ssl_certificate_validation = self._conf.get_defaulted(
'identity',
'disable_ssl_certificate_validation'
)
self.ca_certs = self._conf.get_defaulted('identity',
'ca_certificates_file')
self.set_credentials()
def get_credential(self, key):
"""Helper for getting credential by its name.
:param key: credential name
:type key: string
:returns: credential
:rtype: string
"""
admin_prefix = 'admin_' if self.admin else ""
return self.get_identity_credential(admin_prefix + key)
def get_identity_credential(self, key):
"""Get credential requested by its name.
The function is providing the backwards compatibility for looking up
the credentials, because admin credentials were moved from identity
to auth section and admin_tenant_name was renamed to
admin_project_name.
:param key: name of the credential e.g. username, passsword ...
:type key: string
:returns: credential
:rtype: string
"""
if key == 'admin_tenant_name':
value = self._conf.get_defaulted('auth', 'admin_project_name')
else:
value = self._conf.get_defaulted('auth', key)
if value is None:
return self._conf.get_defaulted('identity', key)
return value
def _get_identity_version(self):
"""Looks for identity version in TempestConf object.
:returns: identity version
:rtype: string
"""
if "v3" in self._conf.get("identity", "uri"):
return "v3"
else:
return "v2"
def _get_creds_kwargs(self):
"""Creates kwargs.
Kwargs based on the identity version, for obtaining
Tempest credentials.
:returns: kwargs
:rtype: dict
"""
creds_kwargs = {'username': self.username,
'password': self.password}
if self.identity_version == 'v3':
creds_kwargs.update({'project_name': self.tenant_name,
'domain_name': 'Default',
'user_domain_name': 'Default'})
else:
creds_kwargs.update({'tenant_name': self.tenant_name})
return creds_kwargs
def set_credentials(self):
"""Gets and saves Tempest credentials from auth library."""
creds_kwargs = self._get_creds_kwargs()
disable_ssl = self.disable_ssl_certificate_validation
self.tempest_creds = auth.get_credentials(
auth_url=None,
fill_in=False,
identity_version=self.identity_version,
disable_ssl_certificate_validation=disable_ssl,
ca_certs=self.ca_certs,
**creds_kwargs)
def get_auth_provider(self):
"""Gets auth provider based on the type of Tempest credentials.
:returns: auth provider
:rtype: auth.KeystoneV2AuthProvider/auth.KeystoneV3AuthProvider
"""
if isinstance(self.tempest_creds, auth.KeystoneV3Credentials):
return auth.KeystoneV3AuthProvider(
self.tempest_creds,
self._conf.get_defaulted('identity', 'uri_v3'),
self.disable_ssl_certificate_validation,
self.ca_certs)
else:
return auth.KeystoneV2AuthProvider(
self.tempest_creds,
self._conf.get_defaulted('identity', 'uri'),
self.disable_ssl_certificate_validation,
self.ca_certs)

View File

@ -45,26 +45,11 @@ import shutil
import sys import sys
import urllib2 import urllib2
from clients import ClientManager
from credentials import Credentials
import os_client_config import os_client_config
from oslo_config import cfg from oslo_config import cfg
from tempest.lib import auth
from tempest.lib import exceptions from tempest.lib import exceptions
from tempest.lib.services.compute import flavors_client
from tempest.lib.services.compute import networks_client as nova_net_client
from tempest.lib.services.compute import servers_client
from tempest.lib.services.identity.v2 import identity_client
from tempest.lib.services.identity.v2 import roles_client
from tempest.lib.services.identity.v2 import tenants_client
from tempest.lib.services.identity.v2 import users_client
from tempest.lib.services.identity.v3 \
import identity_client as identity_v3_client
from tempest.lib.services.identity.v3 import projects_client
from tempest.lib.services.identity.v3 import roles_client as roles_v3_client
from tempest.lib.services.identity.v3 import services_client as s_client
from tempest.lib.services.identity.v3 import users_client as users_v3_client
from tempest.lib.services.image.v2 import images_client
from tempest.lib.services.network import networks_client
from tempest.lib.services.volume.v2 import services_client
import tempest_conf import tempest_conf
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -348,253 +333,6 @@ def set_cloud_config_values(non_admin, cloud_creds, conf):
'Could not load some identity options from cloud config file') 'Could not load some identity options from cloud config file')
class ProjectsClient(object):
def __init__(self, auth, catalog_type, identity_region, endpoint_type,
identity_version, **default_params):
self.identity_version = identity_version
self.project_class = tenants_client.TenantsClient if \
self.identity_version == "v2" else projects_client.ProjectsClient
self.client = self.project_class(auth, catalog_type, identity_region,
endpoint_type, **default_params)
def get_project_by_name(self, project_name):
if self.identity_version == "v2":
projects = self.client.list_tenants()['tenants']
else:
projects = self.client.list_projects()['projects']
for project in projects:
if project['name'] == project_name:
return project
raise exceptions.NotFound(
'No such tenant/project (%s) in %s' % (project_name, projects))
def create_project(self, name, description):
if self.identity_version == "v2":
self.client.create_tenant(name=name, description=description)
else:
self.client.create_project(name=name, description=description)
class ClientManager(object):
"""Manager of various OpenStack API clients.
Connections to clients are created on-demand, i.e. the client tries to
connect to the server only when it's being requested.
"""
def get_credentials(self, conf, username, tenant_name, password,
identity_version='v2'):
creds_kwargs = {'username': username,
'password': password}
if identity_version == 'v3':
creds_kwargs.update({'project_name': tenant_name,
'domain_name': 'Default',
'user_domain_name': 'Default'})
else:
creds_kwargs.update({'tenant_name': tenant_name})
return auth.get_credentials(
auth_url=None,
fill_in=False,
identity_version=identity_version,
disable_ssl_certificate_validation=conf.get_defaulted(
'identity',
'disable_ssl_certificate_validation'),
ca_certs=conf.get_defaulted(
'identity',
'ca_certificates_file'),
**creds_kwargs)
def get_auth_provider(self, conf, credentials):
disable_ssl_certificate_validation = conf.get_defaulted(
'identity',
'disable_ssl_certificate_validation')
ca_certs = conf.get_defaulted(
'identity',
'ca_certificates_file')
if isinstance(credentials, auth.KeystoneV3Credentials):
return auth.KeystoneV3AuthProvider(
credentials, conf.get_defaulted('identity', 'uri_v3'),
disable_ssl_certificate_validation,
ca_certs)
else:
return auth.KeystoneV2AuthProvider(
credentials, conf.get_defaulted('identity', 'uri'),
disable_ssl_certificate_validation,
ca_certs)
def get_identity_version(self, conf):
if "v3" in conf.get("identity", "uri"):
return "v3"
else:
return "v2"
def set_users_client(self, auth, conf, endpoint_type, default_params):
users_class = users_client.UsersClient
if "v3" in self.identity_version:
users_class = users_v3_client.UsersClient
self.users = users_class(
auth,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
endpoint_type=endpoint_type,
**default_params)
def set_roles_client(self, auth, conf, endpoint_type, default_params):
roles_class = roles_client.RolesClient
if "v3" in self.identity_version:
roles_class = roles_v3_client.RolesClient
self.roles = roles_class(
auth,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
endpoint_type=endpoint_type,
**default_params)
def __init__(self, conf, admin):
self.identity_version = self.get_identity_version(conf)
username = None
password = None
tenant_name = None
if admin:
try:
username = conf.get_defaulted('auth', 'admin_username')
if username is None:
username = conf.get_defaulted('identity', 'admin_username')
password = conf.get_defaulted('auth', 'admin_password')
if password is None:
password = conf.get_defaulted('identity', 'admin_password')
tenant_name = conf.get_defaulted('auth',
'admin_project_name')
if tenant_name is None:
tenant_name = conf.get_defaulted('identity',
'admin_tenant_name')
except cfg.NoSuchOptError:
LOG.warning(
'Could not load some identity admin options from %s',
DEFAULTS_FILE)
else:
try:
username = conf.get_defaulted('identity', 'username')
password = conf.get_defaulted('identity', 'password')
tenant_name = conf.get_defaulted('identity', 'tenant_name')
except cfg.NoSuchOptError:
LOG.warning(
'Could not load some identity options from %s',
DEFAULTS_FILE)
self.identity_region = conf.get_defaulted('identity', 'region')
default_params = {
'disable_ssl_certificate_validation':
conf.get_defaulted('identity',
'disable_ssl_certificate_validation'),
'ca_certs': conf.get_defaulted('identity', 'ca_certificates_file')
}
compute_params = {
'service': conf.get_defaulted('compute', 'catalog_type'),
'region': self.identity_region,
'endpoint_type': conf.get_defaulted('compute', 'endpoint_type')
}
compute_params.update(default_params)
if self.identity_version == "v2":
_creds = self.get_credentials(conf, username, tenant_name,
password)
else:
_creds = self.get_credentials(
conf, username, tenant_name, password,
identity_version=self.identity_version)
_auth = self.get_auth_provider(conf, _creds)
self.auth_provider = _auth
if "v2.0" in conf.get("identity", "uri"):
self.identity = identity_client.IdentityClient(
_auth, conf.get_defaulted('identity', 'catalog_type'),
self.identity_region, endpoint_type='publicURL',
**default_params)
else:
self.identity = identity_v3_client.IdentityClient(
_auth, conf.get_defaulted('identity', 'catalog_type'),
self.identity_region, endpoint_type='publicURL',
**default_params)
self.tenants = ProjectsClient(
_auth,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
'publicURL',
self.identity_version,
**default_params)
self.set_roles_client(
auth=_auth,
conf=conf,
endpoint_type='publicURL',
default_params=default_params)
self.set_users_client(
auth=_auth,
conf=conf,
endpoint_type='publicURL',
default_params=default_params)
self.images = images_client.ImagesClient(
_auth,
conf.get_defaulted('image', 'catalog_type'),
self.identity_region,
**default_params)
self.servers = servers_client.ServersClient(_auth,
**compute_params)
self.flavors = flavors_client.FlavorsClient(_auth,
**compute_params)
self.networks = None
self.service_client = s_client.ServicesClient(
_auth,
conf.get_defaulted('identity', 'catalog_type'),
self.identity_region,
**default_params)
self.volume_service = services_client.ServicesClient(
_auth,
conf.get_defaulted('volume', 'catalog_type'),
self.identity_region,
**default_params)
def create_nova_network_client():
if self.networks is None:
self.networks = nova_net_client.NetworksClient(
_auth, **compute_params)
return self.networks
def create_neutron_client():
if self.networks is None:
self.networks = networks_client.NetworksClient(
_auth,
conf.get_defaulted('network', 'catalog_type'),
self.identity_region,
endpoint_type=conf.get_defaulted('network',
'endpoint_type'),
**default_params)
return self.networks
self.get_nova_net_client = create_nova_network_client
self.get_neutron_client = create_neutron_client
# Set admin tenant id needed for keystone v3 tests.
if admin:
tenant_id = self.tenants.get_project_by_name(tenant_name)['id']
conf.set('identity', 'admin_tenant_id', tenant_id)
def create_tempest_users(tenants_client, roles_client, users_client, conf, def create_tempest_users(tenants_client, roles_client, users_client, conf,
services): services):
"""Create users necessary for Tempest if they don't exist already.""" """Create users necessary for Tempest if they don't exist already."""
@ -1046,8 +784,8 @@ def main():
else: else:
# TODO(arxcruz) make a check if v3 is enabled # TODO(arxcruz) make a check if v3 is enabled
conf.set("identity", "uri_v3", uri.replace("v2.0", "v3")) conf.set("identity", "uri_v3", uri.replace("v2.0", "v3"))
credentials = Credentials(conf, not args.non_admin)
clients = ClientManager(conf, not args.non_admin) clients = ClientManager(conf, credentials)
swift_discover = conf.get_defaulted('object-storage-feature-enabled', swift_discover = conf.get_defaulted('object-storage-feature-enabled',
'discoverability') 'discoverability')
services = api_discovery.discover( services = api_discovery.discover(

View File

@ -17,6 +17,7 @@ import ConfigParser
import logging import logging
import sys import sys
from oslo_config import cfg
import tempest.config import tempest.config
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -58,10 +59,13 @@ class TempestConf(ConfigParser.SafeConfigParser):
:returns: default value for the section.key pair :returns: default value for the section.key pair
:rtype: String :rtype: String
""" """
if self.has_option(section, key): try:
return self.get(section, key) if self.has_option(section, key):
else: return self.get(section, key)
return self.CONF.get(section).get(key) else:
return self.CONF.get(section).get(key)
except cfg.NoSuchOptError:
LOG.warning("Option %s is not defined in %s section", key, section)
def set(self, section, key, value, priority=False): def set(self, section, key, value, priority=False):
"""Set value in configuration, similar to `SafeConfigParser.set` """Set value in configuration, similar to `SafeConfigParser.set`

View File

@ -21,7 +21,8 @@ import mock
from oslotest import base from oslotest import base
from config_tempest import api_discovery as api from config_tempest import api_discovery as api
from config_tempest import main as tool from config_tempest.clients import ClientManager
from config_tempest.credentials import Credentials
from config_tempest import tempest_conf from config_tempest import tempest_conf
@ -63,13 +64,18 @@ class BaseConfigTempestTest(base.BaseTestCase):
conf.set("auth", "use_dynamic_credentials", "True") conf.set("auth", "use_dynamic_credentials", "True")
return conf return conf
def _get_creds(self, conf, admin=False):
return Credentials(conf, admin)
@mock.patch('os_client_config.cloud_config.CloudConfig') @mock.patch('os_client_config.cloud_config.CloudConfig')
def _get_clients(self, conf, mock_args, admin=False): def _get_clients(self, conf, mock_args, creds=None):
"""Returns ClientManager instance""" """Returns ClientManager instance"""
if creds is None:
creds = self._get_creds(conf)
mock_function = mock.Mock(return_value=False) mock_function = mock.Mock(return_value=False)
func2mock = 'os_client_config.cloud_config.CloudConfig.config.get' func2mock = 'os_client_config.cloud_config.CloudConfig.config.get'
self.useFixture(MonkeyPatch(func2mock, mock_function)) self.useFixture(MonkeyPatch(func2mock, mock_function))
return tool.ClientManager(conf, admin=admin) return ClientManager(conf, creds)
class BaseServiceTest(base.BaseTestCase): class BaseServiceTest(base.BaseTestCase):

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fixtures import MonkeyPatch
import mock
from config_tempest.tests.base import BaseConfigTempestTest
class TestClientManager(BaseConfigTempestTest):
def setUp(self):
super(TestClientManager, self).setUp()
self.conf = self._get_conf("v2.0", "v3")
self.client = self._get_clients(self.conf)
def test_init_manager_as_admin(self):
mock_function = mock.Mock(return_value={"id": "my_fake_id"})
func2mock = ('config_tempest.clients.ProjectsClient.'
'get_project_by_name')
self.useFixture(MonkeyPatch(func2mock, mock_function))
self._get_clients(self.conf, self._get_creds(self.conf, admin=True))
# check if admin credentials were set
admin_tenant = self.conf.get("identity", "admin_tenant_name")
admin_password = self.conf.get("identity", "admin_password")
self.assertEqual(self.conf.get("identity", "admin_username"), "admin")
self.assertEqual(admin_tenant, "adminTenant")
self.assertEqual(admin_password, "adminPass")
# check if admin tenant id was set
admin_tenant_id = self.conf.get("identity", "admin_tenant_id")
self.assertEqual(admin_tenant_id, "my_fake_id")
def test_init_manager_as_admin_using_new_auth(self):
self.conf = self._get_alt_conf("v2.0", "v3")
self.client = self._get_clients(self.conf)
mock_function = mock.Mock(return_value={"id": "my_fake_id"})
func2mock = ('config_tempest.clients.ProjectsClient'
'.get_project_by_name')
self.useFixture(MonkeyPatch(func2mock, mock_function))
self._get_clients(self.conf, self._get_creds(self.conf, admin=True))
# check if admin credentials were set
admin_tenant = self.conf.get("auth", "admin_project_name")
admin_password = self.conf.get("auth", "admin_password")
self.assertEqual(self.conf.get("auth", "admin_username"), "admin")
self.assertEqual(admin_tenant, "adminTenant")
self.assertEqual(admin_password, "adminPass")
# check if admin tenant id was set
admin_tenant_id = self.conf.get("identity", "admin_tenant_id")
self.assertEqual(admin_tenant_id, "my_fake_id")
use_dynamic_creds_bool = self.conf.get("auth",
"use_dynamic_credentials")
self.assertEqual(use_dynamic_creds_bool, "True")

View File

@ -19,6 +19,7 @@ from fixtures import MonkeyPatch
import logging import logging
import mock import mock
from config_tempest.clients import ClientManager
from config_tempest import main as tool from config_tempest import main as tool
from config_tempest import tempest_conf from config_tempest import tempest_conf
from config_tempest.tests.base import BaseConfigTempestTest from config_tempest.tests.base import BaseConfigTempestTest
@ -27,116 +28,6 @@ from config_tempest.tests.base import BaseConfigTempestTest
logging.disable(logging.CRITICAL) logging.disable(logging.CRITICAL)
class TestClientManager(BaseConfigTempestTest):
def setUp(self):
super(TestClientManager, self).setUp()
self.conf = self._get_conf("v2.0", "v3")
self.client = self._get_clients(self.conf)
def test_get_credentials_v2(self):
mock_function = mock.Mock()
function2mock = 'config_tempest.main.auth.get_credentials'
self.useFixture(MonkeyPatch(function2mock, mock_function))
self.client.get_credentials(self.conf, "name", "Tname", "pass")
mock_function.assert_called_with(
auth_url=None, fill_in=False, identity_version='v2',
disable_ssl_certificate_validation='true',
ca_certs=None, password='pass', tenant_name='Tname',
username='name')
def test_get_credentials_v3(self):
mock_function = mock.Mock()
function2mock = 'config_tempest.main.auth.get_credentials'
self.useFixture(MonkeyPatch(function2mock, mock_function))
self.client.get_credentials(self.conf, "name", "project_name",
"pass", identity_version='v3')
mock_function.assert_called_with(
auth_url=None, fill_in=False, identity_version='v3',
disable_ssl_certificate_validation='true',
ca_certs=None, password='pass',
username='name',
project_name='project_name',
domain_name='Default',
user_domain_name='Default')
def test_get_auth_provider_keystone_v2(self):
# check if method returns correct method - KeystoneV2AuthProvider
mock_function = mock.Mock()
# mock V2Provider, if other provider is called, it fails
func2mock = 'config_tempest.main.auth.KeystoneV2AuthProvider'
self.useFixture(MonkeyPatch(func2mock, mock_function))
resp = self.client.get_auth_provider(self.conf, "")
self.assertEqual(resp, mock_function())
# check parameters of returned function
self.client.get_auth_provider(self.conf, "")
mock_function.assert_called_with('', 'http://172.16.52.151:5000/v2.0',
'true', None)
def test_get_auth_provider_keystone_v3(self):
# check if method returns KeystoneV3AuthProvider
# make isinstance return True
mockIsInstance = mock.Mock(return_value=True)
self.useFixture(MonkeyPatch('config_tempest.main.isinstance',
mockIsInstance))
mock_function = mock.Mock()
# mock V3Provider, if other provider is called, it fails
func2mock = 'config_tempest.main.auth.KeystoneV3AuthProvider'
self.useFixture(MonkeyPatch(func2mock, mock_function))
resp = self.client.get_auth_provider(self.conf, "")
self.assertEqual(resp, mock_function())
# check parameters of returned function
self.client.get_auth_provider(self.conf, "")
mock_function.assert_called_with('', 'http://172.16.52.151:5000/v3',
'true', None)
def test_get_identity_version_v2(self):
resp = self.client.get_identity_version(self.conf)
self.assertEqual(resp, 'v2')
def test_get_identity_version_v3(self):
conf = self._get_conf("v3", "v3") # uri has to be v3
resp = self.client.get_identity_version(conf)
self.assertEqual(resp, 'v3')
def test_init_manager_as_admin(self):
mock_function = mock.Mock(return_value={"id": "my_fake_id"})
func2mock = ('config_tempest.main.ProjectsClient.'
'get_project_by_name')
self.useFixture(MonkeyPatch(func2mock, mock_function))
self._get_clients(self.conf, admin=True)
# check if admin credentials were set
admin_tenant = self.conf.get("identity", "admin_tenant_name")
admin_password = self.conf.get("identity", "admin_password")
self.assertEqual(self.conf.get("identity", "admin_username"), "admin")
self.assertEqual(admin_tenant, "adminTenant")
self.assertEqual(admin_password, "adminPass")
# check if admin tenant id was set
admin_tenant_id = self.conf.get("identity", "admin_tenant_id")
self.assertEqual(admin_tenant_id, "my_fake_id")
def test_init_manager_as_admin_using_new_auth(self):
self.conf = self._get_alt_conf("v2.0", "v3")
self.client = self._get_clients(self.conf)
mock_function = mock.Mock(return_value={"id": "my_fake_id"})
func2mock = ('config_tempest.main.ProjectsClient'
'.get_project_by_name')
self.useFixture(MonkeyPatch(func2mock, mock_function))
self._get_clients(self.conf, admin=True)
# check if admin credentials were set
admin_tenant = self.conf.get("auth", "admin_project_name")
admin_password = self.conf.get("auth", "admin_password")
self.assertEqual(self.conf.get("auth", "admin_username"), "admin")
self.assertEqual(admin_tenant, "adminTenant")
self.assertEqual(admin_password, "adminPass")
# check if admin tenant id was set
admin_tenant_id = self.conf.get("identity", "admin_tenant_id")
self.assertEqual(admin_tenant_id, "my_fake_id")
use_dynamic_creds_bool = self.conf.get("auth",
"use_dynamic_credentials")
self.assertEqual(use_dynamic_creds_bool, "True")
class TestOsClientConfigSupport(BaseConfigTempestTest): class TestOsClientConfigSupport(BaseConfigTempestTest):
def setUp(self): def setUp(self):
@ -162,7 +53,7 @@ class TestOsClientConfigSupport(BaseConfigTempestTest):
func2mock = 'os_client_config.cloud_config.CloudConfig.config.get' func2mock = 'os_client_config.cloud_config.CloudConfig.config.get'
self.useFixture(MonkeyPatch(func2mock, mock_function)) self.useFixture(MonkeyPatch(func2mock, mock_function))
mock_function = mock.Mock(return_value={"id": "my_fake_id"}) mock_function = mock.Mock(return_value={"id": "my_fake_id"})
func2mock = ('config_tempest.main.ProjectsClient.' func2mock = ('config_tempest.clients.ProjectsClient.'
'get_project_by_name') 'get_project_by_name')
self.useFixture(MonkeyPatch(func2mock, mock_function)) self.useFixture(MonkeyPatch(func2mock, mock_function))
@ -204,7 +95,7 @@ class TestOsClientConfigSupport(BaseConfigTempestTest):
mock_function = mock.Mock(return_value={}) mock_function = mock.Mock(return_value={})
func2mock = 'os_client_config.cloud_config.CloudConfig.config.get' func2mock = 'os_client_config.cloud_config.CloudConfig.config.get'
self.useFixture(MonkeyPatch(func2mock, mock_function)) self.useFixture(MonkeyPatch(func2mock, mock_function))
manager = tool.ClientManager(self.conf, admin=False) manager = ClientManager(self.conf, self._get_creds(self.conf))
# cloud_args is empty => check if default credentials were used # cloud_args is empty => check if default credentials were used
self._check_credentials(manager, self._check_credentials(manager,
self.conf.get('identity', 'username'), self.conf.get('identity', 'username'),
@ -213,7 +104,7 @@ class TestOsClientConfigSupport(BaseConfigTempestTest):
def test_init_manager_client_config_override(self): def test_init_manager_client_config_override(self):
self._override_setup() self._override_setup()
manager = tool.ClientManager(self.conf, admin=False) manager = ClientManager(self.conf, self._get_creds(self.conf))
# check if cloud_args credentials were overrided by the ones set in CLI # check if cloud_args credentials were overrided by the ones set in CLI
self._check_credentials(manager, self._check_credentials(manager,
self.conf.get('identity', 'username'), self.conf.get('identity', 'username'),
@ -222,7 +113,8 @@ class TestOsClientConfigSupport(BaseConfigTempestTest):
def test_init_manager_client_config_admin_override(self): def test_init_manager_client_config_admin_override(self):
self._override_setup() self._override_setup()
manager = tool.ClientManager(self.conf, admin=True) creds = self._get_creds(self.conf, admin=True)
manager = ClientManager(self.conf, creds)
# check if cloud_args credentials were overrided by admin ones # check if cloud_args credentials were overrided by admin ones
self._check_credentials(manager, self._check_credentials(manager,
self.conf.get('identity', 'admin_username'), self.conf.get('identity', 'admin_username'),

View File

@ -16,10 +16,10 @@
# under the License. # under the License.
import mock import mock
from tempest.lib import exceptions
from config_tempest import main as tool from config_tempest import main as tool
from config_tempest.tests.base import BaseConfigTempestTest from config_tempest.tests.base import BaseConfigTempestTest
from tempest.lib import exceptions
class TestCreateTempestUser(BaseConfigTempestTest): class TestCreateTempestUser(BaseConfigTempestTest):
@ -110,8 +110,9 @@ class TestCreateUserWithTenant(BaseConfigTempestTest):
self.tenant_description = "Tenant for Tempest %s user" % self.username self.tenant_description = "Tenant for Tempest %s user" % self.username
self.email = "%s@test.com" % self.username self.email = "%s@test.com" % self.username
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient'
@mock.patch('config_tempest.main.ProjectsClient.create_project') '.get_project_by_name')
@mock.patch('config_tempest.clients.ProjectsClient.create_project')
@mock.patch('tempest.lib.services.identity.v2.users_client.' @mock.patch('tempest.lib.services.identity.v2.users_client.'
'UsersClient.create_user') 'UsersClient.create_user')
def test_create_user_with_tenant(self, def test_create_user_with_tenant(self,
@ -132,8 +133,9 @@ class TestCreateUserWithTenant(BaseConfigTempestTest):
tenantId="fake-id", tenantId="fake-id",
email=self.email) email=self.email)
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient'
@mock.patch('config_tempest.main.ProjectsClient.create_project') '.get_project_by_name')
@mock.patch('config_tempest.clients.ProjectsClient.create_project')
@mock.patch('tempest.lib.services.identity.v2' @mock.patch('tempest.lib.services.identity.v2'
'.users_client.UsersClient.create_user') '.users_client.UsersClient.create_user')
def test_create_user_with_tenant_tenant_exists( def test_create_user_with_tenant_tenant_exists(
@ -161,7 +163,8 @@ class TestCreateUserWithTenant(BaseConfigTempestTest):
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'users_client.UsersClient.update_user_password') 'users_client.UsersClient.update_user_password')
@mock.patch('tempest.common.identity.get_user_by_username') @mock.patch('tempest.common.identity.get_user_by_username')
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient.'
'get_project_by_name')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'tenants_client.TenantsClient.create_tenant') 'tenants_client.TenantsClient.create_tenant')
@mock.patch('tempest.lib.services.identity.' @mock.patch('tempest.lib.services.identity.'
@ -191,8 +194,9 @@ class TestCreateUserWithTenant(BaseConfigTempestTest):
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'users_client.UsersClient.update_user_password') 'users_client.UsersClient.update_user_password')
@mock.patch('tempest.common.identity.get_user_by_username') @mock.patch('tempest.common.identity.get_user_by_username')
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient.'
@mock.patch('config_tempest.main.ProjectsClient.create_project') 'get_project_by_name')
@mock.patch('config_tempest.clients.ProjectsClient.create_project')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'users_client.UsersClient.create_user') 'users_client.UsersClient.create_user')
def test_create_user_with_tenant_exists_user_exists( def test_create_user_with_tenant_exists_user_exists(
@ -241,7 +245,8 @@ class TestGiveRoleToUser(BaseConfigTempestTest):
{'name': "fake_role2", {'name': "fake_role2",
'id': "fake_role_id2"}]} 'id': "fake_role_id2"}]}
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient.'
'get_project_by_name')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'users_client.UsersClient.list_users') 'users_client.UsersClient.list_users')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
@ -271,7 +276,8 @@ class TestGiveRoleToUser(BaseConfigTempestTest):
mock_create_user_role_on_project.assert_called_with( mock_create_user_role_on_project.assert_called_with(
"fake_tenant_id", "fake_user_id", "fake_role_id") "fake_tenant_id", "fake_user_id", "fake_role_id")
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient.'
'get_project_by_name')
@mock.patch('tempest.lib.services.identity.' @mock.patch('tempest.lib.services.identity.'
'v2.users_client.UsersClient.list_users') 'v2.users_client.UsersClient.list_users')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
@ -302,7 +308,8 @@ class TestGiveRoleToUser(BaseConfigTempestTest):
tenant_name=self.tenant_name, tenant_name=self.tenant_name,
role_name=role_name) role_name=role_name)
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient.'
'get_project_by_name')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'users_client.UsersClient.list_users') 'users_client.UsersClient.list_users')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
@ -332,7 +339,8 @@ class TestGiveRoleToUser(BaseConfigTempestTest):
role_name=self.role_name, role_name=self.role_name,
role_required=False) role_required=False)
@mock.patch('config_tempest.main.ProjectsClient.get_project_by_name') @mock.patch('config_tempest.clients.ProjectsClient'
'.get_project_by_name')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'
'users_client.UsersClient.list_users') 'users_client.UsersClient.list_users')
@mock.patch('tempest.lib.services.identity.v2.' @mock.patch('tempest.lib.services.identity.v2.'

View File

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fixtures import MonkeyPatch
import mock
from config_tempest.tests.base import BaseConfigTempestTest
class TestCredentials(BaseConfigTempestTest):
def setUp(self):
super(TestCredentials, self).setUp()
self.conf = self._get_conf("v2.0", "v3")
self.creds = self._get_creds(self.conf)
def test_get_identity_version_v2(self):
resp = self.creds._get_identity_version()
self.assertEqual(resp, 'v2')
def test_get_identity_version_v3(self):
conf = self._get_conf("v3", "v3") # uri has to be v3
creds = self._get_creds(conf)
resp = creds._get_identity_version()
self.assertEqual(resp, 'v3')
def test_set_credentials_v2(self):
mock_function = mock.Mock()
function2mock = 'config_tempest.credentials.auth.get_credentials'
self.useFixture(MonkeyPatch(function2mock, mock_function))
self.creds.username = "name"
self.creds.password = "pass"
self.creds.tenant_name = "Tname"
self.creds.set_credentials()
mock_function.assert_called_with(
auth_url=None, fill_in=False, identity_version='v2',
disable_ssl_certificate_validation='true',
ca_certs=None, password='pass', tenant_name='Tname',
username='name')
def test_set_credentials_v3(self):
mock_function = mock.Mock()
function2mock = 'config_tempest.credentials.auth.get_credentials'
self.useFixture(MonkeyPatch(function2mock, mock_function))
self.creds.username = "name"
self.creds.password = "pass"
self.creds.tenant_name = "project_name"
self.creds.identity_version = "v3"
self.creds.set_credentials()
mock_function.assert_called_with(
auth_url=None, fill_in=False, identity_version='v3',
disable_ssl_certificate_validation='true',
ca_certs=None, password='pass',
username='name',
project_name='project_name',
domain_name='Default',
user_domain_name='Default')
def test_get_auth_provider_keystone_v2(self):
# check if method returns correct method - KeystoneV2AuthProvider
mock_function = mock.Mock()
# mock V2Provider, if other provider is called, it fails
func2mock = 'config_tempest.credentials.auth.KeystoneV2AuthProvider'
self.useFixture(MonkeyPatch(func2mock, mock_function))
resp = self.creds.get_auth_provider()
self.assertEqual(resp, mock_function())
# check parameters of returned function
self.creds.get_auth_provider()
mock_function.assert_called_with(self.creds.tempest_creds,
'http://172.16.52.151:5000/v2.0',
'true', None)
def test_get_auth_provider_keystone_v3(self):
# check if method returns KeystoneV3AuthProvider
# make isinstance return True
mockIsInstance = mock.Mock(return_value=True)
self.useFixture(MonkeyPatch('config_tempest.credentials.isinstance',
mockIsInstance))
mock_function = mock.Mock()
# mock V3Provider, if other provider is called, it fails
func2mock = 'config_tempest.credentials.auth.KeystoneV3AuthProvider'
self.useFixture(MonkeyPatch(func2mock, mock_function))
resp = self.creds.get_auth_provider()
self.assertEqual(resp, mock_function())
# check parameters of returned function
self.creds.get_auth_provider()
mock_function.assert_called_with(self.creds.tempest_creds,
'http://172.16.52.151:5000/v3',
'true', None)