From a59020fdab670314ac1ab3d0b77e89b352d7cf27 Mon Sep 17 00:00:00 2001 From: sunyonggen Date: Fri, 7 Oct 2022 11:19:59 +0900 Subject: [PATCH] OAuth 2.0 Mutual-TLS Support The oauth2_mtls_token filter has been added for accepting or denying incoming requests containing OAuth 2.0 certificate-bound access tokens that are obtained from keystone identity server by users through their OAuth 2.0 credentials and Mutual-TLS certificates. Co-Authored-By: Hiromu Asahina Depends-On: https://review.opendev.org/c/openstack/keystoneauth/+/860614 Change-Id: I49127d845954ad6eab39e6e6305948ef0e4ed7b5 Implements: blueprint support-oauth2-mtls --- keystonemiddleware/oauth2_mtls_token.py | 156 ++++++++ .../tests/unit/client_fixtures.py | 109 +++++ .../unit/test_oauth2_mtls_token_middleware.py | 373 ++++++++++++++++++ ...-support-oauth2-mtls-2d2686c9d5b1fe1f.yaml | 8 + setup.cfg | 1 + 5 files changed, 647 insertions(+) create mode 100644 keystonemiddleware/oauth2_mtls_token.py create mode 100644 keystonemiddleware/tests/unit/test_oauth2_mtls_token_middleware.py create mode 100644 releasenotes/notes/bp-support-oauth2-mtls-2d2686c9d5b1fe1f.yaml diff --git a/keystonemiddleware/oauth2_mtls_token.py b/keystonemiddleware/oauth2_mtls_token.py new file mode 100644 index 00000000..5e721e96 --- /dev/null +++ b/keystonemiddleware/oauth2_mtls_token.py @@ -0,0 +1,156 @@ +# Copyright 2022 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import base64 +import hashlib +import ssl +import webob + +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from keystonemiddleware.auth_token import _user_plugin +from keystonemiddleware.auth_token import AuthProtocol +from keystonemiddleware import exceptions +from keystonemiddleware.i18n import _ + + +class OAuth2mTlsProtocol(AuthProtocol): + """Middleware that handles OAuth2.0 mutual-TLS client authentication.""" + + def __init__(self, app, conf): + log = logging.getLogger(conf.get('log_name', __name__)) + log.info('Starting Keystone oauth2_mls_token middleware') + super(OAuth2mTlsProtocol, self).__init__(app, conf) + + def _confirm_certificate_thumbprint(self, token_thumb, peer_cert): + """Check if the thumbprint in the token is valid. + + :rtype: if the thumbprint is valid + """ + try: + cert_pem = ssl.DER_cert_to_PEM_cert(peer_cert) + thumb_sha256 = hashlib.sha256(cert_pem.encode('ascii')).digest() + cert_thumb = base64.urlsafe_b64encode(thumb_sha256).decode('ascii') + if cert_thumb == token_thumb: + is_valid = True + else: + self.log.info('The two thumbprints do not match.') + is_valid = False + except Exception as error: + self.log.exception(error) + is_valid = False + return is_valid + + def _is_valid_access_token(self, request): + """Check the OAuth2.0 certificate-bound access token. + + :param request: Incoming request + :rtype: if the access token is valid + """ + try: + wsgi_input = request.environ.get("wsgi.input") + if not wsgi_input: + self.log.warn('Unable to obtain the client certificate.') + return False + sock = wsgi_input.get_socket() + if not sock: + self.log.warn('Unable to obtain the client certificate.') + return False + peer_cert = sock.getpeercert(binary_form=True) + if not peer_cert: + self.log.warn('Unable to obtain the client certificate.') + return False + except Exception as error: + self.log.warn('Unable to obtain the client certificate. %s' % + str(error)) + return False + + access_token = None + if (request.authorization and + request.authorization.authtype == 'Bearer'): + access_token = request.authorization.params + + if not access_token: + self.log.info('Unable to obtain the token.') + return False + + try: + token_data, user_auth_ref = self._do_fetch_token( + access_token, allow_expired=False) + self._validate_token(user_auth_ref, allow_expired=False) + token = token_data.get('token') + oauth2_cred = token.get('oauth2_credential') + if not oauth2_cred: + self.log.info( + 'Invalid OAuth2.0 certificate-bound access token: ' + 'The token is not an OAuth2.0 credential access token.') + return False + + token_thumb = oauth2_cred.get("x5t#S256") + if self._confirm_certificate_thumbprint(token_thumb, peer_cert): + self._confirm_token_bind(user_auth_ref, request) + request.token_info = token_data + request.token_auth = _user_plugin.UserAuthPlugin( + user_auth_ref, None) + return True + else: + self.log.info( + 'Invalid OAuth2.0 certificate-bound access token: ' + 'the access token dose not match the client certificate.') + return False + except exceptions.KeystoneMiddlewareException as err: + self.log.info('Invalid OAuth2.0 certificate-bound access token: %s' + % str(err)) + return False + + def process_request(self, request): + """Process request. + + :param request: Incoming request + :type request: _request.AuthTokenRequest + """ + request.remove_auth_headers() + self._token_cache.initialize(request.environ) + if (not self._is_valid_access_token(request) + or "keystone.token_info" not in request.environ + or "token" not in request.environ["keystone.token_info"]): + self.log.info('Rejecting request') + message = _('The request you have made requires authentication.') + body = {'error': { + 'code': 401, + 'title': 'Unauthorized', + 'message': message, + }} + raise webob.exc.HTTPUnauthorized( + body=jsonutils.dumps(body), + headers=self._reject_auth_headers, + charset='UTF-8', + content_type='application/json') + + request.set_user_headers(request.token_auth.user) + request.set_service_catalog_headers(request.token_auth.user) + request.token_auth._auth = self._auth + request.token_auth._session = self._session + self.log.debug('Accepting request and inited all env fields.') + + +def filter_factory(global_conf, **local_conf): + """Return a WSGI filter app for use with paste.deploy.""" + conf = global_conf.copy() + conf.update(local_conf) + + def auth_filter(app): + return OAuth2mTlsProtocol(app, conf) + + return auth_filter diff --git a/keystonemiddleware/tests/unit/client_fixtures.py b/keystonemiddleware/tests/unit/client_fixtures.py index 27ba4826..53ecff54 100644 --- a/keystonemiddleware/tests/unit/client_fixtures.py +++ b/keystonemiddleware/tests/unit/client_fixtures.py @@ -12,9 +12,17 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 +import datetime +import hashlib import os +import ssl import uuid +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography import x509 import fixtures from keystoneauth1 import fixture from oslo_serialization import jsonutils @@ -69,6 +77,28 @@ class Examples(fixtures.Fixture): self.v3_APP_CRED_EMPTY_ACCESS_RULES = 'c75905c307f04fdd9979126582d7aae' self.v3_APP_CRED_MATCHING_RULES = 'ad49decc7106489d95ca9ed874b6cb66' + self.v3_OAUTH2_CREDENTIAL = uuid.uuid4().hex + self.V3_OAUTH2_MTLS_CERTIFICATE = self._create_pem_certificate( + self._create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='root' + ) + ) + self.V3_OAUTH2_MTLS_CERTIFICATE_DIFF = self._create_pem_certificate( + self._create_dn( + country_name='jp', + state_or_province_name='kanagawa', + locality_name='kawasaki', + organization_name='fujitsu', + organizational_unit_name='test', + common_name='diff' + ) + ) + # JSON responses keyed by token ID self.TOKEN_RESPONSES = {} @@ -417,8 +447,87 @@ class Examples(fixtures.Fixture): svc.add_endpoint('public', 'https://swift.openstack.example.org') self.TOKEN_RESPONSES[self.v3_APP_CRED_MATCHING_RULES] = token + # oauth2 credential token + cert_pem = ssl.DER_cert_to_PEM_cert(self.V3_OAUTH2_MTLS_CERTIFICATE) + thumb_sha256 = hashlib.sha256(cert_pem.encode('ascii')).digest() + cert_thumb = base64.urlsafe_b64encode(thumb_sha256).decode('ascii') + + token = fixture.V3Token( + methods=['oauth2_credential'], + user_id=USER_ID, + user_name=USER_NAME, + project_id=PROJECT_ID, + oauth2_thumbprint=cert_thumb, + ) + self.TOKEN_RESPONSES[self.v3_OAUTH2_CREDENTIAL] = token + self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in self.TOKEN_RESPONSES.items()]) + def _create_dn( + self, + common_name=None, + locality_name=None, + state_or_province_name=None, + organization_name=None, + organizational_unit_name=None, + country_name=None, + street_address=None, + domain_component=None, + user_id=None, + email_address=None, + ): + oid = x509.NameOID + attr = x509.NameAttribute + dn = [] + if common_name: + dn.append(attr(oid.COMMON_NAME, common_name)) + if locality_name: + dn.append(attr(oid.LOCALITY_NAME, locality_name)) + if state_or_province_name: + dn.append(attr(oid.STATE_OR_PROVINCE_NAME, state_or_province_name)) + if organization_name: + dn.append(attr(oid.ORGANIZATION_NAME, organization_name)) + if organizational_unit_name: + dn.append( + attr( + oid.ORGANIZATIONAL_UNIT_NAME, + organizational_unit_name)) + if country_name: + dn.append(attr(oid.COUNTRY_NAME, country_name)) + if street_address: + dn.append(attr(oid.STREET_ADDRESS, street_address)) + if domain_component: + dn.append(attr(oid.DOMAIN_COMPONENT, domain_component)) + if user_id: + dn.append(attr(oid.USER_ID, user_id)) + if email_address: + dn.append(attr(oid.EMAIL_ADDRESS, email_address)) + return x509.Name(dn) + + def _create_certificate(self, subject_dn, ca=None, ca_key=None): + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + issuer = ca.subject if ca else subject_dn + if not ca_key: + ca_key = private_key + today = datetime.datetime.today() + cert = x509.CertificateBuilder( + issuer_name=issuer, + subject_name=subject_dn, + public_key=private_key.public_key(), + serial_number=x509.random_serial_number(), + not_valid_before=today, + not_valid_after=today + datetime.timedelta(365, 0, 0), + ).sign(ca_key, hashes.SHA256()) + + return cert, private_key + + def _create_pem_certificate(self, subject_dn, ca=None, ca_key=None): + cert, _ = self._create_certificate(subject_dn, ca=ca, ca_key=ca_key) + return cert.public_bytes(Encoding.PEM) + EXAMPLES_RESOURCE = testresources.FixtureResource(Examples()) diff --git a/keystonemiddleware/tests/unit/test_oauth2_mtls_token_middleware.py b/keystonemiddleware/tests/unit/test_oauth2_mtls_token_middleware.py new file mode 100644 index 00000000..29d6b6ec --- /dev/null +++ b/keystonemiddleware/tests/unit/test_oauth2_mtls_token_middleware.py @@ -0,0 +1,373 @@ +# Copyright 2022 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +import ssl +from unittest import mock +import uuid +import webob.dec + +import fixtures +from oslo_config import cfg +import six +from six.moves import http_client +import testresources + +from keystoneauth1 import access +from keystoneauth1 import exceptions as ksa_exceptions + +from keystonemiddleware import oauth2_mtls_token +from keystonemiddleware.tests.unit.auth_token import base +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import BASE_URI +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import ENDPOINT_NOT_FOUND_TOKEN +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import ERROR_TOKEN +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import FAKE_ADMIN_TOKEN +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import FAKE_ADMIN_TOKEN_ID +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import FakeApp +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import FakeOsloCache +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import request_timeout_response +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import TIMEOUT_TOKEN +from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \ + import VERSION_LIST_v3 +from keystonemiddleware.tests.unit import client_fixtures +from keystonemiddleware.tests.unit.test_oauth2_token_middleware \ + import FakeOauth2TokenV3App +from keystonemiddleware.tests.unit.test_oauth2_token_middleware \ + import get_authorization_header +from keystonemiddleware.tests.unit import utils + +_no_value = object() + + +class FakeSocket(object): + + def __init__(self, binary_peer_cert): + self.binary_peer_cert = binary_peer_cert + + def getpeercert(self, binary_form=True): + return self.binary_peer_cert + + +class FakeWsgiInput(object): + + def __init__(self, fake_socket): + self.fake_socket = fake_socket + + def get_socket(self): + return self.fake_socket + + +class BaseOauth2mTlsTokenMiddlewareTest(base.BaseAuthTokenTestCase): + + def setUp(self, expected_env=None, auth_version=None, fake_app=None): + cfg.CONF.clear() + super(BaseOauth2mTlsTokenMiddlewareTest, self).setUp() + + self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + self.useFixture( + fixtures.MockPatchObject(oauth2_mtls_token.OAuth2mTlsProtocol, + '_create_oslo_cache', + return_value=FakeOsloCache)) + self.expected_env = expected_env or dict() + self.fake_app = fake_app or FakeApp + self.middleware = None + + self.conf = { + 'identity_uri': 'https://keystone.example.com:1234/testadmin/', + 'auth_version': auth_version, + 'www_authenticate_uri': 'https://keystone.example.com:1234', + 'admin_user': uuid.uuid4().hex, + } + self.auth_version = auth_version + + def call_middleware(self, **kwargs): + return self.call(self.middleware, **kwargs) + + def set_middleware(self, expected_env=None, conf=None): + """Configure the class ready to call the oauth2_token middleware. + + Set up the various fake items needed to run the middleware. + Individual tests that need to further refine these can call this + function to override the class defaults. + + """ + if conf: + self.conf.update(conf) + + if expected_env: + self.expected_env.update(expected_env) + self.middleware = oauth2_mtls_token.OAuth2mTlsProtocol( + self.fake_app(self.expected_env), self.conf) + + def call(self, middleware, method='GET', path='/', headers=None, + expected_status=http_client.OK, + expected_body_string=None, **kwargs): + req = webob.Request.blank(path, **kwargs) + req.method = method + + for k, v in (headers or {}).items(): + req.headers[k] = v + + resp = req.get_response(middleware) + self.assertEqual(expected_status, resp.status_int) + if expected_body_string: + self.assertIn(expected_body_string, six.text_type(resp.body)) + resp.request = req + return resp + + def assertUnauthorizedResp(self, resp): + error = json.loads(resp.body) + + self.assertEqual('Keystone uri="https://keystone.example.com:1234"', + resp.headers['WWW-Authenticate']) + self.assertEqual( + 'Keystone uri="%s"' % self.conf.get('www_authenticate_uri'), + resp.headers['WWW-Authenticate']) + self.assertEqual( + 'Unauthorized', + error.get('error').get('title')) + self.assertEqual( + 'The request you have made requires authentication.', + error.get('error').get('message')) + + +class Oauth2mTlsTokenMiddlewareTest(BaseOauth2mTlsTokenMiddlewareTest, + testresources.ResourcedTestCase): + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(Oauth2mTlsTokenMiddlewareTest, self).setUp( + auth_version='v3.0', + fake_app=FakeOauth2TokenV3App) + self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v3, + status_code=300) + self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI, + text=self.token_response, + headers={'X-Subject-Token': uuid.uuid4().hex}) + self.set_middleware(conf={'service_type': 'tacker'}) + + def token_response(self, request, context): + auth_id = request.headers.get('X-Auth-Token') + token_id = request.headers.get('X-Subject-Token') + self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) + + if token_id == ERROR_TOKEN: + msg = "Network connection refused." + raise ksa_exceptions.ConnectFailure(msg) + if token_id == ENDPOINT_NOT_FOUND_TOKEN: + raise ksa_exceptions.EndpointNotFound() + if token_id == TIMEOUT_TOKEN: + request_timeout_response(request, context) + + try: + response = self.examples.JSON_TOKEN_RESPONSES[token_id] + except KeyError: + response = "" + context.status_code = 404 + + return response + + def call_middleware(self, client_cert=_no_value, **kwargs): + if client_cert is _no_value: + client_cert = self.examples.V3_OAUTH2_MTLS_CERTIFICATE + + if client_cert: + fake_socket = FakeSocket(client_cert) + fake_wsgi_input = FakeWsgiInput(fake_socket) + kwargs.update({'environ': {'wsgi.input': fake_wsgi_input}}) + + return self.call(self.middleware, **kwargs) + + def test_basic(self): + token = self.examples.v3_OAUTH2_CREDENTIAL + token_data = self.examples.TOKEN_RESPONSES[token] + + resp = self.call_middleware( + headers=get_authorization_header(token), + expected_status=200, + method='GET', path='/vnfpkgm/v1/vnf_packages', + ) + self.assertEqual(FakeApp.SUCCESS, resp.body) + token_auth = resp.request.environ['keystone.token_auth'] + self.assertTrue(token_auth.has_user_token) + self.assertEqual(token_data.user_id, token_auth.user.user_id) + self.assertEqual(token_data.project_id, token_auth.user.project_id) + self.assertEqual(token_data.user_domain_id, + token_auth.user.user_domain_id) + self.assertEqual(token_data.project_domain_id, + token_auth.user.project_domain_id) + self.assertEqual(token_data.oauth2_thumbprint, + token_auth.user.oauth2_credential_thumbprint) + + def test_not_oauth2_credential_token(self): + token = self.examples.v3_APP_CRED_TOKEN + resp = self.call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + ) + self.assertUnauthorizedResp(resp) + self.assertIn( + 'Invalid OAuth2.0 certificate-bound access token: ' + 'The token is not an OAuth2.0 credential access token.', + self.logger.output) + + def test_thumbprint_not_match(self): + diff_cert = self.examples.V3_OAUTH2_MTLS_CERTIFICATE_DIFF + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = self.call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + client_cert=diff_cert + ) + self.assertUnauthorizedResp(resp) + self.assertIn('The two thumbprints do not match.', + self.logger.output) + + @mock.patch.object(ssl, 'DER_cert_to_PEM_cert') + def test_gen_thumbprint_exception(self, mock_DER_cert_to_PEM_cert): + except_msg = 'Boom!' + mock_DER_cert_to_PEM_cert.side_effect = Exception(except_msg) + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = self.call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages' + ) + self.assertUnauthorizedResp(resp) + self.assertIn(except_msg, self.logger.output) + + def test_without_cert(self): + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = self.call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + client_cert=None + ) + self.assertUnauthorizedResp(resp) + self.assertIn('Unable to obtain the client certificate.', + self.logger.output) + + def test_not_wsgi_input(self): + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = super(Oauth2mTlsTokenMiddlewareTest, self).call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': None} + ) + self.assertUnauthorizedResp(resp) + self.assertIn('Unable to obtain the client certificate.', + self.logger.output) + + def test_not_socket(self): + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = super(Oauth2mTlsTokenMiddlewareTest, self).call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(None)} + ) + self.assertUnauthorizedResp(resp) + self.assertIn('Unable to obtain the client certificate.', + self.logger.output) + + def test_not_peer_cert(self): + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = super(Oauth2mTlsTokenMiddlewareTest, self).call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))} + ) + self.assertUnauthorizedResp(resp) + self.assertIn('Unable to obtain the client certificate.', + self.logger.output) + + @mock.patch.object(access, 'create') + def test_keystonemiddleware_exceptiton(self, mock_create): + except_msg = 'Unrecognized auth response' + mock_create.side_effect = Exception(except_msg) + token = self.examples.v3_OAUTH2_CREDENTIAL + resp = self.call_middleware( + headers=get_authorization_header(token), + expected_status=401, + method='GET', path='/vnfpkgm/v1/vnf_packages', + ) + self.assertUnauthorizedResp(resp) + self.assertIn( + 'Invalid token contents.', + self.logger.output) + self.assertIn( + 'Invalid OAuth2.0 certificate-bound access token: %s' + % 'Token authorization failed', + self.logger.output) + + def test_request_no_token(self): + resp = self.call_middleware(expected_status=401) + self.assertUnauthorizedResp(resp) + + def test_request_blank_token(self): + resp = self.call_middleware(headers=get_authorization_header(''), + expected_status=401) + self.assertUnauthorizedResp(resp) + + def _get_cached_token(self, token): + return self.middleware._token_cache.get(token) + + def assert_valid_last_url(self, token_id): + self.assertLastPath('/v3/auth/tokens') + + def assertLastPath(self, path): + if path: + self.assertEqual(BASE_URI + path, + self.requests_mock.last_request.url) + else: + self.assertIsNone(self.requests_mock.last_request) + + def test_http_error_not_cached_token(self): + """Test to don't cache token as invalid on network errors. + + We use UUID tokens since they are the easiest one to reach + get_http_connection. + """ + self.set_middleware(conf={'http_request_max_retries': '0'}) + self.call_middleware(headers=get_authorization_header(ERROR_TOKEN), + expected_status=503) + self.assertIsNone(self._get_cached_token(ERROR_TOKEN)) + self.assert_valid_last_url(ERROR_TOKEN) + + +class FilterFactoryTest(utils.BaseTestCase): + + def test_filter_factory(self): + conf = {} + auth_filter = oauth2_mtls_token.filter_factory(conf) + m = auth_filter(FakeOauth2TokenV3App()) + self.assertIsInstance(m, oauth2_mtls_token.OAuth2mTlsProtocol) diff --git a/releasenotes/notes/bp-support-oauth2-mtls-2d2686c9d5b1fe1f.yaml b/releasenotes/notes/bp-support-oauth2-mtls-2d2686c9d5b1fe1f.yaml new file mode 100644 index 00000000..906404e1 --- /dev/null +++ b/releasenotes/notes/bp-support-oauth2-mtls-2d2686c9d5b1fe1f.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + [`blueprint support-oauth2-mtls `_] + The oauth2_mtls_token filter has been added for accepting or denying + incoming requests containing OAuth 2.0 certificate-bound access tokens + that are obtained from keystone identity server by users through their + OAuth 2.0 credentials and Mutual-TLS certificates. diff --git a/setup.cfg b/setup.cfg index c6eaeae3..a47683ad 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,3 +43,4 @@ paste.filter_factory = ec2_token = keystonemiddleware.ec2_token:filter_factory s3_token = keystonemiddleware.s3_token:filter_factory oauth2_token = keystonemiddleware.oauth2_token:filter_factory + oauth2_mtls_token = keystonemiddleware.oauth2_mtls_token:filter_factory