diff --git a/doc/requirements.txt b/doc/requirements.txt index 5a2e67b0..2edc23fb 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -20,6 +20,7 @@ python-memcached>=1.56 # PSF WebTest>=2.0.27 # MIT oslo.messaging>=5.29.0 # Apache-2.0 pycadf!=2.0.0,>=1.1.0 # Apache-2.0 +PyJWT>=2.4.0 # MIT keystoneauth1>=3.12.0 # Apache-2.0 oslo.cache>=1.26.0 # Apache-2.0 python-keystoneclient>=3.20.0 # Apache-2.0 diff --git a/keystonemiddleware/external_oauth2_token.py b/keystonemiddleware/external_oauth2_token.py new file mode 100644 index 00000000..dd5bc245 --- /dev/null +++ b/keystonemiddleware/external_oauth2_token.py @@ -0,0 +1,846 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import copy +import hashlib +import os +import ssl +import time +import uuid + +import jwt.utils +import oslo_cache +from oslo_config import cfg +from oslo_log import log as logging +from oslo_serialization import jsonutils +import requests.auth +import webob.dec +import webob.exc + +from keystoneauth1 import exceptions as ksa_exceptions +from keystoneauth1 import loading +from keystoneauth1.loading import session as session_loading + +from keystonemiddleware._common import config +from keystonemiddleware.exceptions import ConfigurationError +from keystonemiddleware.exceptions import KeystoneMiddlewareException +from keystonemiddleware.i18n import _ + +oslo_cache.configure(cfg.CONF) +_EXT_AUTH_CONFIG_GROUP_NAME = 'ext_oauth2_auth' +_EXTERNAL_AUTH2_OPTS = [ + cfg.StrOpt('certfile', + help='Required if identity server requires client ' + 'certificate.'), + cfg.StrOpt('keyfile', + help='Required if identity server requires client ' + 'private key.'), + cfg.StrOpt('cafile', + help='A PEM encoded Certificate Authority to use when ' + 'verifying HTTPs connections. Defaults to system CAs.'), + cfg.BoolOpt('insecure', default=False, help='Verify HTTPS connections.'), + cfg.IntOpt('http_connect_timeout', + help='Request timeout value for communicating with Identity ' + 'API server.'), + cfg.StrOpt('introspect_endpoint', + help='The endpoint for introspect API, it is used to verify ' + 'that the OAuth 2.0 access token is valid.'), + cfg.StrOpt('audience', + help='The Audience should be the URL of the Authorization ' + 'Server\'s Token Endpoint. The Authorization Server will ' + 'verify that it is an intended audience for the token.'), + cfg.StrOpt('auth_method', + default='client_secret_basic', + choices=('client_secret_basic', 'client_secret_post', + 'tls_client_auth', 'private_key_jwt', + 'client_secret_jwt'), + help='The auth_method must use the authentication method ' + 'specified by the Authorization Server. The system ' + 'supports 5 authentication methods such as ' + 'tls_client_auth, client_secret_basic, ' + 'client_secret_post, client_secret_jwt, private_key_jwt.'), + cfg.StrOpt('client_id', + help='The OAuth 2.0 Client Identifier valid at the ' + 'Authorization Server.'), + cfg.StrOpt('client_secret', + help='The OAuth 2.0 client secret. When the auth_method is ' + 'client_secret_basic, client_secret_post, or ' + 'client_secret_jwt, the value is used, and otherwise the ' + 'value is ignored.'), + cfg.BoolOpt('thumbprint_verify', default=False, + help='If the access token generated by the Authorization ' + 'Server is bound to the OAuth 2.0 certificate ' + 'thumbprint, the value can be set to true, and then the ' + 'keystone middleware will verify the thumbprint.'), + cfg.StrOpt('jwt_key_file', + help='The jwt_key_file must use the certificate key file which ' + 'has been registered with the Authorization Server. ' + 'When the auth_method is private_key_jwt, the value is ' + 'used, and otherwise the value is ignored.'), + cfg.StrOpt('jwt_algorithm', + help='The jwt_algorithm must use the algorithm specified by ' + 'the Authorization Server. When the auth_method is ' + 'client_secret_jwt, this value is often set to HS256, ' + 'when the auth_method is private_key_jwt, the value is ' + 'often set to RS256, and otherwise the value is ignored.'), + cfg.IntOpt('jwt_bearer_time_out', default=3600, + help='This value is used to calculate the expiration time. If ' + 'after the expiration time, the access token can not be ' + 'accepted. When the auth_method is client_secret_jwt or ' + 'private_key_jwt, the value is used, and otherwise the ' + 'value is ignored.'), + cfg.StrOpt('mapping_project_id', + help='Specifies the method for obtaining the project ID that ' + 'currently needs to be accessed. '), + cfg.StrOpt('mapping_project_name', + help='Specifies the method for obtaining the project name that ' + 'currently needs to be accessed.'), + cfg.StrOpt('mapping_project_domain_id', + help='Specifies the method for obtaining the project domain ID ' + 'that currently needs to be accessed.'), + cfg.StrOpt('mapping_project_domain_name', + help='Specifies the method for obtaining the project domain ' + 'name that currently needs to be accessed.'), + cfg.StrOpt('mapping_user_id', default='client_id', + help='Specifies the method for obtaining the user ID.'), + cfg.StrOpt('mapping_user_name', default='username', + help='Specifies the method for obtaining the user name.'), + cfg.StrOpt('mapping_user_domain_id', + help='Specifies the method for obtaining the domain ID which ' + 'the user belongs.'), + cfg.StrOpt('mapping_user_domain_name', + help='Specifies the method for obtaining the domain name which ' + 'the user belongs.'), + cfg.StrOpt('mapping_roles', + help='Specifies the method for obtaining the list of roles in ' + 'a project or domain owned by the user.'), +] + +cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS, + group=_EXT_AUTH_CONFIG_GROUP_NAME) + + +class InvalidToken(KeystoneMiddlewareException): + """Raise an InvalidToken Error. + + When can not get necessary information from the token, + this error will be thrown. + """ + + +class ForbiddenToken(KeystoneMiddlewareException): + """Raise a ForbiddenToken Error. + + When can not get necessary information from the token, + this error will be thrown. + """ + + +class ServiceError(KeystoneMiddlewareException): + """Raise a ServiceError. + + When can not verify any tokens, this error will be thrown. + """ + + +class AbstractAuthClient(object, metaclass=abc.ABCMeta): + """Abstract http client using to access the OAuth2.0 Server.""" + + def __init__(self, session, introspect_endpoint, audience, client_id, + func_get_config_option, logger): + self.session = session + self.introspect_endpoint = introspect_endpoint + self.audience = audience + self.client_id = client_id + self.get_config_option = func_get_config_option + self.logger = logger + + @abc.abstractmethod + def introspect(self, access_token): + """Access the introspect API.""" + pass + + +class ClientSecretBasicAuthClient(AbstractAuthClient): + """Http client with the auth method 'client_secret_basic'.""" + + def __init__(self, session, introspect_endpoint, audience, client_id, + func_get_config_option, logger): + super(ClientSecretBasicAuthClient, self).__init__( + session, introspect_endpoint, audience, client_id, + func_get_config_option, logger) + self.client_secret = self.get_config_option( + 'client_secret', is_required=True) + + def introspect(self, access_token): + """Access the introspect API. + + Access the Introspect API to verify the access token by + the auth method 'client_secret_basic'. + """ + req_data = {'token': access_token, + 'token_type_hint': 'access_token'} + auth = requests.auth.HTTPBasicAuth(self.client_id, + self.client_secret) + http_response = self.session.request( + self.introspect_endpoint, + 'POST', + authenticated=False, + data=req_data, + requests_auth=auth) + return http_response + + +class ClientSecretPostAuthClient(AbstractAuthClient): + """Http client with the auth method 'client_secret_post'.""" + + def __init__(self, session, introspect_endpoint, audience, client_id, + func_get_config_option, logger): + super(ClientSecretPostAuthClient, self).__init__( + session, introspect_endpoint, audience, client_id, + func_get_config_option, logger) + self.client_secret = self.get_config_option( + 'client_secret', is_required=True) + + def introspect(self, access_token): + """Access the introspect API. + + Access the Introspect API to verify the access token by + the auth method 'client_secret_post'. + """ + req_data = { + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'token': access_token, + 'token_type_hint': 'access_token' + } + http_response = self.session.request( + self.introspect_endpoint, + 'POST', + authenticated=False, + data=req_data) + return http_response + + +class TlsClientAuthClient(AbstractAuthClient): + """Http client with the auth method 'tls_client_auth'.""" + + def introspect(self, access_token): + """Access the introspect API. + + Access the Introspect API to verify the access token by + the auth method 'tls_client_auth'. + """ + req_data = { + 'client_id': self.client_id, + 'token': access_token, + 'token_type_hint': 'access_token' + } + http_response = self.session.request( + self.introspect_endpoint, + 'POST', + authenticated=False, + data=req_data) + return http_response + + +class PrivateKeyJwtAuthClient(AbstractAuthClient): + """Http client with the auth method 'private_key_jwt'.""" + + def __init__(self, session, introspect_endpoint, audience, client_id, + func_get_config_option, logger): + super(PrivateKeyJwtAuthClient, self).__init__( + session, introspect_endpoint, audience, client_id, + func_get_config_option, logger) + self.jwt_key_file = self.get_config_option( + 'jwt_key_file', is_required=True) + self.jwt_bearer_time_out = self.get_config_option( + 'jwt_bearer_time_out', is_required=True) + self.jwt_algorithm = self.get_config_option( + 'jwt_algorithm', is_required=True) + self.logger = logger + + def introspect(self, access_token): + """Access the introspect API. + + Access the Introspect API to verify the access token by + the auth method 'private_key_jwt'. + """ + if not os.path.isfile(self.jwt_key_file): + self.logger.critical('Configuration error. JWT key file is ' + 'not a file. path: %s' % self.jwt_key_file) + raise ConfigurationError(_('Configuration error. ' + 'JWT key file is not a file.')) + try: + with open(self.jwt_key_file, 'r') as jwt_file: + jwt_key = jwt_file.read() + except Exception as e: + self.logger.critical('Configuration error. Failed to read ' + 'the JWT key file. %s', e) + raise ConfigurationError(_('Configuration error. ' + 'Failed to read the JWT key file.')) + if not jwt_key: + self.logger.critical('Configuration error. The JWT key file ' + 'content is empty. path: %s' + % self.jwt_key_file) + raise ConfigurationError(_('Configuration error. The JWT key file ' + 'content is empty.')) + + ita = round(time.time()) + try: + client_assertion = jwt.encode( + payload={ + 'jti': str(uuid.uuid4()), + 'iat': str(ita), + 'exp': str(ita + self.jwt_bearer_time_out), + 'iss': self.client_id, + 'sub': self.client_id, + 'aud': self.audience}, + headers={ + 'typ': 'JWT', + 'alg': self.jwt_algorithm}, + key=jwt_key, + algorithm=self.jwt_algorithm) + except Exception as e: + self.logger.critical('Configuration error. JWT encoding with ' + 'the specified JWT key file and algorithm ' + 'failed. path: %s, algorithm: %s, error: %s' % + (self.jwt_key_file, self.jwt_algorithm, e)) + raise ConfigurationError(_('Configuration error. JWT encoding ' + 'with the specified JWT key file ' + 'and algorithm failed.')) + req_data = { + 'client_id': self.client_id, + 'client_assertion_type': + 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + 'client_assertion': client_assertion, + 'token': access_token, + 'token_type_hint': 'access_token' + } + http_response = self.session.request( + self.introspect_endpoint, + 'POST', + authenticated=False, + data=req_data) + return http_response + + +class ClientSecretJwtAuthClient(AbstractAuthClient): + """Http client with the auth method 'client_secret_jwt'.""" + + def __init__(self, session, introspect_endpoint, audience, client_id, + func_get_config_option, logger): + super(ClientSecretJwtAuthClient, self).__init__( + session, introspect_endpoint, audience, client_id, + func_get_config_option, logger) + self.client_secret = self.get_config_option( + 'client_secret', is_required=True) + self.jwt_bearer_time_out = self.get_config_option( + 'jwt_bearer_time_out', is_required=True) + self.jwt_algorithm = self.get_config_option( + 'jwt_algorithm', is_required=True) + + def introspect(self, access_token): + """Access the introspect API. + + Access the Introspect API to verify the access token by + the auth method 'client_secret_jwt'. + """ + ita = round(time.time()) + try: + client_assertion = jwt.encode( + payload={ + 'jti': str(uuid.uuid4()), + 'iat': str(ita), + 'exp': str(ita + self.jwt_bearer_time_out), + 'iss': self.client_id, + 'sub': self.client_id, + 'aud': self.audience}, + headers={ + 'typ': 'JWT', + 'alg': self.jwt_algorithm}, + key=self.client_secret, + algorithm=self.jwt_algorithm) + except Exception as e: + self.logger.critical('Configuration error. JWT encoding with ' + 'the specified client_secret and algorithm ' + 'failed. algorithm: %s, error: %s' + % (self.jwt_algorithm, e)) + raise ConfigurationError(_('Configuration error. JWT encoding ' + 'with the specified client_secret ' + 'and algorithm failed.')) + req_data = { + 'client_id': self.client_id, + 'client_assertion_type': + 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + 'client_assertion': client_assertion, + 'token': access_token, + 'token_type_hint': 'access_token' + } + http_response = self.session.request( + self.introspect_endpoint, + 'POST', + authenticated=False, + data=req_data) + return http_response + + +_ALL_AUTH_CLIENTS = { + 'client_secret_basic': ClientSecretBasicAuthClient, + 'client_secret_post': ClientSecretPostAuthClient, + 'tls_client_auth': TlsClientAuthClient, + 'private_key_jwt': PrivateKeyJwtAuthClient, + 'client_secret_jwt': ClientSecretJwtAuthClient +} + + +def _get_http_client(auth_method, session, introspect_endpoint, audience, + client_id, func_get_config_option, logger): + """Get an auth HTTP Client to access the OAuth2.0 Server.""" + if auth_method in _ALL_AUTH_CLIENTS: + return _ALL_AUTH_CLIENTS.get(auth_method)( + session, introspect_endpoint, audience, + client_id, func_get_config_option, logger) + logger.critical('The value is incorrect for option ' + 'auth_method in group [%s]' % + _EXT_AUTH_CONFIG_GROUP_NAME) + raise ConfigurationError(_('The configuration parameter for ' + 'key "auth_method" in group [%s] ' + 'is incorrect.') % + _EXT_AUTH_CONFIG_GROUP_NAME) + + +class ExternalAuth2Protocol(object): + """Middleware that handles External Server OAuth2.0 authentication.""" + + def __init__(self, application, conf): + super(ExternalAuth2Protocol, self).__init__() + self._application = application + self._log = logging.getLogger(conf.get('log_name', __name__)) + self._log.info('Starting Keystone external_oauth2_token middleware') + + config_opts = [ + (_EXT_AUTH_CONFIG_GROUP_NAME, _EXTERNAL_AUTH2_OPTS + + loading.get_auth_common_conf_options()) + ] + all_opts = [(g, copy.deepcopy(o)) for g, o in config_opts] + self._conf = config.Config('external_oauth2_token', + _EXT_AUTH_CONFIG_GROUP_NAME, + all_opts, + conf) + + self._session = self._create_session() + self._audience = self._get_config_option('audience', is_required=True) + self._introspect_endpoint = self._get_config_option( + 'introspect_endpoint', is_required=True) + self._auth_method = self._get_config_option( + 'auth_method', is_required=True) + self._client_id = self._get_config_option( + 'client_id', is_required=True) + self._http_client = _get_http_client( + self._auth_method, self._session, self._introspect_endpoint, + self._audience, self._client_id, + self._get_config_option, self._log) + + @webob.dec.wsgify() + def __call__(self, req): + """Handle incoming request.""" + self.process_request(req) + response = req.get_response(self._application) + return self.process_response(response) + + def process_request(self, request): + """Process request. + + :param request: Incoming request + :type request: _request.AuthTokenRequest + """ + access_token = None + if (request.authorization and + request.authorization.authtype == 'Bearer'): + access_token = request.authorization.params + + try: + if not access_token: + self._log.info('Unable to obtain the access token.') + raise InvalidToken(_('Unable to obtain the access token.')) + + token_data = self._fetch_token(access_token) + + if (self._get_config_option('thumbprint_verify', + is_required=False)): + self._confirm_certificate_thumbprint( + request, token_data.get('origin_token_metadata')) + + self._set_request_env(request, token_data) + + except InvalidToken as error: + self._log.info('Rejecting request. ' + 'Need a valid OAuth 2.0 access token. ' + 'error: %s', error) + message = _('The request you have made is denied, ' + 'because the token is invalid.') + body = {'error': { + 'code': 401, + 'title': 'Unauthorized', + 'message': message, + }} + raise webob.exc.HTTPUnauthorized( + body=jsonutils.dumps(body), + headers=self._reject_headers, + charset='UTF-8', + content_type='application/json') + except ForbiddenToken as error: + self._log.warning('Rejecting request. ' + 'The necessary information is required.' + 'error: %s', error) + message = _('The request you have made is denied, ' + 'because the necessary information ' + 'could not be parsed.') + body = {'error': { + 'code': 403, + 'title': 'Forbidden', + 'message': message, + }} + raise webob.exc.HTTPForbidden( + body=jsonutils.dumps(body), + charset='UTF-8', + content_type='application/json') + except ConfigurationError as error: + self._log.critical('Rejecting request. ' + 'The configuration parameters are incorrect. ' + 'error: %s', error) + message = _('The request you have made is denied, ' + 'because the configuration parameters are incorrect ' + 'and the token can not be verified.') + body = {'error': { + 'code': 500, + 'title': 'Internal Server Error', + 'message': message, + }} + raise webob.exc.HTTPServerError( + body=jsonutils.dumps(body), + charset='UTF-8', + content_type='application/json') + except ServiceError as error: + self._log.warning('Rejecting request. An exception occurred and ' + 'the OAuth 2.0 access token can not be ' + 'verified. error: %s', error) + message = _('The request you have made is denied, ' + 'because an exception occurred while accessing ' + 'the external authentication server ' + 'for token validation.') + body = {'error': { + 'code': 500, + 'title': 'Internal Server Error', + 'message': message, + }} + raise webob.exc.HTTPServerError( + body=jsonutils.dumps(body), + charset='UTF-8', + content_type='application/json') + + def process_response(self, response): + """Process Response. + + Add ``WWW-Authenticate`` headers to requests that failed with + ``401 Unauthenticated`` so users know where to authenticate for future + requests. + """ + if response.status_int == 401: + response.headers.extend(self._reject_headers) + return response + + def _create_session(self, **kwargs): + """Create session for HTTP access.""" + kwargs.setdefault('cert', self._get_config_option( + 'certfile', is_required=False)) + kwargs.setdefault('key', self._get_config_option( + 'keyfile', is_required=False)) + kwargs.setdefault('cacert', self._get_config_option( + 'cafile', is_required=False)) + kwargs.setdefault('insecure', self._get_config_option( + 'insecure', is_required=False)) + kwargs.setdefault('timeout', self._get_config_option( + 'http_connect_timeout', is_required=False)) + kwargs.setdefault('user_agent', self._conf.user_agent) + return session_loading.Session().load_from_options(**kwargs) + + def _get_config_option(self, key, is_required): + """Read the value from config file by the config key.""" + value = self._conf.get(key) + if not value: + if is_required: + self._log.critical('The value is required for option %s ' + 'in group [%s]' % ( + key, _EXT_AUTH_CONFIG_GROUP_NAME)) + raise ConfigurationError( + _('Configuration error. The parameter ' + 'is not set for "%s" in group [%s].') % ( + key, _EXT_AUTH_CONFIG_GROUP_NAME)) + else: + return None + else: + return value + + @property + def _reject_headers(self): + """Generate WWW-Authenticate Header. + + When response status is 401, this method will be called to add + the 'WWW-Authenticate' header to the response. + """ + header_val = 'Authorization OAuth 2.0 uri="%s"' % self._audience + return [('WWW-Authenticate', header_val)] + + def _fetch_token(self, access_token): + """Use access_token to get the valid token meta_data. + + Verify the access token through accessing the external + authorization server. + """ + try: + http_response = self._http_client.introspect(access_token) + if http_response.status_code != 200: + self._log.critical('The introspect API returns an ' + 'incorrect response. ' + 'response_status: %s, response_text: %s' % + (http_response.status_code, + http_response.text)) + raise ServiceError(_('The token cannot be verified ' + 'for validity.')) + + origin_token_metadata = http_response.json() + self._log.debug('The introspect API response: %s' % + origin_token_metadata) + if not origin_token_metadata.get('active'): + self._log.info('The token is invalid. response: %s' % + origin_token_metadata) + raise InvalidToken(_('The token is invalid.')) + + return self._parse_necessary_info(origin_token_metadata) + except (ConfigurationError, ForbiddenToken, + ServiceError, InvalidToken): + raise + except (ksa_exceptions.ConnectFailure, + ksa_exceptions.DiscoveryFailure, + ksa_exceptions.RequestTimeout) as error: + self._log.critical('Unable to validate token: %s', error) + raise ServiceError( + _('The Introspect API service is temporarily unavailable.')) + except Exception as error: + self._log.critical('Unable to validate token: %s', error) + raise ServiceError(_('An exception occurred during the token ' + 'verification process.')) + + def _read_data_from_token(self, token_metadata, config_key, + is_required=False, value_type=str): + """Read value from token metadata. + + Read the necessary information from the token metadata with the + config key. + """ + meta_key = self._get_config_option(config_key, is_required=is_required) + if not meta_key: + return None + + if meta_key.find('.') >= 0: + meta_value = None + for temp_key in meta_key.split('.'): + if not temp_key: + self._log.critical('Configuration error. ' + 'config_key: %s , meta_key: %s ' % + (config_key, meta_key)) + raise ConfigurationError( + _('Failed to parse the necessary information ' + 'for the field "%s".') % meta_key) + if not meta_value: + meta_value = token_metadata.get(temp_key) + else: + if not isinstance(meta_value, dict): + self._log.warning( + 'Failed to parse the necessary information. ' + 'The meta_value is not of type dict.' + 'config_key: %s , meta_key: %s, value: %s' % + (config_key, meta_key, meta_value)) + raise ForbiddenToken( + _('Failed to parse the necessary information ' + 'for the field "%s".') % meta_key) + meta_value = meta_value.get(temp_key) + else: + meta_value = token_metadata.get(meta_key) + + if not meta_value: + if is_required: + self._log.warning( + 'Failed to parse the necessary information. ' + 'The meta value is required.' + 'config_key: %s , meta_key: %s, value: %s, need_type: %s' % + (config_key, meta_key, meta_value, value_type)) + raise ForbiddenToken(_('Failed to parse the necessary ' + 'information for the field "%s".') % + meta_key) + else: + meta_value = None + else: + if not isinstance(meta_value, value_type): + self._log.warning( + 'Failed to parse the necessary information. ' + 'The meta value is of an incorrect type.' + 'config_key: %s , meta_key: %s, value: %s, need_type: %s' + % (config_key, meta_key, meta_value, value_type)) + raise ForbiddenToken(_('Failed to parse the necessary ' + 'information for the field "%s".') % + meta_key) + return meta_value + + def _parse_necessary_info(self, token_metadata): + """Parse the necessary information from the token metadata.""" + token_data = dict() + token_data['origin_token_metadata'] = token_metadata + + roles = self._read_data_from_token(token_metadata, + 'mapping_roles', + is_required=True) + is_admin = 'false' + if 'admin' in roles.lower().split(','): + is_admin = 'true' + token_data['roles'] = roles + token_data['is_admin'] = is_admin + + project_id = self._read_data_from_token( + token_metadata, 'mapping_project_id', is_required=False) + if project_id: + token_data['project_id'] = project_id + token_data['project_name'] = self._read_data_from_token( + token_metadata, 'mapping_project_name', is_required=True) + token_data['project_domain_id'] = self._read_data_from_token( + token_metadata, 'mapping_project_domain_id', is_required=True) + token_data['project_domain_name'] = self._read_data_from_token( + token_metadata, 'mapping_project_domain_name', + is_required=True) + else: + token_data['domain_id'] = self._read_data_from_token( + token_metadata, 'mapping_project_domain_id', is_required=True) + token_data['domain_name'] = self._read_data_from_token( + token_metadata, 'mapping_project_domain_name', + is_required=True) + + token_data['user_id'] = self._read_data_from_token( + token_metadata, 'mapping_user_id', is_required=True) + token_data['user_name'] = self._read_data_from_token( + token_metadata, 'mapping_user_name', is_required=True) + token_data['user_domain_id'] = self._read_data_from_token( + token_metadata, 'mapping_user_domain_id', is_required=True) + token_data['user_domain_name'] = self._read_data_from_token( + token_metadata, 'mapping_user_domain_name', is_required=True) + + return token_data + + def _get_client_certificate(self, request): + """Get the client certificate from request environ or socket.""" + try: + pem_client_cert = request.environ.get('SSL_CLIENT_CERT') + if pem_client_cert: + peer_cert = ssl.PEM_cert_to_DER_cert(pem_client_cert) + else: + wsgi_input = request.environ.get('wsgi.input') + if not wsgi_input: + self._log.warn('Unable to obtain the client certificate. ' + 'The object for wsgi_input is none.') + raise InvalidToken(_('Unable to obtain the client ' + 'certificate.')) + socket = wsgi_input.get_socket() + if not socket: + self._log.warn('Unable to obtain the client certificate. ' + 'The object for socket is none.') + raise InvalidToken(_('Unable to obtain the client ' + 'certificate.')) + peer_cert = socket.getpeercert(binary_form=True) + if not peer_cert: + self._log.warn('Unable to obtain the client certificate. ' + 'The object for peer_cert is none.') + raise InvalidToken(_('Unable to obtain the client ' + 'certificate.')) + return peer_cert + except InvalidToken: + raise + except Exception as error: + self._log.warn('Unable to obtain the client certificate. %s' % + error) + raise InvalidToken(_('Unable to obtain the client certificate.')) + + def _confirm_certificate_thumbprint(self, request, origin_token_metadata): + """Check if the thumbprint in the token is valid.""" + peer_cert = self._get_client_certificate(request) + try: + thumb_sha256 = hashlib.sha256(peer_cert).digest() + cert_thumb = jwt.utils.base64url_encode(thumb_sha256).decode( + 'ascii') + except Exception as error: + self._log.warn('An Exception occurred. %s' % error) + raise InvalidToken(_('Can not generate the thumbprint.')) + + token_thumb = origin_token_metadata.get('cnf', {}).get('x5t#S256') + if cert_thumb != token_thumb: + self._log.warn('The two thumbprints do not match. ' + 'token_thumbprint: %s, certificate_thumbprint %s' % + (token_thumb, cert_thumb)) + raise InvalidToken(_('The two thumbprints do not match.')) + + def _set_request_env(self, request, token_data): + """Set request.environ with the necessary information.""" + request.environ['external.token_info'] = token_data + request.environ['HTTP_X_IDENTITY_STATUS'] = 'Confirmed' + request.environ['HTTP_X_ROLES'] = token_data.get('roles') + request.environ['HTTP_X_ROLE'] = token_data.get('roles') + request.environ['HTTP_X_USER_ID'] = token_data.get('user_id') + request.environ['HTTP_X_USER_NAME'] = token_data.get('user_name') + request.environ['HTTP_X_USER_DOMAIN_ID'] = token_data.get( + 'user_domain_id') + request.environ['HTTP_X_USER_DOMAIN_NAME'] = token_data.get( + 'user_domain_name') + if token_data.get('is_admin') == 'true': + request.environ['HTTP_X_IS_ADMIN_PROJECT'] = token_data.get( + 'is_admin') + request.environ['HTTP_X_USER'] = token_data.get('user_name') + + if token_data.get('project_id'): + request.environ['HTTP_X_PROJECT_ID'] = token_data.get('project_id') + request.environ['HTTP_X_PROJECT_NAME'] = token_data.get( + 'project_name') + request.environ['HTTP_X_PROJECT_DOMAIN_ID'] = token_data.get( + 'project_domain_id') + request.environ['HTTP_X_PROJECT_DOMAIN_NAME'] = token_data.get( + 'project_domain_name') + request.environ['HTTP_X_TENANT_ID'] = token_data.get('project_id') + request.environ['HTTP_X_TENANT_NAME'] = token_data.get( + 'project_name') + request.environ['HTTP_X_TENANT'] = token_data.get('project_id') + else: + request.environ['HTTP_X_DOMAIN_ID'] = token_data.get('domain_id') + request.environ['HTTP_X_DOMAIN_NAME'] = token_data.get( + 'domain_name') + self._log.debug('The access token data is %s.' % jsonutils.dumps( + token_data)) + + +def filter_factory(global_conf, **local_conf): + """Return a WSGI filter app for use with paste.deploy.""" + conf = global_conf.copy() + conf.update(local_conf) + + def auth_filter(app): + return ExternalAuth2Protocol(app, conf) + + return auth_filter diff --git a/keystonemiddleware/tests/unit/test_external_oauth2_token_middleware.py b/keystonemiddleware/tests/unit/test_external_oauth2_token_middleware.py new file mode 100644 index 00000000..3a3b625b --- /dev/null +++ b/keystonemiddleware/tests/unit/test_external_oauth2_token_middleware.py @@ -0,0 +1,1834 @@ +# Copyright 2023 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import copy +import hashlib +import jwt.utils +import logging +import ssl +import time +from unittest import mock +import uuid +import webob.dec + +import fixtures +from oslo_config import cfg +import six +from six.moves import http_client +import testresources + +from keystoneauth1 import exceptions as ksa_exceptions +from keystoneauth1 import session + +from keystonemiddleware import external_oauth2_token +from keystonemiddleware.tests.unit.auth_token import base +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import FakeApp +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import v3FakeApp +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import VERSION_LIST_v3 +from keystonemiddleware.tests.unit import client_fixtures +from keystonemiddleware.tests.unit import utils + +JWT_KEY_CONTENT = ( + '-----BEGIN PRIVATE KEY-----\n' + 'MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQDegNuQgmQL7n10\n' + '+Z3itXtpiNHlvZwCYOS66+3PakAw1OoRB6SiHeNYnuVRHlraTDKnnfgHhX/1AVs7\n' + 'P36QU5PVYznGip2PXZlCh8MeQhpXgKKt25LPnpQOnUssHyq+OqTHZB6eS2C7xMHf\n' + 'wzPrYRwxhbVgUUVe85cdiXaL5ZRqXNotM00wH1hck4s+1fsnKv7UeGbwM1WwMn6/\n' + '0E1eKwYzlKm4Vmkcivy8WBI7Ijp/MPOUyRXN/mPh8L2VOq0D1E3pufYoYmpBkiQi\n' + 'Ii8nz5CXrhDpM0tGKD+RZ+howE2i+frI2gNDfU5xMx+k+qjD0jftDrQ+OZUujUtq\n' + '6JfdrvtPBT01XZw8GV5Rm9vEwMRduWUDGdRB3chOTeTUdsIG765+Ot7GE7nYrAs0\n' + 's/ryAm1FnNJocTzje7k07IzdBpWzrTrx087Kfcsn6evEABOxim0i+AHUR94QR9/V\n' + 'EP3/+SkJ7zl9P1KzOZZCWtUTnfQxrLhEnCwwjtl35vWlzst+TR7HDwIzQRQVLFH9\n' + 'zMTz8tw6coPifkbVzdwCLGoKge4llDPcVx/TmIGFD3saT0E68yxXe6k3cdIg6lZf\n' + 'dB0yutVBzECrx+LiIpxwQWRKHNiR58KsHHmgXDb8ORBCjpmctD+JsdBhf8hDRMXP\n' + '9sV/fbMUwgrRceyj9AV2x59tE9+UHwIDAQABAoICABb6V7JkxNA2oN4jqRpwg34y\n' + 'kvqWyjW0q+ph0v1Ii7h/RGzdzTKww3mzbxshd2Bz3gdRWPvt3Xj/2twTgo6FEw9G\n' + 'YAEQ75SOpfUo8A1/5hiDQEmUE2U9iyy3Mbwsu81JYRr2S/Ms9aBugVcKYaI9NRwo\n' + 'IsL/oZpcrY5vU76+xsT1MdLZKW9+zTFCS28Byh4RYp+uj3Le2kqH7G8Co/rFlq5c\n' + '++n9gn1gHRmWPsu8jS31cDI9UfMkAkyi//EZTiTHGAS7H6CsCS0cWn7r6NLDrLr9\n' + 'TuHGWk+0eFwbzvSCZ4IdLrjvSsb9ecxW6z2uZR9T5lKk4hhK+g0EqnUv7/8Eww8E\n' + 'wA2J1zhuQ0UzoAowjj5338whBQROKSO4u3ppxhNUSP7fUgYdEKUQEg7rlfEzI+pG\n' + 'dV1LtG0GZBzdZXpE/PTpASjefCkC6olmZpUvajHJGqP0a/ygA9SEBm+B/Q4ii7+0\n' + 'luk6Lj6z+vSWatU7LrLnQeprN82NWxtkH+u2gjMOq1N8r4FOFvbZYBp1NMvtH4iP\n' + 'R6jLdJWYx/KOr4lCkbgTszlVhPop8dktOPQSPL4u6RxdmsGBf028oWKXLrj1D1Ua\n' + 'dBWR1L1CCnI8X6jxL6eT52qF+NY2JxanX6NnzxE/KqedWXmKDxn0M3ETfizz9UG4\n' + '8UmsMgJ8UUALRbWHjlEBAoIBAQDvQmYWhYtUQjcaeLkhjd9qPXjLGIL6NYnrYEUK\n' + 'Yenn0mh7SKZTTL8yz/QVOecD/QiBetmQJ5FsmUmqorGWYfbWs6C+p2yHQ0U9X7rE\n' + '3ynFm0MDluuNZMWYRu0Yb6gvCYjitlt/efGKDalP1Al1hX2w9fUGdj32K6fulEX6\n' + 'dcl4r2bq4i+rOwe9YDD9yvkvh1+aCwA56JCTBoEBsbmOdKTC7431rT8BTLbBaXwy\n' + 'hf35P9wzU079QwwqDKdUlMQjUz9gWZkYFHkPfce2MCm+T0aHNnjQtLXRGOcIj15P\n' + 'B64+GB9b86XNZlqpuY2rceF+LDwaw4rgQkXDr+TdAsjrtcdHAoIBAQDuElNxRc9t\n' + 'hKwZGBVIWaHI3Y3oaTymi277DwhDzvmJgwBAddfEaC1rCt/giXxtmhhnAXpDD4sk\n' + '3m8iWw1jODRxOv2UDkUvSRV5tfY+QTG0nVVmMpX3lPWpIYxEVg34WYSq0xnXKrpW\n' + 'zxUOqD1fW2i2lXZtFAb6ZNt/hHts7KUPzk9/ZbAomVHO6JO4Ac3n0LTDSCmQHhRO\n' + '5gV0ea4Sh6AVOiFD20rMAnTFNnxnI+wLMt0SNAzouhRMulDqOcAmoH2DKG8PCcEt\n' + 'dQpUDwITxXuomsjhIHIli760MwSlwWZbrh5h7NAj1VmnQBtMkLnBtnE7cFSVdcPt\n' + 'BAFnq72txGhpAoIBAQDIWYKhM1zTxsrbyOHF3kaKcUVYVINBQFnolunZYtp6vG+v\n' + 'ZMuaj3a/9vE+YQk5Bsb7ncLXerrFBKtyTuCEvC315d8iJ5KyxbsSRLpiJzmUdoos\n' + 'VFGVSiBIfoQF5WIhWUueBPQjkBqZ7wyrgzQUjB8PczamHZePL0lleBYNQFrgS4jU\n' + 'AWnHahv2EbmUnEYD7ck5diLPWxbNdzHKGGf4iWZ6shze8B8FWJbk6Q8OQ7PD5xze\n' + 'gdFwNJfYElaAdj60Ef7NENopFuO0/C+jOTuLWFkH2q5anihuGvtD6MIhTZ4z8wE3\n' + 'f5SEpkQfQfkG6srXW/VMuBfv6K8AyabNB4r2Dnb7AoIBADHy2lrroKeDrG/fY6e4\n' + 'Vn9ELJ/UZIs0ueYmsz82z5gQSh88Gjb0/IJ2153OerKsH+6MmtAzFKh5mquEmvx0\n' + 'MFyJWeaUT+Op272bdbx+BSW11NMKTfiR4jDH/xvfSjMO5QzKGaPRLSNFc0+N8MJu\n' + '9TtJhH1CNGyYeIz6iMLDq6XzTS6XcSwzbryQg12Z00+NtD88hqvcA7rB++cCGIl+\n' + 'txF9Drmj6r9+zG0MD3G8UavP0h4dmY/CarvmY0+hKjVweqTn+NUY4NTet3oHZBIt\n' + '3tHzF65UFl7WQP6hrZnxR754e5tkCg9aleLHSnL38mE4G+2ylax99stlib3shHFO\n' + 'wfECggEBAJrW8BmZXbD8ss3c7kHPzleAf1q/6bPnxRXB0luCPz7tkMfdkOQ2cG1t\n' + 'rcnsKcyR2woEbtdRK938KxZgTgzKYVhR8spKFSh01/d9OZAP6f+iCoR2zzOlSFo4\n' + 'pejnQY0LHEwGZmnzghLoqJSUgROAR49CvLO1mI48CaEUuLmqzPYWNXMHDDU2N5XO\n' + 'uF0/ph68fnI+f+0ZUgdpVPFRnfSrAqzEhzEMh1vnZ4ZxEVpgUcn/hRfNZ3hN0LEr\n' + 'fjm2bWxg2j0rxjS0mUDQpaMj0253jVYRiC3M3cCh0NSZtwaXVJYCVxetpjBTPfJr\n' + 'jIgmPTKGR0FedjAeCBByH9vkw8iRg7w=\n' + '-----END PRIVATE KEY-----\n') + + +def get_authorization_header(token): + return {'Authorization': f'Bearer {token}'} + + +def get_config( + introspect_endpoint=None, + audience=None, + auth_method=None, + client_id=None, + client_secret=None, + thumbprint_verify=None, + jwt_key_file=None, + jwt_algorithm=None, + jwt_bearer_time_out=None, + mapping_project_id=None, + mapping_project_name=None, + mapping_project_domain_id=None, + mapping_project_domain_name=None, + mapping_user_id=None, + mapping_user_name=None, + mapping_user_domain_id=None, + mapping_user_domain_name=None, + mapping_roles=None): + conf = {} + if introspect_endpoint is not None: + conf['introspect_endpoint'] = introspect_endpoint + if audience is not None: + conf['audience'] = audience + if auth_method is not None: + conf['auth_method'] = auth_method + if client_id is not None: + conf['client_id'] = client_id + if client_secret is not None: + conf['client_secret'] = client_secret + if jwt_key_file is not None: + conf['jwt_key_file'] = jwt_key_file + if jwt_algorithm is not None: + conf['jwt_algorithm'] = jwt_algorithm + if jwt_bearer_time_out is not None: + conf['jwt_bearer_time_out'] = jwt_bearer_time_out + if thumbprint_verify is not None: + conf['thumbprint_verify'] = thumbprint_verify + if mapping_project_id is not None: + conf['mapping_project_id'] = mapping_project_id + if mapping_project_name is not None: + conf['mapping_project_name'] = mapping_project_name + if mapping_project_id is not None: + conf['mapping_project_domain_id'] = mapping_project_domain_id + if mapping_project_domain_name is not None: + conf['mapping_project_domain_name'] = mapping_project_domain_name + if mapping_user_id is not None: + conf['mapping_user_id'] = mapping_user_id + if mapping_project_id is not None: + conf['mapping_project_id'] = mapping_project_id + if mapping_user_name is not None: + conf['mapping_user_name'] = mapping_user_name + if mapping_user_domain_id is not None: + conf['mapping_user_domain_id'] = mapping_user_domain_id + if mapping_project_id is not None: + conf['mapping_user_domain_name'] = mapping_user_domain_name + if mapping_roles is not None: + conf['mapping_roles'] = mapping_roles + + return conf + + +class FakeOauth2TokenV3App(v3FakeApp): + + def __init__(self, + expected_env=None, + need_service_token=False, + app_response_status_code=200): + super(FakeOauth2TokenV3App, self).__init__(expected_env, + need_service_token) + + self._status_code = app_response_status_code + + @webob.dec.wsgify + def __call__(self, req): + resp = webob.Response() + if self._status_code == 200: + resp.status_code = 200 + resp.body = FakeApp.SUCCESS + else: + resp.status_code = self._status_code + resp.body = b'Error' + + return resp + + +class FakeSocket(object): + + def __init__(self, binary_peer_cert): + self.binary_peer_cert = binary_peer_cert + + def getpeercert(self, binary_form=True): + if binary_form: + return self.binary_peer_cert + else: + return None + + +class FakeWsgiInput(object): + + def __init__(self, fake_socket): + self.fake_socket = fake_socket + + def get_socket(self): + return self.fake_socket + + +class BaseExternalOauth2TokenMiddlewareTest(base.BaseAuthTokenTestCase, + testresources.ResourcedTestCase): + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + cfg.CONF.clear() + super(BaseExternalOauth2TokenMiddlewareTest, self).setUp() + + self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + self.expected_env = dict() + self.fake_app = FakeOauth2TokenV3App + self.middleware = None + self.conf = {} + self.auth_version = 'v3.0' + self._auth_url = 'http://localhost/identity' + self._introspect_endpoint = ( + 'https://localhost:8443/realms/x509/' + 'protocol/openid-connect/token/introspect') + self._audience = 'https://localhost:8443/realms/x509' + + def set_middleware(self, expected_env=None, conf=None): + """Configure the class ready to call the oauth2_token middleware. + + Set up the various fake items needed to run the middleware. + Individual tests that need to further refine these can call this + function to override the class defaults. + """ + if conf: + self.conf.update(conf) + + if expected_env: + self.expected_env.update(expected_env) + self.middleware = external_oauth2_token.ExternalAuth2Protocol( + self.fake_app(expected_env=self.expected_env), self.conf) + + def call(self, middleware, method='GET', path='/', headers=None, + expected_status=http_client.OK, + expected_body_string=None, **kwargs): + req = webob.Request.blank(path, **kwargs) + req.method = method + + for k, v in (headers or {}).items(): + req.headers[k] = v + + resp = req.get_response(middleware) + self.assertEqual(expected_status, resp.status_int) + if expected_body_string: + self.assertIn(expected_body_string, six.text_type(resp.body)) + resp.request = req + return resp + + def call_middleware(self, + pem_client_cert=None, der_client_cert=None, **kwargs): + if pem_client_cert: + # apache + kwargs.update({'environ': {'SSL_CLIENT_CERT': pem_client_cert}}) + elif der_client_cert: + # socket + fake_socket = FakeSocket(der_client_cert) + fake_wsgi_input = FakeWsgiInput(fake_socket) + kwargs.update({'environ': {'wsgi.input': fake_wsgi_input}}) + return self.call(self.middleware, **kwargs) + + def _introspect_response(self, request, context, + auth_method=None, + introspect_client_id=None, + introspect_client_secret=None, + access_token=None, + active=True, + exp_time=None, + cert_thumb=None, + metadata=None, + status_code=200 + ): + if auth_method == 'tls_client_auth': + body = 'client_id=%s&token=%s&token_type_hint=access_token' % ( + introspect_client_id, access_token + ) + self.assertEqual(request.text, body) + elif auth_method == 'client_secret_post': + body = ('client_id=%s&client_secret=%s' + '&token=%s&token_type_hint=access_token') % ( + introspect_client_id, introspect_client_secret, + access_token) + self.assertEqual(request.text, body) + elif auth_method == 'client_secret_basic': + body = 'token=%s&token_type_hint=access_token' % access_token + self.assertEqual(request.text, body) + auth_basic = request._request.headers.get('Authorization') + self.assertIsNotNone(auth_basic) + + auth = 'Basic ' + base64.standard_b64encode( + ("%s:%s" % (introspect_client_id, + introspect_client_secret)).encode('ascii') + ).decode('ascii') + self.assertEqual(auth_basic, auth) + elif auth_method == 'private_key_jwt': + self.assertIn('client_id=%s' % introspect_client_id, request.text) + self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A' + 'oauth%3Aclient-assertion-type%3Ajwt-bearer'), + request.text) + self.assertIn('client_assertion=', request.text) + self.assertIn('token=%s' % access_token, request.text) + self.assertIn('token_type_hint=access_token', request.text) + elif auth_method == 'client_secret_jwt': + self.assertIn('client_id=%s' % introspect_client_id, request.text) + self.assertIn(('client_assertion_type=urn%3Aietf%3Aparams%3A' + 'oauth%3Aclient-assertion-type%3Ajwt-bearer'), + request.text) + self.assertIn('client_assertion=', request.text) + self.assertIn('token=%s' % access_token, request.text) + self.assertIn('token_type_hint=access_token', request.text) + + resp = { + 'iat': 1670311634, + 'jti': str(uuid.uuid4()), + 'iss': str(uuid.uuid4()), + 'aud': str(uuid.uuid4()), + 'sub': str(uuid.uuid4()), + 'typ': 'Bearer', + 'azp': str(uuid.uuid4()), + 'acr': '1', + 'scope': 'default' + } + if exp_time is not None: + resp['exp'] = exp_time + else: + resp['exp'] = time.time() + 3600 + if cert_thumb is not None: + resp['cnf'] = { + 'x5t#S256': cert_thumb + } + if metadata: + for key in metadata: + resp[key] = metadata[key] + + if active is not None: + resp['active'] = active + + context.status_code = status_code + return resp + + def _check_env_value_project_scope(self, request_environ, + user_id, user_name, + user_domain_id, user_domain_name, + project_id, project_name, + project_domain_id, project_domain_name, + roles, is_admin=True): + self.assertEqual('Confirmed', + request_environ['HTTP_X_IDENTITY_STATUS']) + self.assertEqual(roles, request_environ['HTTP_X_ROLES']) + self.assertEqual(roles, request_environ['HTTP_X_ROLE']) + self.assertEqual(user_id, request_environ['HTTP_X_USER_ID']) + self.assertEqual(user_name, request_environ['HTTP_X_USER_NAME']) + self.assertEqual(user_domain_id, + request_environ['HTTP_X_USER_DOMAIN_ID'], ) + self.assertEqual(user_domain_name, + request_environ['HTTP_X_USER_DOMAIN_NAME']) + if is_admin: + self.assertEqual('true', + request_environ['HTTP_X_IS_ADMIN_PROJECT']) + else: + self.assertNotIn('HTTP_X_IS_ADMIN_PROJECT', request_environ) + self.assertEqual(user_name, request_environ['HTTP_X_USER']) + + self.assertEqual(project_id, request_environ['HTTP_X_PROJECT_ID']) + self.assertEqual(project_name, request_environ['HTTP_X_PROJECT_NAME']) + self.assertEqual(project_domain_id, + request_environ['HTTP_X_PROJECT_DOMAIN_ID']) + self.assertEqual(project_domain_name, + request_environ['HTTP_X_PROJECT_DOMAIN_NAME']) + self.assertEqual(project_id, request_environ['HTTP_X_TENANT_ID']) + self.assertEqual(project_name, request_environ['HTTP_X_TENANT_NAME']) + self.assertEqual(project_id, request_environ['HTTP_X_TENANT']) + + self.assertNotIn('HTTP_X_DOMAIN_ID', request_environ) + self.assertNotIn('HTTP_X_DOMAIN_NAME', request_environ) + + def _check_env_value_domain_scope(self, request_environ, + user_id, user_name, + user_domain_id, user_domain_name, + domain_id, domain_name, + roles, is_admin=True): + self.assertEqual('Confirmed', + request_environ['HTTP_X_IDENTITY_STATUS']) + self.assertEqual(roles, request_environ['HTTP_X_ROLES']) + self.assertEqual(roles, request_environ['HTTP_X_ROLE']) + self.assertEqual(user_id, request_environ['HTTP_X_USER_ID']) + self.assertEqual(user_name, request_environ['HTTP_X_USER_NAME']) + self.assertEqual(user_domain_id, + request_environ['HTTP_X_USER_DOMAIN_ID'], ) + self.assertEqual(user_domain_name, + request_environ['HTTP_X_USER_DOMAIN_NAME']) + if is_admin: + self.assertEqual('true', + request_environ['HTTP_X_IS_ADMIN_PROJECT']) + else: + self.assertNotIn('HTTP_X_IS_ADMIN_PROJECT', request_environ) + self.assertEqual(user_name, request_environ['HTTP_X_USER']) + + self.assertEqual(domain_id, request_environ['HTTP_X_DOMAIN_ID']) + self.assertEqual(domain_name, request_environ['HTTP_X_DOMAIN_NAME']) + + self.assertNotIn('HTTP_X_PROJECT_ID', request_environ) + self.assertNotIn('HTTP_X_PROJECT_NAME', request_environ) + self.assertNotIn('HTTP_X_PROJECT_DOMAIN_ID', request_environ) + self.assertNotIn('HTTP_X_PROJECT_DOMAIN_NAME', request_environ) + self.assertNotIn('HTTP_X_TENANT_ID', request_environ) + self.assertNotIn('HTTP_X_TENANT_NAME', request_environ) + self.assertNotIn('HTTP_X_TENANT', request_environ) + + +class ExternalOauth2TokenMiddlewareTlsClientAuthTest( + BaseExternalOauth2TokenMiddlewareTest): + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(ExternalOauth2TokenMiddlewareTlsClientAuthTest, self).setUp() + + self._test_client_id = str(uuid.uuid4()) + self._auth_method = 'tls_client_auth' + self._test_conf = get_config( + introspect_endpoint=self._introspect_endpoint, + audience=self._audience, + auth_method=self._auth_method, + client_id=self._test_client_id, + thumbprint_verify=True, + mapping_project_id='access_project.id', + mapping_project_name='access_project.name', + mapping_project_domain_id='access_project.domain.id', + mapping_project_domain_name='access_project.domain.name', + mapping_user_id='client_id', + mapping_user_name='username', + mapping_user_domain_id='user_domain.id', + mapping_user_domain_name='user_domain.name', + mapping_roles='roles', + ) + self._token = str(uuid.uuid4()) + '_user_token' + self._user_id = str(uuid.uuid4()) + '_user_id' + self._user_name = str(uuid.uuid4()) + '_user_name' + self._user_domain_id = str(uuid.uuid4()) + '_user_domain_id' + self._user_domain_name = str(uuid.uuid4()) + '_user_domain_name' + self._project_id = str(uuid.uuid4()) + '_project_id' + self._project_name = str(uuid.uuid4()) + '_project_name' + self._project_domain_id = str(uuid.uuid4()) + 'project_domain_id' + self._project_domain_name = str(uuid.uuid4()) + 'project_domain_name' + self._roles = 'admin,member,reader' + + self._default_metadata = { + 'access_project': { + 'id': self._project_id, + 'name': self._project_name, + 'domain': { + 'id': self._project_domain_id, + 'name': self._project_domain_name + } + }, + 'user_domain': { + 'id': self._user_domain_id, + 'name': self._user_domain_name + }, + 'roles': self._roles, + 'client_id': self._user_id, + 'username': self._user_name, + } + cert = self.examples.V3_OAUTH2_MTLS_CERTIFICATE + self._pem_client_cert = cert.decode('ascii') + self._der_client_cert = ssl.PEM_cert_to_DER_cert(self._pem_client_cert) + thumb_sha256 = hashlib.sha256(self._der_client_cert).digest() + self._cert_thumb = jwt.utils.base64url_encode(thumb_sha256).decode( + 'ascii') + + def test_basic_200(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + der_client_cert=self._der_client_cert, + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self.assertEqual(resp.request.environ['HTTP_X_IDENTITY_STATUS'], + 'Confirmed') + self._check_env_value_project_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_id, self._project_name, self._project_domain_id, + self._project_domain_name, self._roles) + + def test_thumbprint_verify_is_false_200(self): + conf = copy.deepcopy(self._test_conf) + conf['thumbprint_verify'] = False + self.set_middleware(conf=conf) + + metadata = copy.deepcopy(self._default_metadata) + metadata['access_project'].pop('id') + roles = 'reader' + metadata['roles'] = roles + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb='this is an incorrectly thumbprint.', + metadata=metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + der_client_cert=self._der_client_cert, + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self.assertEqual(resp.request.environ['HTTP_X_IDENTITY_STATUS'], + 'Confirmed') + self._check_env_value_domain_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_domain_id, self._project_domain_name, roles, + is_admin=False) + + def test_confirm_certificate_thumbprint_get_socket_except_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': {'test': 'test'}} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_confirm_certificate_thumbprint_socket_is_none_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(None)} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_confirm_certificate_thumbprint_peercert_is_none_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_confirm_certificate_thumbprint_peercert_error_format_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket('Error Format'))} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_confirm_certificate_thumbprint_wsgi_input_is_none_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': None} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_confirm_certificate_thumbprint_is_not_match_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb='NotMatchThumbprint', + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + der_client_cert=self._der_client_cert, + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_confirm_certificate_thumbprint_apache_default_200(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + pem_client_cert=self._pem_client_cert + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self.assertEqual(resp.request.environ['HTTP_X_IDENTITY_STATUS'], + 'Confirmed') + self._check_env_value_project_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_id, self._project_name, self._project_domain_id, + self._project_domain_name, self._roles) + + def test_confirm_certificate_thumbprint_pem_der_none_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + access_token=self._token, + active=True, + cert_thumb=self._cert_thumb, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + pem_client_cert=None, + der_client_cert=None + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + +class ExternalOauth2TokenMiddlewarePrivateJWTKeyTest( + BaseExternalOauth2TokenMiddlewareTest): + + def setUp(self): + super(ExternalOauth2TokenMiddlewarePrivateJWTKeyTest, self).setUp() + + self._test_client_id = str(uuid.uuid4()) + self._test_client_secret = str(uuid.uuid4()) + self._jwt_key_file = '/root/key.pem' + self._auth_method = 'private_key_jwt' + self._test_conf = get_config( + introspect_endpoint=self._introspect_endpoint, + audience=self._audience, + auth_method=self._auth_method, + client_id=self._test_client_id, + client_secret=self._test_client_secret, + jwt_key_file=self._jwt_key_file, + jwt_algorithm='RS256', + jwt_bearer_time_out='2800', + mapping_project_id='access_project.id', + mapping_project_name='access_project.name', + mapping_project_domain_id='access_project.domain.id', + mapping_project_domain_name='access_project.domain.name', + mapping_user_id='client_id', + mapping_user_name='username', + mapping_user_domain_id='user_domain.id', + mapping_user_domain_name='user_domain.name', + mapping_roles='roles', + ) + self._token = str(uuid.uuid4()) + '_user_token' + self._user_id = str(uuid.uuid4()) + '_user_id' + self._user_name = str(uuid.uuid4()) + '_user_name' + self._user_domain_id = str(uuid.uuid4()) + '_user_domain_id' + self._user_domain_name = str(uuid.uuid4()) + '_user_domain_name' + self._project_id = str(uuid.uuid4()) + '_project_id' + self._project_name = str(uuid.uuid4()) + '_project_name' + self._project_domain_id = str(uuid.uuid4()) + 'project_domain_id' + self._project_domain_name = str(uuid.uuid4()) + 'project_domain_name' + self._roles = 'admin,member,reader' + + self._default_metadata = { + 'access_project': { + 'id': self._project_id, + 'name': self._project_name, + 'domain': { + 'id': self._project_domain_id, + 'name': self._project_domain_name + } + }, + 'user_domain': { + 'id': self._user_domain_id, + 'name': self._user_domain_name + }, + 'roles': self._roles, + 'client_id': self._user_id, + 'username': self._user_name, + } + + @mock.patch('os.path.isfile') + @mock.patch('builtins.open', mock.mock_open(read_data=JWT_KEY_CONTENT)) + def test_basic_200(self, mocker_path_isfile): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + def mocker_isfile_side_effect(filename): + if filename == self._jwt_key_file: + return True + else: + return False + + mocker_path_isfile.side_effect = mocker_isfile_side_effect + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertTrue(mocker_path_isfile.called) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self._check_env_value_project_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_id, self._project_name, self._project_domain_id, + self._project_domain_name, self._roles) + + @mock.patch('os.path.isfile') + @mock.patch('builtins.open', mock.mock_open(read_data=JWT_KEY_CONTENT)) + def test_introspect_by_private_key_jwt_error_alg_500( + self, mocker_path_isfile): + conf = copy.deepcopy(self._test_conf) + conf['jwt_algorithm'] = 'HS256' + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + def mocker_isfile_side_effect(filename): + if filename == self._jwt_key_file: + return True + else: + return False + + mocker_path_isfile.side_effect = mocker_isfile_side_effect + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + @mock.patch('os.path.isfile') + @mock.patch('builtins.open', mock.mock_open(read_data='')) + def test_introspect_by_private_key_jwt_error_file_no_content_500( + self, mocker_path_isfile): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + def mocker_isfile_side_effect(filename): + if filename == self._jwt_key_file: + return True + else: + return False + + mocker_path_isfile.side_effect = mocker_isfile_side_effect + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + @mock.patch('os.path.isfile') + def test_introspect_by_private_key_jwt_error_file_can_not_read_500( + self, mocker_path_isfile): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + def mocker_isfile_side_effect(filename): + if filename == self._jwt_key_file: + return True + else: + return False + + mocker_path_isfile.side_effect = mocker_isfile_side_effect + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_introspect_by_private_key_jwt_error_file_not_exist_500( + self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + +class ExternalOauth2TokenMiddlewareClientSecretJWTTest( + BaseExternalOauth2TokenMiddlewareTest): + + def setUp(self): + super(ExternalOauth2TokenMiddlewareClientSecretJWTTest, self).setUp() + + self._test_client_id = str(uuid.uuid4()) + self._test_client_secret = str(uuid.uuid4()) + self._auth_method = 'client_secret_jwt' + self._test_conf = get_config( + introspect_endpoint=self._introspect_endpoint, + audience=self._audience, + auth_method=self._auth_method, + client_id=self._test_client_id, + client_secret=self._test_client_secret, + jwt_key_file='test', + jwt_algorithm='HS256', + jwt_bearer_time_out='2800', + mapping_project_id='access_project.id', + mapping_project_name='access_project.name', + mapping_project_domain_id='access_project.domain.id', + mapping_project_domain_name='access_project.domain.name', + mapping_user_id='client_id', + mapping_user_name='username', + mapping_user_domain_id='user_domain.id', + mapping_user_domain_name='user_domain.name', + mapping_roles='roles', + ) + self._token = str(uuid.uuid4()) + '_user_token' + self._user_id = str(uuid.uuid4()) + '_user_id' + self._user_name = str(uuid.uuid4()) + '_user_name' + self._user_domain_id = str(uuid.uuid4()) + '_user_domain_id' + self._user_domain_name = str(uuid.uuid4()) + '_user_domain_name' + self._project_id = str(uuid.uuid4()) + '_project_id' + self._project_name = str(uuid.uuid4()) + '_project_name' + self._project_domain_id = str(uuid.uuid4()) + 'project_domain_id' + self._project_domain_name = str(uuid.uuid4()) + 'project_domain_name' + self._roles = 'admin,member,reader' + + self._default_metadata = { + 'access_project': { + 'id': self._project_id, + 'name': self._project_name, + 'domain': { + 'id': self._project_domain_id, + 'name': self._project_domain_name + } + }, + 'user_domain': { + 'id': self._user_domain_id, + 'name': self._user_domain_name + }, + 'roles': self._roles, + 'client_id': self._user_id, + 'username': self._user_name, + } + + def test_basic_200(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self._check_env_value_project_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_id, self._project_name, self._project_domain_id, + self._project_domain_name, self._roles) + + def test_introspect_by_client_secret_jwt_error_alg_500(self): + conf = copy.deepcopy(self._test_conf) + conf['jwt_algorithm'] = 'RS256' + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_fetch_token_introspect_response_201_500(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata, + status_code=201 + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_fetch_token_introspect_response_active_is_false_401(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=False, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp, + status_code=500) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + def test_fetch_token_introspect_response_500(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata, + status_code=500 + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + @mock.patch.object(session.Session, 'request') + def test_fetch_token_introspect_timeout_500(self, mock_session_request): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + mock_session_request.side_effect = ksa_exceptions.RequestTimeout( + 'time out') + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + +class ExternalOauth2TokenMiddlewareClientSecretPostTest( + BaseExternalOauth2TokenMiddlewareTest): + + def setUp(self): + super(ExternalOauth2TokenMiddlewareClientSecretPostTest, self).setUp() + + self._test_client_id = str(uuid.uuid4()) + self._test_client_secret = str(uuid.uuid4()) + self._auth_method = 'client_secret_post' + self._test_conf = get_config( + introspect_endpoint=self._introspect_endpoint, + audience=self._audience, + auth_method=self._auth_method, + client_id=self._test_client_id, + client_secret=self._test_client_secret, + thumbprint_verify=False, + mapping_project_id='project_id', + mapping_project_name='project_name', + mapping_project_domain_id='domain_id', + mapping_project_domain_name='domain_name', + mapping_user_id='user', + mapping_user_name='username', + mapping_user_domain_id='user_domain.id', + mapping_user_domain_name='user_domain.name', + mapping_roles='roles', + ) + self._token = str(uuid.uuid4()) + '_user_token' + self._user_id = str(uuid.uuid4()) + '_user_id' + self._user_name = str(uuid.uuid4()) + '_user_name' + self._user_domain_id = str(uuid.uuid4()) + '_user_domain_id' + self._user_domain_name = str(uuid.uuid4()) + '_user_domain_name' + self._project_id = str(uuid.uuid4()) + '_project_id' + self._project_name = str(uuid.uuid4()) + '_project_name' + self._project_domain_id = str(uuid.uuid4()) + 'project_domain_id' + self._project_domain_name = str(uuid.uuid4()) + 'project_domain_name' + self._roles = 'admin,member,reader' + + self._default_metadata = { + 'project_id': self._project_id, + 'project_name': self._project_name, + 'domain_id': self._project_domain_id, + 'domain_name': self._project_domain_name, + 'user_domain': { + 'id': self._user_domain_id, + 'name': self._user_domain_name + }, + 'roles': self._roles, + 'user': self._user_id, + 'username': self._user_name, + } + + def test_basic_200(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self._check_env_value_project_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_id, self._project_name, self._project_domain_id, + self._project_domain_name, self._roles) + + def test_process_request_no_access_token_in_header_401(self): + conf = copy.deepcopy(self._test_conf) + test_audience = 'https://test_audience' + conf['audience'] = test_audience + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers={}, + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % test_audience) + + def test_read_data_from_token_key_type_not_dict_403(self): + conf = copy.deepcopy(self._test_conf) + conf['mapping_user_id'] = 'user.id' + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=403, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_read_data_from_token_key_not_fount_in_metadata_403(self): + conf = copy.deepcopy(self._test_conf) + conf['mapping_user_id'] = 'user_id' + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=403, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_read_data_from_token_key_value_type_is_not_match_403(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + metadata = copy.deepcopy(self._default_metadata) + metadata['user'] = { + 'id': str(uuid.uuid4()), + 'name': 'testName' + } + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=403, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_read_data_from_token_key_config_error_is_not_dict_500(self): + conf = copy.deepcopy(self._test_conf) + conf['mapping_project_id'] = '..project_id' + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + def test_read_data_from_token_key_config_error_is_not_set_500(self): + conf = copy.deepcopy(self._test_conf) + conf.pop('mapping_roles') + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=500, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + + +class ExternalOauth2TokenMiddlewareClientSecretBasicTest( + BaseExternalOauth2TokenMiddlewareTest): + + def setUp(self): + super(ExternalOauth2TokenMiddlewareClientSecretBasicTest, self).setUp() + + self._test_client_id = str(uuid.uuid4()) + self._test_client_secret = str(uuid.uuid4()) + self._auth_method = 'client_secret_basic' + self._test_conf = get_config( + introspect_endpoint=self._introspect_endpoint, + audience=self._audience, + auth_method=self._auth_method, + client_id=self._test_client_id, + client_secret=self._test_client_secret, + thumbprint_verify=False, + mapping_project_id='access_project.id', + mapping_project_name='access_project.name', + mapping_project_domain_id='access_project.domain.id', + mapping_project_domain_name='access_project.domain.name', + mapping_user_id='client_id', + mapping_user_name='username', + mapping_user_domain_id='user_domain.id', + mapping_user_domain_name='user_domain.name', + mapping_roles='roles', + ) + self._token = str(uuid.uuid4()) + '_user_token' + self._user_id = str(uuid.uuid4()) + '_user_id' + self._user_name = str(uuid.uuid4()) + '_user_name' + self._user_domain_id = str(uuid.uuid4()) + '_user_domain_id' + self._user_domain_name = str(uuid.uuid4()) + '_user_domain_name' + self._project_id = str(uuid.uuid4()) + '_project_id' + self._project_name = str(uuid.uuid4()) + '_project_name' + self._project_domain_id = str(uuid.uuid4()) + 'project_domain_id' + self._project_domain_name = str(uuid.uuid4()) + 'project_domain_name' + self._roles = 'admin,member,reader' + + self._default_metadata = { + 'access_project': { + 'id': self._project_id, + 'name': self._project_name, + 'domain': { + 'id': self._project_domain_id, + 'name': self._project_domain_name + } + }, + 'user_domain': { + 'id': self._user_domain_id, + 'name': self._user_domain_name + }, + 'roles': self._roles, + 'client_id': self._user_id, + 'username': self._user_name, + } + self._clear_call_count = 0 + + def test_basic_200(self): + conf = copy.deepcopy(self._test_conf) + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self._check_env_value_project_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_id, self._project_name, self._project_domain_id, + self._project_domain_name, self._roles) + + def test_domain_scope_200(self): + conf = copy.deepcopy(self._test_conf) + conf.pop('mapping_project_id') + self.set_middleware(conf=conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self._check_env_value_domain_scope( + resp.request.environ, self._user_id, self._user_name, + self._user_domain_id, self._user_domain_name, + self._project_domain_id, self._project_domain_name, self._roles) + + def test_process_response_401(self): + conf = copy.deepcopy(self._test_conf) + conf.pop('mapping_project_id') + self.set_middleware(conf=conf) + self.middleware = external_oauth2_token.ExternalAuth2Protocol( + FakeOauth2TokenV3App(expected_env=self.expected_env, + app_response_status_code=401), self.conf) + + def mock_resp(request, context): + return self._introspect_response( + request, context, + auth_method=self._auth_method, + introspect_client_id=self._test_client_id, + introspect_client_secret=self._test_client_secret, + access_token=self._token, + active=True, + metadata=self._default_metadata + ) + + self.requests_mock.post(self._introspect_endpoint, + json=mock_resp) + self.requests_mock.get(self._auth_url, + json=VERSION_LIST_v3, + status_code=300) + + resp = self.call_middleware( + headers=get_authorization_header(self._token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertEqual(resp.headers.get('WWW-Authenticate'), + 'Authorization OAuth 2.0 uri="%s"' % self._audience) + + +class FilterFactoryTest(utils.BaseTestCase): + + def test_filter_factory(self): + certfile = '/certfile_01' + keyfile = '/keyfile_01' + cafile = '/cafile_01' + insecure = True + http_connect_timeout = 1000 + introspect_endpoint = 'http://introspect_endpoint_01' + audience = 'http://audience_01' + auth_method = 'private_key_jwt' + client_id = 'client_id_01' + client_secret = 'client_secret_01' + thumbprint_verify = True + jwt_key_file = '/jwt_key_file_01' + jwt_algorithm = 'HS512' + jwt_bearer_time_out = 1000 + mapping_project_id = 'test_project.id' + mapping_project_name = 'test_project.name' + mapping_project_domain_id = 'test_project.domain.id' + mapping_project_domain_name = 'test_project.domain.name' + mapping_user_id = 'test_client_id' + mapping_user_name = 'test_username' + mapping_user_domain_id = 'test_user_domain.id' + mapping_user_domain_name = 'test_user_domain.name' + mapping_roles = 'test_roles' + + conf = { + 'certfile': certfile, + 'keyfile': keyfile, + 'cafile': cafile, + 'insecure': insecure, + 'http_connect_timeout': http_connect_timeout, + 'introspect_endpoint': introspect_endpoint, + 'audience': audience, + 'auth_method': auth_method, + 'client_id': client_id, + 'client_secret': client_secret, + 'thumbprint_verify': thumbprint_verify, + 'jwt_key_file': jwt_key_file, + 'jwt_algorithm': jwt_algorithm, + 'jwt_bearer_time_out': jwt_bearer_time_out, + 'mapping_project_id': mapping_project_id, + 'mapping_project_name': mapping_project_name, + 'mapping_project_domain_id': mapping_project_domain_id, + 'mapping_project_domain_name': mapping_project_domain_name, + 'mapping_user_id': mapping_user_id, + 'mapping_user_name': mapping_user_name, + 'mapping_user_domain_id': mapping_user_domain_id, + 'mapping_user_domain_name': mapping_user_domain_name, + 'mapping_roles': mapping_roles + } + auth_filter = external_oauth2_token.filter_factory(conf) + app = FakeApp() + m = auth_filter(app) + self.assertIsInstance(m, external_oauth2_token.ExternalAuth2Protocol) + + self.assertEqual(certfile, + m._get_config_option('certfile', is_required=False)) + self.assertEqual(keyfile, + m._get_config_option('keyfile', is_required=False)) + self.assertEqual(cafile, + m._get_config_option('cafile', is_required=False)) + self.assertEqual(insecure, + m._get_config_option('insecure', is_required=False)) + self.assertEqual(http_connect_timeout, + m._get_config_option('http_connect_timeout', + is_required=False)) + self.assertEqual(introspect_endpoint, + m._get_config_option('introspect_endpoint', + is_required=False)) + self.assertEqual(audience, + m._get_config_option('audience', is_required=False)) + self.assertEqual(auth_method, m._get_config_option('auth_method', + is_required=False)) + self.assertEqual(client_id, + m._get_config_option('client_id', is_required=False)) + self.assertEqual(client_secret, + m._get_config_option('client_secret', + is_required=False)) + self.assertEqual(thumbprint_verify, + m._get_config_option('thumbprint_verify', + is_required=False)) + self.assertEqual(jwt_key_file, m._get_config_option('jwt_key_file', + is_required=False)) + self.assertEqual(jwt_algorithm, + m._get_config_option('jwt_algorithm', + is_required=False)) + self.assertEqual(jwt_bearer_time_out, + m._get_config_option('jwt_bearer_time_out', + is_required=False)) + self.assertEqual(mapping_project_id, + m._get_config_option('mapping_project_id', + is_required=False)) + self.assertEqual(mapping_project_name, + m._get_config_option('mapping_project_name', + is_required=False)) + self.assertEqual(mapping_project_domain_id, + m._get_config_option('mapping_project_domain_id', + is_required=False)) + self.assertEqual(mapping_project_domain_name, + m._get_config_option('mapping_project_domain_name', + is_required=False)) + self.assertEqual(mapping_user_id, + m._get_config_option('mapping_user_id', + is_required=False)) + self.assertEqual(mapping_user_name, + m._get_config_option('mapping_user_name', + is_required=False)) + self.assertEqual(mapping_user_domain_id, + m._get_config_option('mapping_user_domain_id', + is_required=False)) + self.assertEqual(mapping_user_domain_name, + m._get_config_option('mapping_user_domain_name', + is_required=False)) + self.assertEqual(mapping_roles, + m._get_config_option('mapping_roles', + is_required=False)) diff --git a/releasenotes/notes/bp-enhance-oauth2-interoperability-dd998d4e0eafed3c.yaml b/releasenotes/notes/bp-enhance-oauth2-interoperability-dd998d4e0eafed3c.yaml new file mode 100644 index 00000000..bbbeaa12 --- /dev/null +++ b/releasenotes/notes/bp-enhance-oauth2-interoperability-dd998d4e0eafed3c.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + [`blueprint enhance-oauth2-interoperability `_] + The external_oauth2_token filter has been added for accepting or denying + incoming requests containing OAuth 2.0 access tokens that are obtained + from an external authorization server by users through their OAuth 2.0 + credentials. diff --git a/requirements.txt b/requirements.txt index c1398f94..7c81a385 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0 oslo.utils>=3.33.0 # Apache-2.0 pbr!=2.1.0,>=2.0.0 # Apache-2.0 pycadf!=2.0.0,>=1.1.0 # Apache-2.0 +PyJWT>=2.4.0 # MIT python-keystoneclient>=3.20.0 # Apache-2.0 requests>=2.14.2 # Apache-2.0 six>=1.10.0 # MIT diff --git a/setup.cfg b/setup.cfg index a47683ad..c92db9d2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,3 +44,4 @@ paste.filter_factory = s3_token = keystonemiddleware.s3_token:filter_factory oauth2_token = keystonemiddleware.oauth2_token:filter_factory oauth2_mtls_token = keystonemiddleware.oauth2_mtls_token:filter_factory + external_oauth2_token = keystonemiddleware.external_oauth2_token:filter_factory diff --git a/test-requirements.txt b/test-requirements.txt index b539b017..d6f3930a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -19,6 +19,7 @@ testtools>=2.2.0 # MIT python-memcached>=1.59 # PSF WebTest>=2.0.27 # MIT oslo.messaging>=5.29.0 # Apache-2.0 +PyJWT>=2.4.0 # MIT # Bandit security code scanner bandit!=1.6.0,>=1.1.0 # Apache-2.0