Update mock key manager
Adds functionality to the mock key manager for create_key_pair. Change-Id: Iff5c3f9f088da422c088aa9dcac3f39deedcd2d6
This commit is contained in:
parent
3d031cb5af
commit
cf2d3b497a
@ -30,7 +30,13 @@ import binascii
|
||||
import random
|
||||
import uuid
|
||||
|
||||
from cryptography.hazmat import backends
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from castellan.common import exception
|
||||
from castellan.common.objects import private_key as pri_key
|
||||
from castellan.common.objects import public_key as pub_key
|
||||
from castellan.common.objects import symmetric_key as sym_key
|
||||
from castellan.key_manager import key_manager
|
||||
|
||||
@ -68,8 +74,9 @@ class MockKeyManager(key_manager.KeyManager):
|
||||
def create_key(self, context, **kwargs):
|
||||
"""Creates a symmetric key.
|
||||
|
||||
This implementation returns a UUID for the created key. A
|
||||
Forbidden exception is raised if the specified context is None.
|
||||
This implementation returns a UUID for the created key. The algorithm
|
||||
for the key will always be AES. A Forbidden exception is raised if the
|
||||
specified context is None.
|
||||
"""
|
||||
if context is None:
|
||||
raise exception.Forbidden()
|
||||
@ -77,8 +84,62 @@ class MockKeyManager(key_manager.KeyManager):
|
||||
key = self._generate_key(**kwargs)
|
||||
return self.store(context, key)
|
||||
|
||||
def _generate_public_and_private_key(self, length):
|
||||
crypto_private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=length,
|
||||
backend=backends.default_backend())
|
||||
|
||||
private_der = crypto_private_key.private_bytes(
|
||||
encoding=serialization.Encoding.DER,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption())
|
||||
|
||||
crypto_public_key = crypto_private_key.public_key()
|
||||
|
||||
public_der = crypto_public_key.public_bytes(
|
||||
encoding=serialization.Encoding.DER,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo)
|
||||
|
||||
private_key = pri_key.PrivateKey(
|
||||
algorithm='RSA',
|
||||
bit_length=length,
|
||||
key=bytearray(private_der))
|
||||
|
||||
public_key = pub_key.PublicKey(
|
||||
algorithm='RSA',
|
||||
bit_length=length,
|
||||
key=bytearray(public_der))
|
||||
|
||||
return private_key, public_key
|
||||
|
||||
def create_key_pair(self, context, algorithm, length, expiration=None):
|
||||
raise NotImplementedError()
|
||||
"""Creates an asymmetric key pair.
|
||||
|
||||
This implementation returns UUIDs for the created keys in the order:
|
||||
(private, public)
|
||||
Forbidden is raised if the context is None.
|
||||
"""
|
||||
if context is None:
|
||||
raise exception.Forbidden()
|
||||
|
||||
if algorithm.lower() != 'rsa':
|
||||
msg = 'Invalid algorithm: {}, only RSA supported'.format(algorithm)
|
||||
raise ValueError(msg)
|
||||
|
||||
valid_lengths = [2048, 3072, 4096]
|
||||
|
||||
if length not in valid_lengths:
|
||||
msg = 'Invalid bit length: {}, only {} supported'.format(
|
||||
length, valid_lengths)
|
||||
raise ValueError(msg)
|
||||
|
||||
private_key, public_key = self._generate_public_and_private_key(length)
|
||||
|
||||
private_key_uuid = self.store(context, private_key)
|
||||
public_key_uuid = self.store(context, public_key)
|
||||
|
||||
return private_key_uuid, public_key_uuid
|
||||
|
||||
def _generate_key_id(self):
|
||||
key_id = str(uuid.uuid4())
|
||||
|
@ -17,6 +17,10 @@
|
||||
Test cases for the mock key manager.
|
||||
"""
|
||||
|
||||
from cryptography.hazmat import backends
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from oslo_context import context
|
||||
|
||||
from castellan.common import exception
|
||||
@ -25,6 +29,21 @@ from castellan.tests.unit.key_manager import mock_key_manager as mock_key_mgr
|
||||
from castellan.tests.unit.key_manager import test_key_manager as test_key_mgr
|
||||
|
||||
|
||||
def get_cryptography_private_key(private_key):
|
||||
crypto_private_key = serialization.load_der_private_key(
|
||||
bytes(private_key.get_encoded()),
|
||||
password=None,
|
||||
backend=backends.default_backend())
|
||||
return crypto_private_key
|
||||
|
||||
|
||||
def get_cryptography_public_key(public_key):
|
||||
crypto_public_key = serialization.load_der_public_key(
|
||||
bytes(public_key.get_encoded()),
|
||||
backend=backends.default_backend())
|
||||
return crypto_public_key
|
||||
|
||||
|
||||
class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
|
||||
|
||||
def _create_key_manager(self):
|
||||
@ -47,10 +66,64 @@ class MockKeyManagerTestCase(test_key_mgr.KeyManagerTestCase):
|
||||
key = self.key_mgr.get(self.context, key_id)
|
||||
self.assertEqual(length / 8, len(key.get_encoded()))
|
||||
|
||||
def test_create_null_context(self):
|
||||
def test_create_key_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key, None)
|
||||
|
||||
def test_create_key_pair(self):
|
||||
for length in [2048, 3072, 4096]:
|
||||
private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair(
|
||||
self.context, 'RSA', length)
|
||||
|
||||
private_key = self.key_mgr.get(self.context, private_key_uuid)
|
||||
public_key = self.key_mgr.get(self.context, public_key_uuid)
|
||||
|
||||
crypto_private_key = get_cryptography_private_key(private_key)
|
||||
crypto_public_key = get_cryptography_public_key(public_key)
|
||||
|
||||
self.assertEqual(length, crypto_private_key.key_size)
|
||||
self.assertEqual(length, crypto_public_key.key_size)
|
||||
|
||||
def test_create_key_pair_encryption(self):
|
||||
private_key_uuid, public_key_uuid = self.key_mgr.create_key_pair(
|
||||
self.context, 'RSA', 2048)
|
||||
|
||||
private_key = self.key_mgr.get(self.context, private_key_uuid)
|
||||
public_key = self.key_mgr.get(self.context, public_key_uuid)
|
||||
|
||||
crypto_private_key = get_cryptography_private_key(private_key)
|
||||
crypto_public_key = get_cryptography_public_key(public_key)
|
||||
|
||||
message = b'secret plaintext'
|
||||
ciphertext = crypto_public_key.encrypt(
|
||||
message,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA1()),
|
||||
algorithm=hashes.SHA1(),
|
||||
label=None))
|
||||
plaintext = crypto_private_key.decrypt(
|
||||
ciphertext,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA1()),
|
||||
algorithm=hashes.SHA1(),
|
||||
label=None))
|
||||
|
||||
self.assertEqual(message, plaintext)
|
||||
|
||||
def test_create_key_pair_null_context(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.key_mgr.create_key_pair, None, 'RSA', 2048)
|
||||
|
||||
def test_create_key_pair_invalid_algorithm(self):
|
||||
self.assertRaises(ValueError,
|
||||
self.key_mgr.create_key_pair,
|
||||
self.context, 'DSA', 2048)
|
||||
|
||||
def test_create_key_pair_invalid_length(self):
|
||||
self.assertRaises(ValueError,
|
||||
self.key_mgr.create_key_pair,
|
||||
self.context, 'RSA', 10)
|
||||
|
||||
def test_store_and_get_key(self):
|
||||
secret_key = bytes(b'0' * 64)
|
||||
_key = sym_key.SymmetricKey('AES', 64 * 8, secret_key)
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
pbr>=0.6,!=0.7,<1.0
|
||||
Babel>=1.3
|
||||
cryptography>=0.9.1 # Apache-2.0
|
||||
oslo.config>=1.9.3,<1.10.0 # Apache-2.0
|
||||
oslo.context>=0.2.0 # Apache-2.0
|
||||
oslo.log>=1.0.0,<1.1.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user