Stop supporting revocation list

With keystone's move to eliminating pki, pkiz, and uuid tokens the
revocation list is no longer generated. Keystonemiddleware no longer
needs to attempt to retrieve it and reference it.

Change-Id: Ief3bf1941e62f9136dbed11877bca81c4102041b
closes-bug: #1361743
partial-bug: #1649735
partial-bug: #1736985
This commit is contained in:
Morgan Fainberg 2018-10-26 10:32:28 -07:00
parent fc51082ef4
commit 7e1b536259
10 changed files with 9 additions and 734 deletions

View File

@ -16,7 +16,6 @@ import json
import os
from keystoneclient.common import cms
from keystoneclient import utils
CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
@ -25,41 +24,6 @@ def make_filename(*args):
return os.path.join(CURRENT_DIR, *args)
def generate_revocation_list():
REVOKED_TOKENS = ['auth_token_revoked', 'auth_v3_token_revoked']
revoked_list = []
for token in REVOKED_TOKENS:
with open(make_filename('cms', '%s.pkiz' % name), 'r') as f:
token_data = f.read()
id = utils.hash_signed_token(token_data.encode('utf-8'))
revoked_list.append({
'id': id,
"expires": "2112-08-14T17:58:48Z"
})
with open(make_filename('cms', '%s.pem' % name), 'r') as f:
pem_data = f.read()
token_data = cms.cms_to_token(pem_data).encode('utf-8')
id = utils.hash_signed_token(token_data)
revoked_list.append({
'id': id,
"expires": "2112-08-14T17:58:48Z"
})
revoked_json = json.dumps({"revoked": revoked_list})
with open(make_filename('cms', 'revocation_list.json'), 'w') as f:
f.write(revoked_json)
encoded = cms.pkiz_sign(revoked_json,
SIGNING_CERT_FILE_NAME,
SIGNING_KEY_FILE_NAME)
with open(make_filename('cms', 'revocation_list.pkiz'), 'w') as f:
f.write(encoded)
encoded = cms.cms_sign_data(revoked_json,
SIGNING_CERT_FILE_NAME,
SIGNING_KEY_FILE_NAME)
with open(make_filename('cms', 'revocation_list.pem'), 'w') as f:
f.write(encoded)
CA_CERT_FILE_NAME = make_filename('certs', 'cacert.pem')
SIGNING_CERT_FILE_NAME = make_filename('certs', 'signing_cert.pem')
SIGNING_KEY_FILE_NAME = make_filename('private', 'signing_key.pem')
@ -113,5 +77,3 @@ for name in EXAMPLE_TOKENS:
with open(pkiz_file, 'w') as f:
f.write(encoded)
generate_revocation_list()

View File

@ -219,7 +219,6 @@ object is stored.
import binascii
import copy
import datetime
from keystoneauth1 import access
from keystoneauth1 import adapter
@ -243,7 +242,6 @@ from keystonemiddleware.auth_token import _exceptions as ksm_exceptions
from keystonemiddleware.auth_token import _identity
from keystonemiddleware.auth_token import _opts
from keystonemiddleware.auth_token import _request
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.auth_token import _user_plugin
from keystonemiddleware.i18n import _
@ -597,17 +595,6 @@ class AuthProtocol(BaseAuthProtocol):
self._token_cache = self._token_cache_factory()
revocation_cache_timeout = datetime.timedelta(
seconds=self._conf.get('revocation_cache_time'))
self._revocations = _revocations.Revocations(revocation_cache_timeout,
self._signing_directory,
self._identity_server,
self._cms_verify,
self.log)
self._check_revocations_for_cached = self._conf.get(
'check_revocations_for_cached')
def process_request(self, request):
"""Process request.
@ -690,9 +677,6 @@ class AuthProtocol(BaseAuthProtocol):
def _token_hashes(self, token):
"""Generate a list of hashes that the current token may be cached as.
With PKI tokens we have multiple hashing algorithms that we test with
revocations. This generates that whole list.
The first element of this list is the preferred algorithm and is what
new cache values should be saved as.
@ -740,11 +724,6 @@ class AuthProtocol(BaseAuthProtocol):
self.log.debug('Cached token is marked unauthorized')
raise ksm_exceptions.InvalidToken()
if self._check_revocations_for_cached:
# A token might have been revoked, regardless of initial
# mechanism used to validate it, and needs to be checked.
self._revocations.check(token_hashes)
# NOTE(jamielennox): Cached values used to be stored as a tuple
# of data and expiry time. They no longer are but we have to
# allow some time to transition the old format so if it's a
@ -765,7 +744,6 @@ class AuthProtocol(BaseAuthProtocol):
except (ksa_exceptions.ConnectFailure,
ksa_exceptions.DiscoveryFailure,
ksa_exceptions.RequestTimeout,
ksm_exceptions.RevocationListError,
ksm_exceptions.ServiceError) as e:
self.log.critical('Unable to validate token: %s', e)
if self._delay_auth_decision:
@ -797,14 +775,10 @@ class AuthProtocol(BaseAuthProtocol):
return
try:
self._revocations.check(token_hashes)
verified = self._cms_verify(token_data, inform)
except ksc_exceptions.CertificateConfigError:
self.log.warning('Fetch certificate config failed, '
'fallback to online validation.')
except ksm_exceptions.RevocationListError:
self.log.warning('Fetch revocation list failed, '
'fallback to online validation.')
else:
self.log.warning('auth_token middleware received a PKI/Z token. '
'This form of token is deprecated and has been '
@ -815,17 +789,6 @@ class AuthProtocol(BaseAuthProtocol):
data = jsonutils.loads(verified)
audit_ids = None
if 'access' in data:
# It's a v2 token.
audit_ids = data['access']['token'].get('audit_ids')
else:
# It's a v3 token
audit_ids = data['token'].get('audit_ids')
if audit_ids:
self._revocations.check_by_audit_id(audit_ids)
return data
def _validate_token(self, auth_ref, **kwargs):
@ -1005,4 +968,3 @@ def app_factory(global_conf, **local_conf):
InvalidToken = ksm_exceptions.InvalidToken
ServiceError = ksm_exceptions.ServiceError
ConfigurationError = ksm_exceptions.ConfigurationError
RevocationListError = ksm_exceptions.RevocationListError

View File

@ -61,9 +61,6 @@ class _RequestStrategy(object):
def _fetch_ca_cert(self):
pass
def fetch_revocation_list(self):
pass
class _V2RequestStrategy(_RequestStrategy):
@ -89,9 +86,6 @@ class _V2RequestStrategy(_RequestStrategy):
def _fetch_ca_cert(self):
return self._client.certificates.get_ca_certificate()
def fetch_revocation_list(self):
return self._client.tokens.get_revoked()
class _V3RequestStrategy(_RequestStrategy):
@ -119,9 +113,6 @@ class _V3RequestStrategy(_RequestStrategy):
def _fetch_ca_cert(self):
return self._client.simple_cert.get_ca_certificates()
def fetch_revocation_list(self):
return self._client.tokens.get_revoked()
_REQUEST_STRATEGIES = [_V3RequestStrategy, _V2RequestStrategy]
@ -130,7 +121,7 @@ class IdentityServer(object):
"""Base class for operations on the Identity API server.
The auth_token middleware needs to communicate with the Identity API server
to validate UUID tokens, fetch the revocation list, signing certificates,
to validate UUID tokens, signing certificates,
etc. This class encapsulates the data and methods to perform these
operations.
@ -243,17 +234,6 @@ class IdentityServer(object):
else:
return auth_ref
def fetch_revocation_list(self):
try:
data = self._request_strategy.fetch_revocation_list()
except ksa_exceptions.HttpError as e:
msg = _('Failed to fetch token revocation list: %d')
raise ksm_exceptions.RevocationListError(msg % e.http_status)
if 'signed' not in data:
msg = _('Revocation list improperly formatted.')
raise ksm_exceptions.RevocationListError(msg)
return data['signed']
def fetch_signing_cert(self):
return self._request_strategy.fetch_signing_cert()

View File

@ -113,17 +113,6 @@ _OPTS = [
' tokens, the middleware caches previously-seen tokens for a'
' configurable duration (in seconds). Set to -1 to disable'
' caching completely.'),
cfg.IntOpt('revocation_cache_time',
default=10,
deprecated_for_removal=True,
deprecated_reason='PKI token format is no longer supported.',
deprecated_since='Ocata',
help='Determines the frequency at which the list of revoked'
' tokens is retrieved from the Identity service (in seconds). A'
' high number of revocation events combined with a low cache'
' duration may significantly reduce performance. Only valid'
' for PKI tokens. This option has been deprecated in the Ocata'
' release and will be removed in the P release.'),
cfg.StrOpt('memcache_security_strategy',
default='None',
choices=('None', 'MAC', 'ENCRYPT'),
@ -179,13 +168,6 @@ _OPTS = [
' unknown the token will be rejected. "required" any form of'
' token binding is needed to be allowed. Finally the name of a'
' binding method that must be present in tokens.'),
cfg.BoolOpt('check_revocations_for_cached', default=False,
deprecated_for_removal=True,
deprecated_reason='PKI token format is no longer supported.',
deprecated_since='Ocata',
help='If true, the revocation list will be checked for cached'
' tokens. This requires that PKI tokens are configured on the'
' identity server.'),
cfg.ListOpt('hash_algorithms', default=['md5'],
deprecated_for_removal=True,
deprecated_reason='PKI token format is no longer supported.',

View File

@ -1,128 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import os
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.i18n import _
_LOG = logging.getLogger(__name__)
class Revocations(object):
_FILE_NAME = 'revoked.pem'
def __init__(self, timeout, signing_directory, identity_server,
cms_verify, log=_LOG):
self._cache_timeout = timeout
self._signing_directory = signing_directory
self._identity_server = identity_server
self._cms_verify = cms_verify
self._log = log
self._fetched_time_prop = None
self._list_prop = None
@property
def _fetched_time(self):
if not self._fetched_time_prop:
# If the fetched list has been written to disk, use its
# modification time.
file_path = self._signing_directory.calc_path(self._FILE_NAME)
if os.path.exists(file_path):
mtime = os.path.getmtime(file_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
# Otherwise the list will need to be fetched.
else:
fetched_time = datetime.datetime.min
self._fetched_time_prop = fetched_time
return self._fetched_time_prop
@_fetched_time.setter
def _fetched_time(self, value):
self._fetched_time_prop = value
def _fetch(self):
revocation_list_data = self._identity_server.fetch_revocation_list()
return self._cms_verify(revocation_list_data)
@property
def _list(self):
timeout = self._fetched_time + self._cache_timeout
list_is_current = timeutils.utcnow() < timeout
if list_is_current:
# Load the list from disk if required
if not self._list_prop:
self._list_prop = jsonutils.loads(
self._signing_directory.read_file(self._FILE_NAME))
else:
self._list = self._fetch()
return self._list_prop
@_list.setter
def _list(self, value):
"""Save a revocation list to memory and to disk.
:param value: A json-encoded revocation list
"""
self._list_prop = jsonutils.loads(value)
self._fetched_time = timeutils.utcnow()
self._signing_directory.write_file(self._FILE_NAME, value)
def _is_revoked(self, token_id):
"""Indicate whether the token_id appears in the revocation list."""
revoked_tokens = self._list.get('revoked', None)
if not revoked_tokens:
return False
revoked_ids = (x['id'] for x in revoked_tokens)
return token_id in revoked_ids
def _any_revoked(self, token_ids):
for token_id in token_ids:
if self._is_revoked(token_id):
return True
return False
def check(self, token_ids):
if self._any_revoked(token_ids):
self._log.debug('Token is marked as having been revoked')
raise exc.InvalidToken(_('Token has been revoked'))
def check_by_audit_id(self, audit_ids):
"""Check whether the audit_id appears in the revocation list.
:raises keystonemiddleware.auth_token._exceptions.InvalidToken:
if the audit ID(s) appear in the revocation list.
"""
revoked_tokens = self._list.get('revoked', None)
if not revoked_tokens:
# There's no revoked tokens, so nothing to do.
return
# The audit_id may not be present in the revocation events because
# earlier versions of the identity server didn't provide them.
revoked_ids = set(
x['audit_id'] for x in revoked_tokens if 'audit_id' in x)
for audit_id in audit_ids:
if audit_id in revoked_ids:
self._log.debug(
'Token is marked as having been revoked by audit id')
raise exc.InvalidToken(_('Token has been revoked'))

View File

@ -35,7 +35,6 @@ from oslo_utils import timeutils
import pbr.version
import six
import testresources
import testtools
from testtools import matchers
import webob
import webob.dec
@ -44,7 +43,6 @@ from keystonemiddleware import auth_token
from keystonemiddleware.auth_token import _base
from keystonemiddleware.auth_token import _cache
from keystonemiddleware.auth_token import _exceptions as ksm_exceptions
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.tests.unit.auth_token import base
from keystonemiddleware.tests.unit import client_fixtures
@ -101,13 +99,6 @@ ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2'
TIMEOUT_TOKEN = '4ed1c5e53beee59458adcf8261a8cae2'
def cleanup_revoked_file(filename):
try:
os.remove(filename)
except OSError:
pass
def strtime(at=None):
at = at or timeutils.utcnow()
return at.strftime(timeutils.PERFECT_TIME_FORMAT)
@ -337,9 +328,6 @@ class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
self.middleware = auth_token.AuthProtocol(
self.fake_app(self.expected_env), self.conf)
self.middleware._revocations._list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
def update_expected_env(self, expected_env={}):
self.middleware._app.expected_env.update(expected_env)
@ -476,27 +464,14 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertThat(hashed_short_string_key,
matchers.HasLength(len(hashed_long_string_key)))
def test_config_revocation_cache_timeout(self):
conf = {
'revocation_cache_time': '24',
'www_authenticate_uri': 'https://keystone.example.com:1234',
'admin_user': uuid.uuid4().hex
}
middleware = auth_token.AuthProtocol(self.fake_app, conf)
self.assertEqual(middleware._revocations._cache_timeout,
datetime.timedelta(seconds=24))
def test_conf_values_type_convert(self):
conf = {
'revocation_cache_time': '24',
'identity_uri': 'https://keystone.example.com:1234',
'include_service_catalog': '0',
'nonexsit_option': '0',
}
middleware = auth_token.AuthProtocol(self.fake_app, conf)
self.assertEqual(datetime.timedelta(seconds=24),
middleware._revocations._cache_timeout)
self.assertFalse(middleware._include_service_catalog)
self.assertEqual('0', middleware._conf.get('nonexsit_option'))
@ -643,42 +618,6 @@ class CommonAuthTokenMiddlewareTest(object):
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
def _test_cache_revoked(self, token, revoked_form=None):
# When the token is cached and revoked, 401 is returned.
self.middleware._check_revocations_for_cached = True
# Token should be cached as ok after this.
self.call_middleware(headers={'X-Auth-Token': token})
# Put it in revocation list.
self.middleware._revocations._list = self.get_revocation_list_json(
token_ids=[revoked_form or token])
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=401)
def test_cached_revoked_error(self):
# When the token is cached and revocation list retrieval fails,
# 503 is returned
token = self.token_dict['uuid_token_default']
self.middleware._check_revocations_for_cached = True
# Token should be cached as ok after this.
resp = self.call_middleware(headers={'X-Auth-Token': token})
self.assertEqual(200, resp.status_int)
# Cause the revocation list to be fetched again next time so we can
# test the case where that retrieval fails
self.middleware._revocations._fetched_time = datetime.datetime.min
with mock.patch.object(self.middleware._revocations, '_fetch',
side_effect=ksm_exceptions.RevocationListError):
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=503)
def test_cached_revoked_uuid(self):
# When the UUID token is cached and revoked, 401 is returned.
self._test_cache_revoked(self.token_dict['uuid_token_default'])
def test_valid_signed_request(self):
for _ in range(2): # Do it twice because first result was cached.
self.assert_valid_request_200(
@ -692,176 +631,13 @@ class CommonAuthTokenMiddlewareTest(object):
# ensure that signed requests do not generate HTTP traffic
self.assertLastPath(None)
def test_revoked_token_receives_401(self):
self.middleware._revocations._list = (
self.get_revocation_list_json())
token = self.token_dict['revoked_token']
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=401)
def test_revoked_token_receives_401_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
token = self.token_dict['revoked_token']
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=401)
def test_cached_revoked_pki(self):
# When the PKI token is cached and revoked, 401 is returned.
token = self.token_dict['signed_token_scoped']
revoked_form = cms.cms_hash_token(token)
self._test_cache_revoked(token, revoked_form)
def test_cached_revoked_pkiz(self):
# When the PKIZ token is cached and revoked, 401 is returned.
token = self.token_dict['signed_token_scoped_pkiz']
revoked_form = cms.cms_hash_token(token)
self._test_cache_revoked(token, revoked_form)
def test_revoked_token_receives_401_md5_secondary(self):
# When hash_algorithms has 'md5' as the secondary hash and the
# revocation list contains the md5 hash for a token, that token is
# considered revoked so returns 401.
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json())
token = self.token_dict['revoked_token']
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=401)
def _test_revoked_hashed_token(self, token_name):
# If hash_algorithms is set as ['sha256', 'md5'],
# and check_revocations_for_cached is True,
# and a token is in the cache because it was successfully validated
# using the md5 hash, then
# if the token is in the revocation list by md5 hash, it'll be
# rejected and auth_token returns 401.
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.conf['check_revocations_for_cached'] = 'true'
self.set_middleware()
token = self.token_dict[token_name]
# Put the token in the revocation list.
token_hashed = cms.cms_hash_token(token)
self.middleware._revocations._list = self.get_revocation_list_json(
token_ids=[token_hashed])
# First, request is using the hashed token, is valid so goes in
# cache using the given hash.
self.call_middleware(headers={'X-Auth-Token': token_hashed})
# This time use the PKI(Z) token
# Should find the token in the cache and revocation list.
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=401)
def test_revoked_hashed_pki_token(self):
self._test_revoked_hashed_token('signed_token_scoped')
def test_revoked_hashed_pkiz_token(self):
self._test_revoked_hashed_token('signed_token_scoped_pkiz')
def test_revoked_pki_token_by_audit_id(self):
# When the audit ID is in the revocation list, the token is invalid.
self.set_middleware()
token = self.token_dict['signed_token_scoped']
# Put the token audit ID in the revocation list,
# the entry will have a false token ID so the token ID doesn't match.
fake_token_id = uuid.uuid4().hex
# The audit_id value is in examples/pki/cms/auth_*_token_scoped.json.
audit_id = 'SLIXlXQUQZWUi9VJrqdXqA'
revocation_list_data = {
'revoked': [
{
'id': fake_token_id,
'audit_id': audit_id
},
]
}
self.middleware._revocations._list = jsonutils.dumps(
revocation_list_data)
self.call_middleware(headers={'X-Auth-Token': token},
expected_status=401)
def get_revocation_list_json(self, token_ids=None, mode=None):
if token_ids is None:
key = 'revoked_token_hash' + (('_' + mode) if mode else '')
token_ids = [self.token_dict[key]]
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
for x in token_ids]}
return jsonutils.dumps(revocation_list)
def test_is_signed_token_revoked_returns_false(self):
# explicitly setting an empty revocation list here to document intent
self.middleware._revocations._list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
result = self.middleware._revocations._any_revoked(
[self.token_dict['revoked_token_hash']])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
self.middleware._revocations._list = (
self.get_revocation_list_json())
result = self.middleware._revocations._any_revoked(
[self.token_dict['revoked_token_hash']])
self.assertTrue(result)
def test_is_signed_token_revoked_returns_true_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
result = self.middleware._revocations._any_revoked(
[self.token_dict['revoked_token_hash_sha256']])
self.assertTrue(result)
def test_validate_offline_raises_exception_for_revoked_token(self):
self.middleware._revocations._list = (
self.get_revocation_list_json())
self.assertRaises(ksm_exceptions.InvalidToken,
self.middleware._validate_offline,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash']])
def test_validate_offline_raises_exception_for_revoked_token_s256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
self.assertRaises(ksm_exceptions.InvalidToken,
self.middleware._validate_offline,
self.token_dict['revoked_token'],
[self.token_dict['revoked_token_hash_sha256'],
self.token_dict['revoked_token_hash']])
def test_validate_offline_raises_exception_for_revoked_pkiz_token(self):
self.middleware._revocations._list = (
self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
self.assertRaises(ksm_exceptions.InvalidToken,
self.middleware._validate_offline,
self.token_dict['revoked_token_pkiz'],
[self.token_dict['revoked_token_pkiz_hash']])
def test_validate_offline_succeeds_for_unrevoked_token(self):
self.middleware._revocations._list = (
self.get_revocation_list_json())
token = self.middleware._validate_offline(
self.token_dict['signed_token_scoped'],
[self.token_dict['signed_token_scoped_hash']])
self.assertIsInstance(token, dict)
def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
self.middleware._revocations._list = (
self.get_revocation_list_json())
token = self.middleware._validate_offline(
self.token_dict['signed_token_scoped_pkiz'],
[self.token_dict['signed_token_scoped_hash']])
@ -870,84 +646,12 @@ class CommonAuthTokenMiddlewareTest(object):
def test_validate_offline_token_succeeds_for_unrevoked_token_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
token = self.middleware._validate_offline(
self.token_dict['signed_token_scoped'],
[self.token_dict['signed_token_scoped_hash_sha256'],
self.token_dict['signed_token_scoped_hash']])
self.assertIsInstance(token, dict)
def test_get_token_revocation_list_fetched_time_returns_min(self):
self.middleware._revocations._fetched_time = None
# Get rid of the revoked file
revoked_path = self.middleware._signing_directory.calc_path(
_revocations.Revocations._FILE_NAME)
os.remove(revoked_path)
self.assertEqual(self.middleware._revocations._fetched_time,
datetime.datetime.min)
# FIXME(blk-u): move the unit tests into unit/test_auth_token.py
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware._revocations._fetched_time = None
revoked_path = self.middleware._signing_directory.calc_path(
_revocations.Revocations._FILE_NAME)
mtime = os.path.getmtime(revoked_path)
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
self.assertEqual(fetched_time,
self.middleware._revocations._fetched_time)
@testtools.skipUnless(TimezoneFixture.supported(),
'TimezoneFixture not supported')
def test_get_token_revocation_list_fetched_time_returns_utc(self):
with TimezoneFixture('UTC-1'):
self.middleware._revocations._list = jsonutils.dumps(
self.examples.REVOCATION_LIST)
self.middleware._revocations._fetched_time = None
fetched_time = self.middleware._revocations._fetched_time
self.assertTrue(timeutils.is_soon(fetched_time, 1))
def test_get_token_revocation_list_fetched_time_returns_value(self):
expected = self.middleware._revocations._fetched_time
self.assertEqual(self.middleware._revocations._fetched_time,
expected)
def test_get_revocation_list_returns_fetched_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
self.middleware._revocations._fetched_time = None
# Get rid of the revoked file
revoked_path = self.middleware._signing_directory.calc_path(
_revocations.Revocations._FILE_NAME)
os.remove(revoked_path)
self.assertEqual(self.middleware._revocations._list,
self.examples.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware._revocations._list,
self.middleware._revocations._list_prop)
def test_get_revocation_list_returns_current_list_from_disk(self):
in_memory_list = self.middleware._revocations._list
self.middleware._revocations._list_prop = None
self.assertEqual(self.middleware._revocations._list,
in_memory_list)
def test_invalid_revocation_list_raises_error(self):
self.requests_mock.get(self.revocation_url, json={})
self.assertRaises(ksm_exceptions.RevocationListError,
self.middleware._revocations._fetch)
def test_fetch_revocation_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
fetched = jsonutils.loads(self.middleware._revocations._fetch())
self.assertEqual(fetched, self.examples.REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
@ -1604,13 +1308,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256,
'signed_token_scoped_expired':
self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': self.examples.REVOKED_TOKEN,
'revoked_token_pkiz': self.examples.REVOKED_TOKEN_PKIZ,
'revoked_token_pkiz_hash':
self.examples.REVOKED_TOKEN_PKIZ_HASH,
'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH,
'revoked_token_hash_sha256':
self.examples.REVOKED_TOKEN_HASH_SHA256,
'uuid_service_token_default':
self.examples.UUID_SERVICE_TOKEN_DEFAULT,
}
@ -1622,10 +1319,6 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.revocation_url = '%s/v2.0/tokens/revoked' % BASE_URI
self.requests_mock.get(self.revocation_url,
text=self.examples.SIGNED_REVOCATION_LIST)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_UNSCOPED,
self.examples.UUID_TOKEN_BIND,
@ -1812,13 +1505,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256,
'signed_token_scoped_expired':
self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': self.examples.REVOKED_v3_TOKEN,
'revoked_token_pkiz': self.examples.REVOKED_v3_TOKEN_PKIZ,
'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH,
'revoked_token_hash_sha256':
self.examples.REVOKED_v3_TOKEN_HASH_SHA256,
'revoked_token_pkiz_hash':
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
'uuid_service_token_default':
self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT,
}
@ -1832,10 +1518,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.revocation_url = '%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI
self.requests_mock.get(self.revocation_url,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response,
headers={'X-Subject-Token': uuid.uuid4().hex})
@ -1944,7 +1626,6 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.token_dict['signed_token_scoped_pkiz'])
def test_fallback_to_online_validation_with_revocation_list_error(self):
self.requests_mock.get(self.revocation_url, status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
@ -2398,10 +2079,6 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST,
status_code=200)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_SERVICE_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_BIND,
@ -2455,9 +2132,6 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
text=FAKE_ADMIN_TOKEN)
self.requests_mock.get('%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI,
text=self.examples.SIGNED_REVOCATION_LIST)
self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
text=self.token_response,
headers={'X-Subject-Token': uuid.uuid4().hex})

View File

@ -1,104 +0,0 @@
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import json
import shutil
import uuid
import mock
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.tests.unit import utils
class RevocationsTests(utils.BaseTestCase):
def _setup_revocations(self, revoked_list):
directory_name = '/tmp/%s' % uuid.uuid4().hex
signing_directory = _signing_dir.SigningDirectory(directory_name)
self.addCleanup(shutil.rmtree, directory_name)
identity_server = mock.Mock()
verify_result_obj = {'revoked': revoked_list}
cms_verify = mock.Mock(return_value=json.dumps(verify_result_obj))
revocations = _revocations.Revocations(
timeout=datetime.timedelta(1), signing_directory=signing_directory,
identity_server=identity_server, cms_verify=cms_verify)
return revocations
def _check_with_list(self, revoked_list, token_ids):
revoked_list = list({'id': r} for r in revoked_list)
revocations = self._setup_revocations(revoked_list)
revocations.check(token_ids)
def test_check_empty_list(self):
# When the identity server returns an empty list, a token isn't
# revoked.
revoked_tokens = []
token_ids = [uuid.uuid4().hex]
# No assert because this would raise
self._check_with_list(revoked_tokens, token_ids)
def test_check_revoked(self):
# When the identity server returns a list with a token in it, that
# token is revoked.
token_id = uuid.uuid4().hex
revoked_tokens = [token_id]
token_ids = [token_id]
self.assertRaises(exc.InvalidToken,
self._check_with_list, revoked_tokens, token_ids)
def test_check_by_audit_id_revoked(self):
# When the audit ID is in the revocation list, InvalidToken is raised.
audit_id = uuid.uuid4().hex
revoked_list = [{'id': uuid.uuid4().hex, 'audit_id': audit_id}]
revocations = self._setup_revocations(revoked_list)
self.assertRaises(exc.InvalidToken,
revocations.check_by_audit_id, [audit_id])
def test_check_by_audit_id_chain_revoked(self):
# When the token's audit chain ID is in the revocation list,
# InvalidToken is raised.
revoked_audit_id = uuid.uuid4().hex
revoked_list = [{'id': uuid.uuid4().hex, 'audit_id': revoked_audit_id}]
revocations = self._setup_revocations(revoked_list)
token_audit_ids = [uuid.uuid4().hex, revoked_audit_id]
self.assertRaises(exc.InvalidToken,
revocations.check_by_audit_id, token_audit_ids)
def test_check_by_audit_id_not_revoked(self):
# When the audit ID is not in the revocation list no exception.
revoked_list = [{'id': uuid.uuid4().hex, 'audit_id': uuid.uuid4().hex}]
revocations = self._setup_revocations(revoked_list)
audit_id = uuid.uuid4().hex
revocations.check_by_audit_id([audit_id])
def test_check_by_audit_id_no_audit_ids(self):
# Older identity servers don't send audit_ids in the revocation list.
# When this happens, check_by_audit_id still works, just doesn't
# verify anything.
revoked_list = [{'id': uuid.uuid4().hex}]
revocations = self._setup_revocations(revoked_list)
audit_id = uuid.uuid4().hex
revocations.check_by_audit_id([audit_id])

View File

@ -20,7 +20,6 @@ from keystoneauth1 import fixture
from keystoneclient.common import cms
from keystoneclient import utils
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import six
import testresources
@ -77,29 +76,17 @@ class Examples(fixtures.Fixture):
self.SIGNED_v3_TOKEN_SCOPED)
self.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256 = _hash_signed_token_safe(
self.SIGNED_v3_TOKEN_SCOPED, mode='sha256')
with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
self.SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_scoped.pkiz')) as f:
self.SIGNED_TOKEN_SCOPED_PKIZ = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_unscoped.pkiz')) as f:
self.SIGNED_TOKEN_UNSCOPED_PKIZ = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pkiz')) as f:
self.SIGNED_v3_TOKEN_SCOPED_PKIZ = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_revoked.pkiz')) as f:
self.REVOKED_TOKEN_PKIZ = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR,
'auth_token_scoped_expired.pkiz')) as f:
self.SIGNED_TOKEN_SCOPED_EXPIRED_PKIZ = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pkiz')) as f:
self.REVOKED_v3_TOKEN_PKIZ = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
self.REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
self.SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
self.SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem')
with open(self.SIGNING_CERT_FILE) as f:
@ -134,50 +121,6 @@ class Examples(fixtures.Fixture):
self.v3_UUID_SERVICE_TOKEN_DEFAULT = 'g431071bbc2f492748596c1b53cb229'
self.v3_UUID_SERVICE_TOKEN_BIND = 'be705e4426d0449a89e35ae21c380a05'
self.v3_NOT_IS_ADMIN_PROJECT = uuid.uuid4().hex
revoked_token = self.REVOKED_TOKEN
if isinstance(revoked_token, six.text_type):
revoked_token = revoked_token.encode('utf-8')
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(revoked_token)
self.REVOKED_TOKEN_HASH_SHA256 = utils.hash_signed_token(revoked_token,
mode='sha256')
self.REVOKED_TOKEN_LIST = (
{'revoked': [{'id': self.REVOKED_TOKEN_HASH,
'expires': timeutils.utcnow()}]})
self.REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(self.REVOKED_TOKEN_LIST)
revoked_v3_token = self.REVOKED_v3_TOKEN
if isinstance(revoked_v3_token, six.text_type):
revoked_v3_token = revoked_v3_token.encode('utf-8')
self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(revoked_v3_token)
hash = utils.hash_signed_token(revoked_v3_token, mode='sha256')
self.REVOKED_v3_TOKEN_HASH_SHA256 = hash
self.REVOKED_v3_TOKEN_LIST = (
{'revoked': [{'id': self.REVOKED_v3_TOKEN_HASH,
'expires': timeutils.utcnow()}]})
self.REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(
self.REVOKED_v3_TOKEN_LIST)
revoked_token_pkiz = self.REVOKED_TOKEN_PKIZ
if isinstance(revoked_token_pkiz, six.text_type):
revoked_token_pkiz = revoked_token_pkiz.encode('utf-8')
self.REVOKED_TOKEN_PKIZ_HASH = utils.hash_signed_token(
revoked_token_pkiz)
revoked_v3_token_pkiz = self.REVOKED_v3_TOKEN_PKIZ
if isinstance(revoked_v3_token_pkiz, six.text_type):
revoked_v3_token_pkiz = revoked_v3_token_pkiz.encode('utf-8')
self.REVOKED_v3_PKIZ_TOKEN_HASH = utils.hash_signed_token(
revoked_v3_token_pkiz)
self.REVOKED_TOKEN_PKIZ_LIST = (
{'revoked': [{'id': self.REVOKED_TOKEN_PKIZ_HASH,
'expires': timeutils.utcnow()},
{'id': self.REVOKED_v3_PKIZ_TOKEN_HASH,
'expires': timeutils.utcnow()},
]})
self.REVOKED_TOKEN_PKIZ_LIST_JSON = jsonutils.dumps(
self.REVOKED_TOKEN_PKIZ_LIST)
self.SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(
self.SIGNED_TOKEN_SCOPED)
self.SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(

View File

@ -55,7 +55,6 @@ class OptsTestCase(utils.TestCase):
'signing_dir',
'memcached_servers',
'token_cache_time',
'revocation_cache_time',
'memcache_security_strategy',
'memcache_secret_key',
'memcache_use_advanced_pool',
@ -66,7 +65,6 @@ class OptsTestCase(utils.TestCase):
'memcache_pool_socket_timeout',
'include_service_catalog',
'enforce_token_bind',
'check_revocations_for_cached',
'hash_algorithms',
'auth_type',
'auth_section',
@ -102,7 +100,6 @@ class OptsTestCase(utils.TestCase):
'signing_dir',
'memcached_servers',
'token_cache_time',
'revocation_cache_time',
'memcache_security_strategy',
'memcache_secret_key',
'memcache_use_advanced_pool',
@ -113,7 +110,6 @@ class OptsTestCase(utils.TestCase):
'memcache_pool_socket_timeout',
'include_service_catalog',
'enforce_token_bind',
'check_revocations_for_cached',
'hash_algorithms',
'auth_type',
'auth_section',

View File

@ -0,0 +1,8 @@
---
fixes:
- >
[`bug 1649735 <https://bugs.launchpad.net/keystone/+bug/1649735>`_]
The auth_token middleware no longer attempts to retrieve the revocation
list from the Keystone server. The deprecated options
`check_revocations_for_cached` and `check_revocations_for_cached` have been
removed.