Merge "NSXV3: Client certificate private key encryption"
This commit is contained in:
commit
d17efb9c99
@ -186,6 +186,7 @@ function neutron_plugin_configure_service {
|
||||
_nsxv3_ini_set nsx_use_client_auth "True"
|
||||
_nsxv3_ini_set nsx_client_cert_file "$CLIENT_CERT_FILE"
|
||||
_nsxv3_ini_set nsx_client_cert_storage "nsx-db"
|
||||
_nsxv3_ini_set nsx_client_cert_pk_password "openstack"
|
||||
fi
|
||||
}
|
||||
|
||||
|
@ -130,7 +130,7 @@ class NSXClient(object):
|
||||
headers['Accept'] = accept_type
|
||||
# allow admin user to delete entities created
|
||||
# under openstack principal identity
|
||||
headers['X-Allow-Overwrite'] = "True"
|
||||
headers['X-Allow-Overwrite'] = 'true'
|
||||
self.headers = headers
|
||||
|
||||
def get(self, endpoint=None, params=None):
|
||||
|
@ -279,6 +279,9 @@ nsx_v3_opts = [
|
||||
cfg.StrOpt('nsx_client_cert_file',
|
||||
default='',
|
||||
help=_("File to contain client certificate and private key")),
|
||||
cfg.StrOpt('nsx_client_cert_pk_password',
|
||||
default="",
|
||||
help=_("password for private key encryption")),
|
||||
cfg.StrOpt('nsx_client_cert_storage',
|
||||
default='nsx-db',
|
||||
choices=['nsx-db', 'none'],
|
||||
|
@ -13,24 +13,77 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import base64
|
||||
from cryptography import fernet
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from vmware_nsx._i18n import _LE
|
||||
from vmware_nsx.db import db as nsx_db
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
NSX_OPENSTACK_IDENTITY = "com.vmware.nsx.openstack"
|
||||
|
||||
# 32-byte base64-encoded secret for symmetric password encryption
|
||||
# generated on init based on password provided in configuration
|
||||
_SECRET = None
|
||||
|
||||
|
||||
def generate_secret_from_password(password):
|
||||
m = hashlib.md5()
|
||||
m.update(password.encode('ascii'))
|
||||
return base64.b64encode(m.hexdigest().encode('ascii'))
|
||||
|
||||
|
||||
def symmetric_encrypt(secret, plaintext):
|
||||
if not isinstance(plaintext, bytes):
|
||||
plaintext = plaintext.encode('ascii')
|
||||
return fernet.Fernet(secret).encrypt(plaintext).decode('ascii')
|
||||
|
||||
|
||||
def symmetric_decrypt(secret, ciphertext):
|
||||
if not isinstance(ciphertext, bytes):
|
||||
ciphertext = ciphertext.encode('ascii')
|
||||
return fernet.Fernet(secret).decrypt(ciphertext).decode('ascii')
|
||||
|
||||
|
||||
class DbCertificateStorageDriver(object):
|
||||
"""Storage for certificate and private key in neutron DB"""
|
||||
# TODO(annak): Add private key encryption
|
||||
def __init__(self, context):
|
||||
global _SECRET
|
||||
self._context = context
|
||||
if cfg.CONF.nsx_v3.nsx_client_cert_pk_password and not _SECRET:
|
||||
_SECRET = generate_secret_from_password(
|
||||
cfg.CONF.nsx_v3.nsx_client_cert_pk_password)
|
||||
|
||||
def store_cert(self, purpose, certificate, private_key):
|
||||
# ecrypt private key
|
||||
if _SECRET:
|
||||
private_key = symmetric_encrypt(_SECRET, private_key)
|
||||
|
||||
nsx_db.save_certificate(self._context.session, purpose,
|
||||
certificate, private_key)
|
||||
|
||||
def get_cert(self, purpose):
|
||||
return nsx_db.get_certificate(self._context.session, purpose)
|
||||
cert, private_key = nsx_db.get_certificate(self._context.session,
|
||||
purpose)
|
||||
if _SECRET and private_key:
|
||||
try:
|
||||
# Encrypted PK is stored in DB as string, while fernet expects
|
||||
# bytearray.
|
||||
private_key = symmetric_decrypt(_SECRET, private_key)
|
||||
except fernet.InvalidToken:
|
||||
# unable to decrypt - probably due to change of password
|
||||
# cert and PK are useless, need to delete them
|
||||
LOG.error(_LE("Unable to decrypt private key, possibly due "
|
||||
"to change of password. Certificate needs to be "
|
||||
"regenerated"))
|
||||
self.delete_cert(purpose)
|
||||
return None, None
|
||||
|
||||
return cert, private_key
|
||||
|
||||
def delete_cert(self, purpose):
|
||||
return nsx_db.delete_certificate(self._context.session, purpose)
|
||||
@ -47,7 +100,7 @@ class DummyCertificateStorageDriver(object):
|
||||
pass
|
||||
|
||||
def get_cert(self, purpose):
|
||||
pass
|
||||
return None, None
|
||||
|
||||
def delete_cert(self, purpose):
|
||||
pass
|
||||
|
@ -48,6 +48,7 @@ from oslo_utils import uuidutils
|
||||
|
||||
from vmware_nsx.common import exceptions as nsx_exc
|
||||
from vmware_nsx.common import utils
|
||||
from vmware_nsx.plugins.nsx_v3 import cert_utils
|
||||
from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin
|
||||
from vmware_nsx.tests import unit as vmware
|
||||
from vmware_nsx.tests.unit.extensions import test_metadata
|
||||
@ -772,7 +773,7 @@ class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
|
||||
|
||||
CERTFILE = '/tmp/client_cert.pem'
|
||||
|
||||
def _init_config(self):
|
||||
def _init_config(self, password=None):
|
||||
cfg.CONF.set_override('default_overlay_tz', NSX_TZ_NAME, 'nsx_v3')
|
||||
cfg.CONF.set_override('native_dhcp_metadata', False, 'nsx_v3')
|
||||
cfg.CONF.set_override('dhcp_profile',
|
||||
@ -784,22 +785,30 @@ class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
|
||||
cfg.CONF.set_override('nsx_client_cert_file', self.CERTFILE, 'nsx_v3')
|
||||
cfg.CONF.set_override('nsx_client_cert_storage', 'nsx-db', 'nsx_v3')
|
||||
|
||||
def _init_plugin(self):
|
||||
if password:
|
||||
cfg.CONF.set_override('nsx_client_cert_pk_password',
|
||||
password, 'nsx_v3')
|
||||
|
||||
def _init_plugin(self, password=None):
|
||||
_mock_nsx_backend_calls()
|
||||
|
||||
self._tenant_id = test_plugin.TEST_TENANT_ID
|
||||
self._init_config()
|
||||
self._init_config(password)
|
||||
self.setup_coreplugin(PLUGIN_NAME, load_plugins=True)
|
||||
|
||||
def test_init_without_cert(self):
|
||||
"""Verify init fails if no cert is provided in client cert mode"""
|
||||
# certificate not generated - exception should be raised
|
||||
self.assertRaises(nsx_exc.ClientCertificateException,
|
||||
self._init_plugin)
|
||||
|
||||
def test_init_with_cert(self):
|
||||
"""Verify successful certificate load from storage"""
|
||||
|
||||
mock.patch(
|
||||
"vmware_nsx.db.db.get_certificate",
|
||||
return_value=(self.CERT, self.PKEY)).start()
|
||||
|
||||
_mock_nsx_backend_calls()
|
||||
self._init_plugin()
|
||||
|
||||
# verify cert data was exported to CERTFILE
|
||||
@ -812,5 +821,42 @@ class NsxV3PluginClientCertTestCase(testlib_api.WebTestCase):
|
||||
# delete CERTFILE
|
||||
os.remove(self.CERTFILE)
|
||||
|
||||
def test_init_with_cert_encrypted(self):
|
||||
"""Verify successful encrypted PK load from storage"""
|
||||
|
||||
password = 'topsecret'
|
||||
secret = cert_utils.generate_secret_from_password(password)
|
||||
encrypted_pkey = cert_utils.symmetric_encrypt(secret, self.PKEY)
|
||||
# db always returns string
|
||||
mock.patch(
|
||||
"vmware_nsx.db.db.get_certificate",
|
||||
return_value=(self.CERT, encrypted_pkey)).start()
|
||||
|
||||
self._init_plugin(password)
|
||||
|
||||
# verify cert data was exported to CERTFILE
|
||||
expected = self.CERT + self.PKEY
|
||||
with open(self.CERTFILE, 'r') as f:
|
||||
actual = f.read()
|
||||
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# delete CERTFILE
|
||||
os.remove(self.CERTFILE)
|
||||
|
||||
def test_init_with_cert_decrypt_fails(self):
|
||||
"""Verify loading plaintext PK from storage fails in encrypt mode"""
|
||||
|
||||
mock.patch(
|
||||
"vmware_nsx.db.db.get_certificate",
|
||||
return_value=(self.CERT, self.PKEY)).start()
|
||||
|
||||
self._tenant_id = test_plugin.TEST_TENANT_ID
|
||||
self._init_config('topsecret')
|
||||
|
||||
# since PK in DB is not encrypted, we should fail to decrypt it on load
|
||||
self.assertRaises(nsx_exc.ClientCertificateException,
|
||||
self._init_plugin)
|
||||
|
||||
# TODO(annak): add test that verifies bad crypto data raises exception
|
||||
# when OPENSSL exception wrapper is available from NSXLIB
|
||||
|
Loading…
Reference in New Issue
Block a user