Add to_dict and from_dict conversions to managed objects
This patch centralizes the managed objects conversion in order to be used across multiple key_manager backends. Change-Id: Ia2e15d46eb2e504b815a7f51173aecaf82978402 Signed-off-by: Moisés Guimarães de Medeiros <moguimar@redhat.com>
This commit is contained in:
parent
3a3a738a0b
commit
883e9603fa
@ -64,6 +64,14 @@ class ManagedObjectNotFoundError(CastellanException):
|
||||
message = _("Key not found, uuid: %(uuid)s")
|
||||
|
||||
|
||||
class InvalidManagedObjectDictError(CastellanException):
|
||||
message = _("Dict has no field '%(field)s'.")
|
||||
|
||||
|
||||
class UnknownManagedObjectTypeError(CastellanException):
|
||||
message = _("Type not found, type: %(type)s")
|
||||
|
||||
|
||||
class AuthTypeInvalidError(CastellanException):
|
||||
message = _("Invalid auth_type was specified, auth_type: %(type)s")
|
||||
|
||||
|
@ -0,0 +1,49 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from castellan.common import exception
|
||||
from castellan.common.objects import opaque_data
|
||||
from castellan.common.objects import passphrase
|
||||
from castellan.common.objects import private_key
|
||||
from castellan.common.objects import public_key
|
||||
from castellan.common.objects import symmetric_key
|
||||
from castellan.common.objects import x_509
|
||||
|
||||
_managed_objects_by_type = {
|
||||
cls.managed_type(): cls for cls in [
|
||||
opaque_data.OpaqueData,
|
||||
passphrase.Passphrase,
|
||||
private_key.PrivateKey,
|
||||
public_key.PublicKey,
|
||||
symmetric_key.SymmetricKey,
|
||||
x_509.X509,
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def from_dict(obj, id=None):
|
||||
try:
|
||||
managed_object_type = obj["type"]
|
||||
except KeyError:
|
||||
raise exception.InvalidManagedObjectDictError(field="type")
|
||||
|
||||
try:
|
||||
cls = _managed_objects_by_type[managed_object_type]
|
||||
except KeyError:
|
||||
raise exception.UnknownManagedObjectTypeError(type=managed_object_type)
|
||||
|
||||
try:
|
||||
managed_object = cls.from_dict(obj, id)
|
||||
except KeyError as e:
|
||||
raise exception.InvalidManagedObjectDictError(field=str(e))
|
||||
|
||||
return managed_object
|
@ -22,14 +22,17 @@ from Java.
|
||||
"""
|
||||
|
||||
import abc
|
||||
import binascii
|
||||
|
||||
from castellan.common.objects import exception
|
||||
from castellan.common.objects import managed_object
|
||||
|
||||
|
||||
class Key(managed_object.ManagedObject):
|
||||
"""Base class to represent all keys."""
|
||||
|
||||
@abc.abstractproperty
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def algorithm(self):
|
||||
"""Returns the key's algorithm.
|
||||
|
||||
@ -38,7 +41,8 @@ class Key(managed_object.ManagedObject):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractproperty
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def bit_length(self):
|
||||
"""Returns the key's bit length.
|
||||
|
||||
@ -47,3 +51,33 @@ class Key(managed_object.ManagedObject):
|
||||
the length of the modulus.
|
||||
"""
|
||||
pass
|
||||
|
||||
def to_dict(self):
|
||||
dict_fields = super().to_dict()
|
||||
|
||||
dict_fields["algorithm"] = self.algorithm
|
||||
dict_fields["bit_length"] = self.bit_length
|
||||
|
||||
return dict_fields
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, dict_fields, id=None, metadata_only=False):
|
||||
try:
|
||||
value = None
|
||||
|
||||
# NOTE(moguimar): the managed object's value is exported as
|
||||
# a hex string. For now, this is a compatibility thing with
|
||||
# the already existent vault_key_manager backend.
|
||||
if not metadata_only and dict_fields["value"] is not None:
|
||||
value = binascii.unhexlify(dict_fields["value"])
|
||||
|
||||
return cls(
|
||||
algorithm=dict_fields["algorithm"],
|
||||
bit_length=dict_fields["bit_length"],
|
||||
key=value,
|
||||
name=dict_fields["name"],
|
||||
created=dict_fields["created"],
|
||||
id=id,
|
||||
)
|
||||
except KeyError as e:
|
||||
raise exception.InvalidManagedObjectDictError(field=str(e))
|
||||
|
@ -19,7 +19,11 @@ Base ManagedObject Class
|
||||
This module defines the ManagedObject class. The ManagedObject class
|
||||
is the base class to represent all objects managed by the key manager.
|
||||
"""
|
||||
|
||||
import abc
|
||||
import binascii
|
||||
|
||||
from castellan.common import exception
|
||||
|
||||
|
||||
class ManagedObject(object, metaclass=abc.ABCMeta):
|
||||
@ -69,7 +73,8 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
return self._created
|
||||
|
||||
@abc.abstractproperty
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def format(self):
|
||||
"""Returns the encoding format.
|
||||
|
||||
@ -78,6 +83,11 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
"""Returns the managed object value."""
|
||||
return self.get_encoded()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_encoded(self):
|
||||
"""Returns the encoded object.
|
||||
@ -90,3 +100,63 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
|
||||
def is_metadata_only(self):
|
||||
"""Returns if the associated object is only metadata or not."""
|
||||
return self.get_encoded() is None
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def managed_type(cls):
|
||||
"""Returns the managed object type identifier.
|
||||
|
||||
Returns the object's type identifier for serialization purpose.
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, dict_fields, id=None, metadata_only=False):
|
||||
"""Returns an instance of this class based on a dict object.
|
||||
|
||||
:param dict_fields: The dictionary containing all necessary params
|
||||
to create one instance.
|
||||
:param id: The optional param 'id' to be passed to the constructor.
|
||||
:param metadata_only: A switch to create an instance with metadata
|
||||
only, without the secret itself.
|
||||
"""
|
||||
try:
|
||||
value = None
|
||||
|
||||
# NOTE(moguimar): the managed object's value is exported as
|
||||
# a hex string. For now, this is a compatibility thing with
|
||||
# the already existent vault_key_manager backend.
|
||||
if not metadata_only and dict_fields["value"] is not None:
|
||||
value = binascii.unhexlify(dict_fields["value"])
|
||||
|
||||
return cls(
|
||||
value,
|
||||
name=dict_fields["name"],
|
||||
created=dict_fields["created"],
|
||||
id=id,
|
||||
)
|
||||
except KeyError as e:
|
||||
raise exception.InvalidManagedObjectDictError(field=str(e))
|
||||
|
||||
def to_dict(self, metadata_only=False):
|
||||
"""Returns a dict that can be used with the from_dict() method.
|
||||
|
||||
:param metadata_only: A switch to create an dictionary with metadata
|
||||
only, without the secret itself.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
value = None
|
||||
|
||||
# NOTE(moguimar): the managed object's value is exported as
|
||||
# a hex string. For now, this is a compatibility thing with
|
||||
# the already existent vault_key_manager backend.
|
||||
if not metadata_only and self.value is not None:
|
||||
value = binascii.hexlify(self.value).decode("utf-8")
|
||||
|
||||
return {
|
||||
"type": self.managed_type(),
|
||||
"name": self.name,
|
||||
"created": self.created,
|
||||
"value": value,
|
||||
}
|
||||
|
@ -31,15 +31,17 @@ class OpaqueData(managed_object.ManagedObject):
|
||||
Expected type for data is a bytestring.
|
||||
"""
|
||||
self._data = data
|
||||
super(OpaqueData, self).__init__(name=name, created=created, id=id)
|
||||
super().__init__(name=name, created=created, id=id)
|
||||
|
||||
@classmethod
|
||||
def managed_type(cls):
|
||||
return "opaque"
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""This method returns 'Opaque'."""
|
||||
return "Opaque"
|
||||
|
||||
def get_encoded(self):
|
||||
"""Returns the data in its original format."""
|
||||
return self._data
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -31,15 +31,17 @@ class Passphrase(managed_object.ManagedObject):
|
||||
The expected type for the passphrase is a bytestring.
|
||||
"""
|
||||
self._passphrase = passphrase
|
||||
super(Passphrase, self).__init__(name=name, created=created, id=id)
|
||||
super().__init__(name=name, created=created, id=id)
|
||||
|
||||
@classmethod
|
||||
def managed_type(cls):
|
||||
return "passphrase"
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""This method returns 'RAW'."""
|
||||
return "RAW"
|
||||
|
||||
def get_encoded(self):
|
||||
"""Returns the data in a bytestring."""
|
||||
return self._passphrase
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -35,25 +35,25 @@ class PrivateKey(key.Key):
|
||||
self._alg = algorithm
|
||||
self._bit_length = bit_length
|
||||
self._key = key
|
||||
super(PrivateKey, self).__init__(name=name, created=created, id=id)
|
||||
super().__init__(name=name, created=created, id=id)
|
||||
|
||||
@classmethod
|
||||
def managed_type(cls):
|
||||
return "private"
|
||||
|
||||
@property
|
||||
def algorithm(self):
|
||||
"""Returns the algorithm for asymmetric encryption."""
|
||||
return self._alg
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""This method returns 'PKCS8'."""
|
||||
return "PKCS8"
|
||||
|
||||
@property
|
||||
def bit_length(self):
|
||||
"""Returns the key length."""
|
||||
return self._bit_length
|
||||
|
||||
def get_encoded(self):
|
||||
"""Returns the key in DER encoded format."""
|
||||
return self._key
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -36,25 +36,25 @@ class PublicKey(key.Key):
|
||||
self._alg = algorithm
|
||||
self._bit_length = bit_length
|
||||
self._key = key
|
||||
super(PublicKey, self).__init__(name=name, created=created, id=id)
|
||||
super().__init__(name=name, created=created, id=id)
|
||||
|
||||
@classmethod
|
||||
def managed_type(cls):
|
||||
return "public"
|
||||
|
||||
@property
|
||||
def algorithm(self):
|
||||
"""Returns the algorithm for asymmetric encryption."""
|
||||
return self._alg
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""This method returns 'SubjectPublicKeyInfo'."""
|
||||
return "SubjectPublicKeyInfo"
|
||||
|
||||
def get_encoded(self):
|
||||
"""Returns the key in its encoded format."""
|
||||
return self._key
|
||||
|
||||
@property
|
||||
def bit_length(self):
|
||||
"""Returns the key length."""
|
||||
return self._bit_length
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -35,25 +35,25 @@ class SymmetricKey(key.Key):
|
||||
self._alg = algorithm
|
||||
self._bit_length = bit_length
|
||||
self._key = key
|
||||
super(SymmetricKey, self).__init__(name=name, created=created, id=id)
|
||||
super().__init__(name=name, created=created, id=id)
|
||||
|
||||
@classmethod
|
||||
def managed_type(cls):
|
||||
return "symmetric"
|
||||
|
||||
@property
|
||||
def algorithm(self):
|
||||
"""Returns the algorithm for symmetric encryption."""
|
||||
return self._alg
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""This method returns 'RAW'."""
|
||||
return "RAW"
|
||||
|
||||
def get_encoded(self):
|
||||
"""Returns the key in its encoded format."""
|
||||
return self._key
|
||||
|
||||
@property
|
||||
def bit_length(self):
|
||||
"""Returns the key length."""
|
||||
return self._bit_length
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -31,15 +31,17 @@ class X509(certificate.Certificate):
|
||||
The data should be in a bytestring.
|
||||
"""
|
||||
self._data = data
|
||||
super(X509, self).__init__(name=name, created=created, id=id)
|
||||
super().__init__(name=name, created=created, id=id)
|
||||
|
||||
@classmethod
|
||||
def managed_type(cls):
|
||||
return "certificate"
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""This method returns 'X.509'."""
|
||||
return "X.509"
|
||||
|
||||
def get_encoded(self):
|
||||
"""Returns the data in its encoded format."""
|
||||
return self._data
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -0,0 +1,37 @@
|
||||
# Copyright 2020 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Test cases for Managed Objects.
|
||||
"""
|
||||
from castellan.common import exception
|
||||
from castellan.common import objects
|
||||
from castellan.tests import base
|
||||
|
||||
|
||||
class ManagedObjectFromDictTestCase(base.TestCase):
|
||||
def test_invalid_dict(self):
|
||||
self.assertRaises(
|
||||
exception.InvalidManagedObjectDictError,
|
||||
objects.from_dict,
|
||||
{},
|
||||
)
|
||||
|
||||
def test_unknown_type(self):
|
||||
self.assertRaises(
|
||||
exception.UnknownManagedObjectTypeError,
|
||||
objects.from_dict,
|
||||
{"type": "non-existing-managed-object-type"},
|
||||
)
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
Test cases for the opaque data class.
|
||||
"""
|
||||
from castellan.common import objects
|
||||
from castellan.common.objects import opaque_data
|
||||
from castellan.tests import base
|
||||
|
||||
@ -80,3 +81,7 @@ class OpaqueDataTestCase(base.TestCase):
|
||||
def test___ne___data(self):
|
||||
other_opaque = opaque_data.OpaqueData(b'other data', self.name)
|
||||
self.assertTrue(self.opaque_data != other_opaque)
|
||||
|
||||
def test_to_and_from_dict(self):
|
||||
other = objects.from_dict(self.opaque_data.to_dict())
|
||||
self.assertEqual(self.opaque_data, other)
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
Test cases for the passphrase class.
|
||||
"""
|
||||
from castellan.common import objects
|
||||
from castellan.common.objects import passphrase
|
||||
from castellan.tests import base
|
||||
|
||||
@ -80,3 +81,7 @@ class PassphraseTestCase(base.TestCase):
|
||||
def test___ne___data(self):
|
||||
other_phrase = passphrase.Passphrase(b"other passphrase", self.name)
|
||||
self.assertTrue(self.passphrase != other_phrase)
|
||||
|
||||
def test_to_and_from_dict(self):
|
||||
other = objects.from_dict(self.passphrase.to_dict())
|
||||
self.assertEqual(self.passphrase, other)
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
Test cases for the private key class.
|
||||
"""
|
||||
from castellan.common import objects
|
||||
from castellan.common.objects import private_key
|
||||
from castellan.tests import base
|
||||
from castellan.tests import utils
|
||||
@ -116,3 +117,7 @@ class PrivateKeyTestCase(base.KeyTestCase):
|
||||
different_encoded,
|
||||
self.name)
|
||||
self.assertTrue(self.key != other_key)
|
||||
|
||||
def test_to_and_from_dict(self):
|
||||
other = objects.from_dict(self.key.to_dict())
|
||||
self.assertEqual(self.key, other)
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
Test cases for the public key class.
|
||||
"""
|
||||
from castellan.common import objects
|
||||
from castellan.common.objects import public_key
|
||||
from castellan.tests import base
|
||||
from castellan.tests import utils
|
||||
@ -116,3 +117,7 @@ class PublicKeyTestCase(base.KeyTestCase):
|
||||
different_encoded,
|
||||
self.name)
|
||||
self.assertTrue(self.key != other_key)
|
||||
|
||||
def test_to_and_from_dict(self):
|
||||
other = objects.from_dict(self.key.to_dict())
|
||||
self.assertEqual(self.key, other)
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
Test cases for the symmetric key class.
|
||||
"""
|
||||
from castellan.common import objects
|
||||
from castellan.common.objects import symmetric_key as sym_key
|
||||
from castellan.tests import base
|
||||
|
||||
@ -115,3 +116,7 @@ class SymmetricKeyTestCase(base.KeyTestCase):
|
||||
different_encoded,
|
||||
self.name)
|
||||
self.assertTrue(self.key != other_key)
|
||||
|
||||
def test_to_and_from_dict(self):
|
||||
other = objects.from_dict(self.key.to_dict())
|
||||
self.assertEqual(self.key, other)
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""
|
||||
Test cases for the X.509 certificate class.
|
||||
"""
|
||||
from castellan.common import objects
|
||||
from castellan.common.objects import x_509
|
||||
from castellan.tests import base
|
||||
from castellan.tests import utils
|
||||
@ -80,3 +81,7 @@ class X509TestCase(base.CertificateTestCase):
|
||||
def test___ne___data(self):
|
||||
other_x509 = x_509.X509(b'\x00\x00\x00', self.name)
|
||||
self.assertTrue(self.cert != other_x509)
|
||||
|
||||
def test_to_and_from_dict(self):
|
||||
other = objects.from_dict(self.cert.to_dict())
|
||||
self.assertEqual(self.cert, other)
|
||||
|
@ -0,0 +1,10 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Historically, the vault key manager backend converts its managed objects
|
||||
to dictionaries in order to send them as a json object. To promote
|
||||
cross-backend compatibility, suck feature should be migrated to managed
|
||||
objects. Methods from_dict() and to_dict() added to class ManagedObject.
|
||||
The Method from_dict() is a class method to create instances based on a
|
||||
dictionary while the method to_dict() is an instance method to translate
|
||||
an instance to a dictionary.
|
Loading…
Reference in New Issue
Block a user