Merge "barbican key manager: Add support for service user"

This commit is contained in:
Zuul 2021-06-23 10:03:37 +00:00 committed by Gerrit Code Review
commit 17cd8335e1
3 changed files with 77 additions and 6 deletions

View File

@ -25,6 +25,7 @@ from cryptography.hazmat.primitives import serialization
from cryptography import x509 as cryptography_x509 from cryptography import x509 as cryptography_x509
from keystoneauth1 import identity from keystoneauth1 import identity
from keystoneauth1 import loading from keystoneauth1 import loading
from keystoneauth1 import service_token
from keystoneauth1 import session from keystoneauth1 import session
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -80,10 +81,25 @@ _barbican_opts = [
cfg.StrOpt('barbican_region_name', cfg.StrOpt('barbican_region_name',
default=None, default=None,
help='Specifies the region of the chosen endpoint.'), help='Specifies the region of the chosen endpoint.'),
cfg.BoolOpt('send_service_user_token',
default=False,
help="""
When True, if sending a user token to a REST API, also send a service token.
Nova often reuses the user token provided to the nova-api to talk to other REST
APIs, such as Cinder, Glance and Neutron. It is possible that while the user
token was valid when the request was made to Nova, the token may expire before
it reaches the other service. To avoid any failures, and to make it clear it is
Nova calling the service on the user's behalf, we include a service token along
with the user token. Should the user's token have expired, a valid service
token ensures the REST API request will still be accepted by the keystone
middleware.
"""),
] ]
_BARBICAN_OPT_GROUP = 'barbican' _BARBICAN_OPT_GROUP = 'barbican'
_BARBICAN_SERVICE_USER_OPT_GROUP = 'barbican_service_user'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -98,6 +114,11 @@ class BarbicanKeyManager(key_manager.KeyManager):
self.conf.register_opts(_barbican_opts, group=_BARBICAN_OPT_GROUP) self.conf.register_opts(_barbican_opts, group=_BARBICAN_OPT_GROUP)
loading.register_session_conf_options(self.conf, _BARBICAN_OPT_GROUP) loading.register_session_conf_options(self.conf, _BARBICAN_OPT_GROUP)
loading.register_session_conf_options(self.conf,
_BARBICAN_SERVICE_USER_OPT_GROUP)
loading.register_auth_conf_options(self.conf,
_BARBICAN_SERVICE_USER_OPT_GROUP)
def _get_barbican_client(self, context): def _get_barbican_client(self, context):
"""Creates a client to connect to the Barbican service. """Creates a client to connect to the Barbican service.
@ -144,7 +165,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
def _get_keystone_auth(self, context): def _get_keystone_auth(self, context):
if context.__class__.__name__ == 'KeystonePassword': if context.__class__.__name__ == 'KeystonePassword':
return identity.Password( auth = identity.Password(
auth_url=context.auth_url, auth_url=context.auth_url,
username=context.username, username=context.username,
password=context.password, password=context.password,
@ -160,7 +181,7 @@ class BarbicanKeyManager(key_manager.KeyManager):
project_domain_name=context.project_domain_name, project_domain_name=context.project_domain_name,
reauthenticate=context.reauthenticate) reauthenticate=context.reauthenticate)
elif context.__class__.__name__ == 'KeystoneToken': elif context.__class__.__name__ == 'KeystoneToken':
return identity.Token( auth = identity.Token(
auth_url=context.auth_url, auth_url=context.auth_url,
token=context.token, token=context.token,
trust_id=context.trust_id, trust_id=context.trust_id,
@ -175,9 +196,9 @@ class BarbicanKeyManager(key_manager.KeyManager):
# projects begin to use utils.credential_factory # projects begin to use utils.credential_factory
elif context.__class__.__name__ == 'RequestContext': elif context.__class__.__name__ == 'RequestContext':
if getattr(context, 'get_auth_plugin', None): if getattr(context, 'get_auth_plugin', None):
return context.get_auth_plugin() auth = context.get_auth_plugin()
else: else:
return identity.Token( auth = identity.Token(
auth_url=self.conf.barbican.auth_endpoint, auth_url=self.conf.barbican.auth_endpoint,
token=context.auth_token, token=context.auth_token,
project_id=context.project_id, project_id=context.project_id,
@ -190,6 +211,16 @@ class BarbicanKeyManager(key_manager.KeyManager):
LOG.error(msg) LOG.error(msg)
raise exception.Forbidden(reason=msg) raise exception.Forbidden(reason=msg)
if self.conf.barbican.send_service_user_token:
service_auth = loading.load_auth_from_conf_options(
self.conf,
group=_BARBICAN_SERVICE_USER_OPT_GROUP)
auth = service_token.ServiceTokenAuthWrapper(
user_auth=auth,
service_auth=service_auth)
return auth
def _get_barbican_endpoint(self, auth, sess): def _get_barbican_endpoint(self, auth, sess):
if self.conf.barbican.barbican_endpoint: if self.conf.barbican.barbican_endpoint:
return self.conf.barbican.barbican_endpoint return self.conf.barbican.barbican_endpoint
@ -653,4 +684,10 @@ class BarbicanKeyManager(key_manager.KeyManager):
return objects return objects
def list_options_for_discovery(self): def list_options_for_discovery(self):
return [(_BARBICAN_OPT_GROUP, _barbican_opts)] barbican_service_user_opts = loading.get_session_conf_options()
barbican_service_user_opts += loading.get_auth_common_conf_options()
return [
(_BARBICAN_OPT_GROUP, _barbican_opts),
(_BARBICAN_SERVICE_USER_OPT_GROUP, barbican_service_user_opts),
]

View File

@ -20,6 +20,9 @@ import calendar
from unittest import mock from unittest import mock
from barbicanclient import exceptions as barbican_exceptions from barbicanclient import exceptions as barbican_exceptions
from keystoneauth1 import identity
from keystoneauth1 import service_token
from oslo_context import context
from oslo_utils import timeutils from oslo_utils import timeutils
from castellan.common import exception from castellan.common import exception
@ -37,8 +40,10 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
super(BarbicanKeyManagerTestCase, self).setUp() super(BarbicanKeyManagerTestCase, self).setUp()
# Create fake auth_token # Create fake auth_token
self.ctxt = mock.Mock() self.ctxt = mock.Mock(spec=context.RequestContext)
self.ctxt.auth_token = "fake_token" self.ctxt.auth_token = "fake_token"
self.ctxt.project_name = "foo"
self.ctxt.project_domain_name = "foo"
# Create mock barbican client # Create mock barbican client
self._build_mock_barbican() self._build_mock_barbican()
@ -163,6 +168,15 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
sess, service_type='key-manager', interface='public', sess, service_type='key-manager', interface='public',
region_name='regionOne') region_name='regionOne')
def test__get_keystone_auth(self):
auth = self.key_mgr._get_keystone_auth(self.ctxt)
self.assertIsInstance(auth, identity.Token)
def test__get_keystone_auth_service_user(self):
self.key_mgr.conf.barbican.send_service_user_token = True
auth = self.key_mgr._get_keystone_auth(self.ctxt)
self.assertIsInstance(auth, service_token.ServiceTokenAuthWrapper)
def test_base_url_old_version(self): def test_base_url_old_version(self):
version = "v1" version = "v1"
self.key_mgr.conf.barbican.barbican_api_version = version self.key_mgr.conf.barbican.barbican_api_version = version
@ -607,3 +621,16 @@ class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
def test_list_with_invalid_object_type(self): def test_list_with_invalid_object_type(self):
self.assertRaises(exception.KeyManagerError, self.assertRaises(exception.KeyManagerError,
self.key_mgr.list, self.ctxt, "invalid_type") self.key_mgr.list, self.ctxt, "invalid_type")
def test_list_options_for_discovery(self):
opts = self.key_mgr.list_options_for_discovery()
expected_sections = ['barbican', 'barbican_service_user']
self.assertEqual(expected_sections, [section[0] for section in opts])
barbican_opts = [opt.name for opt in opts[0][1]]
# From Castellan opts.
self.assertIn('barbican_endpoint', barbican_opts)
barbican_service_user_opts = [opt.name for opt in opts[1][1]]
# From session opts.
self.assertIn('cafile', barbican_service_user_opts)
# From auth common opts.
self.assertIn('auth_section', barbican_service_user_opts)

View File

@ -0,0 +1,7 @@
---
features:
- |
Adds support for using a service user with the Barbican key manager.
This is enabled via ``[barbican] send_service_user_token``, with
credentials for the service user configured via keystoneauth options in the
``[barbican_service_user]`` group.