Takashi Kajinami f3610ba6d2 Remove fallback to Identity v2 API
Identity v2 API was removed from keystone multiple cycles ago.

Change-Id: Ifcc75531995a6a22865580d895423073c159d77f
2024-10-24 23:28:35 +09:00

155 lines
5.5 KiB
Python

# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneauth1.identity import v3 as v3_auth
from keystoneauth1 import session
from zaqarclient.auth import base
from zaqarclient import errors
# NOTE(flaper87): Some of the code below
# was brought to you by the very unique
# work of ceilometerclient and glanceclient.
class KeystoneAuth(base.AuthBackend):
"""Keystone Auth backend
:params conf: A dictionary with Keystone's
custom parameters:
- os_username
- os_password
- os_project_id
- os_project_name
- os_auth_url
- os_auth_token
- os_region_name
- os_service_type
- os_endpoint_type
:type conf: `dict`
"""
def _get_keystone_session(self, **kwargs):
cacert = kwargs.pop('cacert', None)
cert = kwargs.pop('cert', None)
key = kwargs.pop('key', None)
insecure = kwargs.pop('insecure', False)
auth_url = kwargs.pop('auth_url', None)
project_id = kwargs.pop('project_id', None)
project_name = kwargs.pop('project_name', None)
token = kwargs.get('token')
if insecure:
verify = False
else:
verify = cacert or True
if cert and key:
# passing cert and key together is deprecated in favour of the
# requests lib form of having the cert and key as a tuple
cert = (cert, key)
# create the keystone client session
ks_session = session.Session(verify=verify, cert=cert)
username = kwargs.pop('username', None)
password = kwargs.pop('password', None)
user_id = kwargs.pop('user_id', None)
user_domain_name = kwargs.pop('user_domain_name', None)
user_domain_id = kwargs.pop('user_domain_id', None)
project_domain_name = kwargs.pop('project_domain_name', None)
project_domain_id = kwargs.pop('project_domain_id', None)
auth = None
if auth_url is None:
raise errors.ZaqarError(
'Unable to determine the Keystone endpoint because auth_url '
'is not given.')
if token:
auth = v3_auth.Token(
auth_url,
token=token,
project_name=project_name,
project_id=project_id,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id)
else:
auth = v3_auth.Password(
auth_url,
username=username,
password=password,
user_id=user_id,
user_domain_name=user_domain_name,
user_domain_id=user_domain_id,
project_name=project_name,
project_id=project_id,
project_domain_name=project_domain_name,
project_domain_id=project_domain_id)
ks_session.auth = auth
return ks_session
def _get_endpoint(self, ks_session, **kwargs):
"""Get an endpoint using the provided keystone session."""
# Set service specific endpoint types
endpoint_type = kwargs.get('endpoint_type') or 'publicURL'
service_type = kwargs.get('service_type') or 'messaging'
region_name = kwargs.get('region_name')
endpoint = ks_session.get_endpoint(service_type=service_type,
interface=endpoint_type,
region_name=region_name)
return endpoint
def authenticate(self, api_version, request):
"""Get an authtenticated client using credentials in the keyword args.
:param api_version: the API version to use ('1' or '2')
:param request: The request spec instance to modify with
the auth information.
"""
def get_options(k):
return self.conf.get(k, self.conf.get("os_%s" % k))
token = get_options('auth_token')
if not token or not request.endpoint:
ks_kwargs = {}
keys = ("username", "password", "project_id",
"project_name", "auth_url", "insecure",
"cacert", "region_name", "user_domain_name",
"user_domain_id", "project_domain_name",
"project_domain_id")
for k in keys:
ks_kwargs.update({k: get_options(k)})
ks_session = (request.session or
self._get_keystone_session(**ks_kwargs))
if not token:
token = ks_session.get_token()
if not request.endpoint:
request.endpoint = self._get_endpoint(ks_session, **ks_kwargs)
# NOTE(flaper87): Update the request spec
# with the final token.
request.headers['X-Auth-Token'] = token
# NOTE(flwang): We also need to apply the insecure and cacert when
# talking with Zaqar server.
request.verify = not get_options('insecure')
request.cert = get_options('cacert')
return request