NSX|P: Fix certificate secret to use the correct password

DbCertificateStorageDriver should use the pk_password from the
nsx_p config section and not from the nsx_v3 one

Change-Id: Ibe843e9e994bb679bdae68b0683aa36e2c78d891
This commit is contained in:
asarfaty 2020-07-01 11:01:50 +02:00 committed by Adit Sarfaty
parent b662977ca2
commit d553f307ed
3 changed files with 73 additions and 67 deletions

View File

@ -41,73 +41,79 @@ PORT_SG_SCOPE = 'os-security-group'
NSX_NEUTRON_PLUGIN = 'NSX Neutron plugin'
class DbCertProvider(client_cert.ClientCertProvider):
"""Write cert data from DB to file and delete after use
def get_DbCertProvider(conf_path):
class DbCertProvider(client_cert.ClientCertProvider):
"""Write cert data from DB to file and delete after use
New provider object with random filename is created for each request.
This is not most efficient, but the safest way to avoid race conditions,
since backend connections can occur both before and after neutron
fork, and several concurrent requests can occupy the same thread.
Note that new cert filename for each request does not result in new
connection for each request (at least for now..)
"""
EXPIRATION_ALERT_DAYS = 30 # days prior to expiration
New provider object with random filename is created for each
request.
This is not most efficient, but the safest way to avoid race
conditions, since backend connections can occur both before and
after neutron fork, and several concurrent requests can occupy the
same thread.
Note that new cert filename for each request does not result in new
connection for each request (at least for now..)
"""
EXPIRATION_ALERT_DAYS = 30 # days prior to expiration
def __init__(self):
super(DbCertProvider, self).__init__(None)
random.seed()
self._filename = '/tmp/.' + str(random.randint(1, 10000000))
def __init__(self):
super(DbCertProvider, self).__init__(None)
random.seed()
self._filename = '/tmp/.' + str(random.randint(1, 10000000))
self.conf_path = conf_path
def _check_expiration(self, expires_in_days):
if expires_in_days > self.EXPIRATION_ALERT_DAYS:
return
def _check_expiration(self, expires_in_days):
if expires_in_days > self.EXPIRATION_ALERT_DAYS:
return
if expires_in_days < 0:
LOG.error("Client certificate has expired %d days ago.",
expires_in_days * -1)
else:
LOG.warning("Client certificate expires in %d days. "
"Once expired, service will become unavailable.",
expires_in_days)
if expires_in_days < 0:
LOG.error("Client certificate has expired %d days ago.",
expires_in_days * -1)
else:
LOG.warning("Client certificate expires in %d days. "
"Once expired, service will become unavailable.",
expires_in_days)
def __enter__(self):
try:
context = q_context.get_admin_context()
db_storage_driver = cert_utils.DbCertificateStorageDriver(
context)
with client_cert.ClientCertificateManager(
cert_utils.NSX_OPENSTACK_IDENTITY,
None,
db_storage_driver) as cert_manager:
if not cert_manager.exists():
msg = _("Unable to load from nsx-db")
raise nsx_exc.ClientCertificateException(err_msg=msg)
def __enter__(self):
try:
context = q_context.get_admin_context()
db_storage_driver = cert_utils.DbCertificateStorageDriver(
context, self.conf_path.nsx_client_cert_pk_password)
with client_cert.ClientCertificateManager(
cert_utils.NSX_OPENSTACK_IDENTITY,
None,
db_storage_driver) as cert_manager:
if not cert_manager.exists():
msg = _("Unable to load from nsx-db")
raise nsx_exc.ClientCertificateException(err_msg=msg)
filename = self._filename
if not os.path.exists(os.path.dirname(filename)):
if len(os.path.dirname(filename)) > 0:
fileutils.ensure_tree(os.path.dirname(filename))
cert_manager.export_pem(filename)
filename = self._filename
if not os.path.exists(os.path.dirname(filename)):
if len(os.path.dirname(filename)) > 0:
fileutils.ensure_tree(os.path.dirname(filename))
cert_manager.export_pem(filename)
expires_in_days = cert_manager.expires_in_days()
self._check_expiration(expires_in_days)
except Exception as e:
expires_in_days = cert_manager.expires_in_days()
self._check_expiration(expires_in_days)
except Exception as e:
self._on_exit()
raise e
return self
def _on_exit(self):
if os.path.isfile(self._filename):
os.remove(self._filename)
self._filename = None
def __exit__(self, type, value, traceback):
self._on_exit()
raise e
return self
def filename(self):
return self._filename
def _on_exit(self):
if os.path.isfile(self._filename):
os.remove(self._filename)
self._filename = None
def __exit__(self, type, value, traceback):
self._on_exit()
def filename(self):
return self._filename
return DbCertProvider
def get_client_cert_provider(conf_path=cfg.CONF.nsx_v3):
@ -123,18 +129,19 @@ def get_client_cert_provider(conf_path=cfg.CONF.nsx_v3):
if conf_path.nsx_client_cert_storage.lower() == 'nsx-db':
# Cert data is stored in DB, and written to file system only
# when new connection is opened, and deleted immediately after.
return DbCertProvider
return get_DbCertProvider(conf_path)
def get_nsxlib_wrapper(nsx_username=None, nsx_password=None, basic_auth=False,
plugin_conf=None, allow_overwrite_header=False):
if not plugin_conf:
plugin_conf = cfg.CONF.nsx_v3
client_cert_provider = None
if not basic_auth:
# if basic auth requested, dont use cert file even if provided
client_cert_provider = get_client_cert_provider()
client_cert_provider = get_client_cert_provider(conf_path=plugin_conf)
if not plugin_conf:
plugin_conf = cfg.CONF.nsx_v3
nsxlib_config = config.NsxLibConfig(
username=nsx_username or plugin_conf.nsx_api_user,
password=nsx_password or plugin_conf.nsx_api_password,

View File

@ -17,7 +17,6 @@ import base64
import hashlib
from cryptography import fernet
from oslo_config import cfg
from oslo_log import log as logging
from vmware_nsx.db import db as nsx_db
@ -55,12 +54,11 @@ def symmetric_decrypt(secret, ciphertext):
class DbCertificateStorageDriver(object):
"""Storage for certificate and private key in neutron DB"""
def __init__(self, context):
def __init__(self, context, cert_pk_password=None):
global _SECRET
self._context = context
if cfg.CONF.nsx_v3.nsx_client_cert_pk_password and not _SECRET:
_SECRET = generate_secret_from_password(
cfg.CONF.nsx_v3.nsx_client_cert_pk_password)
if cert_pk_password and not _SECRET:
_SECRET = generate_secret_from_password(cert_pk_password)
def store_cert(self, purpose, certificate, private_key):
# encrypt private key

View File

@ -56,7 +56,8 @@ def get_certificate_manager(plugin_conf, **kwargs):
LOG.info("Certificate storage is %s", storage_driver_type)
if storage_driver_type == 'nsx-db':
storage_driver = cert_utils.DbCertificateStorageDriver(
context.get_admin_context())
context.get_admin_context(),
plugin_conf.nsx_client_cert_pk_password)
elif storage_driver_type == 'none':
storage_driver = cert_utils.DummyCertificateStorageDriver()
# TODO(annak) - add support for barbican storage driver