Add Barbican key manager
Adds the first usable key manager plugin to Castellan. While there is an implementation of a mock key manager in the test directories, it is used only for testing. This code is based on the barbican key manager code in Nova written by Brianna Poulos. See: https://review.openstack.org/#/c/104001/ The Barbican API version info will be read from a config option until the Barbican Version API is fixed. See fix-version-api blueprint. Implements: blueprint add-barbican-key-manager Co-authored-by: Brianna Poulos <brianna.poulos@jhuapl.edu> Change-Id: Ia27cd831f42c6b027778240b3396b1c4149dc689
This commit is contained in:
parent
5fea4ffb80
commit
4a0d606f74
@ -54,3 +54,7 @@ class CastellanException(Exception):
|
||||
|
||||
class Forbidden(CastellanException):
|
||||
message = u._("You are not authorized to complete this action.")
|
||||
|
||||
|
||||
class KeyManagerError(CastellanException):
|
||||
message = u._("Key manager error: %(reason)s")
|
||||
|
@ -19,6 +19,8 @@ from oslo_utils import importutils
|
||||
|
||||
key_manager_opts = [
|
||||
cfg.StrOpt('api_class',
|
||||
default='castellan.key_manager.barbican_key_manager'
|
||||
'.BarbicanKeyManager',
|
||||
help='The full class name of the key manager API class'),
|
||||
]
|
||||
|
||||
|
296
castellan/key_manager/barbican_key_manager.py
Normal file
296
castellan/key_manager/barbican_key_manager.py
Normal file
@ -0,0 +1,296 @@
|
||||
# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Key manager implementation for Barbican
|
||||
"""
|
||||
from barbicanclient import client as barbican_client
|
||||
from barbicanclient import exceptions as barbican_exceptions
|
||||
from keystoneclient.auth import token_endpoint
|
||||
from keystoneclient import session
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from castellan.common import exception
|
||||
from castellan.key_manager import key_manager
|
||||
from castellan.key_manager import symmetric_key as key_manager_key
|
||||
from castellan.openstack.common import _i18n as u
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
barbican_opts = [
|
||||
cfg.StrOpt('barbican_endpoint',
|
||||
default='http://localhost:9311/',
|
||||
help='Use this endpoint to connect to Barbican'),
|
||||
cfg.StrOpt('api_version',
|
||||
default='v1',
|
||||
help='Version of the Barbican API'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
BARBICAN_OPT_GROUP = 'barbican'
|
||||
|
||||
CONF.register_opts(barbican_opts, group=BARBICAN_OPT_GROUP)
|
||||
|
||||
session.Session.register_conf_options(CONF, BARBICAN_OPT_GROUP)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BarbicanKeyManager(key_manager.KeyManager):
|
||||
"""Key Manager Interface that wraps the Barbican client API."""
|
||||
|
||||
def __init__(self):
|
||||
self._barbican_client = None
|
||||
self._base_url = None
|
||||
|
||||
def _get_barbican_client(self, context):
|
||||
"""Creates a client to connect to the Barbican service.
|
||||
|
||||
:param context: the user context for authentication
|
||||
:return: a Barbican Client object
|
||||
:raises Forbidden: if the context is None
|
||||
"""
|
||||
|
||||
# Confirm context is provided, if not raise forbidden
|
||||
if not context:
|
||||
msg = u._("User is not authorized to use key manager.")
|
||||
LOG.error(msg)
|
||||
raise exception.Forbidden(msg)
|
||||
|
||||
if self._barbican_client and self._current_context == context:
|
||||
return self._barbican_client
|
||||
|
||||
try:
|
||||
self._current_context = context
|
||||
sess = self._get_keystone_session(context)
|
||||
|
||||
self._barbican_client = barbican_client.Client(
|
||||
session=sess,
|
||||
endpoint=self._barbican_endpoint)
|
||||
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error creating Barbican client: %s"), e)
|
||||
|
||||
self._base_url = self._create_base_url()
|
||||
|
||||
return self._barbican_client
|
||||
|
||||
def _get_keystone_session(self, context):
|
||||
sess = session.Session.load_from_conf_options(
|
||||
CONF, BARBICAN_OPT_GROUP)
|
||||
|
||||
self._barbican_endpoint = CONF.barbican.barbican_endpoint
|
||||
|
||||
auth = token_endpoint.Token(self._barbican_endpoint,
|
||||
context.auth_token)
|
||||
sess.auth = auth
|
||||
return sess
|
||||
|
||||
def _create_base_url(self):
|
||||
base_url = urllib.parse.urljoin(self._barbican_endpoint,
|
||||
CONF.barbican.api_version)
|
||||
return base_url
|
||||
|
||||
def create_key(self, context, algorithm, length, expiration=None):
|
||||
"""Creates a key.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param algorithm: the algorithm associated with the secret
|
||||
:param length: the bit length of the secret
|
||||
:param expiration: the date the key will expire
|
||||
:return: the UUID of the new key
|
||||
:raises HTTPAuthError: if key creation fails with 401
|
||||
:raises HTTPClientError: if key creation failes with 4xx
|
||||
:raises HTTPServerError: if key creation fails with 5xx
|
||||
"""
|
||||
barbican_client = self._get_barbican_client(context)
|
||||
|
||||
try:
|
||||
key_order = barbican_client.orders.create_key(
|
||||
algorithm=algorithm,
|
||||
bit_length=length,
|
||||
expiration=expiration)
|
||||
order_ref = key_order.submit()
|
||||
order = barbican_client.orders.get(order_ref)
|
||||
return self._retrieve_secret_uuid(order.secret_ref)
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error creating key: %s"), e)
|
||||
|
||||
def store_key(self, context, key, expiration=None):
|
||||
"""Stores (i.e., registers) a key with the key manager.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param key: the unencrypted secret data. Known as "payload" to the
|
||||
barbicanclient api
|
||||
:param expiration: the expiration time of the secret in ISO 8601
|
||||
format
|
||||
:returns: the UUID of the stored key
|
||||
:raises HTTPAuthError: if key creation fails with 401
|
||||
:raises HTTPClientError: if key creation failes with 4xx
|
||||
:raises HTTPServerError: if key creation fails with 5xx
|
||||
"""
|
||||
barbican_client = self._get_barbican_client(context)
|
||||
|
||||
try:
|
||||
if key.get_algorithm():
|
||||
algorithm = key.get_algorithm()
|
||||
encoded_key = key.get_encoded()
|
||||
# TODO(kfarr) add support for objects other than symmetric keys
|
||||
secret = barbican_client.secrets.create(payload=encoded_key,
|
||||
algorithm=algorithm,
|
||||
expiration=expiration)
|
||||
secret_ref = secret.store()
|
||||
return self._retrieve_secret_uuid(secret_ref)
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error storing key: %s"), e)
|
||||
|
||||
def copy_key(self, context, key_id):
|
||||
"""Copies (i.e., clones) a key stored by barbican.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param key_id: the UUID of the key to copy
|
||||
:return: the UUID of the key copy
|
||||
:raises HTTPAuthError: if key creation fails with 401
|
||||
:raises HTTPClientError: if key creation failes with 4xx
|
||||
:raises HTTPServerError: if key creation fails with 5xx
|
||||
"""
|
||||
|
||||
try:
|
||||
secret = self._get_secret(context, key_id)
|
||||
secret_data = self._get_secret_data(secret)
|
||||
# TODO(kfarr) modify to support other types of keys
|
||||
key = key_manager_key.SymmetricKey(secret.algorithm, secret_data)
|
||||
copy_uuid = self.store_key(context, key, secret.expiration)
|
||||
return copy_uuid
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error copying key: %s"), e)
|
||||
|
||||
def _create_secret_ref(self, key_id):
|
||||
"""Creates the URL required for accessing a secret.
|
||||
|
||||
:param key_id: the UUID of the key to copy
|
||||
:return: the URL of the requested secret
|
||||
"""
|
||||
if not key_id:
|
||||
msg = "Key ID is None"
|
||||
raise exception.KeyManagerError(msg)
|
||||
base_url = self._base_url
|
||||
if base_url[-1] != '/':
|
||||
base_url += '/'
|
||||
return urllib.parse.urljoin(base_url, "secrets/" + key_id)
|
||||
|
||||
def _retrieve_secret_uuid(self, secret_ref):
|
||||
"""Retrieves the UUID of the secret from the secret_ref.
|
||||
|
||||
:param secret_ref: the href of the secret
|
||||
:return: the UUID of the secret
|
||||
"""
|
||||
|
||||
# The secret_ref is assumed to be of a form similar to
|
||||
# http://host:9311/v1/secrets/d152fa13-2b41-42ca-a934-6c21566c0f40
|
||||
# with the UUID at the end. This command retrieves everything
|
||||
# after the last '/', which is the UUID.
|
||||
return secret_ref.rpartition('/')[2]
|
||||
|
||||
def _get_secret_data(self, secret):
|
||||
"""Retrieves the secret data given a secret and content_type.
|
||||
|
||||
:param secret: the secret from barbican with the payload of data
|
||||
:returns: the secret data
|
||||
"""
|
||||
# TODO(kfarr) support other types of keys
|
||||
return secret.payload
|
||||
|
||||
def _get_secret(self, context, key_id):
|
||||
"""Returns the metadata of the secret.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param key_id: UUID of the secret
|
||||
:return: the secret's metadata
|
||||
:raises HTTPAuthError: if key creation fails with 401
|
||||
:raises HTTPClientError: if key creation failes with 4xx
|
||||
:raises HTTPServerError: if key creation fails with 5xx
|
||||
"""
|
||||
|
||||
barbican_client = self._get_barbican_client(context)
|
||||
|
||||
try:
|
||||
secret_ref = self._create_secret_ref(key_id)
|
||||
return barbican_client.secrets.get(secret_ref)
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error getting secret metadata: %s"), e)
|
||||
|
||||
def get_key(self, context, key_id):
|
||||
"""Retrieves the specified key.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param key_id: the UUID of the key to retrieve
|
||||
:return: SymmetricKey representation of the key
|
||||
:raises HTTPAuthError: if key creation fails with 401
|
||||
:raises HTTPClientError: if key creation failes with 4xx
|
||||
:raises HTTPServerError: if key creation fails with 5xx
|
||||
"""
|
||||
try:
|
||||
secret = self._get_secret(context, key_id)
|
||||
secret_data = self._get_secret_data(secret)
|
||||
# TODO(kfarr) add support for other objects
|
||||
key = key_manager_key.SymmetricKey(secret.algorithm, secret_data)
|
||||
return key
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error getting key: %s"), e)
|
||||
|
||||
def delete_key(self, context, key_id):
|
||||
"""Deletes the specified key.
|
||||
|
||||
:param context: contains information of the user and the environment
|
||||
for the request (castellan/context.py)
|
||||
:param key_id: the UUID of the key to delete
|
||||
:raises HTTPAuthError: if key creation fails with 401
|
||||
:raises HTTPClientError: if key creation failes with 4xx
|
||||
:raises HTTPServerError: if key creation fails with 5xx
|
||||
"""
|
||||
barbican_client = self._get_barbican_client(context)
|
||||
|
||||
try:
|
||||
secret_ref = self._create_secret_ref(key_id)
|
||||
barbican_client.secrets.delete(secret_ref)
|
||||
except (barbican_exceptions.HTTPAuthError,
|
||||
barbican_exceptions.HTTPClientError,
|
||||
barbican_exceptions.HTTPServerError) as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(u._LE("Error deleting key: %s"), e)
|
@ -31,8 +31,8 @@ class KeyManager(object):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_key(self, context, algorithm='AES', length=256,
|
||||
expiration=None, **kwargs):
|
||||
def create_key(self, context, algorithm, length,
|
||||
expiration=None):
|
||||
"""Creates a key.
|
||||
|
||||
This method creates a key and returns the key's UUID. If the specified
|
||||
@ -42,7 +42,7 @@ class KeyManager(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def store_key(self, context, key, expiration=None, **kwargs):
|
||||
def store_key(self, context, key, expiration=None):
|
||||
"""Stores (i.e., registers) a key with the key manager.
|
||||
|
||||
This method stores the specified key and returns its UUID that
|
||||
@ -53,7 +53,7 @@ class KeyManager(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def copy_key(self, context, key_id, **kwargs):
|
||||
def copy_key(self, context, key_id):
|
||||
"""Copies (i.e., clones) a key stored by the key manager.
|
||||
|
||||
This method copies the specified key and returns the copy's UUID. If
|
||||
@ -68,7 +68,7 @@ class KeyManager(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_key(self, context, key_id, **kwargs):
|
||||
def get_key(self, context, key_id):
|
||||
"""Retrieves the specified key.
|
||||
|
||||
Implementations should verify that the caller has permissions to
|
||||
@ -84,7 +84,7 @@ class KeyManager(object):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_key(self, context, key_id, **kwargs):
|
||||
def delete_key(self, context, key_id):
|
||||
"""Deletes the specified key.
|
||||
|
||||
Implementations should verify that the caller has permission to delete
|
||||
|
218
castellan/tests/key_manager/test_barbican_key_manager.py
Normal file
218
castellan/tests/key_manager/test_barbican_key_manager.py
Normal file
@ -0,0 +1,218 @@
|
||||
# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Test cases for the barbican key manager.
|
||||
"""
|
||||
|
||||
import array
|
||||
|
||||
import mock
|
||||
|
||||
from castellan.common import exception
|
||||
from castellan.key_manager import barbican_key_manager
|
||||
from castellan.key_manager import symmetric_key as key_manager_key
|
||||
from castellan.tests.key_manager import test_key_manager
|
||||
|
||||
|
||||
class BarbicanKeyManagerTestCase(test_key_manager.KeyManagerTestCase):
|
||||
|
||||
def _create_key_manager(self):
|
||||
return barbican_key_manager.BarbicanKeyManager()
|
||||
|
||||
def setUp(self):
|
||||
super(BarbicanKeyManagerTestCase, self).setUp()
|
||||
|
||||
# Create fake auth_token
|
||||
self.ctxt = mock.Mock()
|
||||
self.ctxt.auth_token = "fake_token"
|
||||
|
||||
# Create mock barbican client
|
||||
self._build_mock_barbican()
|
||||
|
||||
# Create a key_id, secret_ref, pre_hex, and hex to use
|
||||
self.key_id = "d152fa13-2b41-42ca-a934-6c21566c0f40"
|
||||
self.secret_ref = ("http://host:9311/v1/secrets/" + self.key_id)
|
||||
self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY="
|
||||
self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4"
|
||||
"534ae16")
|
||||
self.key_mgr._base_url = "http://host:9311/v1/"
|
||||
self.addCleanup(self._restore)
|
||||
|
||||
def _restore(self):
|
||||
try:
|
||||
getattr(self, 'original_key')
|
||||
key_manager_key.SymmetricKey = self.original_key
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
def _build_mock_barbican(self):
|
||||
self.mock_barbican = mock.MagicMock(name='mock_barbican')
|
||||
|
||||
# Set commonly used methods
|
||||
self.get = self.mock_barbican.secrets.get
|
||||
self.delete = self.mock_barbican.secrets.delete
|
||||
self.store = self.mock_barbican.secrets.store
|
||||
self.create = self.mock_barbican.secrets.create
|
||||
|
||||
self.key_mgr._barbican_client = self.mock_barbican
|
||||
self.key_mgr._current_context = self.ctxt
|
||||
|
||||
def _build_mock_symKey(self):
|
||||
self.mock_symKey = mock.Mock()
|
||||
|
||||
def fake_sym_key(alg, key):
|
||||
self.mock_symKey.get_encoded.return_value = key
|
||||
self.mock_symKey.get_algorithm.return_value = alg
|
||||
return self.mock_symKey
|
||||
self.original_key = key_manager_key.SymmetricKey
|
||||
key_manager_key.SymmetricKey = fake_sym_key
|
||||
|
||||
def test_copy_key(self):
|
||||
# Create metadata for original secret
|
||||
original_secret_metadata = mock.Mock()
|
||||
original_secret_metadata.algorithm = mock.sentinel.alg
|
||||
original_secret_metadata.bit_length = mock.sentinel.bit
|
||||
original_secret_metadata.name = mock.sentinel.name
|
||||
original_secret_metadata.expiration = mock.sentinel.expiration
|
||||
original_secret_metadata.mode = mock.sentinel.mode
|
||||
content_types = {'default': 'fake_type'}
|
||||
original_secret_metadata.content_types = content_types
|
||||
original_secret_data = mock.Mock()
|
||||
original_secret_metadata.payload = original_secret_data
|
||||
|
||||
# Create href for copied secret
|
||||
copied_secret = mock.Mock()
|
||||
copied_secret.store.return_value = (
|
||||
'http://http://host:9311/v1/secrets/uuid')
|
||||
|
||||
# Set get and create return values
|
||||
self.get.return_value = original_secret_metadata
|
||||
self.create.return_value = copied_secret
|
||||
|
||||
# Create the mock key
|
||||
self._build_mock_symKey()
|
||||
|
||||
# Copy the original
|
||||
self.key_mgr.copy_key(self.ctxt, self.key_id)
|
||||
|
||||
# Assert proper methods were called
|
||||
self.get.assert_called_once_with(self.secret_ref)
|
||||
self.create.assert_called_once_with(
|
||||
payload=self.mock_symKey.get_encoded(),
|
||||
algorithm=mock.sentinel.alg,
|
||||
expiration=mock.sentinel.expiration)
|
||||
copied_secret.store.assert_called_once_with()
|
||||
|
||||
def test_copy_null_context(self):
|
||||
self.key_mgr._barbican_client = None
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.copy_key, None, self.key_id)
|
||||
|
||||
def test_create_key(self):
|
||||
# Create order_ref_url and assign return value
|
||||
order_ref_url = ("http://localhost:9311/v1/orders/"
|
||||
"4fe939b7-72bc-49aa-bd1e-e979589858af")
|
||||
key_order = mock.Mock()
|
||||
self.mock_barbican.orders.create_key.return_value = key_order
|
||||
key_order.submit.return_value = order_ref_url
|
||||
|
||||
# Create order and assign return value
|
||||
order = mock.Mock()
|
||||
order.secret_ref = self.secret_ref
|
||||
self.mock_barbican.orders.get.return_value = order
|
||||
|
||||
# Create the key, get the UUID
|
||||
returned_uuid = self.key_mgr.create_key(self.ctxt,
|
||||
algorithm='AES',
|
||||
length=256)
|
||||
|
||||
self.mock_barbican.orders.get.assert_called_once_with(order_ref_url)
|
||||
self.assertEqual(self.key_id, returned_uuid)
|
||||
|
||||
def test_create_null_context(self):
|
||||
self.key_mgr._barbican_client = None
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key, None, 'AES', 256)
|
||||
|
||||
def test_delete_null_context(self):
|
||||
self.key_mgr._barbican_client = None
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.delete_key, None, self.key_id)
|
||||
|
||||
def test_delete_key(self):
|
||||
self.key_mgr.delete_key(self.ctxt, self.key_id)
|
||||
self.delete.assert_called_once_with(self.secret_ref)
|
||||
|
||||
def test_delete_unknown_key(self):
|
||||
self.assertRaises(exception.KeyManagerError,
|
||||
self.key_mgr.delete_key, self.ctxt, None)
|
||||
|
||||
def test_get_key(self):
|
||||
original_secret_metadata = mock.Mock()
|
||||
original_secret_metadata.algorithm = mock.sentinel.alg
|
||||
original_secret_metadata.bit_length = mock.sentinel.bit
|
||||
original_secret_data = mock.Mock()
|
||||
original_secret_metadata.payload = original_secret_data
|
||||
|
||||
self.mock_barbican.secrets.get.return_value = original_secret_metadata
|
||||
key = self.key_mgr.get_key(self.ctxt, self.key_id)
|
||||
|
||||
self.get.assert_called_once_with(self.secret_ref)
|
||||
self.assertEqual(key.get_encoded(), original_secret_data)
|
||||
|
||||
def test_get_null_context(self):
|
||||
self.key_mgr._barbican_client = None
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.get_key, None, self.key_id)
|
||||
|
||||
def test_get_unknown_key(self):
|
||||
self.assertRaises(exception.KeyManagerError,
|
||||
self.key_mgr.get_key, self.ctxt, None)
|
||||
|
||||
def test_store_key_base64(self):
|
||||
# Create Key to store
|
||||
secret_key = array.array('B', [0x01, 0x02, 0xA0, 0xB3]).tolist()
|
||||
_key = key_manager_key.SymmetricKey('AES', secret_key)
|
||||
|
||||
# Define the return values
|
||||
secret = mock.Mock()
|
||||
self.create.return_value = secret
|
||||
secret.store.return_value = self.secret_ref
|
||||
|
||||
# Store the Key
|
||||
returned_uuid = self.key_mgr.store_key(self.ctxt, _key)
|
||||
|
||||
self.create.assert_called_once_with(algorithm='AES',
|
||||
payload=secret_key,
|
||||
expiration=None)
|
||||
self.assertEqual(self.key_id, returned_uuid)
|
||||
|
||||
def test_store_key_plaintext(self):
|
||||
# Create the plaintext key
|
||||
secret_key_text = "This is a test text key."
|
||||
_key = key_manager_key.SymmetricKey('AES', secret_key_text)
|
||||
|
||||
# Store the Key
|
||||
self.key_mgr.store_key(self.ctxt, _key)
|
||||
self.create.assert_called_once_with(algorithm='AES',
|
||||
payload=secret_key_text,
|
||||
expiration=None)
|
||||
self.assertEqual(0, self.store.call_count)
|
||||
|
||||
def test_store_null_context(self):
|
||||
self.key_mgr._barbican_client = None
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.store_key, None, None)
|
@ -6,6 +6,7 @@ hacking>=0.9.2,<0.10
|
||||
|
||||
coverage>=3.6
|
||||
discover
|
||||
python-barbicanclient>=3.0.1
|
||||
python-subunit
|
||||
sphinx>=1.1.2
|
||||
oslosphinx
|
||||
|
Loading…
Reference in New Issue
Block a user