barbican key manager: Add support for service user

This change adds support to the Barbican key manager for configuring a
service user. This can be used to provide additional security through
the combination of a user token and a service token, with appropriate
modifications to Barbican API policy.

Use of a service user is enabled via the [barbican]
send_service_user_token option, which defaults to False. When set to
True, the service user is configured via keystoneauth options in the
barbican_service_user group.

Change-Id: I143cb57c8534a8dc0a91e6e42917dd0c134170c0
This commit is contained in:
Mark Goddard 2021-01-06 15:40:33 +00:00
parent f8b79a2b66
commit 162039467a
3 changed files with 77 additions and 6 deletions

View File

@ -25,6 +25,7 @@ from cryptography.hazmat.primitives import serialization
from cryptography import x509 as cryptography_x509
from keystoneauth1 import identity
from keystoneauth1 import loading
from keystoneauth1 import service_token
from keystoneauth1 import session
from oslo_config import cfg
from oslo_log import log as logging
@ -80,10 +81,25 @@ _barbican_opts = [
cfg.StrOpt('barbican_region_name',
default=None,
help='Specifies the region of the chosen endpoint.'),
cfg.BoolOpt('send_service_user_token',
default=False,
help="""
When True, if sending a user token to a REST API, also send a service token.
Nova often reuses the user token provided to the nova-api to talk to other REST
APIs, such as Cinder, Glance and Neutron. It is possible that while the user
token was valid when the request was made to Nova, the token may expire before
it reaches the other service. To avoid any failures, and to make it clear it is
Nova calling the service on the user's behalf, we include a service token along
with the user token. Should the user's token have expired, a valid service
token ensures the REST API request will still be accepted by the keystone
middleware.
"""),
]
_BARBICAN_OPT_GROUP = 'barbican'
_BARBICAN_SERVICE_USER_OPT_GROUP = 'barbican_service_user'
LOG = logging.getLogger(__name__)
@ -98,6 +114,11 @@ class BarbicanKeyManager(key_manager.KeyManager):
self.conf.register_opts(_barbican_opts, group=_BARBICAN_OPT_GROUP)
loading.register_session_conf_options(self.conf, _BARBICAN_OPT_GROUP)
loading.register_session_conf_options(self.conf,
_BARBICAN_SERVICE_USER_OPT_GROUP)
loading.register_auth_conf_options(self.conf,
_BARBICAN_SERVICE_USER_OPT_GROUP)
def _get_barbican_client(self, context):
"""Creates a client to connect to the Barbican service.
@ -144,7 +165,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
def _get_keystone_auth(self, context):
if context.__class__.__name__ == 'KeystonePassword':
return identity.Password(
auth = identity.Password(
auth_url=context.auth_url,
username=context.username,
password=context.password,
@ -160,7 +181,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
project_domain_name=context.project_domain_name,
reauthenticate=context.reauthenticate)
elif context.__class__.__name__ == 'KeystoneToken':
return identity.Token(
auth = identity.Token(
auth_url=context.auth_url,
token=context.token,
trust_id=context.trust_id,
@ -175,9 +196,9 @@ class BarbicanKeyManager(key_manager.KeyManager):
# projects begin to use utils.credential_factory
elif context.__class__.__name__ == 'RequestContext':
if getattr(context, 'get_auth_plugin', None):
return context.get_auth_plugin()
auth = context.get_auth_plugin()
else:
return identity.Token(
auth = identity.Token(
auth_url=self.conf.barbican.auth_endpoint,
token=context.auth_token,
project_id=context.project_id,
@ -190,6 +211,16 @@ class BarbicanKeyManager(key_manager.KeyManager):
LOG.error(msg)
raise exception.Forbidden(reason=msg)
if self.conf.barbican.send_service_user_token:
service_auth = loading.load_auth_from_conf_options(
self.conf,
group=_BARBICAN_SERVICE_USER_OPT_GROUP)
auth = service_token.ServiceTokenAuthWrapper(
user_auth=auth,
service_auth=service_auth)
return auth
def _get_barbican_endpoint(self, auth, sess):
if self.conf.barbican.barbican_endpoint:
return self.conf.barbican.barbican_endpoint
@ -653,4 +684,10 @@ class BarbicanKeyManager(key_manager.KeyManager):
return objects
def list_options_for_discovery(self):
return [(_BARBICAN_OPT_GROUP, _barbican_opts)]
barbican_service_user_opts = loading.get_session_conf_options()
barbican_service_user_opts += loading.get_auth_common_conf_options()
return [
(_BARBICAN_OPT_GROUP, _barbican_opts),
(_BARBICAN_SERVICE_USER_OPT_GROUP, barbican_service_user_opts),
]

View File

@ -20,6 +20,9 @@ import calendar
from unittest import mock
from barbicanclient import exceptions as barbican_exceptions
from keystoneauth1 import identity
from keystoneauth1 import service_token
from oslo_context import context
from oslo_utils import timeutils
from castellan.common import exception
@ -37,8 +40,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
super(BarbicanKeyManagerTestCase, self).setUp()
# Create fake auth_token
self.ctxt = mock.Mock()
self.ctxt = mock.Mock(spec=context.RequestContext)
self.ctxt.auth_token = "fake_token"
self.ctxt.project_name = "foo"
self.ctxt.project_domain_name = "foo"
# Create mock barbican client
self._build_mock_barbican()
@ -163,6 +168,15 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
sess, service_type='key-manager', interface='public',
region_name='regionOne')
def test__get_keystone_auth(self):
auth = self.key_mgr._get_keystone_auth(self.ctxt)
self.assertIsInstance(auth, identity.Token)
def test__get_keystone_auth_service_user(self):
self.key_mgr.conf.barbican.send_service_user_token = True
auth = self.key_mgr._get_keystone_auth(self.ctxt)
self.assertIsInstance(auth, service_token.ServiceTokenAuthWrapper)
def test_base_url_old_version(self):
version = "v1"
self.key_mgr.conf.barbican.barbican_api_version = version
@ -607,3 +621,16 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
def test_list_with_invalid_object_type(self):
self.assertRaises(exception.KeyManagerError,
self.key_mgr.list, self.ctxt, "invalid_type")
def test_list_options_for_discovery(self):
opts = self.key_mgr.list_options_for_discovery()
expected_sections = ['barbican', 'barbican_service_user']
self.assertEqual(expected_sections, [section[0] for section in opts])
barbican_opts = [opt.name for opt in opts[0][1]]
# From Castellan opts.
self.assertIn('barbican_endpoint', barbican_opts)
barbican_service_user_opts = [opt.name for opt in opts[1][1]]
# From session opts.
self.assertIn('cafile', barbican_service_user_opts)
# From auth common opts.
self.assertIn('auth_section', barbican_service_user_opts)

View File

@ -0,0 +1,7 @@
---
features:
- |
Adds support for using a service user with the Barbican key manager.
This is enabled via ``[barbican] send_service_user_token``, with
credentials for the service user configured via keystoneauth options in the
``[barbican_service_user]`` group.