BigSwitch: Add SSL Certificate Validation

This patch adds the option to use SSL certificate
validation on the backend controller using SSH-style
sticky authentication, individual trusted
certificates, and/or certificate authorities.
Also adds caching of connections to deal with
increased overhead of TLS/SSL handshake.

Default is now sticky-style enforcement.

Partial-Bug: 1188189
Implements: blueprint bsn-certificate-enforcement
Change-Id: If0bab196495c4944a53e0e394c956cca36269883
This commit is contained in:
Kevin Benton 2014-02-02 20:46:18 -08:00 committed by Mark McClain
parent 095aa20e3e
commit e915ad3a15
12 changed files with 527 additions and 31 deletions

View File

@ -6,7 +6,10 @@
# The following parameters are supported:
# servers : <host:port>[,<host:port>]* (Error if not set)
# server_auth : <username:password> (default: no auth)
# server_ssl : True | False (default: False)
# server_ssl : True | False (default: True)
# ssl_cert_directory : <path> (default: /etc/neutron/plugins/bigswitch/ssl)
# no_ssl_validation : True | False (default: False)
# ssl_sticky : True | False (default: True)
# sync_data : True | False (default: False)
# auto_sync_on_failure : True | False (default: True)
# server_timeout : <integer> (default: 10 seconds)
@ -21,7 +24,20 @@ servers=localhost:8080
# server_auth=username:password
# Use SSL when connecting to the BigSwitch or Floodlight controller.
# server_ssl=False
# server_ssl=True
# Directory which contains the ca_certs and host_certs to be used to validate
# controller certificates.
# ssl_cert_directory=/etc/neutron/plugins/bigswitch/ssl/
# If a certificate does not exist for a controller, trust and store the first
# certificate received for that controller and use it to validate future
# connections to that controller.
# ssl_sticky=True
# Do not validate the controller certificates for SSL
# Warning: This will not provide protection against man-in-the-middle attacks
# no_ssl_validation=False
# Sync data on connect
# sync_data=False

View File

@ -0,0 +1,3 @@
Certificates in this folder will be used to
verify signatures for any controllers the plugin
connects to.

View File

@ -0,0 +1,6 @@
Certificates in this folder must match the name
of the controller they should be used to authenticate
with a .pem extension.
For example, the certificate for the controller
"192.168.0.1" should be named "192.168.0.1.pem".

View File

@ -39,9 +39,21 @@ restproxy_opts = [
cfg.StrOpt('server_auth', default=None, secret=True,
help=_("The username and password for authenticating against "
" the BigSwitch or Floodlight controller.")),
cfg.BoolOpt('server_ssl', default=False,
cfg.BoolOpt('server_ssl', default=True,
help=_("If True, Use SSL when connecting to the BigSwitch or "
"Floodlight controller.")),
cfg.BoolOpt('ssl_sticky', default=True,
help=_("Trust and store the first certificate received for "
"each controller address and use it to validate future "
"connections to that address.")),
cfg.BoolOpt('no_ssl_validation', default=False,
help=_("Disables SSL certificate validation for controllers")),
cfg.BoolOpt('cache_connections', default=True,
help=_("Re-use HTTP/HTTPS connections to the controller.")),
cfg.StrOpt('ssl_cert_directory',
default='/etc/neutron/plugins/bigswitch/ssl',
help=_("Directory containing ca_certs and host_certs "
"certificate directories.")),
cfg.BoolOpt('sync_data', default=False,
help=_("Sync data on connect")),
cfg.BoolOpt('auto_sync_on_failure', default=True,

View File

@ -27,13 +27,16 @@ of ServerProxy objects that correspond to individual backend controllers.
The following functionality is handled by this module:
- Translation of rest_* function calls to HTTP/HTTPS calls to the controllers
- Automatic failover between controllers
- SSL Certificate enforcement
- HTTP Authentication
"""
import base64
import httplib
import json
import os
import socket
import ssl
import time
import eventlet
@ -41,6 +44,7 @@ from oslo.config import cfg
from neutron.common import exceptions
from neutron.common import utils
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
@ -85,7 +89,7 @@ class ServerProxy(object):
"""REST server proxy to a network controller."""
def __init__(self, server, port, ssl, auth, neutron_id, timeout,
base_uri, name, mypool):
base_uri, name, mypool, combined_cert):
self.server = server
self.port = port
self.ssl = ssl
@ -99,8 +103,11 @@ class ServerProxy(object):
self.capabilities = []
# enable server to reference parent pool
self.mypool = mypool
# cache connection here to avoid a SSL handshake for every connection
self.currentconn = None
if auth:
self.auth = 'Basic ' + base64.encodestring(auth).strip()
self.combined_cert = combined_cert
def get_capabilities(self):
try:
@ -114,7 +121,8 @@ class ServerProxy(object):
'cap': self.capabilities})
return self.capabilities
def rest_call(self, action, resource, data='', headers={}, timeout=None):
def rest_call(self, action, resource, data='', headers={}, timeout=False,
reconnect=False):
uri = self.base_uri + resource
body = json.dumps(data)
if not headers:
@ -125,6 +133,10 @@ class ServerProxy(object):
headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash
if 'keep-alive' in self.capabilities:
headers['Connection'] = 'keep-alive'
else:
reconnect = True
if self.auth:
headers['Authorization'] = self.auth
@ -136,26 +148,37 @@ class ServerProxy(object):
{'resource': resource, 'data': data, 'headers': headers,
'action': action})
conn = None
timeout = timeout or self.timeout
if self.ssl:
conn = httplib.HTTPSConnection(
self.server, self.port, timeout=timeout)
if conn is None:
LOG.error(_('ServerProxy: Could not establish HTTPS '
'connection'))
return 0, None, None, None
else:
conn = httplib.HTTPConnection(
self.server, self.port, timeout=timeout)
if conn is None:
LOG.error(_('ServerProxy: Could not establish HTTP '
'connection'))
return 0, None, None, None
# unspecified timeout is False because a timeout can be specified as
# None to indicate no timeout.
if timeout is False:
timeout = self.timeout
if timeout != self.timeout:
# need a new connection if timeout has changed
reconnect = True
if not self.currentconn or reconnect:
if self.currentconn:
self.currentconn.close()
if self.ssl:
self.currentconn = HTTPSConnectionWithValidation(
self.server, self.port, timeout=timeout)
self.currentconn.combined_cert = self.combined_cert
if self.currentconn is None:
LOG.error(_('ServerProxy: Could not establish HTTPS '
'connection'))
return 0, None, None, None
else:
self.currentconn = httplib.HTTPConnection(
self.server, self.port, timeout=timeout)
if self.currentconn is None:
LOG.error(_('ServerProxy: Could not establish HTTP '
'connection'))
return 0, None, None, None
try:
conn.request(action, uri, body, headers)
response = conn.getresponse()
self.currentconn.request(action, uri, body, headers)
response = self.currentconn.getresponse()
newhash = response.getheader(HASH_MATCH_HEADER)
if newhash:
self._put_consistency_hash(newhash)
@ -168,11 +191,20 @@ class ServerProxy(object):
# response was not JSON, ignore the exception
pass
ret = (response.status, response.reason, respstr, respdata)
except httplib.ImproperConnectionState:
# If we were using a cached connection, try again with a new one.
with excutils.save_and_reraise_exception() as ctxt:
if not reconnect:
ctxt.reraise = False
if self.currentconn:
self.currentconn.close()
return self.rest_call(action, resource, data, headers,
timeout=timeout, reconnect=True)
except (socket.timeout, socket.error) as e:
LOG.error(_('ServerProxy: %(action)s failure, %(e)r'),
{'action': action, 'e': e})
ret = 0, None, None, None
conn.close()
LOG.debug(_("ServerProxy: status=%(status)d, reason=%(reason)r, "
"ret=%(ret)s, data=%(data)r"), {'status': ret[0],
'reason': ret[1],
@ -187,7 +219,7 @@ class ServerProxy(object):
class ServerPool(object):
def __init__(self, timeout=10,
def __init__(self, timeout=False,
base_uri=BASE_URI, name='NeutronRestProxy'):
LOG.debug(_("ServerPool: initializing"))
# 'servers' is the list of network controller REST end-points
@ -200,8 +232,9 @@ class ServerPool(object):
self.base_uri = base_uri
self.name = name
self.timeout = cfg.CONF.RESTPROXY.server_timeout
self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
default_port = 8000
if timeout is not None:
if timeout is not False:
self.timeout = timeout
# Function to use to retrieve topology for consistency syncs.
@ -244,8 +277,99 @@ class ServerPool(object):
return self.capabilities
def server_proxy_for(self, server, port):
combined_cert = self._get_combined_cert_for_server(server, port)
return ServerProxy(server, port, self.ssl, self.auth, self.neutron_id,
self.timeout, self.base_uri, self.name, mypool=self)
self.timeout, self.base_uri, self.name, mypool=self,
combined_cert=combined_cert)
def _get_combined_cert_for_server(self, server, port):
# The ssl library requires a combined file with all trusted certs
# so we make one containing the trusted CAs and the corresponding
# host cert for this server
combined_cert = None
if self.ssl and not cfg.CONF.RESTPROXY.no_ssl_validation:
base_ssl = cfg.CONF.RESTPROXY.ssl_cert_directory
host_dir = os.path.join(base_ssl, 'host_certs')
ca_dir = os.path.join(base_ssl, 'ca_certs')
combined_dir = os.path.join(base_ssl, 'combined')
combined_cert = os.path.join(combined_dir, '%s.pem' % server)
if not os.path.exists(base_ssl):
raise cfg.Error(_('ssl_cert_directory [%s] does not exist. '
'Create it or disable ssl.') % base_ssl)
for automake in [combined_dir, ca_dir, host_dir]:
if not os.path.exists(automake):
os.makedirs(automake)
# get all CA certs
certs = self._get_ca_cert_paths(ca_dir)
# check for a host specific cert
hcert, exists = self._get_host_cert_path(host_dir, server)
if exists:
certs.append(hcert)
elif cfg.CONF.RESTPROXY.ssl_sticky:
self._fetch_and_store_cert(server, port, hcert)
certs.append(hcert)
if not certs:
raise cfg.Error(_('No certificates were found to verify '
'controller %s') % (server))
self._combine_certs_to_file(certs, combined_cert)
return combined_cert
def _combine_certs_to_file(certs, cfile):
'''
Concatenates the contents of each certificate in a list of
certificate paths to one combined location for use with ssl
sockets.
'''
with open(cfile, 'w') as combined:
for c in certs:
with open(c, 'r') as cert_handle:
combined.write(cert_handle.read())
def _get_host_cert_path(self, host_dir, server):
'''
returns full path and boolean indicating existence
'''
hcert = os.path.join(host_dir, '%s.pem' % server)
if os.path.exists(hcert):
return hcert, True
return hcert, False
def _get_ca_cert_paths(self, ca_dir):
certs = [os.path.join(root, name)
for name in [
name for (root, dirs, files) in os.walk(ca_dir)
for name in files
]
if name.endswith('.pem')]
return certs
def _fetch_and_store_cert(self, server, port, path):
'''
Grabs a certificate from a server and writes it to
a given path.
'''
try:
cert = ssl.get_server_certificate((server, port))
except Exception as e:
raise cfg.Error(_('Could not retrieve initial '
'certificate from controller %(server)s. '
'Error details: %(error)s'),
{'server': server, 'error': e.strerror})
LOG.warning(_("Storing to certificate for host %(server)s "
"at %(path)s") % {'server': server,
'path': path})
self._file_put_contents(path, cert)
return cert
def _file_put_contents(path, contents):
# Simple method to write to file.
# Created for easy Mocking
with open(path, 'w') as handle:
handle.write(contents)
def server_failure(self, resp, ignore_codes=[]):
"""Define failure codes as required.
@ -264,12 +388,13 @@ class ServerPool(object):
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=None):
timeout=False):
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers,
timeout)
timeout,
reconnect=self.always_reconnect)
# If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT:
if not self.get_topo_function:
@ -309,7 +434,7 @@ class ServerPool(object):
return first_response
def rest_action(self, action, resource, data='', errstr='%s',
ignore_codes=[], headers={}, timeout=None):
ignore_codes=[], headers={}, timeout=False):
"""
Wrapper for rest_call that verifies success and raises a
RemoteRestError on failure with a provided error string
@ -427,3 +552,26 @@ class ServerPool(object):
# that will be handled by the rest_call.
time.sleep(polling_interval)
self.servers.rest_call('GET', HEALTH_PATH)
class HTTPSConnectionWithValidation(httplib.HTTPSConnection):
# If combined_cert is None, the connection will continue without
# any certificate validation.
combined_cert = None
def connect(self):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
if self.combined_cert:
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=self.combined_cert)
else:
self.sock = ssl.wrap_socket(sock, self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_NONE)

View File

@ -0,0 +1,2 @@
ca_certs directory for SSL unit tests
No files will be generated here, but it should exist for the tests

View File

@ -0,0 +1,2 @@
combined certificates directory for SSL unit tests
No files will be created here, but it should exist for the tests

View File

@ -0,0 +1,2 @@
host_certs directory for SSL unit tests
No files will be created here, but it should exist for the tests

View File

@ -139,3 +139,48 @@ class VerifyMultiTenantFloatingIP(HTTPConnectionMock):
raise Exception(msg)
super(VerifyMultiTenantFloatingIP,
self).request(action, uri, body, headers)
class HTTPSMockBase(HTTPConnectionMock):
expected_cert = ''
combined_cert = None
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=None, source_address=None):
self.host = host
super(HTTPSMockBase, self).__init__(host, port, timeout)
def request(self, method, url, body=None, headers={}):
self.connect()
super(HTTPSMockBase, self).request(method, url, body, headers)
class HTTPSNoValidation(HTTPSMockBase):
def connect(self):
if self.combined_cert:
raise Exception('combined_cert set on NoValidation')
class HTTPSCAValidation(HTTPSMockBase):
expected_cert = 'DUMMYCERTIFICATEAUTHORITY'
def connect(self):
contents = get_cert_contents(self.combined_cert)
if self.expected_cert not in contents:
raise Exception('No dummy CA cert in cert_file')
class HTTPSHostValidation(HTTPSMockBase):
expected_cert = 'DUMMYCERTFORHOST%s'
def connect(self):
contents = get_cert_contents(self.combined_cert)
expected = self.expected_cert % self.host
if expected not in contents:
raise Exception(_('No host cert for %(server)s in cert %(cert)s'),
{'server': self.host, 'cert': contents})
def get_cert_contents(path):
raise Exception('METHOD MUST BE MOCKED FOR TEST')

View File

@ -45,6 +45,12 @@ class BigSwitchTestBase(object):
'restproxy.ini.test')]
self.addCleanup(cfg.CONF.reset)
config.register_config()
# Only try SSL on SSL tests
cfg.CONF.set_override('server_ssl', False, 'RESTPROXY')
cfg.CONF.set_override('ssl_cert_directory',
os.path.join(etc_path, 'ssl'), 'RESTPROXY')
# The mock interferes with HTTP(S) connection caching
cfg.CONF.set_override('cache_connections', False, 'RESTPROXY')
def setup_patches(self):
self.httpPatch = mock.patch(HTTPCON, create=True,

View File

@ -0,0 +1,251 @@
# Copyright 2014 Big Switch Networks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kevin Benton, kevin.benton@bigswitch.com
#
import os
import mock
from oslo.config import cfg
import webob.exc
from neutron.openstack.common import log as logging
from neutron.tests.unit.bigswitch import fake_server
from neutron.tests.unit.bigswitch import test_base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin as test_plugin
LOG = logging.getLogger(__name__)
SERVERMANAGER = 'neutron.plugins.bigswitch.servermanager'
HTTPS = SERVERMANAGER + '.HTTPSConnectionWithValidation'
CERTCOMBINER = SERVERMANAGER + '.ServerPool._combine_certs_to_file'
FILEPUT = SERVERMANAGER + '.ServerPool._file_put_contents'
GETCACERTS = SERVERMANAGER + '.ServerPool._get_ca_cert_paths'
GETHOSTCERT = SERVERMANAGER + '.ServerPool._get_host_cert_path'
FAKECERTGET = 'neutron.tests.unit.bigswitch.fake_server.get_cert_contents'
SSLGETCERT = 'ssl.get_server_certificate'
class test_ssl_certificate_base(test_plugin.NeutronDbPluginV2TestCase,
test_base.BigSwitchTestBase):
plugin_str = ('%s.NeutronRestProxyV2' %
test_base.RESTPROXY_PKG_PATH)
servername = None
cert_base = None
def _setUp(self):
self.servername = test_api_v2._uuid()
self.cert_base = cfg.CONF.RESTPROXY.ssl_cert_directory
self.host_cert_val = 'DUMMYCERTFORHOST%s' % self.servername
self.host_cert_path = os.path.join(
self.cert_base,
'host_certs',
'%s.pem' % self.servername
)
self.comb_cert_path = os.path.join(
self.cert_base,
'combined',
'%s.pem' % self.servername
)
self.ca_certs_path = os.path.join(
self.cert_base,
'ca_certs'
)
cfg.CONF.set_override('servers', ["%s:443" % self.servername],
'RESTPROXY')
self.setup_patches()
# Mock method SSL lib uses to grab cert from server
self.sslgetcert_m = mock.patch(SSLGETCERT, create=True).start()
self.sslgetcert_m.return_value = self.host_cert_val
# Mock methods that write and read certs from the file-system
self.fileput_m = mock.patch(FILEPUT, create=True).start()
self.certcomb_m = mock.patch(CERTCOMBINER, create=True).start()
self.getcacerts_m = mock.patch(GETCACERTS, create=True).start()
# this is used to configure what certificate contents the fake HTTPS
# lib should expect to receive
self.fake_certget_m = mock.patch(FAKECERTGET, create=True).start()
def setUp(self):
super(test_ssl_certificate_base, self).setUp(self.plugin_str)
class TestSslSticky(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', True, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSHostValidation)
self.httpsPatch.start()
self._setUp()
# Set fake HTTPS connection's expectation
self.fake_certget_m.return_value = self.host_cert_val
# No CA certs for this test
self.getcacerts_m.return_value = []
super(TestSslSticky, self).setUp()
def test_sticky_cert(self):
# SSL connection should be successful and cert should be cached
with self.network():
# CA certs should have been checked for
self.getcacerts_m.assert_has_calls([mock.call(self.ca_certs_path)])
# cert should have been fetched via SSL lib
self.sslgetcert_m.assert_has_calls(
[mock.call((self.servername, 443))]
)
# cert should have been recorded
self.fileput_m.assert_has_calls([mock.call(self.host_cert_path,
self.host_cert_val)])
# no ca certs, so host cert only for this combined cert
self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
self.comb_cert_path)])
class TestSslHostCert(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSHostValidation)
self.httpsPatch.start()
self._setUp()
# Set fake HTTPS connection's expectation
self.fake_certget_m.return_value = self.host_cert_val
# No CA certs for this test
self.getcacerts_m.return_value = []
# Pretend host cert exists
self.hcertpath_p = mock.patch(GETHOSTCERT,
return_value=(self.host_cert_path, True),
create=True).start()
super(TestSslHostCert, self).setUp()
def test_host_cert(self):
# SSL connection should be successful because of pre-configured cert
with self.network():
self.hcertpath_p.assert_has_calls([
mock.call(os.path.join(self.cert_base, 'host_certs'),
self.servername)
])
# sticky is disabled, no fetching allowed
self.assertFalse(self.sslgetcert_m.call_count)
# no ca certs, so host cert is only for this combined cert
self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
self.comb_cert_path)])
class TestSslCaCert(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSCAValidation)
self.httpsPatch.start()
self._setUp()
# pretend to have a few ca certs
self.getcacerts_m.return_value = ['ca1.pem', 'ca2.pem']
# Set fake HTTPS connection's expectation
self.fake_certget_m.return_value = 'DUMMYCERTIFICATEAUTHORITY'
super(TestSslCaCert, self).setUp()
def test_ca_cert(self):
# SSL connection should be successful because CA cert was present
# If not, attempting to create a network would raise an exception
with self.network():
# sticky is disabled, no fetching allowed
self.assertFalse(self.sslgetcert_m.call_count)
# 2 CAs and no host cert so combined should only contain both CAs
self.certcomb_m.assert_has_calls([mock.call(['ca1.pem', 'ca2.pem'],
self.comb_cert_path)])
class TestSslWrongHostCert(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', True, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSHostValidation)
self.httpsPatch.start()
self._setUp()
# Set fake HTTPS connection's expectation to something wrong
self.fake_certget_m.return_value = 'OTHERCERT'
# No CA certs for this test
self.getcacerts_m.return_value = []
# Pretend host cert exists
self.hcertpath_p = mock.patch(GETHOSTCERT,
return_value=(self.host_cert_path, True),
create=True).start()
super(TestSslWrongHostCert, self).setUp()
def test_error_no_cert(self):
# since there will already be a host cert, sticky should not take
# effect and there will be an error because the host cert's contents
# will be incorrect
tid = test_api_v2._uuid()
data = {}
data['network'] = {'tenant_id': tid, 'name': 'name',
'admin_state_up': True}
req = self.new_create_request('networks', data, 'json')
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPInternalServerError.code)
self.hcertpath_p.assert_has_calls([
mock.call(os.path.join(self.cert_base, 'host_certs'),
self.servername)
])
# sticky is enabled, but a host cert already exists so it shant fetch
self.assertFalse(self.sslgetcert_m.call_count)
# no ca certs, so host cert only for this combined cert
self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
self.comb_cert_path)])
class TestSslNoValidation(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
cfg.CONF.set_override('no_ssl_validation', True, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSNoValidation)
self.httpsPatch.start()
self._setUp()
super(TestSslNoValidation, self).setUp()
def test_validation_disabled(self):
# SSL connection should be successful without any certificates
# If not, attempting to create a network will raise an exception
with self.network():
# no sticky grabbing and no cert combining with no enforcement
self.assertFalse(self.sslgetcert_m.call_count)
self.assertFalse(self.certcomb_m.call_count)

View File

@ -47,7 +47,10 @@ data_files =
etc/neutron/rootwrap.d/ryu-plugin.filters
etc/neutron/rootwrap.d/vpnaas.filters
etc/init.d = etc/init.d/neutron-server
etc/neutron/plugins/bigswitch = etc/neutron/plugins/bigswitch/restproxy.ini
etc/neutron/plugins/bigswitch =
etc/neutron/plugins/bigswitch/restproxy.ini
etc/neutron/plugins/bigswitch/ssl/ca_certs/README
etc/neutron/plugins/bigswitch/ssl/host_certs/README
etc/neutron/plugins/brocade = etc/neutron/plugins/brocade/brocade.ini
etc/neutron/plugins/cisco = etc/neutron/plugins/cisco/cisco_plugins.ini
etc/neutron/plugins/hyperv = etc/neutron/plugins/hyperv/hyperv_neutron_plugin.ini