vmware-nsx/neutron/tests/unit/bigswitch/test_ssl.py
Kevin Benton e915ad3a15 BigSwitch: Add SSL Certificate Validation
This patch adds the option to use SSL certificate
validation on the backend controller using SSH-style
sticky authentication, individual trusted
certificates, and/or certificate authorities.
Also adds caching of connections to deal with
increased overhead of TLS/SSL handshake.

Default is now sticky-style enforcement.

Partial-Bug: 1188189
Implements: blueprint bsn-certificate-enforcement
Change-Id: If0bab196495c4944a53e0e394c956cca36269883
2014-03-05 15:08:39 -05:00

252 lines
10 KiB
Python

# Copyright 2014 Big Switch Networks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Kevin Benton, kevin.benton@bigswitch.com
#
import os
import mock
from oslo.config import cfg
import webob.exc
from neutron.openstack.common import log as logging
from neutron.tests.unit.bigswitch import fake_server
from neutron.tests.unit.bigswitch import test_base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin as test_plugin
LOG = logging.getLogger(__name__)
SERVERMANAGER = 'neutron.plugins.bigswitch.servermanager'
HTTPS = SERVERMANAGER + '.HTTPSConnectionWithValidation'
CERTCOMBINER = SERVERMANAGER + '.ServerPool._combine_certs_to_file'
FILEPUT = SERVERMANAGER + '.ServerPool._file_put_contents'
GETCACERTS = SERVERMANAGER + '.ServerPool._get_ca_cert_paths'
GETHOSTCERT = SERVERMANAGER + '.ServerPool._get_host_cert_path'
FAKECERTGET = 'neutron.tests.unit.bigswitch.fake_server.get_cert_contents'
SSLGETCERT = 'ssl.get_server_certificate'
class test_ssl_certificate_base(test_plugin.NeutronDbPluginV2TestCase,
test_base.BigSwitchTestBase):
plugin_str = ('%s.NeutronRestProxyV2' %
test_base.RESTPROXY_PKG_PATH)
servername = None
cert_base = None
def _setUp(self):
self.servername = test_api_v2._uuid()
self.cert_base = cfg.CONF.RESTPROXY.ssl_cert_directory
self.host_cert_val = 'DUMMYCERTFORHOST%s' % self.servername
self.host_cert_path = os.path.join(
self.cert_base,
'host_certs',
'%s.pem' % self.servername
)
self.comb_cert_path = os.path.join(
self.cert_base,
'combined',
'%s.pem' % self.servername
)
self.ca_certs_path = os.path.join(
self.cert_base,
'ca_certs'
)
cfg.CONF.set_override('servers', ["%s:443" % self.servername],
'RESTPROXY')
self.setup_patches()
# Mock method SSL lib uses to grab cert from server
self.sslgetcert_m = mock.patch(SSLGETCERT, create=True).start()
self.sslgetcert_m.return_value = self.host_cert_val
# Mock methods that write and read certs from the file-system
self.fileput_m = mock.patch(FILEPUT, create=True).start()
self.certcomb_m = mock.patch(CERTCOMBINER, create=True).start()
self.getcacerts_m = mock.patch(GETCACERTS, create=True).start()
# this is used to configure what certificate contents the fake HTTPS
# lib should expect to receive
self.fake_certget_m = mock.patch(FAKECERTGET, create=True).start()
def setUp(self):
super(test_ssl_certificate_base, self).setUp(self.plugin_str)
class TestSslSticky(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', True, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSHostValidation)
self.httpsPatch.start()
self._setUp()
# Set fake HTTPS connection's expectation
self.fake_certget_m.return_value = self.host_cert_val
# No CA certs for this test
self.getcacerts_m.return_value = []
super(TestSslSticky, self).setUp()
def test_sticky_cert(self):
# SSL connection should be successful and cert should be cached
with self.network():
# CA certs should have been checked for
self.getcacerts_m.assert_has_calls([mock.call(self.ca_certs_path)])
# cert should have been fetched via SSL lib
self.sslgetcert_m.assert_has_calls(
[mock.call((self.servername, 443))]
)
# cert should have been recorded
self.fileput_m.assert_has_calls([mock.call(self.host_cert_path,
self.host_cert_val)])
# no ca certs, so host cert only for this combined cert
self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
self.comb_cert_path)])
class TestSslHostCert(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSHostValidation)
self.httpsPatch.start()
self._setUp()
# Set fake HTTPS connection's expectation
self.fake_certget_m.return_value = self.host_cert_val
# No CA certs for this test
self.getcacerts_m.return_value = []
# Pretend host cert exists
self.hcertpath_p = mock.patch(GETHOSTCERT,
return_value=(self.host_cert_path, True),
create=True).start()
super(TestSslHostCert, self).setUp()
def test_host_cert(self):
# SSL connection should be successful because of pre-configured cert
with self.network():
self.hcertpath_p.assert_has_calls([
mock.call(os.path.join(self.cert_base, 'host_certs'),
self.servername)
])
# sticky is disabled, no fetching allowed
self.assertFalse(self.sslgetcert_m.call_count)
# no ca certs, so host cert is only for this combined cert
self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
self.comb_cert_path)])
class TestSslCaCert(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSCAValidation)
self.httpsPatch.start()
self._setUp()
# pretend to have a few ca certs
self.getcacerts_m.return_value = ['ca1.pem', 'ca2.pem']
# Set fake HTTPS connection's expectation
self.fake_certget_m.return_value = 'DUMMYCERTIFICATEAUTHORITY'
super(TestSslCaCert, self).setUp()
def test_ca_cert(self):
# SSL connection should be successful because CA cert was present
# If not, attempting to create a network would raise an exception
with self.network():
# sticky is disabled, no fetching allowed
self.assertFalse(self.sslgetcert_m.call_count)
# 2 CAs and no host cert so combined should only contain both CAs
self.certcomb_m.assert_has_calls([mock.call(['ca1.pem', 'ca2.pem'],
self.comb_cert_path)])
class TestSslWrongHostCert(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', True, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSHostValidation)
self.httpsPatch.start()
self._setUp()
# Set fake HTTPS connection's expectation to something wrong
self.fake_certget_m.return_value = 'OTHERCERT'
# No CA certs for this test
self.getcacerts_m.return_value = []
# Pretend host cert exists
self.hcertpath_p = mock.patch(GETHOSTCERT,
return_value=(self.host_cert_path, True),
create=True).start()
super(TestSslWrongHostCert, self).setUp()
def test_error_no_cert(self):
# since there will already be a host cert, sticky should not take
# effect and there will be an error because the host cert's contents
# will be incorrect
tid = test_api_v2._uuid()
data = {}
data['network'] = {'tenant_id': tid, 'name': 'name',
'admin_state_up': True}
req = self.new_create_request('networks', data, 'json')
res = req.get_response(self.api)
self.assertEqual(res.status_int,
webob.exc.HTTPInternalServerError.code)
self.hcertpath_p.assert_has_calls([
mock.call(os.path.join(self.cert_base, 'host_certs'),
self.servername)
])
# sticky is enabled, but a host cert already exists so it shant fetch
self.assertFalse(self.sslgetcert_m.call_count)
# no ca certs, so host cert only for this combined cert
self.certcomb_m.assert_has_calls([mock.call([self.host_cert_path],
self.comb_cert_path)])
class TestSslNoValidation(test_ssl_certificate_base):
def setUp(self):
self.setup_config_files()
cfg.CONF.set_override('server_ssl', True, 'RESTPROXY')
cfg.CONF.set_override('ssl_sticky', False, 'RESTPROXY')
cfg.CONF.set_override('no_ssl_validation', True, 'RESTPROXY')
self.httpsPatch = mock.patch(HTTPS, create=True,
new=fake_server.HTTPSNoValidation)
self.httpsPatch.start()
self._setUp()
super(TestSslNoValidation, self).setUp()
def test_validation_disabled(self):
# SSL connection should be successful without any certificates
# If not, attempting to create a network will raise an exception
with self.network():
# no sticky grabbing and no cert combining with no enforcement
self.assertFalse(self.sslgetcert_m.call_count)
self.assertFalse(self.certcomb_m.call_count)