Merge "Retrieve encryption root secret from Barbican"
This commit is contained in:
commit
b0142d0cd2
@ -86,6 +86,11 @@ The keymaster config option ``encryption_root_secret`` MUST be set to a value
|
||||
of at least 44 valid base-64 characters before the middleware is used and
|
||||
should be consistent across all proxy servers. The minimum length of 44 has
|
||||
been chosen because it is the length of a base-64 encoded 32 byte value.
|
||||
Alternatives to specifying the encryption root secret directly in the
|
||||
`proxy-server.conf` file are storing it in a separate file, or storing it in
|
||||
an :ref:`external key management system
|
||||
<encryption_root_secret_in_external_kms>` such as `Barbican
|
||||
<https://docs.openstack.org/barbican>`_.
|
||||
|
||||
.. note::
|
||||
|
||||
@ -119,6 +124,135 @@ into GET and PUT requests by the :ref:`copy` middleware before reaching the
|
||||
encryption middleware and as a result object data and metadata is decrypted and
|
||||
re-encrypted when copied.
|
||||
|
||||
.. _encryption_root_secret_in_external_kms:
|
||||
|
||||
Encryption Root Secret in External Key Management System
|
||||
--------------------------------------------------------
|
||||
|
||||
The benefits of using
|
||||
a dedicated system for storing the encryption root secret include the
|
||||
auditing and access control infrastructure that are already in place in such a
|
||||
system, and the fact that an encryption root secret stored in a key management
|
||||
system (KMS) may be backed by a hardware security module (HSM) for additional
|
||||
security. Another significant benefit of storing the root encryption secret in
|
||||
an external KMS is that it is in this case never stored on a disk in the Swift
|
||||
cluster.
|
||||
|
||||
Make sure the required dependencies are installed for retrieving an encryption
|
||||
root secret from an external KMS. This can be done when installing Swift (add
|
||||
the ``-e`` flag to install as a development version) by changing to the Swift
|
||||
directory and running the following command to install Swift together with
|
||||
the ``kms_keymaster`` extra dependencies::
|
||||
|
||||
sudo pip install .[kms_keymaster]
|
||||
|
||||
Another way to install the dependencies is by making sure the
|
||||
following lines exist in the requirements.txt file, and installing them using
|
||||
``pip install -r requirements.txt``::
|
||||
|
||||
cryptography>=1.6 # BSD/Apache-2.0
|
||||
castellan>=0.6.0
|
||||
|
||||
.. note::
|
||||
|
||||
If any of the required packages is already installed, the ``--upgrade``
|
||||
flag may be required for the ``pip`` commands in order for the required
|
||||
minimum version to be installed.
|
||||
|
||||
To make use of an encryption root secret stored in an external KMS,
|
||||
replace the keymaster middleware with the kms_keymaster middleware in the
|
||||
proxy server WSGI pipeline in `proxy-server.conf`, in the order shown in this
|
||||
example::
|
||||
|
||||
<other middleware> kms_keymaster encryption proxy-logging proxy-server
|
||||
|
||||
and add a section to the same file::
|
||||
|
||||
[filter:kms_keymaster]
|
||||
use = egg:swift#kms_keymaster
|
||||
keymaster_config_path = file_with_kms_keymaster_config
|
||||
|
||||
Create or edit the file `file_with_kms_keymaster_config` referenced above.
|
||||
For further details on the middleware configuration options, see the
|
||||
`keymaster.conf-sample` file. An example of the content of this file, with
|
||||
optional parameters omitted, is below::
|
||||
|
||||
[kms_keymaster]
|
||||
key_id = changeme
|
||||
username = swift
|
||||
password = password
|
||||
project_name = swift
|
||||
auth_endpoint = http://keystonehost:5000/v3
|
||||
|
||||
The encryption root secret shall be created and stored in the external key
|
||||
management system before it can be used by the keymaster. It shall be stored
|
||||
as a symmetric key, with content type ``application/octet-stream``,
|
||||
``base64`` content encoding, ``AES`` algorithm, bit length ``256``, and secret
|
||||
type ``symmetric``. The mode ``ctr`` may also be stored for informational
|
||||
purposes - it is not currently checked by the keymaster.
|
||||
|
||||
The following command can be used to store the currently configured
|
||||
``encryption_root_secret`` value from the `proxy-server.conf` file
|
||||
in Barbican::
|
||||
|
||||
openstack secret store --name swift_root_secret \
|
||||
--payload-content-type="application/octet-stream" \
|
||||
--payload-content-encoding="base64" --algorithm aes --bit-length 256 \
|
||||
--mode ctr --secret-type symmetric --payload <base64_encoded_root_secret>
|
||||
|
||||
Alternatively, the existing root secret can also be stored in Barbican using
|
||||
`curl <http://developer.openstack.org/api-guide/key-manager/secrets.html>`__.
|
||||
|
||||
.. note::
|
||||
|
||||
The credentials used to store the secret in Barbican shall be the same
|
||||
ones that the proxy server uses to retrieve the secret, i.e., the ones
|
||||
configured in the `keymaster.conf` file. For clarity reasons the commands
|
||||
shown here omit the credentials - they may be specified explicitly, or in
|
||||
environment variables.
|
||||
|
||||
Instead of using an existing root secret, Barbican can also be asked to
|
||||
generate a new 256-bit root secret, with content type
|
||||
``application/octet-stream`` and algorithm ``AES`` (the ``mode`` parameter is
|
||||
currently optional)::
|
||||
|
||||
openstack secret order create --name swift_root_secret \
|
||||
--payload-content-type="application/octet-stream" --algorithm aes \
|
||||
--bit-length 256 --mode ctr key
|
||||
|
||||
The ``order create`` creates an asynchronous request to create the actual
|
||||
secret.
|
||||
The order can be retrieved using ``openstack secret order get``, and once the
|
||||
order completes successfully, the output will show the key id of the generated
|
||||
root secret.
|
||||
Keys currently stored in Barbican can be listed using the
|
||||
``openstack secret list`` command.
|
||||
|
||||
.. note::
|
||||
|
||||
Both the order (the asynchronous request for creating or storing a secret),
|
||||
and the actual secret itself, have similar unique identifiers. Once the
|
||||
order has been completed, the key id is shown in the output of the ``order
|
||||
get`` command.
|
||||
|
||||
The keymaster uses the explicitly configured username and password (and
|
||||
project name etc.) from the `keymaster.conf` file for retrieving the encryption
|
||||
root secret from an external key management system. The `Castellan library
|
||||
<http://docs.openstack.org/developer/castellan/>`_ is used to communicate with
|
||||
Barbican.
|
||||
|
||||
For the proxy server, reading the encryption root secret directly from the
|
||||
`proxy-server.conf` file, from the `keymaster.conf` file pointed to
|
||||
from the `proxy-server.conf` file, or from an external key management system
|
||||
such as Barbican, are all functionally equivalent. In case reading the
|
||||
encryption root secret from the external key management system fails, the
|
||||
proxy server will not start up. If the encryption root secret is retrieved
|
||||
successfully, it is cached in memory in the proxy server.
|
||||
|
||||
For further details on the configuration options, see the
|
||||
`[filter:kms_keymaster]` section in the `proxy-server.conf-sample` file, and
|
||||
the `keymaster.conf-sample` file.
|
||||
|
||||
Upgrade Considerations
|
||||
----------------------
|
||||
|
||||
|
77
etc/keymaster.conf-sample
Normal file
77
etc/keymaster.conf-sample
Normal file
@ -0,0 +1,77 @@
|
||||
[keymaster]
|
||||
# Sets the root secret from which encryption keys are derived. This must be set
|
||||
# before first use to a value that is a base64 encoding of at least 32 bytes.
|
||||
# The security of all encrypted data critically depends on this key, therefore
|
||||
# it should be set to a high-entropy value. For example, a suitable value may
|
||||
# be obtained by base-64 encoding a 32 byte (or longer) value generated by a
|
||||
# cryptographically secure random number generator. Changing the root secret is
|
||||
# likely to result in data loss. If this option is set, the root secret MUST
|
||||
# NOT be set in proxy-server.conf.
|
||||
# encryption_root_secret = changeme
|
||||
|
||||
[kms_keymaster]
|
||||
# The kms_keymaster section is used for configuring a keymaster that retrieves
|
||||
# the encryption root secret from an external key management system (kms),
|
||||
# using the Castellan abstraction layer. Castellan can support various kms
|
||||
# backends that use Keystone for authentication. Currently, the only
|
||||
# implemented backend is for Barbican.
|
||||
|
||||
# The api_class tells Castellan which key manager to use to access the external
|
||||
# key management system. The default value that accesses Barbican is
|
||||
# castellan.key_manager.barbican_key_manager.BarbicanKeyManager.
|
||||
# api_class = castellan.key_manager.barbican_key_manager.BarbicanKeyManager
|
||||
|
||||
# The configuration options below apply to a Barbican KMS being accessed using
|
||||
# Castellan. If another KMS type is used (by specifying another value for
|
||||
# api_class), then other configuration options may be required.
|
||||
|
||||
# The key_id is the identifier of the root secret stored in the KMS. For
|
||||
# details of how to store an existing root secret in Barbican, or how to
|
||||
# generate a new root secret in Barbican, see the 'overview_encryption'
|
||||
# documentation.
|
||||
# The key_id is the final part of the secret href returned in the
|
||||
# output of an 'openstack secret order get' command after an order to store or
|
||||
# create a key has been successfully completed. See the 'overview_encryption'
|
||||
# documentation for more information on this command.
|
||||
# key_id = changeme
|
||||
|
||||
# The Keystone username of the user used to access the key from the KMS. The
|
||||
# username shall be set to match an existing user.
|
||||
# username = changeme
|
||||
|
||||
# The password to go with the Keystone username above.
|
||||
# password = changeme
|
||||
|
||||
# The Keystone project name. For security reasons, it is recommended to set
|
||||
# the project_name to a project separate from the service project used by
|
||||
# other OpenStack services. Thereby, if another service is compromised, it will
|
||||
# not have access to the Swift root encryption secret. It is recommended that
|
||||
# the swift user is the only one that has a role in this project.
|
||||
# project_name = changeme
|
||||
# Instead of the project name, the project id may also be used.
|
||||
# project_id = changeme
|
||||
|
||||
# The Keystone URL to authenticate to. The value of auth_url may be
|
||||
# set according to the value of auth_uri in [filter:authtoken] in
|
||||
# proxy-server.conf. Currently, the only supported version of the Identity API
|
||||
# is v3, which requires that the url end in "/v3".
|
||||
# auth_endpoint = http://keystonehost:5000/v3
|
||||
|
||||
# The project and user domain names may optionally be specified. If they are
|
||||
# not specified, the default values of 'Default' (for *_domain_name) and
|
||||
# 'default' (for *_domain_id) are used (note the capitalization).
|
||||
# project_domain_name = Default
|
||||
# user_domain_name = Default
|
||||
# Instead of the project domain name and user domain name, the project domain
|
||||
# id and user domain id may also be specified.
|
||||
# project_domain_id = default
|
||||
# user_domain_id = default
|
||||
|
||||
# The following configuration options may also be used in addition to/instead
|
||||
# of the above options. Refer to the Keystone documentation for more details
|
||||
# on the usage of the options: https://docs.openstack.org/keystone/
|
||||
# user_id = changeme
|
||||
# trust_id = changeme
|
||||
# reauthenticate = changeme
|
||||
# domain_id = changeme
|
||||
# domain_name = changeme
|
@ -889,6 +889,24 @@ encryption_root_secret = changeme
|
||||
# MUST NOT be set in proxy-server.conf.
|
||||
# keymaster_config_path =
|
||||
|
||||
# To store the encryption root secret in a remote key management system (KMS)
|
||||
# such as Barbican, replace the keymaster middleware with the kms_keymaster
|
||||
# middleware in the proxy-server pipeline. They should be to the right of all
|
||||
# other middleware apart from the final proxy-logging middleware, and in the
|
||||
# order shown in this example:
|
||||
# <other middleware> kms_keymaster encryption proxy-logging proxy-server
|
||||
[filter:kms_keymaster]
|
||||
use = egg:swift#kms_keymaster
|
||||
|
||||
# Sets the path from which the keymaster config options should be read. This
|
||||
# allows multiple processes which need to be encryption-aware (for example,
|
||||
# proxy-server and container-sync) to share the same config file, ensuring
|
||||
# that the encryption keys used are the same. The format expected is similar
|
||||
# to other config files, with a single [kms_keymaster] section. See the
|
||||
# keymaster.conf-sample file for details on the kms_keymaster configuration
|
||||
# options.
|
||||
# keymaster_config_path =
|
||||
|
||||
[filter:encryption]
|
||||
use = egg:swift#encryption
|
||||
|
||||
|
@ -10,4 +10,4 @@ pastedeploy>=1.3.3
|
||||
six>=1.9.0
|
||||
xattr>=0.4
|
||||
PyECLib>=1.3.1 # BSD
|
||||
cryptography>=1.0,!=1.3.0 # BSD/Apache-2.0
|
||||
cryptography!=2.0,>=1.6 # BSD/Apache-2.0
|
||||
|
@ -63,6 +63,11 @@ scripts =
|
||||
bin/swift-ring-builder-analyzer
|
||||
bin/swift-temp-url
|
||||
|
||||
[extras]
|
||||
kms_keymaster =
|
||||
oslo.config>=4.0.0,!=4.3.0,!=4.4.0 # Apache-2.0
|
||||
castellan>=0.7.0 # Apache-2.0
|
||||
|
||||
[entry_points]
|
||||
paste.app_factory =
|
||||
proxy = swift.proxy.server:app_factory
|
||||
@ -100,6 +105,7 @@ paste.filter_factory =
|
||||
copy = swift.common.middleware.copy:filter_factory
|
||||
keymaster = swift.common.middleware.crypto.keymaster:filter_factory
|
||||
encryption = swift.common.middleware.crypto:filter_factory
|
||||
kms_keymaster = swift.common.middleware.crypto.kms_keymaster:filter_factory
|
||||
|
||||
[build_sphinx]
|
||||
all_files = 1
|
||||
|
@ -101,42 +101,61 @@ class KeyMasterContext(WSGIContext):
|
||||
class KeyMaster(object):
|
||||
"""Middleware for providing encryption keys.
|
||||
|
||||
The middleware requires its ``encryption_root_secret`` option to be set.
|
||||
This is the root secret from which encryption keys are derived. This must
|
||||
be set before first use to a value that is a base64 encoding of at least 32
|
||||
bytes. The security of all encrypted data critically depends on this key,
|
||||
therefore it should be set to a high-entropy value. For example, a suitable
|
||||
value may be obtained by base-64 encoding a 32 byte (or longer) value
|
||||
generated by a cryptographically secure random number generator. Changing
|
||||
the root secret is likely to result in data loss.
|
||||
The middleware requires its encryption root secret to be set. This is the
|
||||
root secret from which encryption keys are derived. This must be set before
|
||||
first use to a value that is at least 256 bits. The security of all
|
||||
encrypted data critically depends on this key, therefore it should be set
|
||||
to a high-entropy value. For example, a suitable value may be obtained by
|
||||
generating a 32 byte (or longer) value using a cryptographically secure
|
||||
random number generator. Changing the root secret is likely to result in
|
||||
data loss.
|
||||
"""
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.keymaster_config_path = conf.get('keymaster_config_path')
|
||||
# The _get_root_secret() function is overridden by other keymasters
|
||||
self.root_secret = self._get_root_secret(conf)
|
||||
|
||||
keymaster_config_path = conf.get('keymaster_config_path')
|
||||
if keymaster_config_path:
|
||||
if any(opt in conf for opt in ('encryption_root_secret',)):
|
||||
def _get_root_secret(self, conf):
|
||||
"""
|
||||
This keymaster requires its ``encryption_root_secret`` option to be
|
||||
set. This must be set before first use to a value that is a base64
|
||||
encoding of at least 32 bytes. The encryption root secret is stored
|
||||
in either proxy-server.conf, or in an external file referenced from
|
||||
proxy-server.conf using ``keymaster_config_path``.
|
||||
|
||||
:param conf: the keymaster config section from proxy-server.conf
|
||||
:type conf: dict
|
||||
|
||||
:return: the encryption root secret binary bytes
|
||||
:rtype: bytearray
|
||||
"""
|
||||
if self.keymaster_config_path:
|
||||
keymaster_opts = ['encryption_root_secret']
|
||||
if any(opt in conf for opt in keymaster_opts):
|
||||
raise ValueError('keymaster_config_path is set, but there '
|
||||
'are other config options specified!')
|
||||
conf = readconf(keymaster_config_path, 'keymaster')
|
||||
|
||||
self.root_secret = conf.get('encryption_root_secret')
|
||||
'are other config options specified: %s' %
|
||||
", ".join(list(
|
||||
set(keymaster_opts).intersection(conf))))
|
||||
conf = readconf(self.keymaster_config_path, 'keymaster')
|
||||
b64_root_secret = conf.get('encryption_root_secret')
|
||||
try:
|
||||
# b64decode will silently discard bad characters, but we should
|
||||
# treat them as an error
|
||||
if not isinstance(self.root_secret, six.string_types) or any(
|
||||
if not isinstance(b64_root_secret, six.string_types) or any(
|
||||
c not in string.digits + string.ascii_letters + '/+\r\n'
|
||||
for c in self.root_secret.strip('\r\n=')):
|
||||
for c in b64_root_secret.strip('\r\n=')):
|
||||
raise ValueError
|
||||
self.root_secret = base64.b64decode(self.root_secret)
|
||||
if len(self.root_secret) < 32:
|
||||
binary_root_secret = base64.b64decode(b64_root_secret)
|
||||
if len(binary_root_secret) < 32:
|
||||
raise ValueError
|
||||
return binary_root_secret
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError(
|
||||
'encryption_root_secret option in %s must be a base64 '
|
||||
'encoding of at least 32 raw bytes' % (
|
||||
keymaster_config_path or 'proxy-server.conf'))
|
||||
self.keymaster_config_path or 'proxy-server.conf'))
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
req = Request(env)
|
||||
|
114
swift/common/middleware/crypto/kms_keymaster.py
Normal file
114
swift/common/middleware/crypto/kms_keymaster.py
Normal file
@ -0,0 +1,114 @@
|
||||
# Copyright (c) 2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from castellan import key_manager, options
|
||||
from castellan.common.credentials import keystone_password
|
||||
from oslo_config import cfg
|
||||
from swift.common.middleware.crypto.keymaster import KeyMaster
|
||||
from swift.common.utils import readconf
|
||||
|
||||
|
||||
class KmsKeyMaster(KeyMaster):
|
||||
"""Middleware for retrieving a encryption root secret from an external KMS.
|
||||
|
||||
The middleware accesses the encryption root secret from an external key
|
||||
management system (KMS), e.g., a Barbican service, using Castellan. To be
|
||||
able to do so, the appropriate configuration options shall be set in the
|
||||
proxy-server.conf file, or in the configuration pointed to using the
|
||||
keymaster_config_path configuration value in the proxy-server.conf file.
|
||||
"""
|
||||
|
||||
def __init__(self, app, conf):
|
||||
# Call the superclass __init__() method, which calls the overridden
|
||||
# self._get_root_secret() below.
|
||||
super(KmsKeyMaster, self).__init__(app, conf)
|
||||
|
||||
def _get_root_secret(self, conf):
|
||||
"""
|
||||
Retrieve the root encryption secret from an external key management
|
||||
system using Castellan.
|
||||
|
||||
:param conf: the keymaster config section from proxy-server.conf
|
||||
:type conf: dict
|
||||
|
||||
:return: the encryption root secret binary bytes
|
||||
:rtype: bytearray
|
||||
"""
|
||||
if self.keymaster_config_path is not None:
|
||||
keymaster_opts = ['username', 'password', 'project_name',
|
||||
'user_domain_name', 'project_domain_name',
|
||||
'user_id', 'user_domain_id', 'trust_id',
|
||||
'domain_id', 'domain_name', 'project_id',
|
||||
'project_domain_id', 'reauthenticate',
|
||||
'auth_endpoint', 'api_class', 'key_id']
|
||||
if any(opt in conf for opt in keymaster_opts):
|
||||
raise ValueError('keymaster_config_path is set, but there '
|
||||
'are other config options specified: %s' %
|
||||
", ".join(list(
|
||||
set(keymaster_opts).intersection(conf))))
|
||||
conf = readconf(self.keymaster_config_path, 'kms_keymaster')
|
||||
ctxt = keystone_password.KeystonePassword(
|
||||
username=conf.get('username'),
|
||||
password=conf.get('password'),
|
||||
project_name=conf.get('project_name'),
|
||||
user_domain_name=conf.get('user_domain_name'),
|
||||
project_domain_name=conf.get(
|
||||
'project_domain_name'),
|
||||
user_id=conf.get('user_id'),
|
||||
user_domain_id=conf.get('user_domain_id'),
|
||||
trust_id=conf.get('trust_id'),
|
||||
domain_id=conf.get('domain_id'),
|
||||
domain_name=conf.get('domain_name'),
|
||||
project_id=conf.get('project_id'),
|
||||
project_domain_id=conf.get('project_domain_id'),
|
||||
reauthenticate=conf.get('reauthenticate'))
|
||||
oslo_conf = cfg.ConfigOpts()
|
||||
options.set_defaults(
|
||||
oslo_conf, auth_endpoint=conf.get('auth_endpoint'),
|
||||
api_class=conf.get('api_class')
|
||||
)
|
||||
options.enable_logging()
|
||||
manager = key_manager.API(oslo_conf)
|
||||
key = manager.get(ctxt, conf.get('key_id'))
|
||||
if key is None:
|
||||
raise ValueError("Retrieval of encryption root secret with key_id "
|
||||
"'%s' returned None." % conf.get('key_id'))
|
||||
try:
|
||||
if (key.bit_length < 256) or (key.algorithm.lower() != "aes"):
|
||||
raise ValueError('encryption root secret stored in the '
|
||||
'external KMS must be an AES key of at least '
|
||||
'256 bits (provided key length: %d, provided '
|
||||
'key algorithm: %s)'
|
||||
% (key.bit_length, key.algorithm))
|
||||
if (key.format != 'RAW'):
|
||||
raise ValueError('encryption root secret stored in the '
|
||||
'external KMS must be in RAW format and not '
|
||||
'e.g., as a base64 encoded string (format of '
|
||||
'key with uuid %s: %s)' %
|
||||
(conf.get('key_id'), key.format))
|
||||
except Exception:
|
||||
raise ValueError("Secret with key_id '%s' is not a symmetric key "
|
||||
"(type: %s)" % (conf.get('key_id'),
|
||||
str(type(key))))
|
||||
return key.get_encoded()
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
conf = global_conf.copy()
|
||||
conf.update(local_conf)
|
||||
|
||||
def kms_keymaster_filter(app):
|
||||
return KmsKeyMaster(app, conf)
|
||||
|
||||
return kms_keymaster_filter
|
59
test/functional/mock_swift_key_manager.py
Normal file
59
test/functional/mock_swift_key_manager.py
Normal file
@ -0,0 +1,59 @@
|
||||
# Copyright (c) 2017 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from castellan.tests.unit.key_manager.mock_key_manager import MockKeyManager
|
||||
|
||||
|
||||
class MockSwiftKeyManager(MockKeyManager):
|
||||
"""Mocking key manager for Swift functional tests.
|
||||
|
||||
This mock key manager implementation extends the Castellan mock key
|
||||
manager with support for a pre-existing key that the Swift proxy server
|
||||
can use as the root encryption secret. The actual key material bytes
|
||||
for the root encryption secret changes each time this mock key manager is
|
||||
instantiated, meaning that data written earlier is no longer accessible
|
||||
once the proxy server is restarted.
|
||||
|
||||
To use this mock key manager instead of the default Barbican key manager,
|
||||
set the following property in the [kms_keymaster] section in the
|
||||
keymaster.conf configuration file pointed to using the
|
||||
keymaster_config_path property in the [filter:kms_keymaster] section in the
|
||||
proxy-server.conf file:
|
||||
|
||||
api_class = test.functional.mock_swift_key_manager.MockSwiftKeyManager
|
||||
|
||||
In case of a Python import error, make sure that the swift directory under
|
||||
which this mock key manager resides is early in the sys.path, e.g., by
|
||||
setting it in the PYTHONPATH environment variable before starting the
|
||||
proxy server.
|
||||
|
||||
This key manager is not suitable for use in production deployments.
|
||||
"""
|
||||
|
||||
def __init__(self, configuration=None):
|
||||
super(MockSwiftKeyManager, self).__init__(configuration)
|
||||
'''
|
||||
Create a new, random symmetric key for use as the encryption root
|
||||
secret.
|
||||
'''
|
||||
existing_key = self._generate_key(algorithm='AES', length=256)
|
||||
'''
|
||||
Store the key under the UUID 'mock_key_manager_existing_key', from
|
||||
where it can be retrieved by the proxy server. In the kms_keymaster
|
||||
configuration, set the following property to use this key:
|
||||
|
||||
key_id = mock_key_manager_existing_key
|
||||
'''
|
||||
self.keys['mock_key_manager_existing_key'] = existing_key
|
@ -213,9 +213,11 @@ class TestKeymaster(unittest.TestCase):
|
||||
'keymaster_config_path': '/ets/swift/keymaster.conf'}
|
||||
with self.assertRaises(ValueError) as err:
|
||||
keymaster.KeyMaster(self.swift, conf)
|
||||
self.assertEqual('keymaster_config_path is set, but there are '
|
||||
'other config options specified!',
|
||||
err.exception.message)
|
||||
expected_message = ('keymaster_config_path is set, but there are '
|
||||
'other config options specified:')
|
||||
self.assertTrue(err.exception.message.startswith(expected_message),
|
||||
"Error message does not start with '%s'" %
|
||||
expected_message)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
783
test/unit/common/middleware/crypto/test_kms_keymaster.py
Normal file
783
test/unit/common/middleware/crypto/test_kms_keymaster.py
Normal file
@ -0,0 +1,783 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import base64
|
||||
import mock
|
||||
import unittest
|
||||
import sys
|
||||
sys.modules['castellan'] = mock.Mock()
|
||||
sys.modules['castellan.common'] = mock.Mock()
|
||||
sys.modules['castellan.common.credentials'] = mock.Mock()
|
||||
|
||||
from keystoneauth1.exceptions.connection import ConnectFailure
|
||||
from keystoneauth1.exceptions.http import Unauthorized
|
||||
from keystoneclient.exceptions import DiscoveryFailure
|
||||
from swift.common.middleware.crypto import kms_keymaster
|
||||
from swift.common.swob import Request
|
||||
from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts
|
||||
|
||||
TEST_KMS_INVALID_KEY_ID = 'invalid-kms-key-id'
|
||||
TEST_KMS_NONEXISTENT_KEY_ID = '11111111-1111-1111-1111-ffffffffffff'
|
||||
TEST_KMS_OPAQUE_KEY_ID = '22222222-2222-2222-2222-aaaaaaaaaaaa'
|
||||
TEST_KMS_SHORT_KEY_ID = '22222222-2222-2222-2222-bbbbbbbbbbbb'
|
||||
TEST_KMS_DES_KEY_ID = '22222222-2222-2222-2222-cccccccccccc'
|
||||
TEST_KMS_NONE_KEY_ID = '22222222-2222-2222-2222-dddddddddddd'
|
||||
TEST_KMS_INVALID_API_VERSION = 'vBadVersion'
|
||||
TEST_KMS_INVALID_USER_DOMAIN_NAME = "baduserdomainname"
|
||||
TEST_KMS_CONNECT_FAILURE_URL = 'http://endpoint_url_connect_error:45621'
|
||||
TEST_KMS_NON_BARBICAN_URL = 'http://endpoint_url_nonbarbican:45621'
|
||||
TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF = {
|
||||
'keymaster_config_path': 'PATH_TO_KEYMASTER_CONFIG_FILE',
|
||||
}
|
||||
TEST_KMS_KEYMASTER_CONF = {
|
||||
'auth_endpoint': 'kmsauthurlv3',
|
||||
'password': 'kmspass',
|
||||
'username': 'kmsuser',
|
||||
'user_domain_id': None,
|
||||
'user_domain_name': 'default',
|
||||
'project_id': None,
|
||||
'project_name': 'kmsproject',
|
||||
'project_domain_id': None,
|
||||
'project_domain_name': 'default',
|
||||
'key_id': 'valid_kms_key_id-abcdefg-123456'
|
||||
}
|
||||
|
||||
|
||||
def capture_start_response():
|
||||
calls = []
|
||||
|
||||
def start_response(*args):
|
||||
calls.append(args)
|
||||
return start_response, calls
|
||||
|
||||
|
||||
def mock_castellan_api_side_effect(*args, **kwargs):
|
||||
return MockBarbicanKeyManager(args[0])
|
||||
|
||||
|
||||
def mock_options_set_defaults_side_effect(*args, **kwargs):
|
||||
'''
|
||||
Add options from kwargs into args dict.
|
||||
'''
|
||||
args[0].update(kwargs)
|
||||
|
||||
|
||||
def mock_config_opts_side_effect(*args, **kwargs):
|
||||
return dict()
|
||||
|
||||
|
||||
def mock_keystone_password_side_effect(username, password, project_name,
|
||||
user_domain_name, project_domain_name,
|
||||
user_id, user_domain_id, trust_id,
|
||||
domain_id, domain_name, project_id,
|
||||
project_domain_id, reauthenticate):
|
||||
return MockPassword(username, password, project_name, user_domain_name,
|
||||
project_domain_name, user_id, user_domain_id, trust_id,
|
||||
domain_id, domain_name, project_id, project_domain_id,
|
||||
reauthenticate)
|
||||
|
||||
ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED = 'Secret incorrectly specified.'
|
||||
ERR_MESSAGE_KEY_UUID_NOT_FOUND = 'Key not found, uuid: '
|
||||
|
||||
|
||||
class MockBarbicanKeyManager(object):
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
|
||||
def get(self, ctxt, key_id):
|
||||
# If authentication fails, raise an exception here.
|
||||
if (TEST_KMS_KEYMASTER_CONF['username'] !=
|
||||
ctxt.username
|
||||
or TEST_KMS_KEYMASTER_CONF['password'] !=
|
||||
ctxt.password or
|
||||
TEST_KMS_KEYMASTER_CONF['user_domain_name'] !=
|
||||
ctxt.user_domain_name):
|
||||
raise Unauthorized(
|
||||
message='The request you have made requires authentication.',
|
||||
http_status=401)
|
||||
elif self.conf['auth_endpoint'] == TEST_KMS_CONNECT_FAILURE_URL:
|
||||
raise ConnectFailure('Unable to establish connection')
|
||||
elif self.conf['auth_endpoint'] == TEST_KMS_NON_BARBICAN_URL:
|
||||
raise DiscoveryFailure(
|
||||
'Could not determine a suitable URL for the plugin')
|
||||
elif (self.conf['auth_endpoint'] !=
|
||||
TEST_KMS_KEYMASTER_CONF['auth_endpoint']):
|
||||
raise Unauthorized(
|
||||
message='Cannot authorize API client.')
|
||||
elif (key_id == TEST_KMS_NONEXISTENT_KEY_ID):
|
||||
message = ERR_MESSAGE_KEY_UUID_NOT_FOUND + key_id
|
||||
'''
|
||||
Raising a ManagedObjectNotFoundError would require importing it
|
||||
from castellan.common.exception. To avoid this import, raising a
|
||||
general Exception.
|
||||
'''
|
||||
raise Exception(message)
|
||||
elif key_id == TEST_KMS_INVALID_KEY_ID:
|
||||
raise ValueError(ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED)
|
||||
elif key_id == TEST_KMS_NONE_KEY_ID:
|
||||
return None
|
||||
return MockBarbicanKey(b'x' * 32, key_id)
|
||||
|
||||
|
||||
class MockBarbicanKey(object):
|
||||
def __init__(self, key_material, key_id):
|
||||
self.key_material = key_material
|
||||
self.bit_length = len(key_material) * 8
|
||||
if key_id == TEST_KMS_OPAQUE_KEY_ID:
|
||||
self.format = 'Opaque'
|
||||
else:
|
||||
self.format = 'RAW'
|
||||
self.algorithm = "aes"
|
||||
if key_id == TEST_KMS_DES_KEY_ID:
|
||||
self.format = 'des'
|
||||
if key_id == TEST_KMS_SHORT_KEY_ID:
|
||||
self.bit_length = 128
|
||||
self.key_material[:128]
|
||||
|
||||
def get_encoded(self):
|
||||
return self.key_material
|
||||
|
||||
def format(self):
|
||||
return self.format
|
||||
|
||||
|
||||
class MockPassword(object):
|
||||
def __init__(self, username, password, project_name, user_domain_name,
|
||||
project_domain_name, user_id, user_domain_id, trust_id,
|
||||
domain_id, domain_name, project_id, project_domain_id,
|
||||
reauthenticate):
|
||||
self.password = password
|
||||
self.username = username
|
||||
self.user_domain_name = user_domain_name
|
||||
self.project_name = project_name
|
||||
self.project_domain_name = project_domain_name
|
||||
self.user_id = user_id,
|
||||
self.user_domain_id = user_domain_id,
|
||||
self.trust_id = trust_id,
|
||||
self.domain_id = domain_id,
|
||||
self.domain_name = domain_name,
|
||||
self.project_id = project_id,
|
||||
self.project_domain_id = project_domain_id,
|
||||
self.reauthenticate = reauthenticate
|
||||
|
||||
|
||||
class TestKmsKeymaster(unittest.TestCase):
|
||||
"""
|
||||
Unit tests for storing the encryption root secret in a Barbican external
|
||||
key management system accessed using Castellan.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestKmsKeymaster, self).setUp()
|
||||
self.swift = FakeSwift()
|
||||
|
||||
"""
|
||||
Tests using the v3 Identity API, where all calls to Barbican are mocked.
|
||||
"""
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch.object(kms_keymaster.KmsKeyMaster,
|
||||
'_get_root_secret')
|
||||
def test_filter_v3(self, mock_get_root_secret_from_kms,
|
||||
mock_readconf):
|
||||
mock_get_root_secret_from_kms.return_value = (
|
||||
base64.b64encode(b'x' * 32))
|
||||
mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF
|
||||
factory = kms_keymaster.filter_factory(TEST_KMS_KEYMASTER_CONF)
|
||||
self.assertTrue(callable(factory))
|
||||
self.assertTrue(callable(factory(self.swift)))
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch.object(kms_keymaster.KmsKeyMaster,
|
||||
'_get_root_secret')
|
||||
def test_app_exception_v3(self, mock_get_root_secret_from_kms,
|
||||
mock_readconf):
|
||||
mock_get_root_secret_from_kms.return_value = (
|
||||
base64.b64encode(b'x' * 32))
|
||||
mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF
|
||||
app = kms_keymaster.KmsKeyMaster(
|
||||
FakeAppThatExcepts(), TEST_KMS_KEYMASTER_CONF)
|
||||
req = Request.blank('/', environ={'REQUEST_METHOD': 'PUT'})
|
||||
start_response, _ = capture_start_response()
|
||||
self.assertRaises(Exception, app, req.environ, start_response)
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch.object(kms_keymaster.KmsKeyMaster, '_get_root_secret')
|
||||
def test_get_root_secret(
|
||||
self, mock_get_root_secret_from_kms, mock_readconf):
|
||||
# Successful call with coarse _get_root_secret_from_kms() mock.
|
||||
mock_get_root_secret_from_kms.return_value = (
|
||||
base64.b64encode(b'x' * 32))
|
||||
'''
|
||||
Return valid Barbican configuration parameters.
|
||||
'''
|
||||
mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF
|
||||
'''
|
||||
Verify that keys are derived correctly by the keymaster.
|
||||
'''
|
||||
self.app = kms_keymaster.KmsKeyMaster(self.swift,
|
||||
TEST_KMS_KEYMASTER_CONF)
|
||||
'''
|
||||
Verify that _get_root_secret_from_kms() was called with the
|
||||
correct parameters.
|
||||
'''
|
||||
mock_get_root_secret_from_kms.assert_called_with(
|
||||
TEST_KMS_KEYMASTER_CONF
|
||||
)
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config, mock_keystone_password):
|
||||
# Successful call with finer grained mocks.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return valid Barbican configuration parameters.
|
||||
'''
|
||||
mock_readconf.return_value = TEST_KMS_KEYMASTER_CONF
|
||||
|
||||
'''
|
||||
Verify that no exceptions are raised by the mocked functions.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(self.swift,
|
||||
TEST_KMS_KEYMASTER_CONF)
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_invalid_key_id(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Invalid key ID.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['key_id'] = TEST_KMS_INVALID_KEY_ID
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though key id invalid')
|
||||
except ValueError as e:
|
||||
self.assertEqual(e.message,
|
||||
ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED)
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_nonexistent_key_id(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Nonexistent key.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['key_id'] = TEST_KMS_NONEXISTENT_KEY_ID
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though key id invalid')
|
||||
except Exception as e:
|
||||
expected_message = ('Key not found, uuid: ' +
|
||||
TEST_KMS_NONEXISTENT_KEY_ID)
|
||||
self.assertEqual(e.message, expected_message)
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_invalid_key_format(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Nonexistent key.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['key_id'] = TEST_KMS_OPAQUE_KEY_ID
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though key format invalid')
|
||||
except ValueError:
|
||||
pass
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_config_file_and_params(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Both external config file and config parameters specified.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['keymaster_config_path'] = (
|
||||
'PATH_TO_KEYMASTER_CONFIG_FILE'
|
||||
)
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(self.swift, kms_conf)
|
||||
raise Exception('Success even though config invalid')
|
||||
except Exception as e:
|
||||
expected_message = ('keymaster_config_path is set, but there are '
|
||||
'other config options specified:')
|
||||
self.assertTrue(e.message.startswith(expected_message),
|
||||
"Error message does not start with '%s'" %
|
||||
expected_message)
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_invalid_username(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Invalid username.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['username'] = 'invaliduser'
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though username invalid')
|
||||
except Unauthorized as e:
|
||||
self.assertEqual(e.http_status, 401)
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_invalid_password(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Invalid password.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['password'] = 'invalidpassword'
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though password invalid')
|
||||
except Unauthorized as e:
|
||||
self.assertEqual(e.http_status, 401)
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_connect_failure_auth_url(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config, mock_keystone_password):
|
||||
# Connect failure kms auth_url.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['auth_endpoint'] = TEST_KMS_CONNECT_FAILURE_URL
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though auth_url invalid')
|
||||
except ConnectFailure:
|
||||
pass
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_bad_auth_url(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Bad kms auth_url.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['auth_endpoint'] = TEST_KMS_NON_BARBICAN_URL
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though auth_url invalid')
|
||||
except DiscoveryFailure:
|
||||
pass
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_bad_user_domain_name(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config, mock_keystone_password):
|
||||
# Bad user domain name with mocks.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['user_domain_name'] = (
|
||||
TEST_KMS_INVALID_USER_DOMAIN_NAME)
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though api_version invalid')
|
||||
except Unauthorized as e:
|
||||
self.assertEqual(e.http_status, 401)
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_invalid_key_algorithm(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Nonexistent key.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['key_id'] = TEST_KMS_DES_KEY_ID
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though key format invalid')
|
||||
except ValueError:
|
||||
pass
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_invalid_key_length(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Nonexistent key.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['key_id'] = TEST_KMS_SHORT_KEY_ID
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though key format invalid')
|
||||
except ValueError:
|
||||
pass
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
|
||||
'keystone_password.KeystonePassword')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.cfg')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.options')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.readconf')
|
||||
@mock.patch('swift.common.middleware.crypto.kms_keymaster.key_manager')
|
||||
def test_mocked_castellan_keymanager_none_key(
|
||||
self, mock_castellan_key_manager, mock_readconf,
|
||||
mock_castellan_options, mock_oslo_config,
|
||||
mock_keystone_password):
|
||||
# Nonexistent key.
|
||||
mock_keystone_password.side_effect = (
|
||||
mock_keystone_password_side_effect)
|
||||
'''
|
||||
Set side_effect functions.
|
||||
'''
|
||||
mock_castellan_key_manager.API.side_effect = (
|
||||
mock_castellan_api_side_effect)
|
||||
mock_castellan_options.set_defaults.side_effect = (
|
||||
mock_options_set_defaults_side_effect)
|
||||
mock_oslo_config.ConfigOpts.side_effect = (
|
||||
mock_config_opts_side_effect)
|
||||
'''
|
||||
Return invalid Barbican configuration parameters.
|
||||
'''
|
||||
kms_conf = dict(TEST_KMS_KEYMASTER_CONF)
|
||||
kms_conf['key_id'] = TEST_KMS_NONE_KEY_ID
|
||||
mock_readconf.return_value = kms_conf
|
||||
|
||||
'''
|
||||
Verify that an exception is raised by the mocked function.
|
||||
'''
|
||||
try:
|
||||
self.app = kms_keymaster.KmsKeyMaster(
|
||||
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
|
||||
raise Exception('Success even though None key returned')
|
||||
except ValueError:
|
||||
pass
|
||||
except Exception:
|
||||
print("Unexpected error: %s" % sys.exc_info()[0])
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user