Merge "VPNaaS: Separate validation for Cisco impl"

This commit is contained in:
Jenkins 2014-07-25 20:24:15 +00:00 committed by Gerrit Code Review
commit 6a78fb926f
3 changed files with 214 additions and 151 deletions

View File

@ -12,32 +12,19 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import netaddr
from netaddr import core as net_exc
from neutron.common import exceptions
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.vpn.common import topics from neutron.services.vpn.common import topics
from neutron.services.vpn import service_drivers from neutron.services.vpn import service_drivers
from neutron.services.vpn.service_drivers import cisco_csr_db as csr_id_map from neutron.services.vpn.service_drivers import cisco_csr_db as csr_id_map
from neutron.services.vpn.service_drivers import cisco_validator
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
IPSEC = 'ipsec' IPSEC = 'ipsec'
BASE_IPSEC_VERSION = '1.0' BASE_IPSEC_VERSION = '1.0'
LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
'IPSec Policy': {'min': 120, 'max': 2592000}}
MIN_CSR_MTU = 1500
MAX_CSR_MTU = 9192
class CsrValidationFailure(exceptions.BadRequest):
message = _("Cisco CSR does not support %(resource)s attribute %(key)s "
"with value '%(value)s'")
class CiscoCsrIPsecVpnDriverCallBack(n_rpc.RpcCallback): class CiscoCsrIPsecVpnDriverCallBack(n_rpc.RpcCallback):
@ -84,7 +71,9 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
"""Cisco CSR VPN Service Driver class for IPsec.""" """Cisco CSR VPN Service Driver class for IPsec."""
def __init__(self, service_plugin): def __init__(self, service_plugin):
super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin) super(CiscoCsrIPsecVPNDriver, self).__init__(
service_plugin,
cisco_validator.CiscoCsrVpnValidator(service_plugin))
self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)] self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)]
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
self.conn.create_consumer( self.conn.create_consumer(
@ -97,79 +86,9 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
def service_type(self): def service_type(self):
return IPSEC return IPSEC
def validate_lifetime(self, for_policy, policy_info):
"""Ensure lifetime in secs and value is supported, based on policy."""
units = policy_info['lifetime']['units']
if units != 'seconds':
raise CsrValidationFailure(resource=for_policy,
key='lifetime:units',
value=units)
value = policy_info['lifetime']['value']
if (value < LIFETIME_LIMITS[for_policy]['min'] or
value > LIFETIME_LIMITS[for_policy]['max']):
raise CsrValidationFailure(resource=for_policy,
key='lifetime:value',
value=value)
def validate_ike_version(self, policy_info):
"""Ensure IKE policy is v1 for current REST API."""
version = policy_info['ike_version']
if version != 'v1':
raise CsrValidationFailure(resource='IKE Policy',
key='ike_version',
value=version)
def validate_mtu(self, conn_info):
"""Ensure the MTU value is supported."""
mtu = conn_info['mtu']
if mtu < MIN_CSR_MTU or mtu > MAX_CSR_MTU:
raise CsrValidationFailure(resource='IPSec Connection',
key='mtu',
value=mtu)
def validate_public_ip_present(self, vpn_service):
"""Ensure there is one gateway IP specified for the router used."""
gw_port = vpn_service.router.gw_port
if not gw_port or len(gw_port.fixed_ips) != 1:
raise CsrValidationFailure(resource='IPSec Connection',
key='router:gw_port:ip_address',
value='missing')
def validate_peer_id(self, ipsec_conn):
"""Ensure that an IP address is specified for peer ID."""
# TODO(pcm) Should we check peer_address too?
peer_id = ipsec_conn['peer_id']
try:
netaddr.IPAddress(peer_id)
except net_exc.AddrFormatError:
raise CsrValidationFailure(resource='IPSec Connection',
key='peer_id', value=peer_id)
def validate_ipsec_connection(self, context, ipsec_conn, vpn_service):
"""Validate attributes w.r.t. Cisco CSR capabilities."""
ike_policy = self.service_plugin.get_ikepolicy(
context, ipsec_conn['ikepolicy_id'])
ipsec_policy = self.service_plugin.get_ipsecpolicy(
context, ipsec_conn['ipsecpolicy_id'])
self.validate_lifetime('IKE Policy', ike_policy)
self.validate_lifetime('IPSec Policy', ipsec_policy)
self.validate_ike_version(ike_policy)
self.validate_mtu(ipsec_conn)
self.validate_public_ip_present(vpn_service)
self.validate_peer_id(ipsec_conn)
LOG.debug(_("IPSec connection %s validated for Cisco CSR"),
ipsec_conn['id'])
def create_ipsec_site_connection(self, context, ipsec_site_connection): def create_ipsec_site_connection(self, context, ipsec_site_connection):
vpnservice = self.service_plugin._get_vpnservice( vpnservice = self.service_plugin._get_vpnservice(
context, ipsec_site_connection['vpnservice_id']) context, ipsec_site_connection['vpnservice_id'])
try:
self.validate_ipsec_connection(context, ipsec_site_connection,
vpnservice)
except CsrValidationFailure:
with excutils.save_and_reraise_exception():
self.service_plugin.update_ipsec_site_conn_status(
context, ipsec_site_connection['id'], constants.ERROR)
csr_id_map.create_tunnel_mapping(context, ipsec_site_connection) csr_id_map.create_tunnel_mapping(context, ipsec_site_connection)
self.agent_rpc.vpnservice_updated(context, vpnservice['router_id'], self.agent_rpc.vpnservice_updated(context, vpnservice['router_id'],
reason='ipsec-conn-create') reason='ipsec-conn-create')

View File

@ -0,0 +1,116 @@
# Copyright 2014 Cisco Systems, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Paul Michali, Cisco Systems, Inc.
import netaddr
from netaddr import core as net_exc
from neutron.common import exceptions
from neutron.db.vpn import vpn_validator
from neutron.openstack.common import log as logging
LIFETIME_LIMITS = {'IKE Policy': {'min': 60, 'max': 86400},
'IPSec Policy': {'min': 120, 'max': 2592000}}
MIN_CSR_MTU = 1500
MAX_CSR_MTU = 9192
LOG = logging.getLogger(__name__)
class CsrValidationFailure(exceptions.BadRequest):
message = _("Cisco CSR does not support %(resource)s attribute %(key)s "
"with value '%(value)s'")
class CiscoCsrVpnValidator(vpn_validator.VpnReferenceValidator):
"""Validator methods for the Cisco CSR."""
def __init__(self, service_plugin):
self.service_plugin = service_plugin
super(CiscoCsrVpnValidator, self).__init__()
def validate_lifetime(self, for_policy, policy_info):
"""Ensure lifetime in secs and value is supported, based on policy."""
units = policy_info['lifetime']['units']
if units != 'seconds':
raise CsrValidationFailure(resource=for_policy,
key='lifetime:units',
value=units)
value = policy_info['lifetime']['value']
if (value < LIFETIME_LIMITS[for_policy]['min'] or
value > LIFETIME_LIMITS[for_policy]['max']):
raise CsrValidationFailure(resource=for_policy,
key='lifetime:value',
value=value)
def validate_ike_version(self, policy_info):
"""Ensure IKE policy is v1 for current REST API."""
version = policy_info['ike_version']
if version != 'v1':
raise CsrValidationFailure(resource='IKE Policy',
key='ike_version',
value=version)
def validate_mtu(self, conn_info):
"""Ensure the MTU value is supported."""
mtu = conn_info['mtu']
if mtu < MIN_CSR_MTU or mtu > MAX_CSR_MTU:
raise CsrValidationFailure(resource='IPSec Connection',
key='mtu',
value=mtu)
def validate_public_ip_present(self, vpn_service):
"""Ensure there is one gateway IP specified for the router used."""
gw_port = vpn_service.router.gw_port
if not gw_port or len(gw_port.fixed_ips) != 1:
raise CsrValidationFailure(resource='IPSec Connection',
key='router:gw_port:ip_address',
value='missing')
def validate_peer_id(self, ipsec_conn):
"""Ensure that an IP address is specified for peer ID."""
# TODO(pcm) Should we check peer_address too?
peer_id = ipsec_conn['peer_id']
try:
netaddr.IPAddress(peer_id)
except net_exc.AddrFormatError:
raise CsrValidationFailure(resource='IPSec Connection',
key='peer_id', value=peer_id)
def validate_ipsec_site_connection(self, context, ipsec_sitecon,
ip_version):
"""Validate IPSec site connection for Cisco CSR.
After doing reference validation, do additional checks that relate
to the Cisco CSR.
"""
super(CiscoCsrVpnValidator, self)._check_dpd(ipsec_sitecon)
ike_policy = self.service_plugin.get_ikepolicy(
context, ipsec_sitecon['ikepolicy_id'])
ipsec_policy = self.service_plugin.get_ipsecpolicy(
context, ipsec_sitecon['ipsecpolicy_id'])
vpn_service = self.service_plugin.get_vpnservice(
context, ipsec_sitecon['vpnservice_id'])
self.validate_lifetime('IKE Policy', ike_policy)
self.validate_lifetime('IPSec Policy', ipsec_policy)
self.validate_ike_version(ike_policy)
self.validate_mtu(ipsec_sitecon)
self.validate_public_ip_present(vpn_service)
self.validate_peer_id(ipsec_sitecon)
LOG.debug("IPSec connection %s validated for Cisco CSR",
ipsec_sitecon['id'])

View File

@ -13,20 +13,21 @@
# under the License. # under the License.
import mock import mock
# from oslo.config import cfg
from neutron import context as n_ctx from neutron import context as n_ctx
from neutron.db import api as dbapi from neutron.db import api as dbapi
from neutron.openstack.common import uuidutils from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants from neutron.plugins.common import constants
# from neutron.services.vpn import plugin as vpn_plugin
from neutron.services.vpn.service_drivers import cisco_csr_db as csr_db from neutron.services.vpn.service_drivers import cisco_csr_db as csr_db
from neutron.services.vpn.service_drivers import cisco_ipsec as ipsec_driver from neutron.services.vpn.service_drivers import cisco_ipsec as ipsec_driver
from neutron.services.vpn.service_drivers import cisco_validator as validator
from neutron.tests import base from neutron.tests import base
_uuid = uuidutils.generate_uuid _uuid = uuidutils.generate_uuid
FAKE_VPN_CONN_ID = _uuid() FAKE_VPN_CONN_ID = _uuid()
FAKE_VPN_CONNECTION = { FAKE_VPN_CONNECTION = {
'vpnservice_id': _uuid(), 'vpnservice_id': _uuid(),
'id': FAKE_VPN_CONN_ID, 'id': FAKE_VPN_CONN_ID,
@ -34,10 +35,39 @@ FAKE_VPN_CONNECTION = {
'ipsecpolicy_id': _uuid(), 'ipsecpolicy_id': _uuid(),
'tenant_id': _uuid() 'tenant_id': _uuid()
} }
FAKE_VPN_SERVICE = {
'router_id': _uuid() FAKE_SERVICE_ID = _uuid()
FAKE_VPN_CONNECTION = {
'vpnservice_id': FAKE_SERVICE_ID
} }
FAKE_ROUTER_ID = _uuid()
FAKE_VPN_SERVICE = {
'router_id': FAKE_ROUTER_ID
}
FAKE_HOST = 'fake_host' FAKE_HOST = 'fake_host'
IPV4 = 4
CISCO_IPSEC_SERVICE_DRIVER = ('neutron.services.vpn.service_drivers.'
'cisco_ipsec.CiscoCsrIPsecVPNDriver')
# class TestCiscoValidatorSelection(base.BaseTestCase):
#
# def setUp(self):
# super(TestCiscoValidatorSelection, self).setUp()
# vpnaas_provider = (constants.VPN + ':vpnaas:' +
# CISCO_IPSEC_SERVICE_DRIVER + ':default')
# cfg.CONF.set_override('service_provider',
# [vpnaas_provider],
# 'service_providers')
# mock.patch('neutron.common.rpc.create_connection').start()
# self.vpn_plugin = vpn_plugin.VPNDriverPlugin()
#
# def test_reference_driver_used(self):
# self.assertIsInstance(self.vpn_plugin._get_validator(),
# validator.CiscoCsrVpnValidator)
class TestCiscoIPsecDriverValidation(base.BaseTestCase): class TestCiscoIPsecDriverValidation(base.BaseTestCase):
@ -45,85 +75,93 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestCiscoIPsecDriverValidation, self).setUp() super(TestCiscoIPsecDriverValidation, self).setUp()
mock.patch('neutron.common.rpc.create_connection').start() mock.patch('neutron.common.rpc.create_connection').start()
self.service_plugin = mock.Mock() self.l3_plugin = mock.Mock()
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(self.service_plugin) mock.patch(
'neutron.manager.NeutronManager.get_service_plugins',
return_value={constants.L3_ROUTER_NAT: self.l3_plugin}).start()
self.core_plugin = mock.Mock()
mock.patch('neutron.manager.NeutronManager.get_plugin',
return_value=self.core_plugin).start()
self.context = n_ctx.Context('some_user', 'some_tenant') self.context = n_ctx.Context('some_user', 'some_tenant')
self.vpn_service = mock.Mock() self.vpn_service = mock.Mock()
self.service_plugin = mock.Mock()
self.validator = validator.CiscoCsrVpnValidator(self.service_plugin)
def test_ike_version_unsupported(self): def test_ike_version_unsupported(self):
"""Failure test that Cisco CSR REST API does not support IKE v2.""" """Failure test that Cisco CSR REST API does not support IKE v2."""
policy_info = {'ike_version': 'v2', policy_info = {'ike_version': 'v2',
'lifetime': {'units': 'seconds', 'value': 60}} 'lifetime': {'units': 'seconds', 'value': 60}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_ike_version, policy_info) self.validator.validate_ike_version,
policy_info)
def test_ike_lifetime_not_in_seconds(self): def test_ike_lifetime_not_in_seconds(self):
"""Failure test of unsupported lifetime units for IKE policy.""" """Failure test of unsupported lifetime units for IKE policy."""
policy_info = {'lifetime': {'units': 'kilobytes', 'value': 1000}} policy_info = {'lifetime': {'units': 'kilobytes', 'value': 1000}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_lifetime, self.validator.validate_lifetime,
"IKE Policy", policy_info) "IKE Policy", policy_info)
def test_ipsec_lifetime_not_in_seconds(self): def test_ipsec_lifetime_not_in_seconds(self):
"""Failure test of unsupported lifetime units for IPSec policy.""" """Failure test of unsupported lifetime units for IPSec policy."""
policy_info = {'lifetime': {'units': 'kilobytes', 'value': 1000}} policy_info = {'lifetime': {'units': 'kilobytes', 'value': 1000}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_lifetime, self.validator.validate_lifetime,
"IPSec Policy", policy_info) "IPSec Policy", policy_info)
def test_ike_lifetime_seconds_values_at_limits(self): def test_ike_lifetime_seconds_values_at_limits(self):
"""Test valid lifetime values for IKE policy.""" """Test valid lifetime values for IKE policy."""
policy_info = {'lifetime': {'units': 'seconds', 'value': 60}} policy_info = {'lifetime': {'units': 'seconds', 'value': 60}}
self.driver.validate_lifetime('IKE Policy', policy_info) self.validator.validate_lifetime('IKE Policy', policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 86400}} policy_info = {'lifetime': {'units': 'seconds', 'value': 86400}}
self.driver.validate_lifetime('IKE Policy', policy_info) self.validator.validate_lifetime('IKE Policy', policy_info)
def test_ipsec_lifetime_seconds_values_at_limits(self): def test_ipsec_lifetime_seconds_values_at_limits(self):
"""Test valid lifetime values for IPSec policy.""" """Test valid lifetime values for IPSec policy."""
policy_info = {'lifetime': {'units': 'seconds', 'value': 120}} policy_info = {'lifetime': {'units': 'seconds', 'value': 120}}
self.driver.validate_lifetime('IPSec Policy', policy_info) self.validator.validate_lifetime('IPSec Policy', policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 2592000}} policy_info = {'lifetime': {'units': 'seconds', 'value': 2592000}}
self.driver.validate_lifetime('IPSec Policy', policy_info) self.validator.validate_lifetime('IPSec Policy', policy_info)
def test_ike_lifetime_values_invalid(self): def test_ike_lifetime_values_invalid(self):
"""Failure test of unsupported lifetime values for IKE policy.""" """Failure test of unsupported lifetime values for IKE policy."""
which = "IKE Policy" which = "IKE Policy"
policy_info = {'lifetime': {'units': 'seconds', 'value': 59}} policy_info = {'lifetime': {'units': 'seconds', 'value': 59}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_lifetime, self.validator.validate_lifetime,
which, policy_info) which, policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 86401}} policy_info = {'lifetime': {'units': 'seconds', 'value': 86401}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_lifetime, self.validator.validate_lifetime,
which, policy_info) which, policy_info)
def test_ipsec_lifetime_values_invalid(self): def test_ipsec_lifetime_values_invalid(self):
"""Failure test of unsupported lifetime values for IPSec policy.""" """Failure test of unsupported lifetime values for IPSec policy."""
which = "IPSec Policy" which = "IPSec Policy"
policy_info = {'lifetime': {'units': 'seconds', 'value': 119}} policy_info = {'lifetime': {'units': 'seconds', 'value': 119}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_lifetime, self.validator.validate_lifetime,
which, policy_info) which, policy_info)
policy_info = {'lifetime': {'units': 'seconds', 'value': 2592001}} policy_info = {'lifetime': {'units': 'seconds', 'value': 2592001}}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_lifetime, self.validator.validate_lifetime,
which, policy_info) which, policy_info)
def test_ipsec_connection_with_mtu_at_limits(self): def test_ipsec_connection_with_mtu_at_limits(self):
"""Test IPSec site-to-site connection with MTU at limits.""" """Test IPSec site-to-site connection with MTU at limits."""
conn_info = {'mtu': 1500} conn_info = {'mtu': 1500}
self.driver.validate_mtu(conn_info) self.validator.validate_mtu(conn_info)
conn_info = {'mtu': 9192} conn_info = {'mtu': 9192}
self.driver.validate_mtu(conn_info) self.validator.validate_mtu(conn_info)
def test_ipsec_connection_with_invalid_mtu(self): def test_ipsec_connection_with_invalid_mtu(self):
"""Failure test of IPSec site connection with unsupported MTUs.""" """Failure test of IPSec site connection with unsupported MTUs."""
conn_info = {'mtu': 1499} conn_info = {'mtu': 1499}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_mtu, conn_info) self.validator.validate_mtu, conn_info)
conn_info = {'mtu': 9193} conn_info = {'mtu': 9193}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_mtu, conn_info) self.validator.validate_mtu, conn_info)
def simulate_gw_ip_available(self): def simulate_gw_ip_available(self):
"""Helper function indicating that tunnel has a gateway IP.""" """Helper function indicating that tunnel has a gateway IP."""
@ -137,43 +175,53 @@ class TestCiscoIPsecDriverValidation(base.BaseTestCase):
def test_have_public_ip_for_router(self): def test_have_public_ip_for_router(self):
"""Ensure that router for IPSec connection has gateway IP.""" """Ensure that router for IPSec connection has gateway IP."""
self.simulate_gw_ip_available() self.simulate_gw_ip_available()
self.driver.validate_public_ip_present(self.vpn_service) self.validator.validate_public_ip_present(self.vpn_service)
def test_router_with_missing_gateway_ip(self): def test_router_with_missing_gateway_ip(self):
"""Failure test of IPSec connection with missing gateway IP.""" """Failure test of IPSec connection with missing gateway IP."""
self.simulate_gw_ip_available() self.simulate_gw_ip_available()
self.vpn_service.router.gw_port = None self.vpn_service.router.gw_port = None
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_public_ip_present, self.validator.validate_public_ip_present,
self.vpn_service) self.vpn_service)
def test_peer_id_is_an_ip_address(self): def test_peer_id_is_an_ip_address(self):
"""Ensure peer ID is an IP address for IPsec connection create.""" """Ensure peer ID is an IP address for IPsec connection create."""
ipsec_conn = {'peer_id': '10.10.10.10'} ipsec_sitecon = {'peer_id': '10.10.10.10'}
self.driver.validate_peer_id(ipsec_conn) self.validator.validate_peer_id(ipsec_sitecon)
def test_peer_id_is_not_ip_address(self): def test_peer_id_is_not_ip_address(self):
"""Failure test of peer_id that is not an IP address.""" """Failure test of peer_id that is not an IP address."""
ipsec_conn = {'peer_id': 'some-site.com'} ipsec_sitecon = {'peer_id': 'some-site.com'}
self.assertRaises(ipsec_driver.CsrValidationFailure, self.assertRaises(validator.CsrValidationFailure,
self.driver.validate_peer_id, ipsec_conn) self.validator.validate_peer_id, ipsec_sitecon)
def test_validation_for_create_ipsec_connection(self): def test_validation_for_create_ipsec_connection(self):
"""Ensure all validation passes for IPSec site connection create.""" """Ensure all validation passes for IPSec site connection create."""
self.simulate_gw_ip_available() self.simulate_gw_ip_available()
# Provide the minimum needed items to validate
ipsec_conn = {'id': '1',
'ikepolicy_id': '123',
'ipsecpolicy_id': '2',
'mtu': 1500,
'peer_id': '10.10.10.10'}
self.service_plugin.get_ikepolicy = mock.Mock( self.service_plugin.get_ikepolicy = mock.Mock(
return_value={'ike_version': 'v1', return_value={'ike_version': 'v1',
'lifetime': {'units': 'seconds', 'value': 60}}) 'lifetime': {'units': 'seconds', 'value': 60}})
self.service_plugin.get_ipsecpolicy = mock.Mock( self.service_plugin.get_ipsecpolicy = mock.Mock(
return_value={'lifetime': {'units': 'seconds', 'value': 120}}) return_value={'lifetime': {'units': 'seconds', 'value': 120}})
self.driver.validate_ipsec_connection(self.context, ipsec_conn, self.service_plugin.get_vpnservice = mock.Mock(
self.vpn_service) return_value=self.vpn_service)
# Provide the minimum needed items to validate
ipsec_sitecon = {'id': '1',
'vpnservice_id': FAKE_SERVICE_ID,
'ikepolicy_id': '123',
'ipsecpolicy_id': '2',
'mtu': 1500,
'peer_id': '10.10.10.10'}
# Using defaults for DPD info
expected = {'dpd_action': 'hold',
'dpd_interval': 30,
'dpd_timeout': 120}
expected.update(ipsec_sitecon)
self.validator.assign_sensible_ipsec_sitecon_defaults(ipsec_sitecon)
self.validator.validate_ipsec_site_connection(self.context,
ipsec_sitecon, IPV4)
self.assertEqual(expected, ipsec_sitecon)
class TestCiscoIPsecDriverMapping(base.BaseTestCase): class TestCiscoIPsecDriverMapping(base.BaseTestCase):
@ -305,7 +353,6 @@ class TestCiscoIPsecDriver(base.BaseTestCase):
} }
self.db_update_mock = service_plugin.update_ipsec_site_conn_status self.db_update_mock = service_plugin.update_ipsec_site_conn_status
self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(service_plugin) self.driver = ipsec_driver.CiscoCsrIPsecVPNDriver(service_plugin)
self.driver.validate_ipsec_connection = mock.Mock()
mock.patch.object(csr_db, 'create_tunnel_mapping').start() mock.patch.object(csr_db, 'create_tunnel_mapping').start()
self.context = n_ctx.Context('some_user', 'some_tenant') self.context = n_ctx.Context('some_user', 'some_tenant')
@ -325,25 +372,6 @@ class TestCiscoIPsecDriver(base.BaseTestCase):
[FAKE_VPN_CONNECTION], [FAKE_VPN_CONNECTION],
{'reason': 'ipsec-conn-create'}) {'reason': 'ipsec-conn-create'})
def test_failure_validation_ipsec_connection(self):
"""Failure test of validation during IPSec site connection create.
Simulate a validation failure, and ensure that database is
updated to indicate connection is in error state.
TODO(pcm): FUTURE - remove test case, once vendor plugin
validation is done before database commit.
"""
self.driver.validate_ipsec_connection.side_effect = (
ipsec_driver.CsrValidationFailure(resource='IPSec Connection',
key='mtu', value=1000))
self.assertRaises(ipsec_driver.CsrValidationFailure,
self.driver.create_ipsec_site_connection,
self.context, FAKE_VPN_CONNECTION)
self.db_update_mock.assert_called_with(self.context,
FAKE_VPN_CONN_ID,
constants.ERROR)
def test_update_ipsec_site_connection(self): def test_update_ipsec_site_connection(self):
self._test_update(self.driver.update_ipsec_site_connection, self._test_update(self.driver.update_ipsec_site_connection,
[FAKE_VPN_CONNECTION, FAKE_VPN_CONNECTION], [FAKE_VPN_CONNECTION, FAKE_VPN_CONNECTION],