329 lines
13 KiB
Python
329 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2018 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
from unittest import mock
|
|
import os
|
|
import unittest
|
|
from tempfile import mkdtemp
|
|
from textwrap import dedent
|
|
from shutil import rmtree
|
|
import sys
|
|
sys.modules['kmip'] = mock.Mock()
|
|
sys.modules['kmip.pie'] = mock.Mock()
|
|
sys.modules['kmip.pie.client'] = mock.Mock()
|
|
|
|
from swift.common.middleware.crypto import kmip_keymaster
|
|
|
|
|
|
KMIP_CLIENT_CLASS = \
|
|
'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
|
|
|
|
|
|
class MockProxyKmipClient(object):
|
|
def __init__(self, secrets, calls, kwargs):
|
|
calls.append(('__init__', kwargs))
|
|
self.secrets = secrets
|
|
self.calls = calls
|
|
|
|
def get(self, uid):
|
|
self.calls.append(('get', uid))
|
|
return self.secrets[uid]
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
pass
|
|
|
|
|
|
def create_secret(algorithm_name, length, value):
|
|
algorithm = mock.MagicMock()
|
|
algorithm.name = algorithm_name
|
|
secret = mock.MagicMock(cryptographic_algorithm=algorithm,
|
|
cryptographic_length=length,
|
|
value=value)
|
|
return secret
|
|
|
|
|
|
def create_mock_client(secrets, calls):
|
|
def mock_client(*args, **kwargs):
|
|
if args:
|
|
raise Exception('unexpected args provided: %r' % (args,))
|
|
return MockProxyKmipClient(secrets, calls, kwargs)
|
|
return mock_client
|
|
|
|
|
|
class InMemoryHandler(logging.Handler):
|
|
def __init__(self):
|
|
self.messages = []
|
|
super(InMemoryHandler, self).__init__()
|
|
|
|
def handle(self, record):
|
|
self.messages.append(record.msg)
|
|
|
|
|
|
class TestKmipKeymaster(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tempdir = mkdtemp()
|
|
|
|
def tearDown(self):
|
|
rmtree(self.tempdir)
|
|
|
|
def test_config_in_filter_section(self):
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster',
|
|
'key_id': '1234'}
|
|
secrets = {'1234': create_secret('AES', 256, b'x' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
|
|
km = kmip_keymaster.filter_factory(conf)(None)
|
|
|
|
self.assertEqual({None: b'x' * 32}, km._root_secrets)
|
|
self.assertEqual(None, km.active_secret_id)
|
|
self.assertIsNone(km.keymaster_config_path)
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': '/etc/swift/proxy-server.conf',
|
|
'config': 'filter:kmip_keymaster'}),
|
|
('get', '1234'),
|
|
])
|
|
|
|
def test_multikey_config_in_filter_section(self):
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip-keymaster',
|
|
'key_id': '1234',
|
|
'key_id_xyzzy': 'foobar',
|
|
'key_id_alt_secret_id': 'foobar',
|
|
'active_root_secret_id': 'xyzzy'}
|
|
secrets = {'1234': create_secret('AES', 256, b'x' * 32),
|
|
'foobar': create_secret('AES', 256, b'y' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
|
|
km = kmip_keymaster.KmipKeyMaster(None, conf)
|
|
|
|
self.assertEqual({None: b'x' * 32, 'xyzzy': b'y' * 32,
|
|
'alt_secret_id': b'y' * 32},
|
|
km._root_secrets)
|
|
self.assertEqual('xyzzy', km.active_secret_id)
|
|
self.assertIsNone(km.keymaster_config_path)
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': '/etc/swift/proxy-server.conf',
|
|
'config': 'filter:kmip-keymaster'}),
|
|
('get', '1234'),
|
|
('get', 'foobar'),
|
|
])
|
|
|
|
def test_bad_active_key(self):
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster',
|
|
'key_id': '1234',
|
|
'key_id_xyzzy': 'foobar',
|
|
'active_root_secret_id': 'unknown'}
|
|
secrets = {'1234': create_secret('AES', 256, b'x' * 32),
|
|
'foobar': create_secret('AES', 256, b'y' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS,
|
|
create_mock_client(secrets, calls)), \
|
|
self.assertRaises(ValueError) as raised:
|
|
kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertEqual('No secret loaded for active_root_secret_id unknown',
|
|
str(raised.exception))
|
|
|
|
def test_config_in_separate_file(self):
|
|
km_conf = """
|
|
[kmip_keymaster]
|
|
key_id = 4321
|
|
"""
|
|
km_config_file = os.path.join(self.tempdir, 'km.conf')
|
|
with open(km_config_file, 'wt') as fd:
|
|
fd.write(dedent(km_conf))
|
|
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'keymaster-kmip',
|
|
'keymaster_config_path': km_config_file}
|
|
secrets = {'4321': create_secret('AES', 256, b'x' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
|
|
km = kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertEqual({None: b'x' * 32}, km._root_secrets)
|
|
self.assertEqual(None, km.active_secret_id)
|
|
self.assertEqual(km_config_file, km.keymaster_config_path)
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': km_config_file,
|
|
'config': 'kmip_keymaster'}),
|
|
('get', '4321')])
|
|
|
|
def test_multikey_config_in_separate_file(self):
|
|
km_conf = """
|
|
[kmip_keymaster]
|
|
key_id = 4321
|
|
key_id_secret_id = another id
|
|
active_root_secret_id = secret_id
|
|
"""
|
|
km_config_file = os.path.join(self.tempdir, 'km.conf')
|
|
with open(km_config_file, 'wt') as fd:
|
|
fd.write(dedent(km_conf))
|
|
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster',
|
|
'keymaster_config_path': km_config_file}
|
|
secrets = {'4321': create_secret('AES', 256, b'x' * 32),
|
|
'another id': create_secret('AES', 256, b'y' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
|
|
km = kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertEqual({None: b'x' * 32, 'secret_id': b'y' * 32},
|
|
km._root_secrets)
|
|
self.assertEqual('secret_id', km.active_secret_id)
|
|
self.assertEqual(km_config_file, km.keymaster_config_path)
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': km_config_file,
|
|
'config': 'kmip_keymaster'}),
|
|
('get', '4321'),
|
|
('get', 'another id')])
|
|
|
|
def test_proxy_server_conf_dir(self):
|
|
proxy_server_conf_dir = os.path.join(self.tempdir, 'proxy_server.d')
|
|
os.mkdir(proxy_server_conf_dir)
|
|
|
|
# KmipClient can't read conf from a dir, so check that is caught early
|
|
conf = {'__file__': proxy_server_conf_dir,
|
|
'__name__': 'kmip_keymaster',
|
|
'key_id': '789'}
|
|
with self.assertRaises(ValueError) as cm:
|
|
kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertIn('config cannot be read from conf dir', str(cm.exception))
|
|
|
|
# ...but a conf file in a conf dir could point back to itself for the
|
|
# KmipClient config
|
|
km_config_file = os.path.join(proxy_server_conf_dir, '40.conf')
|
|
km_conf = """
|
|
[filter:kmip_keymaster]
|
|
keymaster_config_file = %s
|
|
|
|
[kmip_keymaster]
|
|
key_id = 789
|
|
""" % km_config_file
|
|
|
|
with open(km_config_file, 'wt') as fd:
|
|
fd.write(dedent(km_conf))
|
|
|
|
conf = {'__file__': proxy_server_conf_dir,
|
|
'__name__': 'kmip_keymaster',
|
|
'keymaster_config_path': km_config_file}
|
|
secrets = {'789': create_secret('AES', 256, b'x' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
|
|
km = kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertEqual({None: b'x' * 32}, km._root_secrets)
|
|
self.assertEqual(None, km.active_secret_id)
|
|
self.assertEqual(km_config_file, km.keymaster_config_path)
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': km_config_file,
|
|
# NB: no "filter:"
|
|
'config': 'kmip_keymaster'}),
|
|
('get', '789')])
|
|
|
|
def test_bad_key_length(self):
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster',
|
|
'key_id': '1234'}
|
|
secrets = {'1234': create_secret('AES', 128, b'x' * 16)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS,
|
|
create_mock_client(secrets, calls)), \
|
|
self.assertRaises(ValueError) as cm:
|
|
kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertIn('Expected key 1234 to be an AES-256 key',
|
|
str(cm.exception))
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': '/etc/swift/proxy-server.conf',
|
|
'config': 'filter:kmip_keymaster'}),
|
|
('get', '1234')])
|
|
|
|
def test_bad_key_algorithm(self):
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster',
|
|
'key_id': '1234'}
|
|
secrets = {'1234': create_secret('notAES', 256, b'x' * 32)}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS,
|
|
create_mock_client(secrets, calls)), \
|
|
self.assertRaises(ValueError) as cm:
|
|
kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertIn('Expected key 1234 to be an AES-256 key',
|
|
str(cm.exception))
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': '/etc/swift/proxy-server.conf',
|
|
'config': 'filter:kmip_keymaster'}),
|
|
('get', '1234')])
|
|
|
|
def test_missing_key_id(self):
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster'}
|
|
secrets = {}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS,
|
|
create_mock_client(secrets, calls)), \
|
|
self.assertRaises(ValueError) as cm:
|
|
kmip_keymaster.KmipKeyMaster(None, conf)
|
|
self.assertEqual('No secret loaded for active_root_secret_id None',
|
|
str(cm.exception))
|
|
# We make the client, but never use it
|
|
self.assertEqual(calls, [
|
|
('__init__', {'config_file': '/etc/swift/proxy-server.conf',
|
|
'config': 'filter:kmip_keymaster'})])
|
|
|
|
def test_logger_manipulations(self):
|
|
root_logger = logging.getLogger()
|
|
old_level = root_logger.getEffectiveLevel()
|
|
handler = InMemoryHandler()
|
|
try:
|
|
root_logger.setLevel(logging.DEBUG)
|
|
root_logger.addHandler(handler)
|
|
|
|
conf = {'__file__': '/etc/swift/proxy-server.conf',
|
|
'__name__': 'kmip_keymaster'}
|
|
secrets = {}
|
|
calls = []
|
|
with mock.patch(KMIP_CLIENT_CLASS,
|
|
create_mock_client(secrets, calls)), \
|
|
self.assertRaises(ValueError):
|
|
# missing key_id, as above, but that's not the interesting bit
|
|
kmip_keymaster.KmipKeyMaster(None, conf)
|
|
|
|
self.assertEqual(handler.messages, [])
|
|
|
|
logger = logging.getLogger('kmip.services.server.kmip_protocol')
|
|
logger.debug('Something secret!')
|
|
logger.info('Something useful')
|
|
self.assertNotIn('Something secret!', handler.messages)
|
|
self.assertIn('Something useful', handler.messages)
|
|
|
|
logger = logging.getLogger('kmip.core.config_helper')
|
|
logger.debug('Also secret')
|
|
logger.warning('Also useful')
|
|
self.assertNotIn('Also secret', handler.messages)
|
|
self.assertIn('Also useful', handler.messages)
|
|
|
|
logger = logging.getLogger('kmip')
|
|
logger.debug('Boring, but not secret')
|
|
self.assertIn('Boring, but not secret', handler.messages)
|
|
finally:
|
|
root_logger.setLevel(old_level)
|
|
root_logger.removeHandler(handler)
|